🌐 Detecting your location…
📢 Advertisement — Configure AdSense in Appearance → Customize → AdSense Settings

Python asyncio Complete Guide 2026: Async/Await, Tasks & Real Patterns

⏱️3 min read  ·  508 words
Python asyncio Complete Guide 2026: Async/Await, Tasks & Real Patterns

Python’s asyncio module enables writing concurrent code using the async/await syntax. In 2026, asyncio powers FastAPI, Starlette, and aiohttp. This guide covers everything from event loops to real-world patterns.

What Is asyncio?

asyncio uses a single-threaded event loop to manage concurrent tasks — perfect for network I/O and APIs where you spend most time waiting, not computing.

import asyncio

async def main():
    print('Hello asyncio!')
    await asyncio.sleep(1)
    print('Done!')

asyncio.run(main())

async/await Syntax

Use async def to define a coroutine. Use await to pause until a result is ready.

import asyncio

async def fetch_data(url: str) -> str:
    await asyncio.sleep(0.5)  # simulate network
    return f'Data from {url}'

async def main():
    result = await fetch_data('https://api.example.com')
    print(result)

asyncio.run(main())

Running Tasks Concurrently

asyncio.gather() runs multiple coroutines concurrently. Three 1-second tasks finish in ~1 second total, not 3.

import asyncio, time

async def task(name: str, delay: float):
    print(f'Start {name}')
    await asyncio.sleep(delay)
    return name

async def main():
    start = time.time()
    results = await asyncio.gather(
        task('A', 1.0),
        task('B', 2.0),
        task('C', 0.5),
    )
    print(f'Done in {time.time()-start:.2f}s: {results}')
    # ~2.0s not 3.5s

asyncio.run(main())

asyncio with aiohttp

Fetch 100 URLs concurrently in roughly the same time as fetching one.

import asyncio, aiohttp

async def fetch(session, url):
    async with session.get(url) as resp:
        return await resp.text()

async def main():
    urls = ['https://httpbin.org/delay/1'] * 5
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*[fetch(session, u) for u in urls])
    print(f'Fetched {len(results)} pages')

asyncio.run(main())

asyncio.Queue for Producer-Consumer

import asyncio

async def producer(q: asyncio.Queue):
    for i in range(5):
        await q.put(f'item-{i}')
        await asyncio.sleep(0.2)
    await q.put(None)  # sentinel

async def consumer(q: asyncio.Queue):
    while True:
        item = await q.get()
        if item is None: break
        print(f'Consumed {item}')

async def main():
    q = asyncio.Queue(maxsize=3)
    await asyncio.gather(producer(q), consumer(q))

asyncio.run(main())

Error Handling

import asyncio

async def risky(n):
    if n == 2: raise ValueError(f'Task {n} failed!')
    return f'Task {n} OK'

async def main():
    results = await asyncio.gather(
        *[risky(i) for i in range(4)],
        return_exceptions=True
    )
    for r in results:
        print('Error:', r if isinstance(r, Exception) else r)

asyncio.run(main())

Best Practices

  • Never use time.sleep() — use await asyncio.sleep()
  • Avoid blocking I/O inside coroutines
  • Use asyncio.run() as the entry point
  • Use asyncio.TaskGroup (Python 3.11+) for structured concurrency
  • Debug with PYTHONASYNCIODEBUG=1

Conclusion

asyncio is Python’s answer to Node.js-style concurrency. Master asyncio.gather(), add queues when needed, and move to TaskGroup for cleaner error boundaries. Perfect for APIs, scrapers, and microservices.

✍️ Leave a Comment

Your email address will not be published. Required fields are marked *

🌐 Read in:🇬🇧 English🇩🇪 Deutsch🇧🇷 Português🇸🇦 العربية🇮🇳 हिन्दी🇧🇩 বাংলা