🌐 Detecting your location…
📢 Advertisement — Configure AdSense in Appearance → Customize → AdSense Settings

Python AsyncIO Advanced Patterns 2026: Task Groups, Queues and Structured Concurrency

⏱️5 min read  ·  1,086 words

Python asyncio has matured into a complete async ecosystem in 2026. Beyond basic async/await, production applications use advanced patterns: task groups, semaphores, async queues, event-driven architectures, and structured concurrency. This guide covers patterns that make async Python code robust and maintainable.

Task Groups — Structured Concurrency

import asyncio
import httpx
from typing import Any

# Python 3.11+ TaskGroup — cancel all if one fails
async def fetch_all_data(urls: list[str]) -> list[dict]:
    results = []
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_url(url)) for url in urls]
    # If ANY task raises, ALL are cancelled — no partial results
    return [t.result() for t in tasks]

async def fetch_url(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        r = await client.get(url, timeout=10)
        return {"url": url, "data": r.json()}

# vs asyncio.gather (partial results possible)
async def fetch_gather(urls: list[str]) -> list[Any]:
    async with httpx.AsyncClient() as client:
        tasks = [client.get(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return [r.json() if not isinstance(r, Exception) else None for r in results]

Semaphores — Rate Limiting

import asyncio
import httpx
from dataclasses import dataclass

@dataclass
class RateLimitedFetcher:
    max_concurrent: int = 10
    requests_per_second: float = 5.0

    def __post_init__(self):
        self._semaphore = asyncio.Semaphore(self.max_concurrent)
        self._rate_limiter = asyncio.Semaphore(int(self.requests_per_second))

    async def fetch(self, client: httpx.AsyncClient, url: str) -> dict:
        async with self._semaphore:  # max 10 concurrent
            await asyncio.sleep(1.0 / self.requests_per_second)  # rate limit
            r = await client.get(url, timeout=30)
            return r.json()

    async def fetch_all(self, urls: list[str]) -> list[dict]:
        async with httpx.AsyncClient() as client:
            tasks = [self.fetch(client, url) for url in urls]
            return await asyncio.gather(*tasks)

# Usage
fetcher = RateLimitedFetcher(max_concurrent=10, requests_per_second=5)
results = asyncio.run(fetcher.fetch_all(urls))

Async Queue — Producer/Consumer

import asyncio
import httpx
from dataclasses import dataclass

async def crawl_urls(seed_urls: list[str], max_workers: int = 20):
    queue: asyncio.Queue[str] = asyncio.Queue()
    visited: set[str] = set()
    results: list[dict] = []

    # Seed the queue
    for url in seed_urls:
        await queue.put(url)

    async def worker(client: httpx.AsyncClient):
        while True:
            try:
                url = queue.get_nowait()
            except asyncio.QueueEmpty:
                break

            if url in visited:
                queue.task_done()
                continue
            visited.add(url)

            try:
                r = await client.get(url, timeout=10)
                data = r.json()
                results.append({"url": url, "data": data})

                # Discover new URLs
                for link in data.get("links", []):
                    if link not in visited:
                        await queue.put(link)
            except Exception as e:
                print(f"Error fetching {url}: {e}")
            finally:
                queue.task_done()

    async with httpx.AsyncClient() as client:
        workers = [asyncio.create_task(worker(client)) for _ in range(max_workers)]
        await queue.join()
        for w in workers:
            w.cancel()

    return results

Async Context Managers and Iterators

from contextlib import asynccontextmanager
import asyncpg

# Database connection pool context
@asynccontextmanager
async def get_db_pool(dsn: str):
    pool = await asyncpg.create_pool(dsn, min_size=5, max_size=20)
    try:
        yield pool
    finally:
        await pool.close()

# Async generator for streaming large datasets
async def stream_records(pool: asyncpg.Pool, table: str, batch_size: int = 1000):
    offset = 0
    async with pool.acquire() as conn:
        while True:
            rows = await conn.fetch(
                f"SELECT * FROM {table} LIMIT $1 OFFSET $2",
                batch_size, offset
            )
            if not rows:
                break
            for row in rows:
                yield dict(row)
            offset += batch_size

# Usage
async def process_large_table():
    async with get_db_pool("postgresql://localhost/mydb") as pool:
        async for record in stream_records(pool, "events"):
            await process_record(record)

Event-Driven with asyncio.Event

import asyncio

class DataPipeline:
    def __init__(self):
        self._data_ready = asyncio.Event()
        self._shutdown = asyncio.Event()
        self._queue: asyncio.Queue = asyncio.Queue(maxsize=100)

    async def producer(self, source):
        async for item in source:
            await self._queue.put(item)
            if not self._data_ready.is_set():
                self._data_ready.set()
        self._shutdown.set()

    async def consumer(self, handler):
        while True:
            await self._data_ready.wait()

            try:
                item = self._queue.get_nowait()
                await handler(item)
                self._queue.task_done()
            except asyncio.QueueEmpty:
                if self._shutdown.is_set():
                    break
                self._data_ready.clear()  # reset until more data arrives

    async def run(self, source, handler, num_consumers: int = 5):
        consumers = [asyncio.create_task(self.consumer(handler)) for _ in range(num_consumers)]
        await self.producer(source)
        await asyncio.gather(*consumers)

Timeouts and Cancellation

import asyncio

async def with_retry(coro_factory, max_retries: int = 3, timeout: float = 10.0):
    last_error = None
    for attempt in range(max_retries):
        try:
            async with asyncio.timeout(timeout):
                return await coro_factory()
        except TimeoutError:
            last_error = TimeoutError(f"Timed out after {timeout}s (attempt {attempt+1})")
        except asyncio.CancelledError:
            raise  # Always propagate cancellation
        except Exception as e:
            last_error = e
        await asyncio.sleep(2 ** attempt)  # exponential backoff
    raise last_error

# Clean cancellation pattern
async def cancellable_task(cancel_event: asyncio.Event):
    try:
        while not cancel_event.is_set():
            await asyncio.sleep(0.1)
            await do_work()
    except asyncio.CancelledError:
        await cleanup()  # always clean up
        raise  # re-raise — required

# Shield a coroutine from cancellation
async def critical_operation():
    # This won't be cancelled even if parent task is
    await asyncio.shield(save_to_database())

asyncio with FastAPI — Advanced

from fastapi import FastAPI, BackgroundTasks, WebSocket
import asyncio

app = FastAPI()

# Background task with asyncio
@app.post("/process")
async def start_process(data: dict, background_tasks: BackgroundTasks):
    background_tasks.add_task(long_running_process, data)
    return {"status": "started", "id": data["id"]}

async def long_running_process(data: dict):
    async with asyncio.timeout(300):  # 5 min max
        await step1(data)
        await step2(data)
        await step3(data)

# WebSocket with asyncio
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await websocket.accept()
    send_queue: asyncio.Queue = asyncio.Queue()

    async def receive_messages():
        try:
            while True:
                msg = await websocket.receive_text()
                await process_message(client_id, msg, send_queue)
        except Exception:
            pass

    async def send_messages():
        while True:
            msg = await send_queue.get()
            await websocket.send_text(msg)
            send_queue.task_done()

    async with asyncio.TaskGroup() as tg:
        tg.create_task(receive_messages())
        tg.create_task(send_messages())

Python asyncio advanced patterns in 2026: use TaskGroup for structured concurrency (cancel-on-error), Semaphore for rate limiting, asyncio.Queue for producer-consumer, and asyncio.timeout() for per-operation timeouts. Always propagate CancelledError — suppressing it breaks cooperative scheduling.

✍️ Leave a Comment

Your email address will not be published. Required fields are marked *

🌐 Read in:🇬🇧 English🇩🇪 Deutsch🇧🇷 Português🇸🇦 العربية🇮🇳 हिन्दी🇧🇩 বাংলা