
Python’s asyncio module enables writing concurrent code using the async/await syntax. In 2026, asyncio powers FastAPI, Starlette, and aiohttp. This guide covers everything from event loops to real-world patterns.
📋 Table of Contents
What Is asyncio?
asyncio uses a single-threaded event loop to manage concurrent tasks — perfect for network I/O and APIs where you spend most time waiting, not computing.
import asyncio
async def main():
print('Hello asyncio!')
await asyncio.sleep(1)
print('Done!')
asyncio.run(main())
async/await Syntax
Use async def to define a coroutine. Use await to pause until a result is ready.
import asyncio
async def fetch_data(url: str) -> str:
await asyncio.sleep(0.5) # simulate network
return f'Data from {url}'
async def main():
result = await fetch_data('https://api.example.com')
print(result)
asyncio.run(main())
Running Tasks Concurrently
asyncio.gather() runs multiple coroutines concurrently. Three 1-second tasks finish in ~1 second total, not 3.
import asyncio, time
async def task(name: str, delay: float):
print(f'Start {name}')
await asyncio.sleep(delay)
return name
async def main():
start = time.time()
results = await asyncio.gather(
task('A', 1.0),
task('B', 2.0),
task('C', 0.5),
)
print(f'Done in {time.time()-start:.2f}s: {results}')
# ~2.0s not 3.5s
asyncio.run(main())
asyncio with aiohttp
Fetch 100 URLs concurrently in roughly the same time as fetching one.
import asyncio, aiohttp
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.text()
async def main():
urls = ['https://httpbin.org/delay/1'] * 5
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[fetch(session, u) for u in urls])
print(f'Fetched {len(results)} pages')
asyncio.run(main())
asyncio.Queue for Producer-Consumer
import asyncio
async def producer(q: asyncio.Queue):
for i in range(5):
await q.put(f'item-{i}')
await asyncio.sleep(0.2)
await q.put(None) # sentinel
async def consumer(q: asyncio.Queue):
while True:
item = await q.get()
if item is None: break
print(f'Consumed {item}')
async def main():
q = asyncio.Queue(maxsize=3)
await asyncio.gather(producer(q), consumer(q))
asyncio.run(main())
Error Handling
import asyncio
async def risky(n):
if n == 2: raise ValueError(f'Task {n} failed!')
return f'Task {n} OK'
async def main():
results = await asyncio.gather(
*[risky(i) for i in range(4)],
return_exceptions=True
)
for r in results:
print('Error:', r if isinstance(r, Exception) else r)
asyncio.run(main())
Best Practices
- Never use
time.sleep()— useawait asyncio.sleep() - Avoid blocking I/O inside coroutines
- Use
asyncio.run()as the entry point - Use
asyncio.TaskGroup(Python 3.11+) for structured concurrency - Debug with
PYTHONASYNCIODEBUG=1
Conclusion
asyncio is Python’s answer to Node.js-style concurrency. Master asyncio.gather(), add queues when needed, and move to TaskGroup for cleaner error boundaries. Perfect for APIs, scrapers, and microservices.
📚 You might also like
🔗 Share this article




✍️ Leave a Comment