بايثونasyncioالمكتبة هي العمود الفقري لكل خدمة Python عالية الإنتاجية التي يتم شحنها في عام 2026 – بدءًا من واجهات برمجة تطبيقات FastAPI إلى روبوتات Discord إلى كاشطات الويب التي تسحب آلاف الصفحات في الدقيقة. يستعرض هذا الدليل النموذج العقلي بأكمله: حلقات الأحداث، والروتينات، والمهام، والتزامن المنظم معTaskGroupوحفنة من الأخطاء التي تؤثر بهدوء على الأداء في الإنتاج.
📋 Table of Contents
- جدول المحتويات
- ما الذي يحله التزامن بالفعل
- حلقة الحدث، موضحة
- Coroutines مقابل المهام
- لقد كانت الطريقة القياسية لتشغيل عدة كوروتينات بشكل متزامن منذ إصدار Python 3.4، ولكنها تتمتع بميزة حادة: إذا ظهرت مهمة واحدة، فإن المهام الأخرى تستمر في العمل في الخلفية ما لم تجتاز
- يمكن للنقطة تسليم التحكم إلى مهمة أخرى في منتصف العملية.
- ، حجب
- Timeouts and Cancellation
- مثال حقيقي: جلب واجهة برمجة التطبيقات المتزامنة
- نسيان انتظار كوروتين.
- هل لا يزال التزامن مناسبًا في عام 2026؟
- كيفية بناء REST API باستخدام FastAPI
- يكافئ asyncio النموذج العقلي الواضح أكثر من بناء الجملة المحفوظ: فهم حلقة الحدث، ومعرفة متى يتم حظر شيء ما، والاعتماد على أساسيات التزامن المنظمة مثل TaskGroup لأي شيء يتجاوز البرنامج النصي السريع. قم بوضع إشارة مرجعية على هذا الدليل وارجع إلى مثال أداة الجلب في المرة القادمة التي تحتاج فيها إلى إنشاء عميل متزامن بشكل صحيح في المرة الأولى.
ما الذي يحله التزامن بالفعل
يوجد عدم التزامن لمهمة واحدة: تشغيل العديد من العمليات المرتبطة بالإدخال/الإخراج بشكل متزامن على مؤشر ترابط واحد، دون تحميل مؤشرات الترابط أو العمليات. عندما يقضي برنامجك معظم وقته في الانتظار – لاستجابة قاعدة البيانات، أو رد HTTP، أو قراءة ملف – يتيح له asyncio التبديل إلى عمل آخر أثناء هذا الانتظار بدلاً من الجلوس خاملاً.
وهذا يختلف بشكل أساسي عن التوازي المرتبط بوحدة المعالجة المركزية. لا يوفر لك asyncio المزيد من نوى وحدة المعالجة المركزية؛ فهو يمنحك استخدامًا أفضل لنواة واحدة أثناء انتظار الإدخال/الإخراج. بالنسبة للعمل الذي يتطلب الكثير من وحدة المعالجة المركزية (معالجة الصور ومعالجة الأرقام)، ما زلت تريدmultiprocessingأو امتداد أصلي.
حلقة الحدث، موضحة
في قلب كل برنامج غير متزامن توجد حلقة الحدث – وهي عبارة عن برنامج جدولة أحادي الخيط يقوم بتشغيل إجراءات روتينية، ويترك التحكم عندما ينتظر المرء شيئًا ما، ويستأنفه عندما تصبح النتيجة المنتظرة جاهزة.
import asyncio
async def say_hello():
print("start")
await asyncio.sleep(1)
print("end")
asyncio.run(say_hello())
asyncio.run()ينشئ حلقة حدث جديدة، ويشغل الكوروتين حتى الاكتمال، ثم يغلق الحلقة. إنها نقطة الإدخال الصحيحة ذات المستوى الأعلى لأي برنامج نصي غير متزامن – تجنب الأقدمget_event_loop().run_until_complete()النمط ما لم يكن لديك سبب محدد لإدارة الحلقة يدويًا.
Coroutines مقابل المهام
كائن coroutine (الذي تم إنشاؤه عن طريق استدعاء دالة ||||) لا يفعل شيئًا من تلقاء نفسه – إنه كائن يشبه المولد متوقف مؤقتًا في انتظار تشغيله. في انتظار تشغيله بشكل مضمن، بالتتابع. أasync defالمهمةيقوم بتغليف coroutine وجدولة تشغيله بشكل متزامن في حلقة الحدث على الفور.المهام.py
import asyncio
import time
async def fetch(n):
await asyncio.sleep(1)
return n * 2
async def sequential():
start = time.perf_counter()
results = [await fetch(i) for i in range(5)]
print("sequential:", time.perf_counter() - start)
return results
async def concurrent():
start = time.perf_counter()
tasks = [asyncio.create_task(fetch(i)) for i in range(5)]
results = await asyncio.gather(*tasks)
print("concurrent:", time.perf_counter() - start)
return results
asyncio.run(sequential()) # ~5 seconds
asyncio.run(concurrent()) # ~1 second
جمع () مقابل مجموعة المهام
لقد كانت الطريقة القياسية لتشغيل عدة كوروتينات بشكل متزامن منذ إصدار Python 3.4، ولكنها تتمتع بميزة حادة: إذا ظهرت مهمة واحدة، فإن المهام الأخرى تستمر في العمل في الخلفية ما لم تجتاز
asyncio.gather()ومعالجة الأخطاء يدويًا. يعد هذا مصدرًا شائعًا للمهام المعزولة وتحذيرات الاستثناءات غير المعالجة في سجلات الإنتاج.return_exceptions=Trueتقديم بايثون 3.11
، وهو بدائي متزامن منظم يعمل على إصلاح هذا بشكل صحيح: إذا فشلت أي مهمة فرعية، تقوم المجموعة بإلغاء جميع المهام الشقيقة المتبقية وتثيرasyncio.TaskGroupتحتوي على كل فشلExceptionGroupTaskgroup.py
import asyncio
async def risky(n):
if n == 2:
raise ValueError("boom at " + str(n))
await asyncio.sleep(1)
return n
async def main():
try:
async with asyncio.TaskGroup() as tg:
results = [tg.create_task(risky(i)) for i in range(4)]
except* ValueError as eg:
for exc in eg.exceptions:
print("caught:", exc)
asyncio.run(main())
الحالات، مما يتيح لك معالجة كل فشل حدث عبر المجموعة بدلاً من الفشل الأول فقط. إذا كنت تستهدف Python 3.11 أو الإصدارات الأحدث، فاخترexcept*على الخامExceptionGroupلأي شيء يتجاوز التزامن التافه.TaskGroupمديرو السياق غير المتزامن والأقفالgather()لا تزال الحالة المشتركة القابلة للتغيير عبر المهام المتزامنة بحاجة إلى الحماية، حتى على مؤشر ترابط واحد، لأن
يمكن للنقطة تسليم التحكم إلى مهمة أخرى في منتصف العملية.
حراسة الأقسام الهامة بنفس الطريقةawaitيفعل للخيوط.asyncio.Locklock_example.pythreading.Lock📋 نسخ
import asyncio
counter = 0
lock = asyncio.Lock()
async def increment():
global counter
async with lock:
current = counter
await asyncio.sleep(0) # simulate a yield point
counter = current + 1
async def main():
await asyncio.gather(*(increment() for _ in range(100)))
print(counter) # always 100, never less
asyncio.run(main())
، مما يتسبب في فقدان التحديثات – حالة السباق الكلاسيكية، على مؤشر ترابط واحد فقط بدلاً من العديد.await asyncio.sleep(0)فخ حظر المكالماتcounterهذا هو المصدر الأكبر لتقارير الأخطاء “تطبيق asyncio الخاص بي بطيء”. حلقة الحدث ذات خيط واحد. أي مكالمة متزامنة تحظر —
، حجب
، حلقة مثقلة بوحدة المعالجة المركزية (CPU)، ملف غير مخزن مؤقتًا يتم قراءته على قرص بطيء – يؤدي إلى تجميد الحلقة بأكملها، بما في ذلك كل مهمة أخرى تنتظرها.time.sleep()bad_vs_good.pyrequests.get()📋 نسخ
# BAD: blocks the whole event loop for 2 seconds
import time
async def bad():
time.sleep(2)
# GOOD: yields control back to the loop
import asyncio
async def good():
await asyncio.sleep(2)
# GOOD: offload a real blocking call to a thread
async def good_blocking_lib():
result = await asyncio.to_thread(some_blocking_function, arg1, arg2)
return result
معasyncio.to_thread()بدلاً من ذلك — لن تساعد سلاسل العمليات في العمل المرتبط بوحدة المعالجة المركزية (CPU) بسبب GIL.loop.run_in_executor()المهلات والإلغاءProcessPoolExecutorيحتاج رمز الإنتاج إلى مهلات لكل مكالمة خارجية. تمت إضافة بايثون 3.11
Timeouts and Cancellation
Production code needs timeouts on every external call. Python 3.11 addedasyncio.timeout()كبديل نظيف لمدير السياق للأقدمasyncio.wait_for().
import asyncio
async def slow_call():
await asyncio.sleep(10)
async def main():
try:
async with asyncio.timeout(2):
await slow_call()
except TimeoutError:
print("operation timed out after 2s")
asyncio.run(main())
عندما تنطلق المهلة، يلغي المزامنة المهمة الداخلية عن طريق رفعasyncio.CancelledErrorداخله. إذا كان الكوروتين الخاص بك يغلف الموارد التي تحتاج إلى التنظيف (الاتصالات المفتوحة، ومقابض الملفات)، فاستخدمtry/finallyأو مدير سياق غير متزامن حتى لا يؤدي الإلغاء إلى تسرب الموارد.
مثال حقيقي: جلب واجهة برمجة التطبيقات المتزامنة
فيما يلي مثال كامل على شكل إنتاج: جلب عناوين URL متعددة بشكل متزامن مع حد الاتصال، ومهلة لكل طلب، ومعالجة الأخطاء المنظمة باستخدامaiohttp.
import asyncio
import aiohttp
async def fetch_one(session, url, semaphore):
async with semaphore:
try:
async with asyncio.timeout(5):
async with session.get(url) as resp:
return url, resp.status, await resp.text()
except (TimeoutError, aiohttp.ClientError) as e:
return url, None, str(e)
async def fetch_all(urls, max_concurrent=10):
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_one(session, u, semaphore)) for u in urls]
return [t.result() for t in tasks]
urls = ["https://example.com"] * 50
results = asyncio.run(fetch_all(urls))
print(len(results), "requests completed")
|||| يحدد عدد الطلبات التي يتم تشغيلها في وقت واحد، مما يحمي كلاً من العميل والخادم الهدف من الإرهاق بمئات الاتصالات المتزامنة. هذا النمط – إعادة استخدام الجلسة، والتزامن المحدود بالإشارة، ومهلة كل طلب، ومجموعة المهام المنظمة – قريب مما تراه في مكشطة الإنتاج أو مجمع واجهة برمجة التطبيقات.Semaphoreأخطاء شائعة
نسيان انتظار كوروتين.
- استدعاءدالة بدون
async defما عليك سوى إنشاء كائن coroutine وعدم القيام بأي شيء – ستحذر Python من أن “coroutine لم يتم انتظاره أبدًا” ولكن من السهل تفويته في السجلات المزعجة.awaitإنشاء المهام دون الاحتفاظ بمرجع. - لا يحمل إلا مرجعًا ضعيفًا داخليًا؛ إذا حصل كائن المهمة على البيانات المهملة قبل أن تنتهي، فيمكن إلغاؤها بصمت. قم بتخزين المهام في قائمة أو قم بتعيينها حتى تكتمل.
asyncio.create_task()خلط برامج تشغيل قاعدة البيانات المتزامنة وغير المتزامنة. - استخدام برنامج تشغيل الحظر (مثل عادي) داخل التعليمات البرمجية غير المتزامنة يتعارض مع الغرض – استخدم برنامج تشغيل غير متزامن (
psycopg2,asyncpg) أو قم بلف المكالمة بـaiomysql.to_thread()عدم تحديد المهلات. - يمكن لمكالمة شبكة معلقة واحدة دون انتهاء المهلة أن تؤدي إلى تعطيل معالج الطلب بأكمله إلى أجل غير مسمى تحت التحميل.الإفراط في استخدام المزامنة للعمل المرتبط بوحدة المعالجة المركزية.
- إذا كان عنق الزجاجة لديك هو الحساب، وليس الانتظار، فإن asyncio يضيف التعقيد دون إضافة السرعة.الأسئلة المتداولة
هل لا يزال التزامن مناسبًا في عام 2026؟
نعم. يظل asyncio هو الطريقة القياسية لكتابة كود Python المرتبط بالإدخال/الإخراج المتزامن، وهو يدعم أطر العمل الرئيسية مثل FastAPI، وaiohttp، وStarlette. الميزات الأحدث مثل TaskGroup ومجموعات الاستثناء في Python 3.11+ جعلتها أكثر راحة، وليست أقل أهمية.
هل يجب علي استخدام المزامنة أو الترابط؟
استخدم asyncio للعمل المرتبط بالإدخال/الإخراج مع العديد من الاتصالات المتزامنة (مكالمات الشبكة، استعلامات قاعدة البيانات، إدخال/إخراج الملفات). استخدم الترابط لحظر مكتبات الجهات الخارجية التي لا تدعم المزامنة. استخدم المعالجة المتعددة للعمل المرتبط بوحدة المعالجة المركزية نظرًا لأن asyncio لا يتجاوز GIL.
ما الفرق بين asyncio.gather وTaskGroup؟
يقوم asyncio.gather() بجمع النتائج من عدة coroutines ولكن لديه سلوك إلغاء غير متناسق على الأخطاء. مجموعة المهام (Python 3.11+) عبارة عن مجموعة بدائية متزامنة منظمة تقوم تلقائيًا بإلغاء المهام الشقيقة عندما يفشل أحدها وتثير مجموعة الاستثناءات، مما يجعل معالجة الأخطاء أكثر قابلية للتنبؤ بها.
لماذا يتجمد برنامج المزامنة الخاص بي؟
السبب الأكثر شيوعًا هو استدعاء وظيفة الحظر المتزامنة (مثل time.sleep أو طلب الحظر.get أو عمل وحدة المعالجة المركزية المكثف) داخل coroutine. يؤدي ذلك إلى حظر سلسلة حلقات الحدث الفردية وإيقاف كل مهمة أخرى. استخدم asyncio.sleep، أو عملاء HTTP غير المتزامنين، أو run_in_executor لحظر المكالمات.
هل يمكنني مزج المزامنة مع التعليمات البرمجية المتزامنة؟
نعم، عبر asyncio.to_thread() (Python 3.9+) أوloop.run_in_executor() لإلغاء تحميل مكالمات الحظر إلى تجمع مؤشرات الترابط دون حظر حلقة الأحداث. يمكنك أيضًا استدعاء asyncio.run() من نقاط الإدخال المتزامنة للانتقال إلى التعليمات البرمجية غير المتزامنة.
هل أحتاج إلى المزامنة لبرنامج نصي صغير؟
عادة لا. إذا كنت تقوم بإجراء استدعاء واحد أو اثنين من استدعاءات واجهة برمجة التطبيقات (API) التسلسلية، فإن التعليمات البرمجية المتزامنة تكون أبسط وأسهل في التصحيح. يمكنك الوصول إلى عدم المزامنة عندما يكون لديك العديد من عمليات الإدخال/الإخراج المتزامنة حيث يكون وقت الانتظار المتداخل مهمًا بالفعل.
القراءة ذات الصلة على TechPulse
يكافئ asyncio النموذج العقلي الواضح أكثر من بناء الجملة المحفوظ: فهم حلقة الحدث، ومعرفة متى يتم حظر شيء ما، والاعتماد على أساسيات التزامن المنظمة مثل TaskGroup لأي شيء يتجاوز البرنامج النصي السريع. قم بوضع إشارة مرجعية على هذا الدليل وارجع إلى مثال أداة الجلب في المرة القادمة التي تحتاج فيها إلى إنشاء عميل متزامن بشكل صحيح في المرة الأولى.
فريق التحرير في TechPulse
🔗 Share this article
✍️ Leave a Comment