
Python’s asyncio library is the backbone of every high-throughput Python service shipping in 2026 — from FastAPI APIs to Discord bots to web scrapers pulling thousands of pages a minute. This guide walks through the entire mental model: event loops, coroutines, tasks, structured concurrency with TaskGroup, and the handful of mistakes that quietly tank performance in production.
📋 Table of Contents
- Table of Contents
- What asyncio Actually Solves
- The Event Loop, Explained
- Coroutines vs Tasks
- gather() vs TaskGroup
- Async Context Managers and Locks
- The Blocking Call Trap
- Timeouts and Cancellation
- Real Example: Concurrent API Fetcher
- Common Mistakes
- Frequently Asked Questions
- Related Reading on TechPulse
- Keep Building
What asyncio Actually Solves
asyncio exists for one job: running many I/O-bound operations concurrently on a single thread, without the overhead of threads or processes. When your program spends most of its time waiting — for a database response, an HTTP reply, a file read — asyncio lets it switch to other work during that wait instead of sitting idle.
This is fundamentally different from CPU-bound parallelism. asyncio does not give you more CPU cores; it gives you better utilization of a single core while waiting on I/O. For CPU-heavy work (image processing, number crunching), you still want multiprocessing or a native extension.
The Event Loop, Explained
At the center of every asyncio program is the event loop — a single-threaded scheduler that runs coroutines, hands off control when one awaits something, and resumes it when the awaited result is ready.
import asyncio
async def say_hello():
print("start")
await asyncio.sleep(1)
print("end")
asyncio.run(say_hello())
asyncio.run() creates a fresh event loop, runs the coroutine to completion, then closes the loop. It’s the correct top-level entry point for any asyncio script — avoid the older get_event_loop().run_until_complete() pattern unless you have a specific reason to manage the loop manually.
Coroutines vs Tasks
A coroutine object (created by calling an async def function) does nothing on its own — it’s a paused generator-like object waiting to be driven. Awaiting it runs it inline, sequentially. A Task wraps a coroutine and schedules it to run concurrently on the event loop immediately.
import asyncio
import time
async def fetch(n):
await asyncio.sleep(1)
return n * 2
async def sequential():
start = time.perf_counter()
results = [await fetch(i) for i in range(5)]
print("sequential:", time.perf_counter() - start)
return results
async def concurrent():
start = time.perf_counter()
tasks = [asyncio.create_task(fetch(i)) for i in range(5)]
results = await asyncio.gather(*tasks)
print("concurrent:", time.perf_counter() - start)
return results
asyncio.run(sequential()) # ~5 seconds
asyncio.run(concurrent()) # ~1 second
The sequential version awaits each call one at a time, so the five one-second sleeps add up to five seconds. The concurrent version schedules all five as tasks up front, so they all sleep in parallel and the whole thing finishes in roughly one second.
gather() vs TaskGroup
asyncio.gather() has been the standard way to run multiple coroutines concurrently since Python 3.4, but it has a sharp edge: if one task raises, the others keep running in the background unless you pass return_exceptions=True and handle errors manually. That’s a common source of orphaned tasks and unhandled exception warnings in production logs.
Python 3.11 introduced asyncio.TaskGroup, a structured concurrency primitive that fixes this properly: if any child task fails, the group cancels all remaining sibling tasks and raises an ExceptionGroup containing every failure.
import asyncio
async def risky(n):
if n == 2:
raise ValueError("boom at " + str(n))
await asyncio.sleep(1)
return n
async def main():
try:
async with asyncio.TaskGroup() as tg:
results = [tg.create_task(risky(i)) for i in range(4)]
except* ValueError as eg:
for exc in eg.exceptions:
print("caught:", exc)
asyncio.run(main())
The except* syntax handles ExceptionGroup instances, letting you process every failure that occurred across the group rather than just the first one. If you’re targeting Python 3.11 or later, prefer TaskGroup over raw gather() for anything beyond trivial fire-and-forget concurrency.
Async Context Managers and Locks
Shared mutable state across concurrent tasks still needs protection, even on a single thread, because an await point can hand control to another task mid-operation. asyncio.Lock guards critical sections the same way threading.Lock does for threads.
import asyncio
counter = 0
lock = asyncio.Lock()
async def increment():
global counter
async with lock:
current = counter
await asyncio.sleep(0) # simulate a yield point
counter = current + 1
async def main():
await asyncio.gather(*(increment() for _ in range(100)))
print(counter) # always 100, never less
asyncio.run(main())
Without the lock, the await asyncio.sleep(0) yield point would let another task interleave between reading and writing counter, causing lost updates — the classic race condition, just on one thread instead of many.
The Blocking Call Trap
This is the single biggest source of “my asyncio app is slow” bug reports. The event loop is single-threaded. Any synchronous call that blocks — time.sleep(), a blocking requests.get(), a CPU-heavy loop, an unbuffered file read on a slow disk — freezes the entire loop, including every other task waiting on it.
# BAD: blocks the whole event loop for 2 seconds
import time
async def bad():
time.sleep(2)
# GOOD: yields control back to the loop
import asyncio
async def good():
await asyncio.sleep(2)
# GOOD: offload a real blocking call to a thread
async def good_blocking_lib():
result = await asyncio.to_thread(some_blocking_function, arg1, arg2)
return result
Use asyncio.to_thread() (3.9+) for blocking I/O from third-party libraries that don’t have an async equivalent. For CPU-bound work, use loop.run_in_executor() with a ProcessPoolExecutor instead — threads won’t help with CPU-bound work because of the GIL.
Timeouts and Cancellation
Production code needs timeouts on every external call. Python 3.11 added asyncio.timeout() as a clean context-manager replacement for the older asyncio.wait_for().
import asyncio
async def slow_call():
await asyncio.sleep(10)
async def main():
try:
async with asyncio.timeout(2):
await slow_call()
except TimeoutError:
print("operation timed out after 2s")
asyncio.run(main())
When a timeout fires, asyncio cancels the inner task by raising asyncio.CancelledError inside it. If your coroutine wraps resources that need cleanup (open connections, file handles), use try/finally or an async context manager so cancellation doesn’t leak resources.
Real Example: Concurrent API Fetcher
Here’s a complete, production-shaped example: fetching multiple URLs concurrently with a connection limit, a per-request timeout, and structured error handling using aiohttp.
import asyncio
import aiohttp
async def fetch_one(session, url, semaphore):
async with semaphore:
try:
async with asyncio.timeout(5):
async with session.get(url) as resp:
return url, resp.status, await resp.text()
except (TimeoutError, aiohttp.ClientError) as e:
return url, None, str(e)
async def fetch_all(urls, max_concurrent=10):
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_one(session, u, semaphore)) for u in urls]
return [t.result() for t in tasks]
urls = ["https://example.com"] * 50
results = asyncio.run(fetch_all(urls))
print(len(results), "requests completed")
The Semaphore caps how many requests run at once, which protects both your client and the target server from being overwhelmed by hundreds of simultaneous connections. This pattern — session reuse, semaphore-bounded concurrency, per-request timeout, structured task group — is close to what you’d see in a production scraper or API aggregator.
Common Mistakes
- Forgetting to await a coroutine. Calling an
async deffunction withoutawaitjust creates a coroutine object and does nothing — Python will warn “coroutine was never awaited” but it’s easy to miss in noisy logs. - Creating tasks without keeping a reference.
asyncio.create_task()only holds a weak reference internally; if the task object gets garbage collected before it finishes, it can be silently cancelled. Store tasks in a list or set until they complete. - Mixing sync and async database drivers. Using a blocking driver (like plain
psycopg2) inside async code defeats the purpose — use an async driver (asyncpg,aiomysql) or wrap the call withto_thread(). - Not setting timeouts. A single hung network call without a timeout can stall an entire request handler indefinitely under load.
- Overusing asyncio for CPU-bound work. If your bottleneck is computation, not waiting, asyncio adds complexity without adding speed.
Frequently Asked Questions
Is asyncio still relevant in 2026?
Yes. asyncio remains the standard way to write concurrent I/O-bound Python code, and it underpins major frameworks like FastAPI, aiohttp, and Starlette. Newer features like TaskGroup and exception groups in Python 3.11+ have made it more ergonomic, not less relevant.
Should I use asyncio or threading?
Use asyncio for I/O-bound work with many concurrent connections (network calls, database queries, file I/O). Use threading for blocking third-party libraries that don’t support async. Use multiprocessing for CPU-bound work since asyncio does not bypass the GIL.
What is the difference between asyncio.gather and TaskGroup?
asyncio.gather() collects results from multiple coroutines but has inconsistent cancellation behavior on errors. TaskGroup (Python 3.11+) is a structured concurrency primitive that automatically cancels sibling tasks when one fails and raises an ExceptionGroup, making error handling far more predictable.
Why does my asyncio program freeze?
The most common cause is calling a blocking, synchronous function (like time.sleep, a blocking requests.get, or heavy CPU work) inside a coroutine. That blocks the single event loop thread and stalls every other task. Use asyncio.sleep, async HTTP clients, or run_in_executor for blocking calls.
Can I mix asyncio with synchronous code?
Yes, via asyncio.to_thread() (Python 3.9+) or loop.run_in_executor() to offload blocking calls to a thread pool without blocking the event loop. You can also call asyncio.run() from synchronous entry points to bridge into async code.
Do I need asyncio for a small script?
Usually not. If you’re making one or two sequential API calls, synchronous code is simpler and easier to debug. Reach for asyncio when you have many concurrent I/O operations where overlapping wait time actually matters.
Keep Building
asyncio rewards a clear mental model more than memorized syntax: understand the event loop, know when something blocks, and lean on structured concurrency primitives like TaskGroup for anything beyond a quick script. Bookmark this guide and come back to the fetcher example next time you need a concurrent client built right the first time.
📚 You might also like
🔗 Share this article




✍️ Leave a Comment