Skip to content

Case Study - AdTech Clickstream Processing

Petabyte-Scale Event Processing Pipeline


Business Context

Company: AdTech platform (real-time bidding and analytics)

Challenge: Process billions of clickstream events per day for real-time bidding, fraud detection, and analytics.

Scale: 1PB/day, 10M events/second peak, 100B+ events/day


Requirements


Architecture Design


Technology Selection

Component Selection Rationale

ComponentTechnologyRationale
Event BusKafka (multi-region)Proven at PB scale
Stream ProcessingSpark Structured StreamingMature, exactly-once
Real-Time AnalyticsFlinkLow latency, state management
Data LakeS3 + Delta LakeACID, time travel
OLAPClickHouseFast ingest, compression
WarehouseSnowflakeSeparation of compute/storage
OrchestrationAirflowFault tolerance

Event Ingestion

Kafka Cluster Configuration

# Kafka cluster configuration (Terraform)
resource "aws_msk_cluster" "clickstream_kafka" {
cluster_name = "clickstream-kafka"
kafka_version = "3.5.0"
number_of_broker_nodes = 30
broker_node_group_info {
instance_type = "kafka.m5.4xlarge"
client_subnets = module.vpc.private_subnets
security_groups = [security_group.kafka.id]
storage_info {
ebs_storage_volume {
volume_size = 1000 # 1TB per broker
provisioned_throughput {
enabled = true
volume_throughput = 250
}
}
}
# Spot instances for cost savings
broker_az_distribution = "DEFAULT"
}
configuration_info {
arn = aws_msk_configuration.clickstream.arn
revision = aws_msk_configuration.clickstream.latest_revision
}
# Encryption at rest and in transit
encryption_info {
encryption_at_rest_kms_key_arn = aws_kms_key.kafka.arn
encryption_in_transit {
client_broker = "TLS_PLAINTEXT"
in_cluster = true
}
}
# Monitoring
enhanced_monitoring = "PER_BROKER"
open_monitoring {
prometheus_jmx_exporter_enabled = true
}
logging_info {
broker_logs {
cloudwatch_logs {
enabled = true
log_group = aws_cloudwatch_log_group.kafka.name
}
}
}
tags = {
Environment = "production"
Service = "clickstream"
CostCenter = "adtech"
}
}
# Kafka configuration
resource "aws_msk_configuration" "clickstream" {
kafka_versions = ["3.5.0"]
name = "clickstream-config"
server_properties = <<PROPERTIES
# Topic configuration
default.replication.factor=3
num.partitions=300
min.insync.replicas=2
# Retention
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# Performance
num.network.threads=16
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# Compression
compression.type=zstd
# Producer
producer.batch.size=32768
producer.linger.ms=10
producer.acks=all
# Consumer
consumer.fetch.min.bytes=1024
consumer.fetch.max.wait.ms=500
PROPERTIES
}

High-Throughput Producer

# High-throughput Kafka producer
from confluent_kafka import Producer
import json
import time
from threading import Thread
from queue import Queue
class ClickstreamProducer:
"""High-throughput clickstream producer"""
def __init__(self, bootstrap_servers: str):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'client.id': 'clickstream-producer',
'queue.buffering.max.messages': 1000000,
'queue.buffering.max.kbytes': 4000000,
'queue.buffering.max.ms': 100,
'batch.num.messages': 10000,
'compression.codec': 'zstd',
'acks': 'all',
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5,
'linger.ms': 10,
'batch.size': 32768,
'retries': 3,
'delivery.timeout.ms': 120000,
})
self.queue = Queue(maxsize=100000)
def delivery_report(self, err, msg):
"""Delivery report callback"""
if err is not None:
print(f'Delivery failed: {err}')
else:
# Successful delivery
pass
def send_event(self, event: dict):
"""Send clickstream event"""
try:
# Serialize to JSON
value = json.dumps(event).encode('utf-8')
# Produce to Kafka
self.producer.produce(
topic='clickstream-events',
key=event['user_id'].encode('utf-8'),
value=value,
timestamp=int(event['timestamp'] * 1000),
on_delivery=self.delivery_report
)
# Poll for delivery reports
self.producer.poll(0)
except BufferError:
# Queue is full, backpressure
print('Queue full, waiting...')
self.producer.flush()
self.send_event(event)
def flush(self):
"""Flush pending messages"""
self.producer.flush()
# Batch producer for efficiency
class BatchClickstreamProducer:
"""Batch producer for higher throughput"""
def __init__(self, bootstrap_servers: str, batch_size: int = 10000):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'compression.codec': 'zstd',
'batch.num.messages': batch_size,
'linger.ms': 100,
'acks': 1, # Faster, accept some data loss
})
self.batch = []
self.batch_size = batch_size
self.lock = threading.Lock()
def add_event(self, event: dict):
"""Add event to batch"""
with self.lock:
self.batch.append(event)
if len(self.batch) >= self.batch_size:
self.flush_batch()
def flush_batch(self):
"""Flush batch to Kafka"""
for event in self.batch:
value = json.dumps(event).encode('utf-8')
self.producer.produce(
topic='clickstream-events',
value=value,
callback=self.delivery_report
)
self.batch = []
self.producer.poll(0)
def delivery_report(self, err, msg):
"""Silent delivery report"""
pass
# Usage
producer = BatchClickstreamProducer('kafka-broker1:9092,kafka-broker2:9092')
# Send events in a loop
for event in clickstream_events:
producer.add_event(event)
# Flush remaining
producer.flush_batch()
producer.producer.flush()

