2026 সালে পাইথন কনকারেন্সিতে তিনটি ভিন্ন মডেল রয়েছে — থ্রেড, প্রসেস এবং অ্যাসিঙ্ক — প্রতিটি আলাদা সমস্যা সমাধান করে। প্রতিটি কখন ব্যবহার করতে হবে এবং কীভাবে সেগুলিকে কার্যকরভাবে একত্রিত করতে হবে তা বোঝা হল পারফরম্যান্স পাইথন অ্যাপ্লিকেশন লেখার চাবিকাঠি। এই নির্দেশিকা বিভ্রান্তি স্পষ্ট করে এবং ব্যবহারিক নিদর্শন দেখায়।
📋 Table of Contents
জিআইএল এবং এর অর্থ কী
Global Interpreter Lock (GIL):
- CPython executes one thread at a time (even on multi-core)
- Threads release GIL during I/O operations (network, disk)
- Threads DON'T help for CPU-intensive Python code
When to use each:
- asyncio → I/O-bound: network calls, database, file
- threading → I/O-bound: simple cases, legacy libraries
- multiprocessing → CPU-bound: computation, data processing
- concurrent.futures → unified interface for threads/processes
Python 3.13 Free-Threaded Mode (experimental):
- Remove GIL entirely!
- True parallel thread execution
- Performance gains for CPU-bound multi-threaded code
- Enable: python3.13t (separate build)
থ্রেডিং – সরল একযোগে
import threading
import queue
from typing import Callable
# Thread pool for I/O-bound tasks
def download_files(urls: list[str], max_workers: int = 10) -> list[bytes]:
results = [None] * len(urls)
errors = []
lock = threading.Lock()
def download(index: int, url: str):
try:
import requests
data = requests.get(url, timeout=30).content
with lock:
results[index] = data
except Exception as e:
with lock:
errors.append((url, str(e)))
threads = []
for i, url in enumerate(urls):
t = threading.Thread(target=download, args=(i, url), daemon=True)
threads.append(t)
t.start()
for t in threads:
t.join(timeout=60)
return results, errors
# Thread-safe queue for producer/consumer
def process_items(items: list, worker_fn: Callable, num_workers: int = 5):
q: queue.Queue = queue.Queue()
results = []
lock = threading.Lock()
def worker():
while True:
item = q.get()
if item is None:
break
result = worker_fn(item)
with lock:
results.append(result)
q.task_done()
# Start workers
workers = [threading.Thread(target=worker, daemon=True) for _ in range(num_workers)]
for w in workers: w.start()
# Add items
for item in items: q.put(item)
# Stop workers
for _ in workers: q.put(None)
for w in workers: w.join()
return results
মাল্টিপ্রসেসিং — CPU-বাউন্ড
import multiprocessing
from functools import partial
def process_chunk(chunk: list, func) -> list:
return [func(item) for item in chunk]
def parallel_map(items: list, func, num_processes: int = None) -> list:
if num_processes is None:
num_processes = multiprocessing.cpu_count()
chunk_size = max(1, len(items) // num_processes)
chunks = [items[i:i+chunk_size] for i in range(0, len(items), chunk_size)]
with multiprocessing.Pool(num_processes) as pool:
results = pool.map(partial(process_chunk, func=func), chunks)
return [item for chunk_result in results for item in chunk_result]
# Example: CPU-intensive image processing
def resize_image(path: str) -> str:
from PIL import Image
img = Image.open(path)
img.thumbnail((800, 600))
output = path.replace('.jpg', '_thumb.jpg')
img.save(output)
return output
image_paths = glob.glob("images/*.jpg")
thumbnails = parallel_map(image_paths, resize_image)
print(f"Processed {len(thumbnails)} images")
concurrent.futures — ইউনিফাইড ইন্টারফেস
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import requests
def fetch_url(url: str) -> dict:
r = requests.get(url, timeout=10)
return {"url": url, "status": r.status_code, "size": len(r.content)}
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
# ThreadPoolExecutor — I/O bound (network, disk)
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result()
print(f"OK: {result['url']} ({result['size']} bytes)")
except Exception as e:
print(f"Error: {url} — {e}")
# ProcessPoolExecutor — CPU bound
def heavy_computation(n: int) -> int:
return sum(i * i for i in range(n))
with ProcessPoolExecutor() as executor:
results = list(executor.map(heavy_computation, range(100, 10100, 100)))
print(f"Sum: {sum(results)}")
অ্যাসিনসিও + মাল্টিপ্রসেসিং মিশ্রিত করা
import asyncio
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor()
def cpu_intensive_task(data: bytes) -> bytes:
# Runs in separate process, doesn't block event loop
import zlib
return zlib.compress(data, level=9)
async def process_upload(file_data: bytes) -> bytes:
loop = asyncio.get_event_loop()
# Run CPU task in process pool without blocking async loop
compressed = await loop.run_in_executor(executor, cpu_intensive_task, file_data)
return compressed
async def handle_uploads(files: list[bytes]) -> list[bytes]:
return await asyncio.gather(*[process_upload(f) for f in files])
# Also useful for blocking libraries
async def use_blocking_library():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, # use default ThreadPoolExecutor
lambda: blocking_requests_call("https://api.example.com")
)
return result
সিদ্ধান্ত নির্দেশিকা
| পরিস্থিতি | সমাধান | Why |
|---|---|---|
| ওয়েব সার্ভার অনেক অনুরোধ হ্যান্ডলিং | asyncio (FastAPI/aiohttp) | প্রতিটি অনুরোধ I/O এ অপেক্ষা করে |
| একসাথে 100টি ফাইল ডাউনলোড করুন | asyncio বা ThreadPoolExecutor | নেটওয়ার্ক I/O আবদ্ধ |
| 1GB ছবি প্রসেস করুন | প্রসেসপুল এক্সিকিউটর | CPU আবদ্ধ, GIL বাইপাস |
| FastAPI তে ব্যাকগ্রাউন্ড টাস্ক | asyncio.create_task বা নির্বাহক | ইভেন্ট লুপ ব্লক করবেন না |
| বিদ্যমান সিঙ্ক্রোনাস কোডবেস | ThreadPoolExecutor | অ্যাসিনসিওর চেয়ে কম রিফ্যাক্টরিং |
| ডেটা সায়েন্স/এমএল | ProcessPoolExecutor বা joblib | CPU+মেমরি নিবিড় |
2026 সালে পাইথন কনকারেন্সি: নতুন I/O-হেভি কোডের জন্য asyncio, CPU- নিবিড় কাজগুলির জন্য ProcessPoolExecutor, ব্লকিং লাইব্রেরিগুলিকে একীভূত করার জন্য ThreadPoolExecutor। পরীক্ষামূলক ফ্রি-থ্রেডেড CPython (3.13+) ভবিষ্যতে CPU-বাউন্ড থ্রেডিংয়ের জন্য ক্যালকুলাস পরিবর্তন করতে পারে। আপাতত: আপনার বাধা (I/O বনাম CPU) জানুন এবং সেই অনুযায়ী বেছে নিন।
🔗 Share this article
✍️ Leave a Comment