TechLead
Lesson 6 of 22
7 min read
Data Engineering

Kafka Producers and Consumers

Build robust Kafka producers and consumers in Python, handle serialization, consumer groups, offset management, and error handling

Working with Kafka Producers

A producer is a client application that publishes events to Kafka topics. Producers are responsible for choosing which partition to send each event to, serializing data into bytes, handling delivery confirmations, and managing retries on failure. A well-configured producer is essential for reliable, high-throughput event streaming.

Kafka producers use an asynchronous, batching model internally. When you call produce(), the event is added to an internal buffer. The producer batches multiple events together and sends them to brokers in a single network request for efficiency. You control this behavior through configuration parameters like batch.size, linger.ms, and buffer.memory.

Producer Configuration

  • acks: Controls durability — acks=all waits for all in-sync replicas to confirm, acks=1 waits for leader only, acks=0 is fire-and-forget
  • retries: Number of times to retry on transient failures (default is very high in modern clients). Combined with delivery.timeout.ms for total retry window.
  • batch.size: Maximum bytes to batch before sending (default 16KB). Larger batches improve throughput at the cost of latency.
  • linger.ms: How long to wait for more events before sending a batch (default 0). Setting to 5-10ms significantly improves throughput.
  • compression.type: Compress batches with gzip, snappy, lz4, or zstd to reduce network and storage usage.
  • enable.idempotence: Ensures exactly-once delivery by deduplicating retried events (enabled by default in recent versions).
from confluent_kafka import Producer, KafkaError
import json
import time

# Production-grade producer configuration
producer_config = {
    'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
    'acks': 'all',                    # Wait for all ISR replicas
    'enable.idempotence': True,       # Exactly-once semantics
    'retries': 2147483647,            # Retry indefinitely
    'delivery.timeout.ms': 120000,    # 2 minute total timeout
    'batch.size': 65536,              # 64KB batches
    'linger.ms': 10,                  # Wait 10ms for batching
    'compression.type': 'lz4',       # Fast compression
    'max.in.flight.requests.per.connection': 5,
}

producer = Producer(producer_config)

def delivery_callback(err, msg):
    """Called once per message to confirm delivery or report failure."""
    if err is not None:
        print(f"DELIVERY FAILED for {msg.key()}: {err}")
        # In production: send to dead-letter queue or alert
    else:
        print(f"Delivered to {msg.topic()}[{msg.partition()}] @ offset {msg.offset()}")

def produce_order_event(order: dict):
    """Produce an order event with proper serialization and error handling."""
    try:
        producer.produce(
            topic="orders",
            key=str(order["customer_id"]).encode("utf-8"),
            value=json.dumps(order).encode("utf-8"),
            callback=delivery_callback,
            headers={
                "event_type": b"order.created",
                "source": b"order-service",
                "timestamp": str(int(time.time())).encode("utf-8"),
            },
        )
        # Trigger delivery of buffered messages (non-blocking)
        producer.poll(0)
    except BufferError:
        print("Producer buffer full — waiting for delivery...")
        producer.flush(timeout=30)
        # Retry the produce
        producer.produce(
            topic="orders",
            key=str(order["customer_id"]).encode("utf-8"),
            value=json.dumps(order).encode("utf-8"),
            callback=delivery_callback,
        )

# Produce multiple events
orders = [
    {"order_id": 1001, "customer_id": 501, "amount": 29.99, "product": "Widget A"},
    {"order_id": 1002, "customer_id": 502, "amount": 49.99, "product": "Widget B"},
    {"order_id": 1003, "customer_id": 501, "amount": 19.99, "product": "Widget C"},
]

for order in orders:
    produce_order_event(order)

# Flush remaining messages before exit
producer.flush(timeout=30)
print("All messages delivered")

Working with Kafka Consumers

A consumer subscribes to one or more topics and processes events. Consumers are organized into consumer groups — a group of consumers that cooperatively process a topic. Kafka assigns each partition to exactly one consumer in the group, enabling parallel processing. If a consumer fails, its partitions are automatically reassigned to other consumers in the group (rebalancing).

The most critical aspect of consumer design is offset management. The offset tracks which events have been processed. If a consumer crashes and restarts, it resumes from its last committed offset. Getting offset management wrong leads to either duplicate processing (at-least-once) or missed events (at-most-once).

from confluent_kafka import Consumer, KafkaError, TopicPartition
import json
import signal

# Production-grade consumer configuration
consumer_config = {
    'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest',   # Start from beginning if no committed offset
    'enable.auto.commit': False,        # Manual offset commits for reliability
    'max.poll.interval.ms': 300000,     # 5 minutes max processing time per poll
    'session.timeout.ms': 45000,        # 45 second heartbeat timeout
    'fetch.min.bytes': 1024,            # Wait for 1KB before returning
    'fetch.max.wait.ms': 500,           # Max wait time for fetch.min.bytes
    'max.partition.fetch.bytes': 1048576,  # 1MB per partition per fetch
}

consumer = Consumer(consumer_config)
consumer.subscribe(['orders'])

running = True

def shutdown(signum, frame):
    """Graceful shutdown on SIGINT/SIGTERM."""
    global running
    print("Shutting down consumer...")
    running = False

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

def process_order(order: dict):
    """Process an order event with business logic."""
    print(f"Processing order {order['order_id']} "
          f"for customer {order['customer_id']} "
          f"amount: {order['amount']:.2f}")
    # Business logic: update inventory, send confirmation, etc.