Stream Processing

Real-Time Bidding

# Spark Structured Streaming for real-time bidding
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Initialize Spark
spark = SparkSession.builder \
.appName("RealTimeBidding") \
.config("spark.sql.streaming.checkpointLocation", "s3://checkpoints/bidding") \
.config("spark.sql.shuffle.partitions", "400") \
.getOrCreate()
# Define schema
clickstream_schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("session_id", StringType(), False),
StructField("ad_id", StringType()),
StructField("publisher_id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("url", StringType()),
StructField("user_agent", StringType()),
StructField("ip", StringType()),
StructField("device_type", StringType()),
StructField("country", StringType()),
])
# Read from Kafka
clickstream_raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092") \
.option("subscribe", "clickstream-events") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load()
# Parse JSON
clickstream = clickstream_raw \
.select(from_json(col("value").cast("string"), clickstream_schema).alias("data")) \
.select("data.*")
# Watermark for late data
clickstream_with_watermark = clickstream \
.withWatermark("timestamp", "30 seconds")
# Join with user profiles (static/batch)
user_profiles = spark.read.format("parquet") \
.load("s3://data/user_profiles/")
enriched = clickstream_with_watermark \
.join(user_profiles, "user_id", "left")
# Join with ad inventory (stream-stream)
ad_inventory_schema = StructType([
StructField("ad_id", StringType(), False),
StructField("publisher_id", StringType(), False),
StructField("bid_floor", DoubleType(), False),
StructField("category", StringType()),
StructField("timestamp", TimestampType(), False),
])
ad_inventory = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker1:9092") \
.option("subscribe", "ad-inventory") \
.load() \
.select(from_json(col("value").cast("string"), ad_inventory_schema).alias("data")) \
.select("data.*") \
.withWatermark("timestamp", "30 seconds")
# Stream-stream join
bidding_opportunities = enriched \
.join(
ad_inventory,
["publisher_id", "timestamp"],
"inner"
)
# Apply bidding logic
@udf(returnType=DoubleType())
def calculate_bid(
user_value_score: float,
ad_relevance_score: float,
bid_floor: float
) -> float:
"""Calculate bid price"""
base_bid = user_value_score * ad_relevance_score * 10
return max(base_bid, bid_floor)
bidding_decisions = bidding_opportunities \
.withColumn("bid_amount", calculate_bid(
col("user_value_score"),
col("ad_relevance_score"),
col("bid_floor")
)) \
.withColumn("decision_timestamp", current_timestamp())
# Write bid decisions back to Kafka
query = bidding_decisions \
.select(
to_json(struct(
col("event_id"),
col("user_id"),
col("ad_id"),
col("bid_amount"),
col("decision_timestamp")
)).alias("value")
) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker1:9092") \
.option("topic", "bid-decisions") \
.option("checkpointLocation", "s3://checkpoints/bidding") \
.outputMode("append") \
.trigger(processingTime='100 milliseconds') \
.start()
query.awaitTermination()

Real-Time Analytics Aggregation

# Real-time analytics aggregation
from pyspark.sql.functions import *
# Real-time metrics
real_time_metrics = clickstream \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window("timestamp", "1 minute", "30 seconds"),
"publisher_id",
"ad_id",
"country",
"device_type"
) \
.agg(
count("*").alias("impressions"),
count(when(col("event_type") == "click", True)).alias("clicks"),
count(when(col("event_type") == "conversion", True)).alias("conversions"),
avg("bid_amount").alias("avg_bid"),
sum("bid_amount").alias("total_spend")
) \
.withColumn("ctr", col("clicks") / col("impressions")) \
.withColumn("cvr", col("conversions") / col("clicks"))
# Write to ClickHouse (real-time analytics)
def write_to_clickhouse(batch_df, batch_id):
"""Write batch to ClickHouse"""
# JDBC connection
jdbc_url = "jdbc:clickhouse://clickhouse-cluster:8123/analytics"
properties = {
"driver": "com.clickhouse.jdbc.ClickHouseDriver",
"user": "clickstream_user",
"password": "password"
}
# Write batch
batch_df.write \
.mode("append") \
.jdbc(jdbc_url, "real_time_metrics", properties=properties)
# Write stream
query = real_time_metrics \
.writeStream \
.foreachBatch(write_to_clickhouse) \
.outputMode("update") \
.trigger(processingTime='10 seconds') \
.option("checkpointLocation", "s3://checkpoints/analytics") \
.start()

Data Lake Optimization

Delta Lake Configuration

