🌐 Detecting your location…
📢 Advertisement — Configure AdSense in Appearance → Customize → AdSense Settings

Guia de simultaneidade Python 2026: threads, assíncio e multiprocessamento

⏱️5 min read  ·  914 words

A simultaneidade do Python em 2026 envolve três modelos diferentes – threads, processos e assíncronos – cada um resolvendo problemas diferentes. Compreender quando usar cada um e como combiná-los de forma eficaz é a chave para escrever aplicativos Python de alto desempenho. Este guia esclarece a confusão e mostra padrões práticos.

O GIL e o que isso significa

Global Interpreter Lock (GIL):
- CPython executes one thread at a time (even on multi-core)
- Threads release GIL during I/O operations (network, disk)
- Threads DON'T help for CPU-intensive Python code

When to use each:
- asyncio    → I/O-bound: network calls, database, file
- threading  → I/O-bound: simple cases, legacy libraries
- multiprocessing → CPU-bound: computation, data processing
- concurrent.futures → unified interface for threads/processes

Python 3.13 Free-Threaded Mode (experimental):
- Remove GIL entirely!
- True parallel thread execution
- Performance gains for CPU-bound multi-threaded code
- Enable: python3.13t (separate build)

threading – Simultaneidade Simples

import threading
import queue
from typing import Callable

# Thread pool for I/O-bound tasks
def download_files(urls: list[str], max_workers: int = 10) -> list[bytes]:
    results = [None] * len(urls)
    errors = []
    lock = threading.Lock()

    def download(index: int, url: str):
        try:
            import requests
            data = requests.get(url, timeout=30).content
            with lock:
                results[index] = data
        except Exception as e:
            with lock:
                errors.append((url, str(e)))

    threads = []
    for i, url in enumerate(urls):
        t = threading.Thread(target=download, args=(i, url), daemon=True)
        threads.append(t)
        t.start()

    for t in threads:
        t.join(timeout=60)

    return results, errors

# Thread-safe queue for producer/consumer
def process_items(items: list, worker_fn: Callable, num_workers: int = 5):
    q: queue.Queue = queue.Queue()
    results = []
    lock = threading.Lock()

    def worker():
        while True:
            item = q.get()
            if item is None:
                break
            result = worker_fn(item)
            with lock:
                results.append(result)
            q.task_done()

    # Start workers
    workers = [threading.Thread(target=worker, daemon=True) for _ in range(num_workers)]
    for w in workers: w.start()

    # Add items
    for item in items: q.put(item)

    # Stop workers
    for _ in workers: q.put(None)
    for w in workers: w.join()

    return results

multiprocessamento – vinculado à CPU

import multiprocessing
from functools import partial

def process_chunk(chunk: list, func) -> list:
    return [func(item) for item in chunk]

def parallel_map(items: list, func, num_processes: int = None) -> list:
    if num_processes is None:
        num_processes = multiprocessing.cpu_count()

    chunk_size = max(1, len(items) // num_processes)
    chunks = [items[i:i+chunk_size] for i in range(0, len(items), chunk_size)]

    with multiprocessing.Pool(num_processes) as pool:
        results = pool.map(partial(process_chunk, func=func), chunks)

    return [item for chunk_result in results for item in chunk_result]

# Example: CPU-intensive image processing
def resize_image(path: str) -> str:
    from PIL import Image
    img = Image.open(path)
    img.thumbnail((800, 600))
    output = path.replace('.jpg', '_thumb.jpg')
    img.save(output)
    return output

image_paths = glob.glob("images/*.jpg")
thumbnails = parallel_map(image_paths, resize_image)
print(f"Processed {len(thumbnails)} images")

concurrent.futures — Interface Unificada

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import requests

def fetch_url(url: str) -> dict:
    r = requests.get(url, timeout=10)
    return {"url": url, "status": r.status_code, "size": len(r.content)}

urls = [f"https://api.example.com/item/{i}" for i in range(100)]

# ThreadPoolExecutor — I/O bound (network, disk)
with ThreadPoolExecutor(max_workers=20) as executor:
    futures = {executor.submit(fetch_url, url): url for url in urls}

    for future in as_completed(futures):
        url = futures[future]
        try:
            result = future.result()
            print(f"OK: {result['url']} ({result['size']} bytes)")
        except Exception as e:
            print(f"Error: {url} — {e}")

# ProcessPoolExecutor — CPU bound
def heavy_computation(n: int) -> int:
    return sum(i * i for i in range(n))

with ProcessPoolExecutor() as executor:
    results = list(executor.map(heavy_computation, range(100, 10100, 100)))
    print(f"Sum: {sum(results)}")

Misturando assíncio + multiprocessamento

import asyncio
from concurrent.futures import ProcessPoolExecutor

executor = ProcessPoolExecutor()

def cpu_intensive_task(data: bytes) -> bytes:
    # Runs in separate process, doesn't block event loop
    import zlib
    return zlib.compress(data, level=9)

async def process_upload(file_data: bytes) -> bytes:
    loop = asyncio.get_event_loop()
    # Run CPU task in process pool without blocking async loop
    compressed = await loop.run_in_executor(executor, cpu_intensive_task, file_data)
    return compressed

async def handle_uploads(files: list[bytes]) -> list[bytes]:
    return await asyncio.gather(*[process_upload(f) for f in files])

# Also useful for blocking libraries
async def use_blocking_library():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None,  # use default ThreadPoolExecutor
        lambda: blocking_requests_call("https://api.example.com")
    )
    return result

Guia de decisão

Situação Solução Why
Servidor Web lidando com muitas solicitações assíncrono (FastAPI/aiohttp) Cada solicitação aguarda E/S
Baixe 100 arquivos simultaneamente assíncio ou ThreadPoolExecutor Limite de E/S de rede
Processe 1 GB de imagens ProcessPoolExecutor Limite de CPU, ignora GIL
Tarefa em segundo plano no FastAPI asyncio.create_task ou executor Não bloqueie o loop de eventos
Base de código síncrona existente ThreadPoolExecutor Menos refatoração que assíncio
Ciência de dados/ML ProcessPoolExecutor ou joblib CPU + memória intensiva

Simultaneidade Python em 2026: asyncio para novo código pesado de E/S, ProcessPoolExecutor para tarefas com uso intensivo de CPU, ThreadPoolExecutor para integração de bibliotecas de bloqueio. O CPython experimental de thread livre (3.13+) pode alterar o cálculo para threading vinculado à CPU no futuro. Por enquanto: conheça seu gargalo (E/S vs CPU) e escolha de acordo.

✍️ Leave a Comment

Your email address will not be published. Required fields are marked *

🌐 Read in:🇬🇧 English🇩🇪 Deutsch🇧🇷 Português🇸🇦 العربية🇮🇳 हिन्दी🇧🇩 বাংলা