print("Consumer started. Waiting for messages...")
try:
    batch = []
    BATCH_SIZE = 100

    while running:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            # No message available, commit any pending batch
            if batch:
                # Process and commit the batch
                for order in batch:
                    process_order(order)
                consumer.commit(asynchronous=False)
                print(f"Committed batch of {len(batch)} messages")
                batch = []
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition — not an error
                continue
            print(f"Consumer error: {msg.error()}")
            continue

        # Deserialize and add to batch
        order = json.loads(msg.value().decode("utf-8"))
        batch.append(order)

        # Process batch when full
        if len(batch) >= BATCH_SIZE:
            for order in batch:
                process_order(order)
            consumer.commit(asynchronous=False)
            print(f"Committed batch of {len(batch)} messages")
            batch = []

finally:
    # Process any remaining messages in the batch
    if batch:
        for order in batch:
            process_order(order)
        consumer.commit(asynchronous=False)

    consumer.close()
    print("Consumer closed cleanly")

Consumer Groups and Rebalancing

When consumers join or leave a group, Kafka triggers a rebalance — redistributing partitions among the remaining consumers. During rebalancing, no messages are processed, causing a brief pause. Understanding rebalancing is crucial for designing resilient consumer applications.

Consumer Group Rules

  • One Partition per Consumer: Within a group, each partition is assigned to exactly one consumer. One consumer can handle multiple partitions.
  • Max Parallelism = Partitions: If you have 6 partitions, at most 6 consumers in the group can actively consume. Extra consumers sit idle.
  • Multiple Groups: Different consumer groups consume the same topic independently. Each group maintains its own offsets.
  • Cooperative Rebalancing: Modern Kafka supports incremental cooperative rebalancing, which only moves affected partitions instead of revoking all assignments.

Serialization with Avro and Schema Registry

In production, you should use a schema-based serialization format like Avro or Protobuf instead of plain JSON. The Confluent Schema Registry provides schema versioning, compatibility checks, and efficient binary serialization.

from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer

# Schema Registry configuration
schema_registry = SchemaRegistryClient({
    'url': 'http://schema-registry:8081'
})

# Define Avro schema for orders
order_schema_str = """
{
    "type": "record",
    "name": "Order",
    "namespace": "com.example.orders",
    "fields": [
        {"name": "order_id", "type": "long"},
        {"name": "customer_id", "type": "long"},
        {"name": "amount", "type": "double"},
        {"name": "product", "type": "string"},
        {"name": "status", "type": {"type": "enum", "name": "Status",
            "symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED"]}},
        {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
    ]
}
"""

# Avro producer — schemas are automatically registered and validated
avro_producer = SerializingProducer({
    'bootstrap.servers': 'kafka:9092',
    'key.serializer': lambda k, ctx: k.encode('utf-8'),
    'value.serializer': AvroSerializer(schema_registry, order_schema_str),
})

# Produce with Avro serialization (smaller, faster, schema-validated)
avro_producer.produce(
    topic="orders-avro",
    key="customer-501",
    value={
        "order_id": 1001,
        "customer_id": 501,
        "amount": 29.99,
        "product": "Widget A",
        "status": "PENDING",
        "created_at": 1710500000000,
    }
)
avro_producer.flush()

Error Handling and Dead Letter Queues

Production consumers must handle errors gracefully. Events that cannot be processed (malformed data, schema violations, business logic errors) should be sent to a dead letter queue (DLQ) rather than blocking the entire pipeline or being silently dropped.

def process_with_dlq(consumer, dlq_producer, max_retries=3):
    """Process messages with dead letter queue for failures."""
    while running:
        msg = consumer.poll(timeout=1.0)
        if msg is None or msg.error():
            continue

        retries = 0
        while retries < max_retries:
            try:
                event = json.loads(msg.value().decode())
                process_order(event)
                break  # Success
            except json.JSONDecodeError:
                # Malformed JSON — send to DLQ immediately
                dlq_producer.produce(
                    topic="orders.dlq",
                    key=msg.key(),
                    value=msg.value(),
                    headers=[
                        ("error", b"json_decode_error"),
                        ("original_topic", b"orders"),
                    ],
                )
                dlq_producer.flush()
                break
            except Exception as e:
                retries += 1
                if retries >= max_retries:
                    dlq_producer.produce(
                        topic="orders.dlq",
                        key=msg.key(),
                        value=msg.value(),
                        headers=[
                            ("error", str(e).encode()),
                            ("retries", str(retries).encode()),
                            ("original_topic", b"orders"),
                        ],
                    )
                    dlq_producer.flush()
                    print(f"Sent to DLQ after {retries} retries: {e}")
                else:
                    time.sleep(2 ** retries)  # Exponential backoff

        consumer.commit(asynchronous=False)

Key Takeaways

  • Configure producers with acks=all and idempotence enabled for reliable delivery
  • Use manual offset commits in consumers to prevent data loss on crashes
  • Consumer groups enable parallel processing — max parallelism equals the number of partitions
  • Use Avro or Protobuf with Schema Registry instead of JSON for production workloads
  • Implement dead letter queues for events that fail processing after retries
  • Design consumers for graceful shutdown to avoid duplicate processing during rebalances

Continue Learning