Skip to content

Case Study - E-Commerce Personalization

Real-Time Recommendations at Scale


Business Context

Company: Large e-commerce platform (50M monthly active users)

Challenge: Deliver personalized product recommendations in real-time to increase conversion rates and average order value.

Scale: 100TB/day, 1M events/second peak, 10B products


Requirements


Architecture Design


Technology Selection

Component Selection Rationale

ComponentTechnologyRationale
Event BusKafkaProven at scale, exactly-once
Stream ProcessingFlinkMillisecond latency, state management
Feature StoreFeastReal-time serving, offline/online sync
Online StoreRedis ClusterSub-millisecond latency
Offline StoreS3 + ParquetCost optimization, ZSTD compression
ML TrainingSpark + XGBoostDistributed training
Model ServingTensorFlow ServingLow latency, batching
Real-Time AnalyticsClickHouseFast ingest, excellent compression
Batch InferenceSparkProcess billions of products
A/B TestingCustom builtBusiness-specific metrics

Real-Time Feature Pipeline

Feature Extraction

# Flink feature extraction
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
import pandas as pd
import numpy as np
# Create environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(100)
t_env = StreamTableEnvironment.create(env)
# User behavior features
@udf(result_type=DataTypes.FLOAT())
def purchase_rate(clicks: int, purchases: int) -> float:
"""Calculate purchase rate"""
if clicks == 0:
return 0.0
return purchases / clicks
@udf(result_type=DataTypes.FLOAT())
def avg_time_on_page(events: list) -> float:
"""Calculate average time on page"""
if not events:
return 0.0
durations = [e.get('duration', 0) for e in events]
return np.mean(durations)
# Create source table
t_env.execute_sql("""
CREATE TABLE user_events (
user_id BIGINT,
event_type STRING,
product_id BIGINT,
category STRING,
price DECIMAL(10,2),
timestamp TIMESTAMP(3),
WATERMARK FOR timestamp AS timestamp - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
""")
# Aggregate features
user_features = t_env.sql_query("""
SELECT
user_id,
CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)) AS feature_timestamp,
-- Count features
COUNT(*) AS total_events,
SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) AS clicks,
SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) AS purchases,
SUM(CASE WHEN event_type = 'add_to_cart' THEN 1 ELSE 0 END) AS add_to_carts,
-- Ratio features
purchase_rate(SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END),
SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END)) AS purchase_rate,
-- Category preferences
COUNT(DISTINCT category) AS categories_viewed,
-- Time-based features
MAX(timestamp) AS last_event_time,
MIN(timestamp) AS first_event_time
FROM user_events
GROUP BY
user_id,
TUMBLE(timestamp, INTERVAL '5' MINUTE)
""")
# Sink to feature store
t_env.execute_sql("""
CREATE TABLE user_features_store (
user_id BIGINT,
feature_timestamp TIMESTAMP(3),
total_events BIGINT,
clicks BIGINT,
purchases BIGINT,
add_to_carts BIGINT,
purchase_rate FLOAT,
categories_viewed BIGINT,
last_event_time TIMESTAMP(3),
first_event_time TIMESTAMP(3),
PRIMARY KEY (user_id, feature_timestamp) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'upsert',
'host' = 'redis-cluster',
'port' = '6379'
)
""")
user_features.execute_insert('user_features_store')

Feast Feature Store

# Feast feature store definition
from datetime import timedelta
from feast import Feature, FeatureView, Field
from feast.types import Float32, Int64, String
from feast.infra.online_stores.redis import RedisOnlineStore
from feast.infra.offline_stores.s3 import S3OfflineStore
from entities import user, product
from data_sources import user_events_source, product_events_source
# User feature view
user_features_view = FeatureView(
name="user_features",
entities=[user],
ttl=timedelta(days=30),
schema=[
Field(name="total_events", dtype=Int64),
Field(name="clicks", dtype=Int64),
Field(name="purchases", dtype=Int64),
Field(name="purchase_rate", dtype=Float32),
Field(name="categories_viewed", dtype=Int64),
Field(name="favorite_category", dtype=String),
Field(name="avg_price_point", dtype=Float32),
],
source=user_events_source,
online_store=RedisOnlineStore(
connection_string="redis-cluster:6379",
),
offline_store=S3OfflineStore(
config={"s3_path": "s3://features/user/"}
)
)
# Product feature view
product_features_view = FeatureView(
name="product_features",
entities=[product],
ttl=timedelta(days=7),
schema=[
Field(name="category", dtype=String),
Field(name="price", dtype=Float32),
Field(name="popularity_score", dtype=Float32),
Field(name="conversion_rate", dtype=Float32),
Field(name="avg_rating", dtype=Float32),
Field(name="inventory_level", dtype=Int64),
],
source=product_events_source,
online_store=RedisOnlineStore(),
offline_store=S3OfflineStore()
)
# On-demand feature (computed at request time)
from feast import OnDemandFeatureView
@udf(input_schema=[Field(name="purchase_rate", dtype=Float32)],
output_schema=[Field(name="purchase_tier", dtype=String)])
def purchase_tier(purchase_rate: pd.Series) -> pd.Series:
"""Calculate purchase tier"""
return pd.Series(
"high" if x > 0.1 else "medium" if x > 0.05 else "low"
for x in purchase_rate
)
on_demand_features = OnDemandFeatureView(
name="on_demand_features",
inputs=[user_features_view],
features=[Field(name="purchase_tier", dtype=String)]
)

