🌐 Detecting your location…
📢 Advertisement — Configure AdSense in Appearance → Customize → AdSense Settings

دليل التزامن في بايثون 2026: الخيوط، وعدم التزامن، والمعالجة المتعددة

⏱️4 min read  ·  729 words

يتضمن التزامن في بايثون في عام 2026 ثلاثة نماذج مختلفة – الخيوط، والعمليات، وغير المتزامن – كل منها يحل مشاكل مختلفة. إن فهم متى يتم استخدام كل منهما، وكيفية الجمع بينهما بشكل فعال، هو المفتاح لكتابة تطبيقات بايثون عالية الأداء. يوضح هذا الدليل الالتباس ويظهر الأنماط العملية.

GIL وماذا يعني

Global Interpreter Lock (GIL):
- CPython executes one thread at a time (even on multi-core)
- Threads release GIL during I/O operations (network, disk)
- Threads DON'T help for CPU-intensive Python code

When to use each:
- asyncio    → I/O-bound: network calls, database, file
- threading  → I/O-bound: simple cases, legacy libraries
- multiprocessing → CPU-bound: computation, data processing
- concurrent.futures → unified interface for threads/processes

Python 3.13 Free-Threaded Mode (experimental):
- Remove GIL entirely!
- True parallel thread execution
- Performance gains for CPU-bound multi-threaded code
- Enable: python3.13t (separate build)

خيوط – التزامن البسيط

import threading
import queue
from typing import Callable

# Thread pool for I/O-bound tasks
def download_files(urls: list[str], max_workers: int = 10) -> list[bytes]:
    results = [None] * len(urls)
    errors = []
    lock = threading.Lock()

    def download(index: int, url: str):
        try:
            import requests
            data = requests.get(url, timeout=30).content
            with lock:
                results[index] = data
        except Exception as e:
            with lock:
                errors.append((url, str(e)))

    threads = []
    for i, url in enumerate(urls):
        t = threading.Thread(target=download, args=(i, url), daemon=True)
        threads.append(t)
        t.start()

    for t in threads:
        t.join(timeout=60)

    return results, errors

# Thread-safe queue for producer/consumer
def process_items(items: list, worker_fn: Callable, num_workers: int = 5):
    q: queue.Queue = queue.Queue()
    results = []
    lock = threading.Lock()

    def worker():
        while True:
            item = q.get()
            if item is None:
                break
            result = worker_fn(item)
            with lock:
                results.append(result)
            q.task_done()

    # Start workers
    workers = [threading.Thread(target=worker, daemon=True) for _ in range(num_workers)]
    for w in workers: w.start()

    # Add items
    for item in items: q.put(item)

    # Stop workers
    for _ in workers: q.put(None)
    for w in workers: w.join()

    return results

المعالجة المتعددة – مرتبطة بوحدة المعالجة المركزية (CPU).

import multiprocessing
from functools import partial

def process_chunk(chunk: list, func) -> list:
    return [func(item) for item in chunk]

def parallel_map(items: list, func, num_processes: int = None) -> list:
    if num_processes is None:
        num_processes = multiprocessing.cpu_count()

    chunk_size = max(1, len(items) // num_processes)
    chunks = [items[i:i+chunk_size] for i in range(0, len(items), chunk_size)]

    with multiprocessing.Pool(num_processes) as pool:
        results = pool.map(partial(process_chunk, func=func), chunks)

    return [item for chunk_result in results for item in chunk_result]

# Example: CPU-intensive image processing
def resize_image(path: str) -> str:
    from PIL import Image
    img = Image.open(path)
    img.thumbnail((800, 600))
    output = path.replace('.jpg', '_thumb.jpg')
    img.save(output)
    return output

image_paths = glob.glob("images/*.jpg")
thumbnails = parallel_map(image_paths, resize_image)
print(f"Processed {len(thumbnails)} images")

concurrent.futures – الواجهة الموحدة

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import requests

def fetch_url(url: str) -> dict:
    r = requests.get(url, timeout=10)
    return {"url": url, "status": r.status_code, "size": len(r.content)}

urls = [f"https://api.example.com/item/{i}" for i in range(100)]

# ThreadPoolExecutor — I/O bound (network, disk)
with ThreadPoolExecutor(max_workers=20) as executor:
    futures = {executor.submit(fetch_url, url): url for url in urls}

    for future in as_completed(futures):
        url = futures[future]
        try:
            result = future.result()
            print(f"OK: {result['url']} ({result['size']} bytes)")
        except Exception as e:
            print(f"Error: {url} — {e}")

# ProcessPoolExecutor — CPU bound
def heavy_computation(n: int) -> int:
    return sum(i * i for i in range(n))

with ProcessPoolExecutor() as executor:
    results = list(executor.map(heavy_computation, range(100, 10100, 100)))
    print(f"Sum: {sum(results)}")

خلط غير متزامن + معالجة متعددة

import asyncio
from concurrent.futures import ProcessPoolExecutor

executor = ProcessPoolExecutor()

def cpu_intensive_task(data: bytes) -> bytes:
    # Runs in separate process, doesn't block event loop
    import zlib
    return zlib.compress(data, level=9)

async def process_upload(file_data: bytes) -> bytes:
    loop = asyncio.get_event_loop()
    # Run CPU task in process pool without blocking async loop
    compressed = await loop.run_in_executor(executor, cpu_intensive_task, file_data)
    return compressed

async def handle_uploads(files: list[bytes]) -> list[bytes]:
    return await asyncio.gather(*[process_upload(f) for f in files])

# Also useful for blocking libraries
async def use_blocking_library():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None,  # use default ThreadPoolExecutor
        lambda: blocking_requests_call("https://api.example.com")
    )
    return result

دليل القرار

الموقف حل Why
خادم الويب يتعامل مع العديد من الطلبات غير متزامن (FastAPI/aiohttp) ينتظر كل طلب على I/O
تحميل 100 ملف في وقت واحد asyncio أو ThreadPoolExecutor شبكة الإدخال/الإخراج مرتبطة
معالجة 1 جيجابايت من الصور ProcessPoolExecutor وحدة المعالجة المركزية مرتبطة، وتتجاوز GIL
مهمة الخلفية في FastAPI asyncio.create_task أو المنفذ لا تمنع حلقة الحدث
قاعدة التعليمات البرمجية المتزامنة الموجودة ThreadPoolExecutor إعادة بيع ديون أقل من asyncio
علم البيانات/تعلم الآلة ProcessPoolExecutor أو joblib وحدة المعالجة المركزية + الذاكرة المكثفة

تزامن Python في 2026: asyncio لرموز الإدخال/الإخراج الجديدة الثقيلة، ProcessPoolExecutor للمهام التي تتطلب وحدة المعالجة المركزية (CPU) المكثفة، ThreadPoolExecutor لدمج مكتبات الحظر. قد يؤدي الإصدار التجريبي الحر CPython (3.13+) إلى تغيير حساب التفاضل والتكامل للترابط المرتبط بوحدة المعالجة المركزية (CPU) في المستقبل. في الوقت الحالي: تعرف على عنق الزجاجة لديك (الإدخال/الإخراج مقابل وحدة المعالجة المركزية) واختر وفقًا لذلك.

✍️ Leave a Comment

Your email address will not be published. Required fields are marked *

🌐 Read in:🇬🇧 English🇩🇪 Deutsch🇧🇷 Português🇸🇦 العربية🇮🇳 हिन्दी🇧🇩 বাংলা