Case Study - AdTech Clickstream Processing
Petabyte-Scale Event Processing Pipeline
Business Context
Company: AdTech platform (real-time bidding and analytics)
Challenge: Process billions of clickstream events per day for real-time bidding, fraud detection, and analytics.
Scale: 1PB/day, 10M events/second peak, 100B+ events/day
Requirements
Architecture Design
Technology Selection
Component Selection Rationale
| Component | Technology | Rationale |
|---|---|---|
| Event Bus | Kafka (multi-region) | Proven at PB scale |
| Stream Processing | Spark Structured Streaming | Mature, exactly-once |
| Real-Time Analytics | Flink | Low latency, state management |
| Data Lake | S3 + Delta Lake | ACID, time travel |
| OLAP | ClickHouse | Fast ingest, compression |
| Warehouse | Snowflake | Separation of compute/storage |
| Orchestration | Airflow | Fault tolerance |
Event Ingestion
Kafka Cluster Configuration
# Kafka cluster configuration (Terraform)resource "aws_msk_cluster" "clickstream_kafka" { cluster_name = "clickstream-kafka" kafka_version = "3.5.0" number_of_broker_nodes = 30
broker_node_group_info { instance_type = "kafka.m5.4xlarge" client_subnets = module.vpc.private_subnets security_groups = [security_group.kafka.id]
storage_info { ebs_storage_volume { volume_size = 1000 # 1TB per broker provisioned_throughput { enabled = true volume_throughput = 250 } } }
# Spot instances for cost savings broker_az_distribution = "DEFAULT" }
configuration_info { arn = aws_msk_configuration.clickstream.arn revision = aws_msk_configuration.clickstream.latest_revision }
# Encryption at rest and in transit encryption_info { encryption_at_rest_kms_key_arn = aws_kms_key.kafka.arn encryption_in_transit { client_broker = "TLS_PLAINTEXT" in_cluster = true } }
# Monitoring enhanced_monitoring = "PER_BROKER" open_monitoring { prometheus_jmx_exporter_enabled = true }
logging_info { broker_logs { cloudwatch_logs { enabled = true log_group = aws_cloudwatch_log_group.kafka.name } } }
tags = { Environment = "production" Service = "clickstream" CostCenter = "adtech" }}
# Kafka configurationresource "aws_msk_configuration" "clickstream" { kafka_versions = ["3.5.0"] name = "clickstream-config"
server_properties = <<PROPERTIES# Topic configurationdefault.replication.factor=3num.partitions=300min.insync.replicas=2
# Retentionlog.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000
# Performancenum.network.threads=16num.io.threads=16socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600
# Compressioncompression.type=zstd
# Producerproducer.batch.size=32768producer.linger.ms=10producer.acks=all
# Consumerconsumer.fetch.min.bytes=1024consumer.fetch.max.wait.ms=500PROPERTIES}High-Throughput Producer
# High-throughput Kafka producerfrom confluent_kafka import Producerimport jsonimport timefrom threading import Threadfrom queue import Queue
class ClickstreamProducer: """High-throughput clickstream producer"""
def __init__(self, bootstrap_servers: str): self.producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'client.id': 'clickstream-producer', 'queue.buffering.max.messages': 1000000, 'queue.buffering.max.kbytes': 4000000, 'queue.buffering.max.ms': 100, 'batch.num.messages': 10000, 'compression.codec': 'zstd', 'acks': 'all', 'enable.idempotence': True, 'max.in.flight.requests.per.connection': 5, 'linger.ms': 10, 'batch.size': 32768, 'retries': 3, 'delivery.timeout.ms': 120000, }) self.queue = Queue(maxsize=100000)
def delivery_report(self, err, msg): """Delivery report callback""" if err is not None: print(f'Delivery failed: {err}') else: # Successful delivery pass
def send_event(self, event: dict): """Send clickstream event""" try: # Serialize to JSON value = json.dumps(event).encode('utf-8')
# Produce to Kafka self.producer.produce( topic='clickstream-events', key=event['user_id'].encode('utf-8'), value=value, timestamp=int(event['timestamp'] * 1000), on_delivery=self.delivery_report )
# Poll for delivery reports self.producer.poll(0)
except BufferError: # Queue is full, backpressure print('Queue full, waiting...') self.producer.flush() self.send_event(event)
def flush(self): """Flush pending messages""" self.producer.flush()
# Batch producer for efficiencyclass BatchClickstreamProducer: """Batch producer for higher throughput"""
def __init__(self, bootstrap_servers: str, batch_size: int = 10000): self.producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'compression.codec': 'zstd', 'batch.num.messages': batch_size, 'linger.ms': 100, 'acks': 1, # Faster, accept some data loss }) self.batch = [] self.batch_size = batch_size self.lock = threading.Lock()
def add_event(self, event: dict): """Add event to batch""" with self.lock: self.batch.append(event)
if len(self.batch) >= self.batch_size: self.flush_batch()
def flush_batch(self): """Flush batch to Kafka""" for event in self.batch: value = json.dumps(event).encode('utf-8') self.producer.produce( topic='clickstream-events', value=value, callback=self.delivery_report )
self.batch = [] self.producer.poll(0)
def delivery_report(self, err, msg): """Silent delivery report""" pass
# Usageproducer = BatchClickstreamProducer('kafka-broker1:9092,kafka-broker2:9092')
# Send events in a loopfor event in clickstream_events: producer.add_event(event)
# Flush remainingproducer.flush_batch()producer.producer.flush()Stream Processing
Real-Time Bidding
# Spark Structured Streaming for real-time biddingfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *from pyspark.sql.types import *
# Initialize Sparkspark = SparkSession.builder \ .appName("RealTimeBidding") \ .config("spark.sql.streaming.checkpointLocation", "s3://checkpoints/bidding") \ .config("spark.sql.shuffle.partitions", "400") \ .getOrCreate()
# Define schemaclickstream_schema = StructType([ StructField("event_id", StringType(), False), StructField("user_id", StringType(), False), StructField("session_id", StringType(), False), StructField("ad_id", StringType()), StructField("publisher_id", StringType(), False), StructField("timestamp", TimestampType(), False), StructField("url", StringType()), StructField("user_agent", StringType()), StructField("ip", StringType()), StructField("device_type", StringType()), StructField("country", StringType()),])
# Read from Kafkaclickstream_raw = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092") \ .option("subscribe", "clickstream-events") \ .option("startingOffsets", "latest") \ .option("failOnDataLoss", "false") \ .load()
# Parse JSONclickstream = clickstream_raw \ .select(from_json(col("value").cast("string"), clickstream_schema).alias("data")) \ .select("data.*")
# Watermark for late dataclickstream_with_watermark = clickstream \ .withWatermark("timestamp", "30 seconds")
# Join with user profiles (static/batch)user_profiles = spark.read.format("parquet") \ .load("s3://data/user_profiles/")
enriched = clickstream_with_watermark \ .join(user_profiles, "user_id", "left")
# Join with ad inventory (stream-stream)ad_inventory_schema = StructType([ StructField("ad_id", StringType(), False), StructField("publisher_id", StringType(), False), StructField("bid_floor", DoubleType(), False), StructField("category", StringType()), StructField("timestamp", TimestampType(), False),])
ad_inventory = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka-broker1:9092") \ .option("subscribe", "ad-inventory") \ .load() \ .select(from_json(col("value").cast("string"), ad_inventory_schema).alias("data")) \ .select("data.*") \ .withWatermark("timestamp", "30 seconds")
# Stream-stream joinbidding_opportunities = enriched \ .join( ad_inventory, ["publisher_id", "timestamp"], "inner" )
# Apply bidding logic@udf(returnType=DoubleType())def calculate_bid( user_value_score: float, ad_relevance_score: float, bid_floor: float) -> float: """Calculate bid price""" base_bid = user_value_score * ad_relevance_score * 10 return max(base_bid, bid_floor)
bidding_decisions = bidding_opportunities \ .withColumn("bid_amount", calculate_bid( col("user_value_score"), col("ad_relevance_score"), col("bid_floor") )) \ .withColumn("decision_timestamp", current_timestamp())
# Write bid decisions back to Kafkaquery = bidding_decisions \ .select( to_json(struct( col("event_id"), col("user_id"), col("ad_id"), col("bid_amount"), col("decision_timestamp") )).alias("value") ) \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka-broker1:9092") \ .option("topic", "bid-decisions") \ .option("checkpointLocation", "s3://checkpoints/bidding") \ .outputMode("append") \ .trigger(processingTime='100 milliseconds') \ .start()
query.awaitTermination()Real-Time Analytics Aggregation
# Real-time analytics aggregationfrom pyspark.sql.functions import *
# Real-time metricsreal_time_metrics = clickstream \ .withWatermark("timestamp", "5 minutes") \ .groupBy( window("timestamp", "1 minute", "30 seconds"), "publisher_id", "ad_id", "country", "device_type" ) \ .agg( count("*").alias("impressions"), count(when(col("event_type") == "click", True)).alias("clicks"), count(when(col("event_type") == "conversion", True)).alias("conversions"), avg("bid_amount").alias("avg_bid"), sum("bid_amount").alias("total_spend") ) \ .withColumn("ctr", col("clicks") / col("impressions")) \ .withColumn("cvr", col("conversions") / col("clicks"))
# Write to ClickHouse (real-time analytics)def write_to_clickhouse(batch_df, batch_id): """Write batch to ClickHouse"""
# JDBC connection jdbc_url = "jdbc:clickhouse://clickhouse-cluster:8123/analytics"
properties = { "driver": "com.clickhouse.jdbc.ClickHouseDriver", "user": "clickstream_user", "password": "password" }
# Write batch batch_df.write \ .mode("append") \ .jdbc(jdbc_url, "real_time_metrics", properties=properties)
# Write streamquery = real_time_metrics \ .writeStream \ .foreachBatch(write_to_clickhouse) \ .outputMode("update") \ .trigger(processingTime='10 seconds') \ .option("checkpointLocation", "s3://checkpoints/analytics") \ .start()Data Lake Optimization
Delta Lake Configuration
# Delta Lake for data lakehousefrom delta import *
# Configure Deltabuilder = spark.builder \ .appName("ClickstreamDataLake") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .config("spark.databricks.delta.autoOptimize.optimizeWrite", "true") \ .config("spark.databricks.delta.autoOptimize.autoCompact", "true") \ .config("spark.sql.shuffle.partitions", "400") \ .config("spark.sql.files.maxPartitionBytes", "134217728") # 128MB
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Write raw events (bronze)def write_bronze(df, batch_id): """Write raw events to bronze layer"""
df.write.format("delta") \ .mode("append") \ .partitionBy("date", "hour") \ .option("delta.commitLog.lock.enabled", "true") \ .save("s3://lakehouse/bronze/clickstream")
# Write processed events (silver)def write_silver(df, batch_id): """Write processed events to silver layer"""
df.write.format("delta") \ .mode("overwrite") \ .partitionBy("date", "publisher_id") \ .option("overwriteSchema", "true") \ .option("delta.autoOptimize.optimizeWrite", "true") \ .save("s3://lakehouse/silver/clickstream_enriched")
# Write aggregated (gold)def write_gold(df, batch_id): """Write aggregated metrics to gold layer"""
df.write.format("delta") \ .mode("overwrite") \ .partitionBy("date") \ .option("delta.autoOptimize.optimizeWrite", "true") \ .save("s3://lakehouse/gold/clickstream_metrics")
# Scheduled optimizationdef optimize_delta_tables(): """Optimize Delta Lake tables"""
from delta.tables import DeltaTable
# Bronze table bronze_table = DeltaTable.forPath(spark, "s3://lakehouse/bronze/clickstream") bronze_table.optimize().executeCompaction()
# Silver table with Z-Ordering silver_table = DeltaTable.forPath(spark, "s3://lakehouse/silver/clickstream_enriched") silver_table.optimize().executeZOrderBy("user_id", "timestamp")
# Gold table with Z-Ordering gold_table = DeltaTable.forPath(spark, "s3://lakehouse/gold/clickstream_metrics") gold_table.optimize().executeZOrderBy("publisher_id", "date")
# Vacuum old files (7 days retention) bronze_table.vacuum(retention_hours=168)Cost Optimization
Storage Costs
| Tier | Data | Size | Cost | Optimization |
|---|---|---|---|---|
| Hot (7 days) | Raw events | 7PB | $161,000/month | S3 Standard |
| Warm (30 days) | Processed | 10PB | $60,000/month | S3 Standard-IA |
| Cold (90 days) | Aggregated | 5PB | $9,000/month | S3 Glacier |
| Archive (1 year) | Training data | 15PB | $15,000/month | Glacier Deep Archive |
| Total | 37PB | $245,000/month |
Optimization:
- Lifecycle: 7 days → IA, 90 days → Glacier
- Compression: ZSTD-19 (~50% savings)
- Partitioning: Date + hour for pruning
- Delta Lake: Auto-compaction
- Z-Ordering: user_id, timestamp (10x query improvement)
Compute Costs
| Component | Compute | Cost | Optimization |
|---|---|---|---|
| Kafka Brokers | 30 nodes (spot) | $43,200/month | 70% spot savings |
| Spark Streaming | 500 nodes (spot) | $288,000/month | 70% spot savings |
| Flink Fraud | 100 nodes (spot) | $57,600/month | 70% spot savings |
| ClickHouse | 50 nodes (on-demand) | $21,600/month | Right-sized |
| Total | $410,400/month |
Optimization:
- Spot instances for stateless workloads
- Auto-scaling based on throughput
- Scheduled operations (batch during off-peak)
- Right-sized clusters (memory vs CPU optimized)
Total Monthly Cost
| Category | Cost | Optimization |
|---|---|---|
| Storage | $245,000 | Lifecycle + compression |
| Compute | $410,400 | Spot + auto-scaling |
| Network | $50,000 | Colocation, data transfer |
| Total | $705,400/month | |
| Annual | $8,464,800 | |
| Cost per event | $0.00023 |
Failure Modes
Mode 1: Kafka Cluster Failure
Mitigation:
- Multi-region Kafka clusters
- Replication factor = 3
- Client-side buffering
- Automated failover
Mode 2: Spark Streaming Failure
Mitigation:
- Kafka retention: 7 days
- Checkpointing to S3
- Exactly-once semantics
- Horizontal autoscaling
Mode 3: Cost Spike
Mitigation:
- Budget alerts (daily)
- Anomaly detection on costs
- Automated cluster scaling
- Storage lifecycle reviews
SLA/SLO Definitions
slas: data_pipeline: throughput: "10M events/second" latency: "< 1 second" completeness: "> 99.9%" correctness: "Exactly-once"
real_time_bidding: latency: "< 100ms" availability: "99.95%"
analytics: freshness: "< 1 minute" query_latency: "< 5 seconds"Key Takeaways
- Petabyte scale: 1PB/day requires careful optimization
- Spot instances: 70% compute savings
- Compression: ZSTD-19 for 50% storage savings
- Delta Lake: Auto-compaction + Z-Ordering
- Kafka tuning: Partitioning, batching, compression
- Multi-region: Global data collection
- Monitoring: Comprehensive metrics and alerts
- Cost per event: Target < $0.001
Back to Module 8