
بايثونغير متزامنتتيح الوحدة كتابة التعليمات البرمجية المتزامنة باستخدام بناء الجملة غير المتزامن/الانتظار. في عام 2026، سيعمل نظام asyncio على تشغيل FastAPI وStarlette وaiohttp. يغطي هذا الدليل كل شيء بدءًا من حلقات الأحداث وحتى أنماط العالم الحقيقي.
📋 Table of Contents
ما هو التزامن؟
يستخدم asyncio حلقة حدث ذات ترابط واحد لإدارة المهام المتزامنة – وهو مثالي للإدخال/الإخراج للشبكة وواجهات برمجة التطبيقات حيث تقضي معظم الوقت في الانتظار، وليس الحوسبة.
import asyncio
async def main():
print('Hello asyncio!')
await asyncio.sleep(1)
print('Done!')
asyncio.run(main())
غير متزامن/في انتظار بناء الجملة
استخدمasync defلتحديد كوروتين. استخدمawaitللتوقف حتى تصبح النتيجة جاهزة.
import asyncio
async def fetch_data(url: str) -> str:
await asyncio.sleep(0.5) # simulate network
return f'Data from {url}'
async def main():
result = await fetch_data('https://api.example.com')
print(result)
asyncio.run(main())
تشغيل المهام بشكل متزامن
asyncio.gather()يدير coroutines متعددة في وقت واحد. ثلاث مهام مدتها ثانية واحدة تنتهي في إجمالي ثانية واحدة تقريبًا، وليس 3.
import asyncio, time
async def task(name: str, delay: float):
print(f'Start {name}')
await asyncio.sleep(delay)
return name
async def main():
start = time.time()
results = await asyncio.gather(
task('A', 1.0),
task('B', 2.0),
task('C', 0.5),
)
print(f'Done in {time.time()-start:.2f}s: {results}')
# ~2.0s not 3.5s
asyncio.run(main())
التزامن مع aiohttp
جلب 100 عنوان URL بشكل متزامن في نفس الوقت تقريبًا الذي يتم فيه جلب عنوان URL.
import asyncio, aiohttp
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.text()
async def main():
urls = ['https://httpbin.org/delay/1'] * 5
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[fetch(session, u) for u in urls])
print(f'Fetched {len(results)} pages')
asyncio.run(main())
asyncio.Queue للمنتج والمستهلك
import asyncio
async def producer(q: asyncio.Queue):
for i in range(5):
await q.put(f'item-{i}')
await asyncio.sleep(0.2)
await q.put(None) # sentinel
async def consumer(q: asyncio.Queue):
while True:
item = await q.get()
if item is None: break
print(f'Consumed {item}')
async def main():
q = asyncio.Queue(maxsize=3)
await asyncio.gather(producer(q), consumer(q))
asyncio.run(main())
معالجة الأخطاء
import asyncio
async def risky(n):
if n == 2: raise ValueError(f'Task {n} failed!')
return f'Task {n} OK'
async def main():
results = await asyncio.gather(
*[risky(i) for i in range(4)],
return_exceptions=True
)
for r in results:
print('Error:', r if isinstance(r, Exception) else r)
asyncio.run(main())
أفضل الممارسات
- لا تستخدم أبدًا
time.sleep()– استخدمawait asyncio.sleep() - تجنب حظر الإدخال/الإخراج داخل coroutines
- استخدم
asyncio.run()كنقطة الدخول - استخدم
asyncio.TaskGroup(Python 3.11+) للتزامن المنظم - تصحيح الأخطاء باستخدام
PYTHONASYNCIODEBUG=1
الخلاصة
asyncio هو رد بايثون على التزامن على نمط Node.js. ماسترasyncio.gather()وأضف قوائم الانتظار عند الحاجة، وانتقل إلى TaskGroup للحصول على حدود أكثر وضوحًا للأخطاء. مثالي لواجهات برمجة التطبيقات والكاشطات والخدمات الصغيرة.
🔗 Share this article
✍️ Leave a Comment