🌐 Detecting your location…
📢 Advertisement — Configure AdSense in Appearance → Customize → AdSense Settings

Apache Kafka কমপ্লিট গাইড 2026: প্রযোজক, ভোক্তা এবং ইভেন্ট স্ট্রিমিং

⏱️4 min read  ·  753 words

Apache Kafka হল Netflix, LinkedIn, Uber এবং 2026 সালে হাজার হাজার কোম্পানির রিয়েল-টাইম ডেটা পরিকাঠামোর মেরুদণ্ড। Kafka-এর প্রকাশ-সাবস্ক্রাইব মডেল, ক্রমাগত লগ স্টোরেজ এবং অনুভূমিক স্কেলেবিলিটি এটিকে ইভেন্ট স্ট্রিমিং, মাইক্রোসার্ভিসেস কমিউনিকেশন এবং ডেটা পাইপলাইনের জন্য আদর্শ করে তুলেছে। এই নির্দেশিকাটি প্রথম প্রযোজক থেকে শুরু করে কাফকা আর্কিটেকচার পর্যন্ত সবকিছুই কভার করে।

কাফকা মূল ধারণা

Kafka Architecture:

Producers → Brokers (Kafka cluster) → Consumers
                ↓
          Topic (logical channel)
                ↓
         Partitions (physical)
                ↓
    Replicas (redundancy across brokers)

Key concepts:
- Topic: Named stream of records
- Partition: Ordered, immutable sequence within a topic
- Offset: Position of a record within a partition
- Consumer Group: Multiple consumers sharing partition reads
- Broker: Server in Kafka cluster
- ZooKeeper/KRaft: Cluster coordination (KRaft is now default)

Retention: records kept for 7 days by default (configurable)
Throughput: millions of messages/second per broker

ডকার কম্পোজ সেটআপ

# compose.yaml — Kafka with KRaft (no ZooKeeper)
services:
  kafka:
    image: apache/kafka:3.7.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qg

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    depends_on:
      - kafka

পাইথন প্রযোজক এবং ভোক্তা

pip install confluent-kafka

# producer.py
from confluent_kafka import Producer
import json, time

producer = Producer({
    "bootstrap.servers": "localhost:9092",
    "acks": "all",                    # wait for all replicas to acknowledge
    "retries": 3,                     # retry on failure
    "retry.backoff.ms": 300,
    "enable.idempotence": True,       # exactly-once semantics
    "compression.type": "snappy",     # compress messages
})

def delivery_report(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

# Produce messages
for i in range(100):
    event = {
        "id": i,
        "type": "user.signup",
        "user_id": 1000 + i,
        "email": f"user{i}@example.com",
        "timestamp": time.time(),
    }
    producer.produce(
        topic="user-events",
        key=str(event["user_id"]),       # partition by user_id
        value=json.dumps(event).encode(),
        callback=delivery_report,
    )
    producer.poll(0)  # trigger delivery callbacks

producer.flush()  # wait for all messages to be delivered
print("All messages produced")

# consumer.py
from confluent_kafka import Consumer, KafkaError
import json

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "user-event-processor",     # consumer group
    "auto.offset.reset": "earliest",        # start from beginning if no committed offset
    "enable.auto.commit": False,            # manual commit for exactly-once
    "max.poll.interval.ms": 300000,         # 5 min max processing time per batch
})

consumer.subscribe(["user-events"])

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            print(f"Error: {msg.error()}")
            break

        # Process message
        event = json.loads(msg.value().decode())
        print(f"Processing: {event['type']} for user {event['user_id']}")

        try:
            process_event(event)
            consumer.commit(msg)  # commit AFTER successful processing
        except Exception as e:
            print(f"Processing failed: {e}")
            # Don't commit — message will be reprocessed

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

বিষয় কনফিগারেশন

# Create topic with 3 partitions, replication factor 3
kafka-topics.sh --create   --bootstrap-server localhost:9092   --topic user-events   --partitions 3   --replication-factor 3   --config retention.ms=604800000 \  # 7 days
  --config retention.bytes=1073741824  # 1GB per partition

# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092

# Describe topic
kafka-topics.sh --describe --topic user-events --bootstrap-server localhost:9092

# Consume from CLI (debugging)
kafka-console-consumer.sh   --bootstrap-server localhost:9092   --topic user-events   --from-beginning