Recommendation API

FastAPI Recommendation Service

# Recommendation API
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import redis
import grpc
import numpy as np
from feast import FeatureStore
from tensorflow_serving.apis import prediction_service_pb2_grpc
app = FastAPI(title="Recommendation API")
# Initialize connections
redis_client = redis.Redis(host='redis-cluster', port=6379, decode_responses=True)
feature_store = FeatureStore(repo_path="feature_store/")
model_channel = grpc.insecure_channel('tensorflow-serving:8500')
model_stub = prediction_service_pb2_grpc.PredictionServiceStub(model_channel)
class RecommendationRequest(BaseModel):
user_id: int
session_id: str
context: Optional[dict] = None
num_recommendations: int = 10
class RecommendationResponse(BaseModel):
user_id: int
recommendations: List[dict]
model_version: str
latency_ms: float
@app.post("/recommend", response_model=RecommendationResponse)
async def recommend(request: RecommendationRequest):
"""Generate personalized recommendations"""
import time
start_time = time.time()
# 1. Fetch user features (Redis: < 5ms)
try:
user_features = feature_store.get_online_features(
features=[
"user_features:total_events",
"user_features:clicks",
"user_features:purchases",
"user_features:purchase_rate",
"user_features:favorite_category",
"user_features:avg_price_point",
],
entity_rows=[{"user_id": request.user_id}]
).to_dict()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Feature fetch failed: {str(e)}")
# 2. Get candidate products (Redis: < 10ms)
try:
# Get popular products in favorite category
favorite_category = user_features["favorite_category"][0]
candidates = redis_client.zrevrange(
f"category:{favorite_category}",
0, 999,
withscores=True
)
# Convert to list of product IDs
candidate_ids = [int(pid) for pid, score in candidates[:1000]]
except Exception as e:
# Fallback to global popular
candidate_ids = get_global_popular_products()
# 3. Prepare model input
model_input = prepare_model_input(
user_features=user_features,
candidate_ids=candidate_ids,
context=request.context
)
# 4. Model inference (TensorFlow Serving: < 50ms)
try:
scores = model_inference(model_stub, model_input)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Model inference failed: {str(e)}")
# 5. Rerank and filter (business logic)
ranked_products = rerank_products(
candidate_ids=candidate_ids,
scores=scores,
user_features=user_features,
context=request.context
)
# 6. Apply diversity and business rules
final_recommendations = apply_diversity_and_rules(
products=ranked_products,
num=request.num_recommendations,
user_id=request.user_id
)
latency = (time.time() - start_time) * 1000
return RecommendationResponse(
user_id=request.user_id,
recommendations=final_recommendations,
model_version="v2.3.1",
latency_ms=latency
)
def prepare_model_input(user_features, candidate_ids, context):
"""Prepare input for model inference"""
# This would prepare the feature matrix
# Shape: (batch_size, num_features)
pass
def model_inference(model_stub, model_input):
"""Call TensorFlow Serving for inference"""
# gRPC call to TensorFlow Serving
pass
def rerank_products(candidate_ids, scores, user_features, context):
"""Rerank products with business logic"""
# Combine ML scores with business rules
# e.g., inventory availability, margin, promotions
pass
def apply_diversity_and_rules(products, num, user_id):
"""Apply diversity and business rules"""
# Ensure category diversity
# Filter out already purchased items
# Apply A/B test allocations
pass
# Health check
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "healthy"}

Batch Model Training

Spark + XGBoost Training

