لقد نضجت بنية الخدمات الصغيرة بشكل ملحوظ في عام 2026. بعد سنوات من “جحيم الخدمات الصغيرة” حيث أنشأت الفرق عددًا كبيرًا جدًا من الخدمات الصغيرة جدًا، تقاربت الصناعة على أنماط توازن بين النمطية والبساطة التشغيلية. يغطي هذا الدليل متى يجب استخدام الخدمات الصغيرة، وكيفية تصميمها، وكيفية التعامل مع الأجزاء الصلبة.
📋 Table of Contents
Monolith vs Microservices vs Modular Monolith
| يقترب | متى تستخدم | المقايضات |
|---|---|---|
| متراصة | بدء التشغيل، أقل من 10 مهندسين، مجال غير معروف | سهل البناء/التصحيح، ومن الصعب توسيع نطاقه بشكل مستقل |
| متراصة وحدات | فريق متنامٍ وسياقات محددة واضحة ونشر فردي | أفضل ما في العالمين – وحدات نمطية ولكنها بسيطة |
| الخدمات المصغرة | مؤسسة كبيرة، وتوسيع نطاق الفريق المستقل، وتكنولوجيا متعددة اللغات | التوسع/النشر المستقل، النظام الموزع المعقد |
إجماع 2026: ابدأ بوحدة متراصة معيارية، واستخرج الخدمات عندما يكون لديك سبب واضح (النطاق، النشر المستقل، استقلالية الفريق).
السياقات المحدودة وتصميم الخدمة
تتوافق الخدمات الصغيرة الجيدة مع السياقات المحددة للتصميم المستند إلى المجال (DDD):
E-commerce domain bounded contexts:
Order Service (owns orders, order items)
- Create order
- Update order status
- Get order history
Product Service (owns products, inventory)
- List products
- Update inventory
- Product search
User Service (owns users, auth)
- Register, login
- Profile management
- Authentication tokens
Payment Service (owns payment processing)
- Process payment
- Handle refunds
- Payment status
Notification Service (owns notifications)
- Email, SMS, push
- Triggered by events from other services
Each service:
- Has its own database (database-per-service pattern)
- Is owned by one team
- Can be deployed independently
- Communicates via API or events (not shared DB)
أنماط الاتصال
متزامن (REST/gRPC)
# REST API call between services
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
async def get_user(user_id: int) -> dict:
async with httpx.AsyncClient(base_url="http://user-service:8001") as client:
r = await client.get(f"/api/users/{user_id}", timeout=5.0)
r.raise_for_status()
return r.json()
# Use circuit breaker for resilience
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=30)
async def call_payment_service(order_id: int, amount: float):
async with httpx.AsyncClient(timeout=10) as client:
return await client.post("http://payment-service/payments",
json={"order_id": order_id, "amount": amount})
غير متزامن (مدفوع بالحدث مع كافكا)
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
# Produce event when order is created
async def publish_order_created(order: dict):
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await producer.start()
try:
event = {
"event_type": "order.created",
"order_id": order["id"],
"user_id": order["user_id"],
"total": order["total"],
"timestamp": datetime.utcnow().isoformat()
}
await producer.send("orders", json.dumps(event).encode())
finally:
await producer.stop()
# Notification service consumes the event
async def consume_order_events():
consumer = AIOKafkaConsumer(
"orders",
bootstrap_servers="kafka:9092",
group_id="notification-service",
auto_offset_reset="earliest"
)
await consumer.start()
async for msg in consumer:
event = json.loads(msg.value)
if event["event_type"] == "order.created":
await send_confirmation_email(event["user_id"], event["order_id"])
نمط بوابة API
# Kong / nginx / custom gateway routes
# All clients talk to one entry point:
# POST /api/orders -> Order Service :8001
# GET /api/products -> Product Service :8002
# POST /api/auth -> User Service :8003
# Kong route configuration
routes:
- name: orders
paths: ["/api/orders"]
service: order-service
plugins:
- name: jwt # validate JWT
- name: rate-limiting # 100/min per user
- name: request-transformer
- name: products
paths: ["/api/products"]
service: product-service
plugins:
- name: response-ratelimiting
التتبع الموزع
# OpenTelemetry — trace requests across services
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Setup (once at startup)
provider = TracerProvider()
jaeger_exporter = JaegerExporter(agent_host_name="jaeger", agent_port=6831)
provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
# Instrument FastAPI
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
FastAPIInstrumentor.instrument_app(app) # auto-instruments all routes
HTTPXClientInstrumentor().instrument() # auto-instruments httpx calls
# Manual span for business logic
async def process_order(order_id: int):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
order = await get_order(order_id)
span.set_attribute("order.total", order["total"])
await charge_payment(order)
await send_confirmation(order)
خدمة الصحة والمرونة
from fastapi import FastAPI
from datetime import datetime
app = FastAPI()
@app.get("/health")
async def health():
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.2.3",
"dependencies": {
"database": await check_db(),
"redis": await check_redis(),
}
}
@app.get("/ready")
async def readiness():
# Check if service can handle traffic
if not db_pool or not redis_client:
raise HTTPException(503, "Service not ready")
return {"status": "ready"}
# Kubernetes probes:
# livenessProbe: /health (if fails: restart container)
# readinessProbe: /ready (if fails: remove from load balancer)
عندما لا تستخدم الخدمات المصغرة
- فريق صغير (أقل من 5 مهندسين)– النفقات العامة التشغيلية تقتل السرعة
- المنتج المبكر– حدود المجال ليست واضحة بعد
- التطبيق CRUD بسيط– لا يوجد مبرر للتحجيم
- زمن وصول الشبكة أمر بالغ الأهمية— تضيف المكالمات بين الخدمات زمن الوصول
- لا يوجد نضج DevOps– تحتاج إلى CI/CD والمراقبة والتتبع أولاً
تعمل الخدمات الصغيرة في 2026 بشكل أفضل عندما: تكون الفرق كبيرة بما يكفي لامتلاك الخدمات بشكل مستقل، وتكون السياقات المحددة واضحة، ولديك بنية DevOps الأساسية للتعامل مع الأنظمة الموزعة. ابدأ بوحدة متراصة معيارية جيدة التنظيم واستخرج الخدمات فقط عندما تصل إلى نقاط الألم الواضحة.
🔗 Share this article
✍️ Leave a Comment