কাফকা স্ট্রিমগুলির সাথে স্ট্রিম প্রক্রিয়াকরণ

# Using faust (Kafka Streams for Python)
import faust

app = faust.App("myapp", broker="kafka://localhost:9092")

# Define topics
orders_topic = app.topic("orders", value_type=dict)
processed_topic = app.topic("processed-orders", value_type=dict)
revenue_topic = app.topic("revenue-by-category", value_type=dict)

# Simple stream processor
@app.agent(orders_topic)
async def process_orders(orders):
    async for order in orders:
        # Enrich the order
        order["processed_at"] = time.time()
        order["status"] = "processed"
        await processed_topic.send(value=order)

# Aggregation (windowed)
order_revenue_table = app.Table(
    "order-revenue",
    default=float,
    partitions=1,
).tumbling(60.0)  # 60-second tumbling window

@app.agent(orders_topic)
async def aggregate_revenue(orders):
    async for order in orders.group_by(lambda o: o["category"]):
        # Accumulate revenue per category per 60-second window
        order_revenue_table[order["category"]] += order["total"]
        await revenue_topic.send(
            key=order["category"],
            value={
                "category": order["category"],
                "revenue": order_revenue_table[order["category"]].current()
            }
        )

if __name__ == "__main__":
    app.main()

মাইক্রোসার্ভিসের জন্য কাফকা প্যাটার্নস

Event-Driven Microservices with Kafka:

1. Event Sourcing
   - All state changes as immutable events in Kafka
   - Replay events to rebuild state
   - Audit log for free

2. CQRS (Command Query Responsibility Segregation)
   - Commands go through Kafka
   - Consumers build read-optimized projections

3. Saga Pattern (distributed transactions)
   - OrderService publishes OrderCreated
   - PaymentService consumes, publishes PaymentCompleted or PaymentFailed
   - OrderService consumes result, updates order status
   - No distributed transactions needed

4. Outbox Pattern (avoiding dual-write)
   - Write to DB + outbox table in same transaction
   - Separate process reads outbox, publishes to Kafka
   - Kafka Connect CDC can automate this

উৎপাদন কাফকা টিপস

  • বিভাজন: বিভাজন সংখ্যা = সর্বোচ্চ সমান্তরালতা। আরো পার্টিশন = আরো থ্রুপুট কিন্তু আরো ওভারহেড। নিয়ম: 10-100x প্রত্যাশিত ভোক্তা সমান্তরালতা
  • ধরে রাখা: স্টেটফুল ডেটার জন্য লগ কমপ্যাকশন ব্যবহার করুন (কি প্রতি সর্বশেষ মান রাখুন)
  • ভোক্তা গোষ্ঠী: প্রতিটি মাইক্রোসার্ভিস তার নিজস্ব গ্রুপ পায় — স্বাধীন অগ্রগতি
  • স্কিমা রেজিস্ট্রি: স্কিমা বিবর্তনের জন্য Confluent Schema Registry + Avro/Protobuf ব্যবহার করুন
  • মনিটরিং: কনজিউমার ল্যাগ হল মূল মেট্রিক — যখন ল্যাগ বাড়ে তখন সতর্কতা
  • অন্তত-একবার বনাম ঠিক-একবার: অবিকল-একবার অদম্য ভোক্তাদের + লেনদেন প্রয়োজন

2026 সালে কাফকা হল ইভেন্ট স্ট্রিমিং এবং মাইক্রোসার্ভিসেস যোগাযোগের জন্য শিল্পের মানদণ্ড। confluent-kafka Python লাইব্রেরি দিয়ে শুরু করুন, সমান্তরাল প্রক্রিয়াকরণের জন্য ভোক্তা গোষ্ঠীগুলি ব্যবহার করুন, নির্ভরযোগ্যতার জন্য ম্যানুয়ালি অফসেটগুলি কমিট করুন এবং আপনার প্রাথমিক স্বাস্থ্য মেট্রিক হিসাবে ভোক্তাদের ল্যাগ নিরীক্ষণ করুন৷

✍️ Leave a Comment

Your email address will not be published. Required fields are marked *

🌐 Read in:🇬🇧 English🇩🇪 Deutsch🇧🇷 Português🇸🇦 العربية🇮🇳 हिन्दी🇧🇩 বাংলা