🌐 Detecting your location…
📢 Advertisement — Configure AdSense in Appearance → Customize → AdSense Settings

Python Context Managers Complete Guide 2026: with, contextlib and async

⏱️5 min read  ·  991 words

Python context managers are one of the language’s most elegant and underused features. The with statement guarantees cleanup code runs even when exceptions occur. In 2026, context managers are used everywhere from file handling to database connections, HTTP sessions, and async operations. This complete guide covers all patterns.

How Context Managers Work

# The with statement calls __enter__ and __exit__
with open("file.txt") as f:
    data = f.read()
# f.close() called automatically, even on exception

# Equivalent to:
f = open("file.txt")
try:
    data = f.read()
finally:
    f.close()

# Multiple context managers
with open("input.txt") as infile, open("output.txt", "w") as outfile:
    for line in infile:
        outfile.write(line.upper())

# Nested (older style, avoid)
with open("input.txt") as infile:
    with open("output.txt", "w") as outfile:
        pass

Creating Context Managers

class DatabaseConnection:
    def __init__(self, url: str):
        self.url = url
        self.conn = None

    def __enter__(self):
        import psycopg2
        self.conn = psycopg2.connect(self.url)
        self.cursor = self.conn.cursor()
        return self.cursor  # what 'as' variable receives

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            self.conn.commit()   # commit if no exception
        else:
            self.conn.rollback() # rollback on exception
        self.cursor.close()
        self.conn.close()
        return False  # False = don't suppress the exception

# Usage
with DatabaseConnection("postgresql://localhost/mydb") as cursor:
    cursor.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
    # commit() called automatically on exit
    # rollback() called if exception occurs

contextlib.contextmanager — The Decorator Way

from contextlib import contextmanager
import time, logging

logger = logging.getLogger(__name__)

@contextmanager
def timer(name: str = ""):
    start = time.perf_counter()
    try:
        yield  # code inside 'with' block runs here
    finally:
        elapsed = time.perf_counter() - start
        logger.info(f"{name}: {elapsed*1000:.2f}ms")

# Usage
with timer("database query"):
    results = db.execute_slow_query()

@contextmanager
def transaction(db):
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
        raise

@contextmanager
def temporary_file(suffix=".tmp"):
    import tempfile, os
    path = None
    try:
        fd, path = tempfile.mkstemp(suffix=suffix)
        os.close(fd)
        yield path
    finally:
        if path and os.path.exists(path):
            os.unlink(path)

with temporary_file(".json") as tmp:
    with open(tmp, "w") as f:
        json.dump(data, f)
    process_file(tmp)
# file is deleted here, even on exception

Async Context Managers

import asyncio
from contextlib import asynccontextmanager
import httpx

@asynccontextmanager
async def http_session(base_url: str, timeout: float = 30.0):
    async with httpx.AsyncClient(base_url=base_url, timeout=timeout) as client:
        yield client

async def fetch_data():
    async with http_session("https://api.example.com") as client:
        r = await client.get("/users")
        return r.json()

# Async database context manager
@asynccontextmanager
async def async_transaction(pool):
    async with pool.acquire() as conn:
        async with conn.transaction():
            yield conn

async def create_user(pool, email: str):
    async with async_transaction(pool) as conn:
        user_id = await conn.fetchval(
            "INSERT INTO users (email) VALUES ($1) RETURNING id", email
        )
        await conn.execute(
            "INSERT INTO user_settings (user_id) VALUES ($1)", user_id
        )
        return user_id
# Transaction auto-commits or rolls back

FastAPI Lifespan — Application Context Manager

from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncpg

db_pool = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    global db_pool
    db_pool = await asyncpg.create_pool(
        dsn="postgresql://localhost/mydb",
        min_size=5, max_size=20
    )
    print("Database pool created")

    yield  # App runs here

    # Shutdown
    await db_pool.close()
    print("Database pool closed")

app = FastAPI(lifespan=lifespan)

@app.get("/users")
async def list_users():
    async with db_pool.acquire() as conn:
        return await conn.fetch("SELECT id, name FROM users")

contextlib Utilities

from contextlib import suppress, redirect_stdout, nullcontext, ExitStack
import io, os

# suppress — ignore specific exceptions
with suppress(FileNotFoundError):
    os.remove("temp_file.txt")  # OK if doesn't exist

# redirect_stdout — capture print output
buffer = io.StringIO()
with redirect_stdout(buffer):
    print("This goes to buffer, not terminal")
output = buffer.getvalue()

# nullcontext — conditional context manager
def process(data, lock=None):
    ctx = lock if lock is not None else nullcontext()
    with ctx:
        process_data(data)

# ExitStack — dynamic number of context managers
with ExitStack() as stack:
    files = [stack.enter_context(open(f)) for f in filenames]
    # All files open, all closed on exit regardless of errors
    data = [f.read() for f in files]

# Re-entrant context manager
from contextlib import AbstractContextManager
import threading

class ThreadSafeDB(AbstractContextManager):
    def __init__(self):
        self._lock = threading.RLock()  # Reentrant lock

    def __enter__(self):
        self._lock.acquire()
        return self

    def __exit__(self, *args):
        self._lock.release()
        return False

Real-World Patterns

# Feature flag context manager
@contextmanager
def feature_flag(flag_name: str, user_id: int):
    enabled = check_feature_flag(flag_name, user_id)
    try:
        yield enabled
    finally:
        if enabled:
            track_feature_usage(flag_name, user_id)

with feature_flag("new_checkout", user.id) as new_checkout:
    if new_checkout:
        result = new_checkout_flow()
    else:
        result = legacy_checkout_flow()

# Retry context manager
@contextmanager
def retry(max_attempts: int = 3, exceptions=(Exception,)):
    last_error = None
    for attempt in range(max_attempts):
        try:
            yield attempt
            break  # success — exit loop
        except exceptions as e:
            last_error = e
            if attempt < max_attempts - 1:
                time.sleep(2 ** attempt)
    else:
        raise last_error

# Note: retry CM is tricky because yield can't be resumed after exception
# Better with a function decorator (see decorators guide)

Python context managers are the right tool for any resource that needs cleanup — files, connections, locks, timers, transactions. Use @contextmanager for simple cases, implement __enter__/__exit__ for reusable classes, and ExitStack for dynamic resources. Async context managers work identically with async with and @asynccontextmanager.

✍️ Leave a Comment

Your email address will not be published. Required fields are marked *

🌐 Read in:🇬🇧 English🇩🇪 Deutsch🇧🇷 Português🇸🇦 العربية🇮🇳 हिन्दी🇧🇩 বাংলা