# Delta Lake for data lakehouse
from delta import *
# Configure Delta
builder = spark.builder \
.appName("ClickstreamDataLake") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.databricks.delta.autoOptimize.optimizeWrite", "true") \
.config("spark.databricks.delta.autoOptimize.autoCompact", "true") \
.config("spark.sql.shuffle.partitions", "400") \
.config("spark.sql.files.maxPartitionBytes", "134217728") # 128MB
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Write raw events (bronze)
def write_bronze(df, batch_id):
"""Write raw events to bronze layer"""
df.write.format("delta") \
.mode("append") \
.partitionBy("date", "hour") \
.option("delta.commitLog.lock.enabled", "true") \
.save("s3://lakehouse/bronze/clickstream")
# Write processed events (silver)
def write_silver(df, batch_id):
"""Write processed events to silver layer"""
df.write.format("delta") \
.mode("overwrite") \
.partitionBy("date", "publisher_id") \
.option("overwriteSchema", "true") \
.option("delta.autoOptimize.optimizeWrite", "true") \
.save("s3://lakehouse/silver/clickstream_enriched")
# Write aggregated (gold)
def write_gold(df, batch_id):
"""Write aggregated metrics to gold layer"""
df.write.format("delta") \
.mode("overwrite") \
.partitionBy("date") \
.option("delta.autoOptimize.optimizeWrite", "true") \
.save("s3://lakehouse/gold/clickstream_metrics")
# Scheduled optimization
def optimize_delta_tables():
"""Optimize Delta Lake tables"""
from delta.tables import DeltaTable
# Bronze table
bronze_table = DeltaTable.forPath(spark, "s3://lakehouse/bronze/clickstream")
bronze_table.optimize().executeCompaction()
# Silver table with Z-Ordering
silver_table = DeltaTable.forPath(spark, "s3://lakehouse/silver/clickstream_enriched")
silver_table.optimize().executeZOrderBy("user_id", "timestamp")
# Gold table with Z-Ordering
gold_table = DeltaTable.forPath(spark, "s3://lakehouse/gold/clickstream_metrics")
gold_table.optimize().executeZOrderBy("publisher_id", "date")
# Vacuum old files (7 days retention)
bronze_table.vacuum(retention_hours=168)

Cost Optimization

Storage Costs

TierDataSizeCostOptimization
Hot (7 days)Raw events7PB$161,000/monthS3 Standard
Warm (30 days)Processed10PB$60,000/monthS3 Standard-IA
Cold (90 days)Aggregated5PB$9,000/monthS3 Glacier
Archive (1 year)Training data15PB$15,000/monthGlacier Deep Archive
Total37PB$245,000/month

Optimization:

  • Lifecycle: 7 days → IA, 90 days → Glacier
  • Compression: ZSTD-19 (~50% savings)
  • Partitioning: Date + hour for pruning
  • Delta Lake: Auto-compaction
  • Z-Ordering: user_id, timestamp (10x query improvement)

Compute Costs

ComponentComputeCostOptimization
Kafka Brokers30 nodes (spot)$43,200/month70% spot savings
Spark Streaming500 nodes (spot)$288,000/month70% spot savings
Flink Fraud100 nodes (spot)$57,600/month70% spot savings
ClickHouse50 nodes (on-demand)$21,600/monthRight-sized
Total$410,400/month

Optimization:

  • Spot instances for stateless workloads
  • Auto-scaling based on throughput
  • Scheduled operations (batch during off-peak)
  • Right-sized clusters (memory vs CPU optimized)

Total Monthly Cost

CategoryCostOptimization
Storage$245,000Lifecycle + compression
Compute$410,400Spot + auto-scaling
Network$50,000Colocation, data transfer
Total$705,400/month
Annual$8,464,800
Cost per event$0.00023

Failure Modes

Mode 1: Kafka Cluster Failure

Mitigation:

  • Multi-region Kafka clusters
  • Replication factor = 3
  • Client-side buffering
  • Automated failover

Mode 2: Spark Streaming Failure

Mitigation:

  • Kafka retention: 7 days
  • Checkpointing to S3
  • Exactly-once semantics
  • Horizontal autoscaling

Mode 3: Cost Spike

Mitigation:

  • Budget alerts (daily)
  • Anomaly detection on costs
  • Automated cluster scaling
  • Storage lifecycle reviews

SLA/SLO Definitions

slas:
data_pipeline:
throughput: "10M events/second"
latency: "< 1 second"
completeness: "> 99.9%"
correctness: "Exactly-once"
real_time_bidding:
latency: "< 100ms"
availability: "99.95%"
analytics:
freshness: "< 1 minute"
query_latency: "< 5 seconds"

Key Takeaways

  1. Petabyte scale: 1PB/day requires careful optimization
  2. Spot instances: 70% compute savings
  3. Compression: ZSTD-19 for 50% storage savings
  4. Delta Lake: Auto-compaction + Z-Ordering
  5. Kafka tuning: Partitioning, batching, compression
  6. Multi-region: Global data collection
  7. Monitoring: Comprehensive metrics and alerts
  8. Cost per event: Target < $0.001

Back to Module 8