🌐 Detecting your location…
📢 Advertisement — Configure AdSense in Appearance → Customize → AdSense Settings

Guia completo 2026 dos gerenciadores de contexto Python: com, contextlib e async

⏱️5 min read  ·  948 words

Os gerenciadores de contexto Python são um dos recursos mais elegantes e subutilizados da linguagem. OwithA instrução garante que o código de limpeza seja executado mesmo quando ocorrerem exceções. Em 2026, os gerenciadores de contexto serão usados ​​em todos os lugares, desde manipulação de arquivos até conexões de banco de dados, sessões HTTP e operações assíncronas. Este guia completo cobre todos os padrões.

Como funcionam os gerenciadores de contexto

# The with statement calls __enter__ and __exit__
with open("file.txt") as f:
    data = f.read()
# f.close() called automatically, even on exception

# Equivalent to:
f = open("file.txt")
try:
    data = f.read()
finally:
    f.close()

# Multiple context managers
with open("input.txt") as infile, open("output.txt", "w") as outfile:
    for line in infile:
        outfile.write(line.upper())

# Nested (older style, avoid)
with open("input.txt") as infile:
    with open("output.txt", "w") as outfile:
        pass

Criando Gerenciadores de Contexto

class DatabaseConnection:
    def __init__(self, url: str):
        self.url = url
        self.conn = None

    def __enter__(self):
        import psycopg2
        self.conn = psycopg2.connect(self.url)
        self.cursor = self.conn.cursor()
        return self.cursor  # what 'as' variable receives

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            self.conn.commit()   # commit if no exception
        else:
            self.conn.rollback() # rollback on exception
        self.cursor.close()
        self.conn.close()
        return False  # False = don't suppress the exception

# Usage
with DatabaseConnection("postgresql://localhost/mydb") as cursor:
    cursor.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
    # commit() called automatically on exit
    # rollback() called if exception occurs

contextlib.contextmanager – O jeito do decorador

from contextlib import contextmanager
import time, logging

logger = logging.getLogger(__name__)

@contextmanager
def timer(name: str = ""):
    start = time.perf_counter()
    try:
        yield  # code inside 'with' block runs here
    finally:
        elapsed = time.perf_counter() - start
        logger.info(f"{name}: {elapsed*1000:.2f}ms")

# Usage
with timer("database query"):
    results = db.execute_slow_query()

@contextmanager
def transaction(db):
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
        raise

@contextmanager
def temporary_file(suffix=".tmp"):
    import tempfile, os
    path = None
    try:
        fd, path = tempfile.mkstemp(suffix=suffix)
        os.close(fd)
        yield path
    finally:
        if path and os.path.exists(path):
            os.unlink(path)

with temporary_file(".json") as tmp:
    with open(tmp, "w") as f:
        json.dump(data, f)
    process_file(tmp)
# file is deleted here, even on exception

Gerenciadores de contexto assíncronos

import asyncio
from contextlib import asynccontextmanager
import httpx

@asynccontextmanager
async def http_session(base_url: str, timeout: float = 30.0):
    async with httpx.AsyncClient(base_url=base_url, timeout=timeout) as client:
        yield client

async def fetch_data():
    async with http_session("https://api.example.com") as client:
        r = await client.get("/users")
        return r.json()

# Async database context manager
@asynccontextmanager
async def async_transaction(pool):
    async with pool.acquire() as conn:
        async with conn.transaction():
            yield conn

async def create_user(pool, email: str):
    async with async_transaction(pool) as conn:
        user_id = await conn.fetchval(
            "INSERT INTO users (email) VALUES ($1) RETURNING id", email
        )
        await conn.execute(
            "INSERT INTO user_settings (user_id) VALUES ($1)", user_id
        )
        return user_id
# Transaction auto-commits or rolls back

FastAPI Lifespan — Gerenciador de contexto de aplicativo

from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncpg

db_pool = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    global db_pool
    db_pool = await asyncpg.create_pool(
        dsn="postgresql://localhost/mydb",
        min_size=5, max_size=20
    )
    print("Database pool created")

    yield  # App runs here

    # Shutdown
    await db_pool.close()
    print("Database pool closed")

app = FastAPI(lifespan=lifespan)

@app.get("/users")
async def list_users():
    async with db_pool.acquire() as conn:
        return await conn.fetch("SELECT id, name FROM users")

Utilitários contextlib

from contextlib import suppress, redirect_stdout, nullcontext, ExitStack
import io, os

# suppress — ignore specific exceptions
with suppress(FileNotFoundError):
    os.remove("temp_file.txt")  # OK if doesn't exist

# redirect_stdout — capture print output
buffer = io.StringIO()
with redirect_stdout(buffer):
    print("This goes to buffer, not terminal")
output = buffer.getvalue()

# nullcontext — conditional context manager
def process(data, lock=None):
    ctx = lock if lock is not None else nullcontext()
    with ctx:
        process_data(data)

# ExitStack — dynamic number of context managers
with ExitStack() as stack:
    files = [stack.enter_context(open(f)) for f in filenames]
    # All files open, all closed on exit regardless of errors
    data = [f.read() for f in files]

# Re-entrant context manager
from contextlib import AbstractContextManager
import threading

class ThreadSafeDB(AbstractContextManager):
    def __init__(self):
        self._lock = threading.RLock()  # Reentrant lock

    def __enter__(self):
        self._lock.acquire()
        return self

    def __exit__(self, *args):
        self._lock.release()
        return False

Padrões do mundo real

# Feature flag context manager
@contextmanager
def feature_flag(flag_name: str, user_id: int):
    enabled = check_feature_flag(flag_name, user_id)
    try:
        yield enabled
    finally:
        if enabled:
            track_feature_usage(flag_name, user_id)

with feature_flag("new_checkout", user.id) as new_checkout:
    if new_checkout:
        result = new_checkout_flow()
    else:
        result = legacy_checkout_flow()

# Retry context manager
@contextmanager
def retry(max_attempts: int = 3, exceptions=(Exception,)):
    last_error = None
    for attempt in range(max_attempts):
        try:
            yield attempt
            break  # success — exit loop
        except exceptions as e:
            last_error = e
            if attempt < max_attempts - 1:
                time.sleep(2 ** attempt)
    else:
        raise last_error

# Note: retry CM is tricky because yield can't be resumed after exception
# Better with a function decorator (see decorators guide)

Os gerenciadores de contexto Python são a ferramenta certa para qualquer recurso que precise de limpeza – arquivos, conexões, bloqueios, temporizadores, transações. Usar@contextmanagerpara casos simples, implemente__enter__/__exit__para classes reutilizáveis, eExitStackpara recursos dinâmicos. Os gerenciadores de contexto assíncronos funcionam de forma idêntica comasync with and @asynccontextmanager.

✍️ Leave a Comment

Your email address will not be published. Required fields are marked *

🌐 Read in:🇬🇧 English🇩🇪 Deutsch🇧🇷 Português🇸🇦 العربية🇮🇳 हिन्दी🇧🇩 বাংলা