
PythonsasynchronDas Modul ermöglicht das Schreiben gleichzeitigen Codes mithilfe der Async/Await-Syntax. Im Jahr 2026 unterstützt Asyncio FastAPI, Starlette und aiohttp. Dieser Leitfaden deckt alles ab, von Ereignisschleifen bis hin zu realen Mustern.
📋 Table of Contents
Was ist Asyncio?
asyncio verwendet eine Single-Thread-Ereignisschleife, um gleichzeitige Aufgaben zu verwalten – perfekt für Netzwerk-E/A und APIs, bei denen Sie die meiste Zeit mit Warten und nicht mit Rechnen verbringen.
import asyncio
async def main():
print('Hello asyncio!')
await asyncio.sleep(1)
print('Done!')
asyncio.run(main())
async/await-Syntax
Verwenden Sieasync defum eine Coroutine zu definieren. Verwenden Sieawaitpausieren, bis ein Ergebnis vorliegt.
import asyncio
async def fetch_data(url: str) -> str:
await asyncio.sleep(0.5) # simulate network
return f'Data from {url}'
async def main():
result = await fetch_data('https://api.example.com')
print(result)
asyncio.run(main())
Gleichzeitiges Ausführen von Aufgaben
asyncio.gather()führt mehrere Coroutinen gleichzeitig aus. Drei 1-Sekunden-Aufgaben werden in insgesamt etwa 1 Sekunde abgeschlossen, nicht in 3.
import asyncio, time
async def task(name: str, delay: float):
print(f'Start {name}')
await asyncio.sleep(delay)
return name
async def main():
start = time.time()
results = await asyncio.gather(
task('A', 1.0),
task('B', 2.0),
task('C', 0.5),
)
print(f'Done in {time.time()-start:.2f}s: {results}')
# ~2.0s not 3.5s
asyncio.run(main())
asynchron mit aiohttp
Rufen Sie 100 URLs gleichzeitig ab, und zwar ungefähr in der gleichen Zeit wie das Abrufen einer URL.
import asyncio, aiohttp
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.text()
async def main():
urls = ['https://httpbin.org/delay/1'] * 5
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[fetch(session, u) for u in urls])
print(f'Fetched {len(results)} pages')
asyncio.run(main())
asyncio.Queue für Producer-Consumer
import asyncio
async def producer(q: asyncio.Queue):
for i in range(5):
await q.put(f'item-{i}')
await asyncio.sleep(0.2)
await q.put(None) # sentinel
async def consumer(q: asyncio.Queue):
while True:
item = await q.get()
if item is None: break
print(f'Consumed {item}')
async def main():
q = asyncio.Queue(maxsize=3)
await asyncio.gather(producer(q), consumer(q))
asyncio.run(main())
Fehlerbehandlung
import asyncio
async def risky(n):
if n == 2: raise ValueError(f'Task {n} failed!')
return f'Task {n} OK'
async def main():
results = await asyncio.gather(
*[risky(i) for i in range(4)],
return_exceptions=True
)
for r in results:
print('Error:', r if isinstance(r, Exception) else r)
asyncio.run(main())
Best Practices
- Verwenden Sie niemals
time.sleep()– verwenden Sieawait asyncio.sleep() - Vermeiden Sie das Blockieren von E/A innerhalb von Coroutinen
- Verwenden Sie
asyncio.run()als Einstiegspunkt - Verwenden Sie
asyncio.TaskGroup(Python 3.11+) für strukturierte Parallelität - Debuggen Sie mit
PYTHONASYNCIODEBUG=1
Fazit
asyncio ist Pythons Antwort auf die Parallelität im Node.js-Stil. Meisterasyncio.gather(), fügen Sie bei Bedarf Warteschlangen hinzu und wechseln Sie zur TaskGroup, um klarere Fehlergrenzen zu erhalten. Perfekt für APIs, Scraper und Microservices.
🔗 Share this article
✍️ Leave a Comment