# Daily model training
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, lag, avg as _avg
from pyspark.sql.window import Window
from xgboost.spark import SparkXGBClassifier
import mlflow
# Initialize Spark
spark = SparkSession.builder \
.appName("RecommendationModelTraining") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Load training data
def load_training_data(days: int = 30):
"""Load training data from data lake"""
# User features
user_features = spark.read.format("delta") \
.load("s3://lakehouse/gold/user_features") \
.filter(col("feature_date") >= current_date() - expr(f"INTERVAL {days} DAYS"))
# Product features
product_features = spark.read.format("delta") \
.load("s3://lakehouse/gold/product_features")
# Interactions (labels)
interactions = spark.read.format("delta") \
.load("s3://lakehouse/silver/interactions") \
.filter(col("timestamp") >= current_timestamp() - expr(f"INTERVAL {days} DAYS"))
# Join features
training_data = interactions \
.join(user_features, "user_id", "left") \
.join(product_features, "product_id", "left") \
.filter(col("label").isNotNull()) # Positive/negative examples
return training_data
# Feature engineering
def engineer_features(df):
"""Create model features"""
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
df = df \
.withColumn("prev_click_count", lag("click_count").over(window_spec)) \
.withColumn("user_click_avg", _avg("click_count").over(window_spec)) \
.withColumn("recency_days", datediff(current_timestamp(), col("last_event_time")))
return df
# Train model
def train_model():
"""Train recommendation model"""
with mlflow.start_run():
# Load data
training_data = load_training_data(days=30)
training_data = engineer_features(training_data)
# Split train/test
train_df, test_df = training_data.randomSplit([0.8, 0.2], seed=42)
# Define features
feature_cols = [
"user_total_events", "user_clicks", "user_purchases",
"user_purchase_rate", "product_popularity_score",
"product_conversion_rate", "recency_days"
]
# Train XGBoost
xgb = SparkXGBClassifier(
max_depth=8,
learning_rate=0.1,
n_estimators=200,
subsample=0.8,
colsample_bytree=0.8,
labelCol="label",
featuresCol="features"
)
# Create features vector
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_df = assembler.transform(train_df)
# Train
model = xgb.fit(train_df)
# Evaluate
from pyspark.ml.evaluation import BinaryClassificationEvaluator
test_df = assembler.transform(test_df)
predictions = model.transform(test_df)
evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction"
)
auc = evaluator.evaluate(predictions)
mlflow.log_metric("auc", auc)
# Save model
model_path = "s3://models/recommendation/v2.3.1"
model.write().overwrite().save(model_path)
mlflow.log_artifact(model_path)
print(f"Model trained with AUC: {auc:.4f}")
return model
# Schedule with Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'start_date': datetime(2025, 1, 1),
'depends_on_past': False,
}
dag = DAG(
'recommendation_model_training',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
catchup=False
)
train_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag
)

Cost Optimization

Storage Costs

TierDataSizeCostOptimization
HotUser features50TB$1,150/monthRedis cluster
WarmRecent events100TB$2,300/monthS3 Standard
ColdHistorical500TB$3,450/monthS3 IA
ArchiveTraining data1PB$1,500/monthGlacier
Total$8,400/month

Optimization:

  • Feature TTL: 30 days for user features
  • Event lifecycle: 7 days → IA, 90 days → Glacier
  • Compression: ZSTD-19 (~40% savings)
  • Delta Lake compaction: Weekly

Compute Costs

ComponentComputeCostOptimization
Flink Cluster100 nodes (spot)$43,200/month70% spot savings
Spark Training200 nodes (spot)$57,600/month70% spot savings
Redis Cluster50 nodes (on-demand)$21,600/monthMemory optimized
TensorFlow Serving20 nodes (auto-scale)$8,640/monthRequest-based
Total$131,040/month

Optimization:

  • Spot instances for Flink/Spark
  • Auto-scaling for API servers
  • Scheduled training (off-peak hours)
  • Right-sized Redis instances

Total Monthly Cost

CategoryCostOptimization
Storage$8,400Lifecycle + compression
Compute$131,040Spot + auto-scaling
Network$15,000Colocation
Total$154,440/month
Before Optimization$270,000/month
Savings43%

Failure Modes

Mode 1: Feature Store Unavailable

Mitigation:

  • Redis cache with 24-hour TTL
  • Local feature caching
  • Fallback to popular products
  • Multi-region feature store

Mode 2: Model Inference Failure

Mitigation:

  • Multiple model versions
  • Blue-green deployments
  • Canary deployments
  • Rule-based fallback

Mode 3: High Traffic Spike

Mitigation:

  • Auto-scaling (target 70% utilization)
  • Load shedding (degrade for low-priority)
  • Caching (pre-compute recommendations) CDN for static content

SLA/SLO Definitions

slas:
recommendation_api:
latency:
p50: "< 50ms"
p95: "< 100ms"
p99: "< 200ms"
availability:
uptime: "99.9%"
maintenance: "4 hours/month"
accuracy:
ndcg: "> 0.75"
click_rate: "> 5%"
conversion_lift: "> 10%"
data_pipeline:
freshness:
user_features: "< 5 minutes"
product_features: "< 1 hour"
model_updates: "Daily"
completeness:
feature_coverage: "> 99%"

Key Takeaways

  1. Low latency: Feature store + model serving < 100ms
  2. Fresh features: Real-time feature updates via Flink
  3. Scalability: Handle 1M requests/second
  4. Personalization: Individual recommendations per user
  5. Cost optimization: Spot instances for 43% savings
  6. Fallback strategies: Graceful degradation
  7. A/B testing: Continuous model improvement
  8. Monitoring: Comprehensive metrics and alerts

Back to Module 8