পাইথন অ্যাসিঙ্কিও 2026 সালে একটি সম্পূর্ণ অ্যাসিঙ্ক ইকোসিস্টেমে পরিণত হয়েছে৷ মৌলিক অ্যাসিঙ্ক/অপেক্ষার বাইরে, প্রোডাকশন অ্যাপ্লিকেশনগুলি উন্নত নিদর্শনগুলি ব্যবহার করে: টাস্ক গ্রুপ, সেমাফোরস, অ্যাসিঙ্ক সারি, ইভেন্ট-চালিত আর্কিটেকচার এবং স্ট্রাকচার্ড কনকারেন্সি৷ এই নির্দেশিকাটি এমন নিদর্শনগুলিকে কভার করে যা অ্যাসিঙ্ক পাইথন কোডকে শক্তিশালী এবং রক্ষণাবেক্ষণযোগ্য করে তোলে।
📋 Table of Contents
টাস্ক গ্রুপ — স্ট্রাকচার্ড কনকারেন্সি
import asyncio
import httpx
from typing import Any
# Python 3.11+ TaskGroup — cancel all if one fails
async def fetch_all_data(urls: list[str]) -> list[dict]:
results = []
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
# If ANY task raises, ALL are cancelled — no partial results
return [t.result() for t in tasks]
async def fetch_url(url: str) -> dict:
async with httpx.AsyncClient() as client:
r = await client.get(url, timeout=10)
return {"url": url, "data": r.json()}
# vs asyncio.gather (partial results possible)
async def fetch_gather(urls: list[str]) -> list[Any]:
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r.json() if not isinstance(r, Exception) else None for r in results]
Semaphores — হার সীমাবদ্ধতা
import asyncio
import httpx
from dataclasses import dataclass
@dataclass
class RateLimitedFetcher:
max_concurrent: int = 10
requests_per_second: float = 5.0
def __post_init__(self):
self._semaphore = asyncio.Semaphore(self.max_concurrent)
self._rate_limiter = asyncio.Semaphore(int(self.requests_per_second))
async def fetch(self, client: httpx.AsyncClient, url: str) -> dict:
async with self._semaphore: # max 10 concurrent
await asyncio.sleep(1.0 / self.requests_per_second) # rate limit
r = await client.get(url, timeout=30)
return r.json()
async def fetch_all(self, urls: list[str]) -> list[dict]:
async with httpx.AsyncClient() as client:
tasks = [self.fetch(client, url) for url in urls]
return await asyncio.gather(*tasks)
# Usage
fetcher = RateLimitedFetcher(max_concurrent=10, requests_per_second=5)
results = asyncio.run(fetcher.fetch_all(urls))
অ্যাসিঙ্ক সারি — প্রযোজক/ভোক্তা
import asyncio
import httpx
from dataclasses import dataclass
async def crawl_urls(seed_urls: list[str], max_workers: int = 20):
queue: asyncio.Queue[str] = asyncio.Queue()
visited: set[str] = set()
results: list[dict] = []
# Seed the queue
for url in seed_urls:
await queue.put(url)
async def worker(client: httpx.AsyncClient):
while True:
try:
url = queue.get_nowait()
except asyncio.QueueEmpty:
break
if url in visited:
queue.task_done()
continue
visited.add(url)
try:
r = await client.get(url, timeout=10)
data = r.json()
results.append({"url": url, "data": data})
# Discover new URLs
for link in data.get("links", []):
if link not in visited:
await queue.put(link)
except Exception as e:
print(f"Error fetching {url}: {e}")
finally:
queue.task_done()
async with httpx.AsyncClient() as client:
workers = [asyncio.create_task(worker(client)) for _ in range(max_workers)]
await queue.join()
for w in workers:
w.cancel()
return results
Async কনটেক্সট ম্যানেজার এবং ইটারেটর
from contextlib import asynccontextmanager
import asyncpg
# Database connection pool context
@asynccontextmanager
async def get_db_pool(dsn: str):
pool = await asyncpg.create_pool(dsn, min_size=5, max_size=20)
try:
yield pool
finally:
await pool.close()
# Async generator for streaming large datasets
async def stream_records(pool: asyncpg.Pool, table: str, batch_size: int = 1000):
offset = 0
async with pool.acquire() as conn:
while True:
rows = await conn.fetch(
f"SELECT * FROM {table} LIMIT $1 OFFSET $2",
batch_size, offset
)
if not rows:
break
for row in rows:
yield dict(row)
offset += batch_size
# Usage
async def process_large_table():
async with get_db_pool("postgresql://localhost/mydb") as pool:
async for record in stream_records(pool, "events"):
await process_record(record)
Asyncio.Event দিয়ে ইভেন্ট-চালিত
import asyncio
class DataPipeline:
def __init__(self):
self._data_ready = asyncio.Event()
self._shutdown = asyncio.Event()
self._queue: asyncio.Queue = asyncio.Queue(maxsize=100)
async def producer(self, source):
async for item in source:
await self._queue.put(item)
if not self._data_ready.is_set():
self._data_ready.set()
self._shutdown.set()
async def consumer(self, handler):
while True:
await self._data_ready.wait()
try:
item = self._queue.get_nowait()
await handler(item)
self._queue.task_done()
except asyncio.QueueEmpty:
if self._shutdown.is_set():
break
self._data_ready.clear() # reset until more data arrives
async def run(self, source, handler, num_consumers: int = 5):
consumers = [asyncio.create_task(self.consumer(handler)) for _ in range(num_consumers)]
await self.producer(source)
await asyncio.gather(*consumers)
টাইমআউট এবং বাতিলকরণ
import asyncio
async def with_retry(coro_factory, max_retries: int = 3, timeout: float = 10.0):
last_error = None
for attempt in range(max_retries):
try:
async with asyncio.timeout(timeout):
return await coro_factory()
except TimeoutError:
last_error = TimeoutError(f"Timed out after {timeout}s (attempt {attempt+1})")
except asyncio.CancelledError:
raise # Always propagate cancellation
except Exception as e:
last_error = e
await asyncio.sleep(2 ** attempt) # exponential backoff
raise last_error
# Clean cancellation pattern
async def cancellable_task(cancel_event: asyncio.Event):
try:
while not cancel_event.is_set():
await asyncio.sleep(0.1)
await do_work()
except asyncio.CancelledError:
await cleanup() # always clean up
raise # re-raise — required
# Shield a coroutine from cancellation
async def critical_operation():
# This won't be cancelled even if parent task is
await asyncio.shield(save_to_database())
ফাস্টএপিআই-এর সাথে অ্যাসিঙ্কিও — উন্নত
from fastapi import FastAPI, BackgroundTasks, WebSocket
import asyncio
app = FastAPI()
# Background task with asyncio
@app.post("/process")
async def start_process(data: dict, background_tasks: BackgroundTasks):
background_tasks.add_task(long_running_process, data)
return {"status": "started", "id": data["id"]}
async def long_running_process(data: dict):
async with asyncio.timeout(300): # 5 min max
await step1(data)
await step2(data)
await step3(data)
# WebSocket with asyncio
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await websocket.accept()
send_queue: asyncio.Queue = asyncio.Queue()
async def receive_messages():
try:
while True:
msg = await websocket.receive_text()
await process_message(client_id, msg, send_queue)
except Exception:
pass
async def send_messages():
while True:
msg = await send_queue.get()
await websocket.send_text(msg)
send_queue.task_done()
async with asyncio.TaskGroup() as tg:
tg.create_task(receive_messages())
tg.create_task(send_messages())
2026 সালে Python asyncio অ্যাডভান্স প্যাটার্ন: স্ট্রাকচার্ড কনকারেন্সির জন্য TaskGroup ব্যবহার করুন (বাতিল-অন-ত্রুটি), হার সীমিত করার জন্য Semaphore, প্রযোজক-ভোক্তার জন্য asyncio.Queue এবং প্রতি-অপারেশন টাইমআউটের জন্য asyncio.timeout() ব্যবহার করুন। সর্বদা CancelledError প্রচার করুন — এটিকে দমন করা সমবায় সময়সূচীকে ভঙ্গ করে।
🔗 Share this article
✍️ Leave a Comment