⏱️4 min read · 748 words
يعد Apache Kafka العمود الفقري للبنية التحتية للبيانات في الوقت الفعلي في Netflix وLinkedIn وUber وآلاف الشركات في عام 2026. إن نموذج Kafka للنشر والاشتراك وتخزين السجل المستمر وقابلية التوسع الأفقي يجعله المعيار القياسي لتدفق الأحداث واتصالات الخدمات الصغيرة وخطوط أنابيب البيانات. يغطي هذا الدليل كل شيء بدءًا من المنتج الأول وحتى إنتاج هندسة كافكا.
📋 Table of Contents
مفاهيم كافكا الأساسية
Kafka Architecture:
Producers → Brokers (Kafka cluster) → Consumers
↓
Topic (logical channel)
↓
Partitions (physical)
↓
Replicas (redundancy across brokers)
Key concepts:
- Topic: Named stream of records
- Partition: Ordered, immutable sequence within a topic
- Offset: Position of a record within a partition
- Consumer Group: Multiple consumers sharing partition reads
- Broker: Server in Kafka cluster
- ZooKeeper/KRaft: Cluster coordination (KRaft is now default)
Retention: records kept for 7 days by default (configurable)
Throughput: millions of messages/second per broker
إعداد عامل الميناء
# compose.yaml — Kafka with KRaft (no ZooKeeper)
services:
kafka:
image: apache/kafka:3.7.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_DIRS: /var/lib/kafka/data
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qg
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
- kafka
منتج ومستهلك بايثون
pip install confluent-kafka
# producer.py
from confluent_kafka import Producer
import json, time
producer = Producer({
"bootstrap.servers": "localhost:9092",
"acks": "all", # wait for all replicas to acknowledge
"retries": 3, # retry on failure
"retry.backoff.ms": 300,
"enable.idempotence": True, # exactly-once semantics
"compression.type": "snappy", # compress messages
})
def delivery_report(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
# Produce messages
for i in range(100):
event = {
"id": i,
"type": "user.signup",
"user_id": 1000 + i,
"email": f"user{i}@example.com",
"timestamp": time.time(),
}
producer.produce(
topic="user-events",
key=str(event["user_id"]), # partition by user_id
value=json.dumps(event).encode(),
callback=delivery_report,
)
producer.poll(0) # trigger delivery callbacks
producer.flush() # wait for all messages to be delivered
print("All messages produced")
# consumer.py
from confluent_kafka import Consumer, KafkaError
import json
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "user-event-processor", # consumer group
"auto.offset.reset": "earliest", # start from beginning if no committed offset
"enable.auto.commit": False, # manual commit for exactly-once
"max.poll.interval.ms": 300000, # 5 min max processing time per batch
})
consumer.subscribe(["user-events"])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Error: {msg.error()}")
break
# Process message
event = json.loads(msg.value().decode())
print(f"Processing: {event['type']} for user {event['user_id']}")
try:
process_event(event)
consumer.commit(msg) # commit AFTER successful processing
except Exception as e:
print(f"Processing failed: {e}")
# Don't commit — message will be reprocessed
except KeyboardInterrupt:
pass
finally:
consumer.close()
تكوين الموضوع
# Create topic with 3 partitions, replication factor 3
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic user-events --partitions 3 --replication-factor 3 --config retention.ms=604800000 \ # 7 days
--config retention.bytes=1073741824 # 1GB per partition
# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092
# Describe topic
kafka-topics.sh --describe --topic user-events --bootstrap-server localhost:9092
# Consume from CLI (debugging)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user-events --from-beginning
معالجة الدفق مع تيارات كافكا
# Using faust (Kafka Streams for Python)
import faust
app = faust.App("myapp", broker="kafka://localhost:9092")
# Define topics
orders_topic = app.topic("orders", value_type=dict)
processed_topic = app.topic("processed-orders", value_type=dict)
revenue_topic = app.topic("revenue-by-category", value_type=dict)
# Simple stream processor
@app.agent(orders_topic)
async def process_orders(orders):
async for order in orders:
# Enrich the order
order["processed_at"] = time.time()
order["status"] = "processed"
await processed_topic.send(value=order)
# Aggregation (windowed)
order_revenue_table = app.Table(
"order-revenue",
default=float,
partitions=1,
).tumbling(60.0) # 60-second tumbling window
@app.agent(orders_topic)
async def aggregate_revenue(orders):
async for order in orders.group_by(lambda o: o["category"]):
# Accumulate revenue per category per 60-second window
order_revenue_table[order["category"]] += order["total"]
await revenue_topic.send(
key=order["category"],
value={
"category": order["category"],
"revenue": order_revenue_table[order["category"]].current()
}
)
if __name__ == "__main__":
app.main()
أنماط كافكا للخدمات الصغيرة
Event-Driven Microservices with Kafka:
1. Event Sourcing
- All state changes as immutable events in Kafka
- Replay events to rebuild state
- Audit log for free
2. CQRS (Command Query Responsibility Segregation)
- Commands go through Kafka
- Consumers build read-optimized projections
3. Saga Pattern (distributed transactions)
- OrderService publishes OrderCreated
- PaymentService consumes, publishes PaymentCompleted or PaymentFailed
- OrderService consumes result, updates order status
- No distributed transactions needed
4. Outbox Pattern (avoiding dual-write)
- Write to DB + outbox table in same transaction
- Separate process reads outbox, publishes to Kafka
- Kafka Connect CDC can automate this
نصائح كافكا الإنتاج
- التقسيم: عدد الأقسام = أقصى التوازي. المزيد من الأقسام = المزيد من الإنتاجية ولكن المزيد من الحمل. القاعدة: 10-100x التوازي المتوقع للمستهلك
- حفظ: استخدم ضغط السجل للبيانات ذات الحالة (احتفظ بأحدث قيمة لكل مفتاح)
- مجموعات المستهلكين: تحصل كل خدمة صغيرة على مجموعتها الخاصة — تقدم مستقل
- سجل المخطط: استخدم Confluent Schema Registry + Avro/Protobuf لتطوير المخطط
- يراقب: تأخر المستهلك هو المقياس الرئيسي – تنبيه عندما ينمو التأخر
- مرة واحدة على الأقل مقابل مرة واحدة بالضبط: بالضبط مرة واحدة يتطلب المستهلكين العاجزين + المعاملات
كافكا في عام 2026 هو معيار الصناعة لتدفق الأحداث واتصالات الخدمات الصغيرة على نطاق واسع. ابدأ بمكتبة Python-kafka المتموجة، واستخدم مجموعات المستهلكين للمعالجة المتوازية، والتزم بالإزاحات يدويًا لضمان الموثوقية، وراقب تأخر المستهلك كمقياس صحي أساسي.
🔗 Share this article
✍️ Leave a Comment