Apache Kafka হল Netflix, LinkedIn, Uber এবং 2026 সালে হাজার হাজার কোম্পানির রিয়েল-টাইম ডেটা পরিকাঠামোর মেরুদণ্ড। Kafka-এর প্রকাশ-সাবস্ক্রাইব মডেল, ক্রমাগত লগ স্টোরেজ এবং অনুভূমিক স্কেলেবিলিটি এটিকে ইভেন্ট স্ট্রিমিং, মাইক্রোসার্ভিসেস কমিউনিকেশন এবং ডেটা পাইপলাইনের জন্য আদর্শ করে তুলেছে। এই নির্দেশিকাটি প্রথম প্রযোজক থেকে শুরু করে কাফকা আর্কিটেকচার পর্যন্ত সবকিছুই কভার করে।
📋 Table of Contents
কাফকা মূল ধারণা
Kafka Architecture:
Producers → Brokers (Kafka cluster) → Consumers
↓
Topic (logical channel)
↓
Partitions (physical)
↓
Replicas (redundancy across brokers)
Key concepts:
- Topic: Named stream of records
- Partition: Ordered, immutable sequence within a topic
- Offset: Position of a record within a partition
- Consumer Group: Multiple consumers sharing partition reads
- Broker: Server in Kafka cluster
- ZooKeeper/KRaft: Cluster coordination (KRaft is now default)
Retention: records kept for 7 days by default (configurable)
Throughput: millions of messages/second per broker
ডকার কম্পোজ সেটআপ
# compose.yaml — Kafka with KRaft (no ZooKeeper)
services:
kafka:
image: apache/kafka:3.7.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_DIRS: /var/lib/kafka/data
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qg
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
- kafka
পাইথন প্রযোজক এবং ভোক্তা
pip install confluent-kafka
# producer.py
from confluent_kafka import Producer
import json, time
producer = Producer({
"bootstrap.servers": "localhost:9092",
"acks": "all", # wait for all replicas to acknowledge
"retries": 3, # retry on failure
"retry.backoff.ms": 300,
"enable.idempotence": True, # exactly-once semantics
"compression.type": "snappy", # compress messages
})
def delivery_report(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
# Produce messages
for i in range(100):
event = {
"id": i,
"type": "user.signup",
"user_id": 1000 + i,
"email": f"user{i}@example.com",
"timestamp": time.time(),
}
producer.produce(
topic="user-events",
key=str(event["user_id"]), # partition by user_id
value=json.dumps(event).encode(),
callback=delivery_report,
)
producer.poll(0) # trigger delivery callbacks
producer.flush() # wait for all messages to be delivered
print("All messages produced")
# consumer.py
from confluent_kafka import Consumer, KafkaError
import json
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "user-event-processor", # consumer group
"auto.offset.reset": "earliest", # start from beginning if no committed offset
"enable.auto.commit": False, # manual commit for exactly-once
"max.poll.interval.ms": 300000, # 5 min max processing time per batch
})
consumer.subscribe(["user-events"])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Error: {msg.error()}")
break
# Process message
event = json.loads(msg.value().decode())
print(f"Processing: {event['type']} for user {event['user_id']}")
try:
process_event(event)
consumer.commit(msg) # commit AFTER successful processing
except Exception as e:
print(f"Processing failed: {e}")
# Don't commit — message will be reprocessed
except KeyboardInterrupt:
pass
finally:
consumer.close()
বিষয় কনফিগারেশন
# Create topic with 3 partitions, replication factor 3
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic user-events --partitions 3 --replication-factor 3 --config retention.ms=604800000 \ # 7 days
--config retention.bytes=1073741824 # 1GB per partition
# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092
# Describe topic
kafka-topics.sh --describe --topic user-events --bootstrap-server localhost:9092
# Consume from CLI (debugging)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user-events --from-beginning
কাফকা স্ট্রিমগুলির সাথে স্ট্রিম প্রক্রিয়াকরণ
# Using faust (Kafka Streams for Python)
import faust
app = faust.App("myapp", broker="kafka://localhost:9092")
# Define topics
orders_topic = app.topic("orders", value_type=dict)
processed_topic = app.topic("processed-orders", value_type=dict)
revenue_topic = app.topic("revenue-by-category", value_type=dict)
# Simple stream processor
@app.agent(orders_topic)
async def process_orders(orders):
async for order in orders:
# Enrich the order
order["processed_at"] = time.time()
order["status"] = "processed"
await processed_topic.send(value=order)
# Aggregation (windowed)
order_revenue_table = app.Table(
"order-revenue",
default=float,
partitions=1,
).tumbling(60.0) # 60-second tumbling window
@app.agent(orders_topic)
async def aggregate_revenue(orders):
async for order in orders.group_by(lambda o: o["category"]):
# Accumulate revenue per category per 60-second window
order_revenue_table[order["category"]] += order["total"]
await revenue_topic.send(
key=order["category"],
value={
"category": order["category"],
"revenue": order_revenue_table[order["category"]].current()
}
)
if __name__ == "__main__":
app.main()
মাইক্রোসার্ভিসের জন্য কাফকা প্যাটার্নস
Event-Driven Microservices with Kafka:
1. Event Sourcing
- All state changes as immutable events in Kafka
- Replay events to rebuild state
- Audit log for free
2. CQRS (Command Query Responsibility Segregation)
- Commands go through Kafka
- Consumers build read-optimized projections
3. Saga Pattern (distributed transactions)
- OrderService publishes OrderCreated
- PaymentService consumes, publishes PaymentCompleted or PaymentFailed
- OrderService consumes result, updates order status
- No distributed transactions needed
4. Outbox Pattern (avoiding dual-write)
- Write to DB + outbox table in same transaction
- Separate process reads outbox, publishes to Kafka
- Kafka Connect CDC can automate this
উৎপাদন কাফকা টিপস
- বিভাজন: বিভাজন সংখ্যা = সর্বোচ্চ সমান্তরালতা। আরো পার্টিশন = আরো থ্রুপুট কিন্তু আরো ওভারহেড। নিয়ম: 10-100x প্রত্যাশিত ভোক্তা সমান্তরালতা
- ধরে রাখা: স্টেটফুল ডেটার জন্য লগ কমপ্যাকশন ব্যবহার করুন (কি প্রতি সর্বশেষ মান রাখুন)
- ভোক্তা গোষ্ঠী: প্রতিটি মাইক্রোসার্ভিস তার নিজস্ব গ্রুপ পায় — স্বাধীন অগ্রগতি
- স্কিমা রেজিস্ট্রি: স্কিমা বিবর্তনের জন্য Confluent Schema Registry + Avro/Protobuf ব্যবহার করুন
- মনিটরিং: কনজিউমার ল্যাগ হল মূল মেট্রিক — যখন ল্যাগ বাড়ে তখন সতর্কতা
- অন্তত-একবার বনাম ঠিক-একবার: অবিকল-একবার অদম্য ভোক্তাদের + লেনদেন প্রয়োজন
2026 সালে কাফকা হল ইভেন্ট স্ট্রিমিং এবং মাইক্রোসার্ভিসেস যোগাযোগের জন্য শিল্পের মানদণ্ড। confluent-kafka Python লাইব্রেরি দিয়ে শুরু করুন, সমান্তরাল প্রক্রিয়াকরণের জন্য ভোক্তা গোষ্ঠীগুলি ব্যবহার করুন, নির্ভরযোগ্যতার জন্য ম্যানুয়ালি অফসেটগুলি কমিট করুন এবং আপনার প্রাথমিক স্বাস্থ্য মেট্রিক হিসাবে ভোক্তাদের ল্যাগ নিরীক্ষণ করুন৷
🔗 Share this article
✍️ Leave a Comment