Apache Kafka Guide
Distributed Event Streaming Platform
Overview
Apache Kafka is a distributed event streaming platform capable of handling trillions of events per day. It is the de facto standard for event streaming, proven at extreme scale by companies like LinkedIn, Uber, Netflix, and thousands of others.
Core Architecture
Key Concepts
| Concept | Description |
|---|---|
| Producer | Application that sends events to Kafka |
| Consumer | Application that reads events from Kafka |
| Topic | Logical channel for events |
| Partition | Unit of parallelism in a topic |
| Broker | Kafka server |
| Consumer Group | Group of consumers that coordinate consumption |
| Offset | Position in partition for a consumer |
Producer Patterns
Producer Configuration
from kafka import KafkaProducerimport json
# Configure producer for reliabilityproducer = KafkaProducer( bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'], # Delivery semantics acks='all', # Wait for all replicas # Compression compression_type='snappy', # Batching linger_ms=10, # Wait up to 10ms to batch batch_size=32768, # 32KB batches # Reliability enable_idempotence=True, retries=3, max_in_flight_requests=5)
# Send messagedef send_event(topic: str, key: str, value: dict): producer.send( topic, key=key.encode('utf-8'), value=json.dumps(value).encode('utf-8') )
# Flush on shutdownproducer.flush()Exactly-Once Semantics
# Enable idempotence for exactly-onceproducer = KafkaProducer( bootstrap_servers=['kafka1:9092'], enable_idempotence=True, acks='all')
# With idempotence, retries are safe# If producer fails after send but before ack,# retry will not create duplicatesConsumer Patterns
Consumer Group Configuration
from kafka import KafkaConsumer
# Configure consumerconsumer = KafkaConsumer( bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'], group_id='my_consumer_group', auto_offset_reset='earliest', # or 'latest' enable_auto_commit=True, auto_commit_interval_ms=1000, # Performance tuning fetch_min_bytes=1024, fetch_max_wait_ms=500, max_poll_records=500)
# Subscribe to topicconsumer.subscribe(['my_topic'])
# Consume messagesfor message in consumer: event = json.loads(message.value.decode('utf-8')) process_event(event) consumer.commit()Manual Offset Control
# Disable auto-commit for manual controlconsumer = KafkaConsumer( bootstrap_servers=['kafka1:9092'], group_id='manual_control_group', enable_auto_commit=False, # Manual commits auto_offset_reset='earliest')
# Process and commit manuallyfor message in consumer: event = json.loads(message.value.decode('utf-8')) if process_event(event): # Commit only if successful consumer.commit()Kafka Configuration
Broker Configuration
# Cluster identificationbroker.id=1listeners=PLAINTEXT://kafka1:9092advertised.listeners=PLAINTEXT://kafka1:9092zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
# Log configurationlog.dirs=/var/kafka/datanum.partitions=3default.replication.factor=3
# Performancenum.network.threads=8num.io.threads=16socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400
# Replicationreplica.fetch.max.bytes=10485760replica.fetch.max.wait.ms=500Topic Configuration
# Create topic with specific configurationkafka-topics.sh --create \ --topic events \ --partitions 10 \ --replication-factor 3 \ --config max.message.bytes=10485760 \ --config retention.ms=604800000 # 7 days
# Alter topic configurationkafka-configs.sh \ --bootstrap-server localhost:9092 \ --entity-type topics \ --alter \ --topic events \ --config retention.ms=2592000000 # 30 daysPerformance Tuning
Producer Tuning
# High-throughput producerproducer = KafkaProducer( bootstrap_servers=['kafka1:9092'], # Batching linger_ms=20, # More batching batch_size=65536, # 64KB batches # Compression compression_type='lz4', # Better than snappy # Buffering buffer_memory=67108864, # 64MB buffer # Acknowledgment acks=1, # Leader only (faster, less durable) max_in_flight_requests=5)Consumer Tuning
# High-throughput consumerconsumer = KafkaConsumer( bootstrap_servers=['kafka1:9092'], group_id='high_throughput_group', # Fetch optimization fetch_min_bytes=102400, # 100KB min fetch fetch_max_bytes=10485760, # 10MB max fetch_max_wait_ms=100, # Session timeout session_timeout_ms=30000, heartbeat_interval_ms=3000, # Max poll records max_poll_records=1000)Monitoring
Critical Metrics
kafka_metrics: broker_metrics: - name: "Under-replicated partitions" metric: kafka.controller.UnderReplicatedPartitions alert: "If > 0"
- name: "Offline partitions count" metric: kafka.controller.OfflinePartitionsCount alert: "If > 0"
- name: "Request latency" metric: kafka.network.RequestAvgIdlePercent alert: "If > 80% (underutilized)"
producer_metrics: - name: "Producer retry rate" metric: producer.retry.rate alert: "If > 0.05"
- name: "Request latency" metric: producer.request-latency-avg alert: "If > 100ms"
consumer_metrics: - name: "Consumer lag" metric: consumer.lag alert: "If > 100000"
- name: "Consumer latency" metric: consumer.latency-avg alert: "If > 1000ms"Cost Optimization
Cost Optimization Strategies
kafka_cost_optimization: storage: - "Use appropriate retention (don't keep forever)" - "Compact old topics" - "Delete test data promptly"
compute: - "Use spot instances for consumers" - "Right-size cluster (avoid over-provisioning)"
network: - "Colocate producers and consumers" - "Use compression (snappy, lz4, zstd)" - "Batch messages to reduce network calls"Security
SSL/SASL Configuration
# Secure producerfrom kafka import KafkaProducer
producer = KafkaProducer( bootstrap_servers=['kafka1:9093'], # SSL port security_protocol='SSL', ssl_cafile='/path/to/ca.crt', ssl_certfile='/path/to/client.crt', ssl_keyfile='/path/to/client.key', sasl_mechanism='PLAIN', sasl_plain_username='producer_user', sasl_plain_password='producer_pass')ACL (Access Control Lists)
# Add ACL for producerkafka-acls.sh \ --authorizer kafka.security.auth.SimpleAclAuthorizer \ --add \ --allow-principal User:producer \ --operation Write \ --topic events
# Add ACL for consumerkafka-acls.sh \ --authorizer kafka.security.auth.SimpleAclAuthorizer \ --add \ --allow-principal User:consumer \ --operation Read \ --topic eventsKey Takeaways
- Proven at scale: Handles trillions of events per day
- Exactly-once: Enable with enable_idempotence
- Performance tuning: Batch size, linger, compression
- Replication: Factor of 3 for durability
- Monitoring: Track lag, under-replicated partitions
- Security: SSL/SASL for production
- Cost: Spot instances for consumers, collocate components
- Consumer groups: Enable parallel processing
Next: Apache Pulsar Guide