🌐 Detecting your location…
📢 Advertisement — Configure AdSense in Appearance → Customize → AdSense Settings

Guia completo do Apache Kafka 2026: produtores, consumidores e streaming de eventos

⏱️5 min read  ·  996 words

Apache Kafka é a espinha dorsal da infraestrutura de dados em tempo real na Netflix, LinkedIn, Uber e milhares de empresas em 2026. O modelo de publicação-assinatura do Kafka, armazenamento persistente de logs e escalabilidade horizontal tornam-no o padrão para streaming de eventos, comunicação de microsserviços e pipelines de dados. Este guia cobre tudo, desde o primeiro produtor até a arquitetura Kafka de produção.

Conceitos básicos de Kafka

Kafka Architecture:

Producers → Brokers (Kafka cluster) → Consumers
                ↓
          Topic (logical channel)
                ↓
         Partitions (physical)
                ↓
    Replicas (redundancy across brokers)

Key concepts:
- Topic: Named stream of records
- Partition: Ordered, immutable sequence within a topic
- Offset: Position of a record within a partition
- Consumer Group: Multiple consumers sharing partition reads
- Broker: Server in Kafka cluster
- ZooKeeper/KRaft: Cluster coordination (KRaft is now default)

Retention: records kept for 7 days by default (configurable)
Throughput: millions of messages/second per broker

Configuração de composição do Docker

# compose.yaml — Kafka with KRaft (no ZooKeeper)
services:
  kafka:
    image: apache/kafka:3.7.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qg

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    depends_on:
      - kafka

Produtor e Consumidor Python

pip install confluent-kafka

# producer.py
from confluent_kafka import Producer
import json, time

producer = Producer({
    "bootstrap.servers": "localhost:9092",
    "acks": "all",                    # wait for all replicas to acknowledge
    "retries": 3,                     # retry on failure
    "retry.backoff.ms": 300,
    "enable.idempotence": True,       # exactly-once semantics
    "compression.type": "snappy",     # compress messages
})

def delivery_report(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

# Produce messages
for i in range(100):
    event = {
        "id": i,
        "type": "user.signup",
        "user_id": 1000 + i,
        "email": f"user{i}@example.com",
        "timestamp": time.time(),
    }
    producer.produce(
        topic="user-events",
        key=str(event["user_id"]),       # partition by user_id
        value=json.dumps(event).encode(),
        callback=delivery_report,
    )
    producer.poll(0)  # trigger delivery callbacks

producer.flush()  # wait for all messages to be delivered
print("All messages produced")

# consumer.py
from confluent_kafka import Consumer, KafkaError
import json

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "user-event-processor",     # consumer group
    "auto.offset.reset": "earliest",        # start from beginning if no committed offset
    "enable.auto.commit": False,            # manual commit for exactly-once
    "max.poll.interval.ms": 300000,         # 5 min max processing time per batch
})

consumer.subscribe(["user-events"])

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            print(f"Error: {msg.error()}")
            break

        # Process message
        event = json.loads(msg.value().decode())
        print(f"Processing: {event['type']} for user {event['user_id']}")

        try:
            process_event(event)
            consumer.commit(msg)  # commit AFTER successful processing
        except Exception as e:
            print(f"Processing failed: {e}")
            # Don't commit — message will be reprocessed

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Configuração do tópico

# Create topic with 3 partitions, replication factor 3
kafka-topics.sh --create   --bootstrap-server localhost:9092   --topic user-events   --partitions 3   --replication-factor 3   --config retention.ms=604800000 \  # 7 days
  --config retention.bytes=1073741824  # 1GB per partition

# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092

# Describe topic
kafka-topics.sh --describe --topic user-events --bootstrap-server localhost:9092

# Consume from CLI (debugging)
kafka-console-consumer.sh   --bootstrap-server localhost:9092   --topic user-events   --from-beginning

Processamento de fluxo com fluxos Kafka

# Using faust (Kafka Streams for Python)
import faust

app = faust.App("myapp", broker="kafka://localhost:9092")

# Define topics
orders_topic = app.topic("orders", value_type=dict)
processed_topic = app.topic("processed-orders", value_type=dict)
revenue_topic = app.topic("revenue-by-category", value_type=dict)

# Simple stream processor
@app.agent(orders_topic)
async def process_orders(orders):
    async for order in orders:
        # Enrich the order
        order["processed_at"] = time.time()
        order["status"] = "processed"
        await processed_topic.send(value=order)

# Aggregation (windowed)
order_revenue_table = app.Table(
    "order-revenue",
    default=float,
    partitions=1,
).tumbling(60.0)  # 60-second tumbling window

@app.agent(orders_topic)
async def aggregate_revenue(orders):
    async for order in orders.group_by(lambda o: o["category"]):
        # Accumulate revenue per category per 60-second window
        order_revenue_table[order["category"]] += order["total"]
        await revenue_topic.send(
            key=order["category"],
            value={
                "category": order["category"],
                "revenue": order_revenue_table[order["category"]].current()
            }
        )

if __name__ == "__main__":
    app.main()

Padrões Kafka para microsserviços

Event-Driven Microservices with Kafka:

1. Event Sourcing
   - All state changes as immutable events in Kafka
   - Replay events to rebuild state
   - Audit log for free

2. CQRS (Command Query Responsibility Segregation)
   - Commands go through Kafka
   - Consumers build read-optimized projections

3. Saga Pattern (distributed transactions)
   - OrderService publishes OrderCreated
   - PaymentService consumes, publishes PaymentCompleted or PaymentFailed
   - OrderService consumes result, updates order status
   - No distributed transactions needed

4. Outbox Pattern (avoiding dual-write)
   - Write to DB + outbox table in same transaction
   - Separate process reads outbox, publishes to Kafka
   - Kafka Connect CDC can automate this

Dicas de produção Kafka

  • Particionamento: Contagem de partições = paralelismo máximo. More partitions = more throughput but more overhead. Regra: paralelismo esperado do consumidor de 10 a 100x
  • Retenção: Use compactação de log para dados com estado (mantenha o valor mais recente por chave)
  • Grupos de consumidores: Cada microsserviço obtém seu próprio grupo — progresso independente
  • Registro de esquema: Use Confluent Schema Registry + Avro/Protobuf para evolução de esquema
  • Monitoramento: O atraso do consumidor é a métrica principal – alerta quando o atraso aumenta
  • Pelo menos uma vez vs exatamente uma vez: Exatamente uma vez requer consumidores idempotentes + transações

Kafka em 2026 é o padrão da indústria para streaming de eventos e comunicação de microsserviços em escala. Comece com a biblioteca Python confluent-kafka, use grupos de consumidores para processamento paralelo, confirme compensações manualmente para obter confiabilidade e monitore o atraso do consumidor como sua principal métrica de saúde.

✍️ Leave a Comment

Your email address will not be published. Required fields are marked *

🌐 Read in:🇬🇧 English🇩🇪 Deutsch🇧🇷 Português🇸🇦 العربية🇮🇳 हिन्दी🇧🇩 বাংলা