🌐 Detecting your location…
📢 Advertisement — Configure AdSense in Appearance → Customize → AdSense Settings

Apache Kafka Complete Guide 2026: Producers, Consumers and Event Streaming

⏱️5 min read  ·  1,034 words

Apache Kafka is the backbone of real-time data infrastructure at Netflix, LinkedIn, Uber, and thousands of companies in 2026. Kafka’s publish-subscribe model, persistent log storage, and horizontal scalability make it the standard for event streaming, microservices communication, and data pipelines. This guide covers everything from first producer to production Kafka architecture.

Kafka Core Concepts

Kafka Architecture:

Producers → Brokers (Kafka cluster) → Consumers
                ↓
          Topic (logical channel)
                ↓
         Partitions (physical)
                ↓
    Replicas (redundancy across brokers)

Key concepts:
- Topic: Named stream of records
- Partition: Ordered, immutable sequence within a topic
- Offset: Position of a record within a partition
- Consumer Group: Multiple consumers sharing partition reads
- Broker: Server in Kafka cluster
- ZooKeeper/KRaft: Cluster coordination (KRaft is now default)

Retention: records kept for 7 days by default (configurable)
Throughput: millions of messages/second per broker

Docker Compose Setup

# compose.yaml — Kafka with KRaft (no ZooKeeper)
services:
  kafka:
    image: apache/kafka:3.7.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qg

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    depends_on:
      - kafka

Python Producer and Consumer

pip install confluent-kafka

# producer.py
from confluent_kafka import Producer
import json, time

producer = Producer({
    "bootstrap.servers": "localhost:9092",
    "acks": "all",                    # wait for all replicas to acknowledge
    "retries": 3,                     # retry on failure
    "retry.backoff.ms": 300,
    "enable.idempotence": True,       # exactly-once semantics
    "compression.type": "snappy",     # compress messages
})

def delivery_report(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

# Produce messages
for i in range(100):
    event = {
        "id": i,
        "type": "user.signup",
        "user_id": 1000 + i,
        "email": f"user{i}@example.com",
        "timestamp": time.time(),
    }
    producer.produce(
        topic="user-events",
        key=str(event["user_id"]),       # partition by user_id
        value=json.dumps(event).encode(),
        callback=delivery_report,
    )
    producer.poll(0)  # trigger delivery callbacks

producer.flush()  # wait for all messages to be delivered
print("All messages produced")

# consumer.py
from confluent_kafka import Consumer, KafkaError
import json

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "user-event-processor",     # consumer group
    "auto.offset.reset": "earliest",        # start from beginning if no committed offset
    "enable.auto.commit": False,            # manual commit for exactly-once
    "max.poll.interval.ms": 300000,         # 5 min max processing time per batch
})

consumer.subscribe(["user-events"])

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            print(f"Error: {msg.error()}")
            break

        # Process message
        event = json.loads(msg.value().decode())
        print(f"Processing: {event['type']} for user {event['user_id']}")

        try:
            process_event(event)
            consumer.commit(msg)  # commit AFTER successful processing
        except Exception as e:
            print(f"Processing failed: {e}")
            # Don't commit — message will be reprocessed

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Topic Configuration

# Create topic with 3 partitions, replication factor 3
kafka-topics.sh --create   --bootstrap-server localhost:9092   --topic user-events   --partitions 3   --replication-factor 3   --config retention.ms=604800000 \  # 7 days
  --config retention.bytes=1073741824  # 1GB per partition

# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092

# Describe topic
kafka-topics.sh --describe --topic user-events --bootstrap-server localhost:9092

# Consume from CLI (debugging)
kafka-console-consumer.sh   --bootstrap-server localhost:9092   --topic user-events   --from-beginning

Stream Processing with Kafka Streams

# Using faust (Kafka Streams for Python)
import faust

app = faust.App("myapp", broker="kafka://localhost:9092")

# Define topics
orders_topic = app.topic("orders", value_type=dict)
processed_topic = app.topic("processed-orders", value_type=dict)
revenue_topic = app.topic("revenue-by-category", value_type=dict)

# Simple stream processor
@app.agent(orders_topic)
async def process_orders(orders):
    async for order in orders:
        # Enrich the order
        order["processed_at"] = time.time()
        order["status"] = "processed"
        await processed_topic.send(value=order)

# Aggregation (windowed)
order_revenue_table = app.Table(
    "order-revenue",
    default=float,
    partitions=1,
).tumbling(60.0)  # 60-second tumbling window

@app.agent(orders_topic)
async def aggregate_revenue(orders):
    async for order in orders.group_by(lambda o: o["category"]):
        # Accumulate revenue per category per 60-second window
        order_revenue_table[order["category"]] += order["total"]
        await revenue_topic.send(
            key=order["category"],
            value={
                "category": order["category"],
                "revenue": order_revenue_table[order["category"]].current()
            }
        )

if __name__ == "__main__":
    app.main()

Kafka Patterns for Microservices

Event-Driven Microservices with Kafka:

1. Event Sourcing
   - All state changes as immutable events in Kafka
   - Replay events to rebuild state
   - Audit log for free

2. CQRS (Command Query Responsibility Segregation)
   - Commands go through Kafka
   - Consumers build read-optimized projections

3. Saga Pattern (distributed transactions)
   - OrderService publishes OrderCreated
   - PaymentService consumes, publishes PaymentCompleted or PaymentFailed
   - OrderService consumes result, updates order status
   - No distributed transactions needed

4. Outbox Pattern (avoiding dual-write)
   - Write to DB + outbox table in same transaction
   - Separate process reads outbox, publishes to Kafka
   - Kafka Connect CDC can automate this

Production Kafka Tips

  • Partitioning: Partition count = max parallelism. More partitions = more throughput but more overhead. Rule: 10-100x expected consumer parallelism
  • Retention: Use log compaction for stateful data (keep latest value per key)
  • Consumer groups: Each microservice gets its own group — independent progress
  • Schema Registry: Use Confluent Schema Registry + Avro/Protobuf for schema evolution
  • Monitoring: Consumer lag is the key metric — alert when lag grows
  • At-least-once vs exactly-once: Exactly-once requires idempotent consumers + transactions

Kafka in 2026 is the industry standard for event streaming and microservices communication at scale. Start with the confluent-kafka Python library, use consumer groups for parallel processing, commit offsets manually for reliability, and monitor consumer lag as your primary health metric.

✍️ Leave a Comment

Your email address will not be published. Required fields are marked *

🌐 Read in:🇬🇧 English🇩🇪 Deutsch🇧🇷 Português🇸🇦 العربية🇮🇳 हिन्दी🇧🇩 বাংলা