Case Study - E-Commerce Personalization
Real-Time Recommendations at Scale
Business Context
Company: Large e-commerce platform (50M monthly active users)
Challenge: Deliver personalized product recommendations in real-time to increase conversion rates and average order value.
Scale: 100TB/day, 1M events/second peak, 10B products
Requirements
Architecture Design
Technology Selection
Component Selection Rationale
| Component | Technology | Rationale |
|---|---|---|
| Event Bus | Kafka | Proven at scale, exactly-once |
| Stream Processing | Flink | Millisecond latency, state management |
| Feature Store | Feast | Real-time serving, offline/online sync |
| Online Store | Redis Cluster | Sub-millisecond latency |
| Offline Store | S3 + Parquet | Cost optimization, ZSTD compression |
| ML Training | Spark + XGBoost | Distributed training |
| Model Serving | TensorFlow Serving | Low latency, batching |
| Real-Time Analytics | ClickHouse | Fast ingest, excellent compression |
| Batch Inference | Spark | Process billions of products |
| A/B Testing | Custom built | Business-specific metrics |
Real-Time Feature Pipeline
Feature Extraction
# Flink feature extractionfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import StreamTableEnvironment, DataTypesfrom pyflink.table.udf import udf
import pandas as pdimport numpy as np
# Create environmentenv = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(100)
t_env = StreamTableEnvironment.create(env)
# User behavior features@udf(result_type=DataTypes.FLOAT())def purchase_rate(clicks: int, purchases: int) -> float: """Calculate purchase rate""" if clicks == 0: return 0.0 return purchases / clicks
@udf(result_type=DataTypes.FLOAT())def avg_time_on_page(events: list) -> float: """Calculate average time on page""" if not events: return 0.0 durations = [e.get('duration', 0) for e in events] return np.mean(durations)
# Create source tablet_env.execute_sql(""" CREATE TABLE user_events ( user_id BIGINT, event_type STRING, product_id BIGINT, category STRING, price DECIMAL(10,2), timestamp TIMESTAMP(3), WATERMARK FOR timestamp AS timestamp - INTERVAL '10' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user-events', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' )""")
# Aggregate featuresuser_features = t_env.sql_query(""" SELECT user_id, CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)) AS feature_timestamp,
-- Count features COUNT(*) AS total_events, SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) AS clicks, SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) AS purchases, SUM(CASE WHEN event_type = 'add_to_cart' THEN 1 ELSE 0 END) AS add_to_carts,
-- Ratio features purchase_rate(SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END), SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END)) AS purchase_rate,
-- Category preferences COUNT(DISTINCT category) AS categories_viewed,
-- Time-based features MAX(timestamp) AS last_event_time, MIN(timestamp) AS first_event_time
FROM user_events GROUP BY user_id, TUMBLE(timestamp, INTERVAL '5' MINUTE)""")
# Sink to feature storet_env.execute_sql(""" CREATE TABLE user_features_store ( user_id BIGINT, feature_timestamp TIMESTAMP(3), total_events BIGINT, clicks BIGINT, purchases BIGINT, add_to_carts BIGINT, purchase_rate FLOAT, categories_viewed BIGINT, last_event_time TIMESTAMP(3), first_event_time TIMESTAMP(3), PRIMARY KEY (user_id, feature_timestamp) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'upsert', 'host' = 'redis-cluster', 'port' = '6379' )""")
user_features.execute_insert('user_features_store')Feast Feature Store
# Feast feature store definitionfrom datetime import timedeltafrom feast import Feature, FeatureView, Fieldfrom feast.types import Float32, Int64, Stringfrom feast.infra.online_stores.redis import RedisOnlineStorefrom feast.infra.offline_stores.s3 import S3OfflineStore
from entities import user, productfrom data_sources import user_events_source, product_events_source
# User feature viewuser_features_view = FeatureView( name="user_features", entities=[user], ttl=timedelta(days=30), schema=[ Field(name="total_events", dtype=Int64), Field(name="clicks", dtype=Int64), Field(name="purchases", dtype=Int64), Field(name="purchase_rate", dtype=Float32), Field(name="categories_viewed", dtype=Int64), Field(name="favorite_category", dtype=String), Field(name="avg_price_point", dtype=Float32), ], source=user_events_source, online_store=RedisOnlineStore( connection_string="redis-cluster:6379", ), offline_store=S3OfflineStore( config={"s3_path": "s3://features/user/"} ))
# Product feature viewproduct_features_view = FeatureView( name="product_features", entities=[product], ttl=timedelta(days=7), schema=[ Field(name="category", dtype=String), Field(name="price", dtype=Float32), Field(name="popularity_score", dtype=Float32), Field(name="conversion_rate", dtype=Float32), Field(name="avg_rating", dtype=Float32), Field(name="inventory_level", dtype=Int64), ], source=product_events_source, online_store=RedisOnlineStore(), offline_store=S3OfflineStore())
# On-demand feature (computed at request time)from feast import OnDemandFeatureView
@udf(input_schema=[Field(name="purchase_rate", dtype=Float32)], output_schema=[Field(name="purchase_tier", dtype=String)])def purchase_tier(purchase_rate: pd.Series) -> pd.Series: """Calculate purchase tier""" return pd.Series( "high" if x > 0.1 else "medium" if x > 0.05 else "low" for x in purchase_rate )
on_demand_features = OnDemandFeatureView( name="on_demand_features", inputs=[user_features_view], features=[Field(name="purchase_tier", dtype=String)])Recommendation API
FastAPI Recommendation Service
# Recommendation APIfrom fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelfrom typing import List, Optionalimport redisimport grpcimport numpy as np
from feast import FeatureStorefrom tensorflow_serving.apis import prediction_service_pb2_grpc
app = FastAPI(title="Recommendation API")
# Initialize connectionsredis_client = redis.Redis(host='redis-cluster', port=6379, decode_responses=True)feature_store = FeatureStore(repo_path="feature_store/")model_channel = grpc.insecure_channel('tensorflow-serving:8500')model_stub = prediction_service_pb2_grpc.PredictionServiceStub(model_channel)
class RecommendationRequest(BaseModel): user_id: int session_id: str context: Optional[dict] = None num_recommendations: int = 10
class RecommendationResponse(BaseModel): user_id: int recommendations: List[dict] model_version: str latency_ms: float
@app.post("/recommend", response_model=RecommendationResponse)async def recommend(request: RecommendationRequest): """Generate personalized recommendations"""
import time start_time = time.time()
# 1. Fetch user features (Redis: < 5ms) try: user_features = feature_store.get_online_features( features=[ "user_features:total_events", "user_features:clicks", "user_features:purchases", "user_features:purchase_rate", "user_features:favorite_category", "user_features:avg_price_point", ], entity_rows=[{"user_id": request.user_id}] ).to_dict()
except Exception as e: raise HTTPException(status_code=500, detail=f"Feature fetch failed: {str(e)}")
# 2. Get candidate products (Redis: < 10ms) try: # Get popular products in favorite category favorite_category = user_features["favorite_category"][0] candidates = redis_client.zrevrange( f"category:{favorite_category}", 0, 999, withscores=True )
# Convert to list of product IDs candidate_ids = [int(pid) for pid, score in candidates[:1000]]
except Exception as e: # Fallback to global popular candidate_ids = get_global_popular_products()
# 3. Prepare model input model_input = prepare_model_input( user_features=user_features, candidate_ids=candidate_ids, context=request.context )
# 4. Model inference (TensorFlow Serving: < 50ms) try: scores = model_inference(model_stub, model_input)
except Exception as e: raise HTTPException(status_code=500, detail=f"Model inference failed: {str(e)}")
# 5. Rerank and filter (business logic) ranked_products = rerank_products( candidate_ids=candidate_ids, scores=scores, user_features=user_features, context=request.context )
# 6. Apply diversity and business rules final_recommendations = apply_diversity_and_rules( products=ranked_products, num=request.num_recommendations, user_id=request.user_id )
latency = (time.time() - start_time) * 1000
return RecommendationResponse( user_id=request.user_id, recommendations=final_recommendations, model_version="v2.3.1", latency_ms=latency )
def prepare_model_input(user_features, candidate_ids, context): """Prepare input for model inference""" # This would prepare the feature matrix # Shape: (batch_size, num_features) pass
def model_inference(model_stub, model_input): """Call TensorFlow Serving for inference""" # gRPC call to TensorFlow Serving pass
def rerank_products(candidate_ids, scores, user_features, context): """Rerank products with business logic""" # Combine ML scores with business rules # e.g., inventory availability, margin, promotions pass
def apply_diversity_and_rules(products, num, user_id): """Apply diversity and business rules""" # Ensure category diversity # Filter out already purchased items # Apply A/B test allocations pass
# Health check@app.get("/health")async def health(): """Health check endpoint""" return {"status": "healthy"}Batch Model Training
Spark + XGBoost Training
# Daily model trainingfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, window, lag, avg as _avgfrom pyspark.sql.window import Windowfrom xgboost.spark import SparkXGBClassifierimport mlflow
# Initialize Sparkspark = SparkSession.builder \ .appName("RecommendationModelTraining") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate()
# Load training datadef load_training_data(days: int = 30): """Load training data from data lake"""
# User features user_features = spark.read.format("delta") \ .load("s3://lakehouse/gold/user_features") \ .filter(col("feature_date") >= current_date() - expr(f"INTERVAL {days} DAYS"))
# Product features product_features = spark.read.format("delta") \ .load("s3://lakehouse/gold/product_features")
# Interactions (labels) interactions = spark.read.format("delta") \ .load("s3://lakehouse/silver/interactions") \ .filter(col("timestamp") >= current_timestamp() - expr(f"INTERVAL {days} DAYS"))
# Join features training_data = interactions \ .join(user_features, "user_id", "left") \ .join(product_features, "product_id", "left") \ .filter(col("label").isNotNull()) # Positive/negative examples
return training_data
# Feature engineeringdef engineer_features(df): """Create model features"""
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
df = df \ .withColumn("prev_click_count", lag("click_count").over(window_spec)) \ .withColumn("user_click_avg", _avg("click_count").over(window_spec)) \ .withColumn("recency_days", datediff(current_timestamp(), col("last_event_time")))
return df
# Train modeldef train_model(): """Train recommendation model"""
with mlflow.start_run():
# Load data training_data = load_training_data(days=30) training_data = engineer_features(training_data)
# Split train/test train_df, test_df = training_data.randomSplit([0.8, 0.2], seed=42)
# Define features feature_cols = [ "user_total_events", "user_clicks", "user_purchases", "user_purchase_rate", "product_popularity_score", "product_conversion_rate", "recency_days" ]
# Train XGBoost xgb = SparkXGBClassifier( max_depth=8, learning_rate=0.1, n_estimators=200, subsample=0.8, colsample_bytree=0.8, labelCol="label", featuresCol="features" )
# Create features vector from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=feature_cols, outputCol="features") train_df = assembler.transform(train_df)
# Train model = xgb.fit(train_df)
# Evaluate from pyspark.ml.evaluation import BinaryClassificationEvaluator test_df = assembler.transform(test_df) predictions = model.transform(test_df)
evaluator = BinaryClassificationEvaluator( labelCol="label", rawPredictionCol="rawPrediction" )
auc = evaluator.evaluate(predictions)
mlflow.log_metric("auc", auc)
# Save model model_path = "s3://models/recommendation/v2.3.1" model.write().overwrite().save(model_path)
mlflow.log_artifact(model_path)
print(f"Model trained with AUC: {auc:.4f}")
return model
# Schedule with Airflowfrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetime, timedelta
default_args = { 'owner': 'ml-team', 'start_date': datetime(2025, 1, 1), 'depends_on_past': False,}
dag = DAG( 'recommendation_model_training', default_args=default_args, schedule_interval='0 2 * * *', # Daily at 2 AM catchup=False)
train_task = PythonOperator( task_id='train_model', python_callable=train_model, dag=dag)Cost Optimization
Storage Costs
| Tier | Data | Size | Cost | Optimization |
|---|---|---|---|---|
| Hot | User features | 50TB | $1,150/month | Redis cluster |
| Warm | Recent events | 100TB | $2,300/month | S3 Standard |
| Cold | Historical | 500TB | $3,450/month | S3 IA |
| Archive | Training data | 1PB | $1,500/month | Glacier |
| Total | $8,400/month |
Optimization:
- Feature TTL: 30 days for user features
- Event lifecycle: 7 days → IA, 90 days → Glacier
- Compression: ZSTD-19 (~40% savings)
- Delta Lake compaction: Weekly
Compute Costs
| Component | Compute | Cost | Optimization |
|---|---|---|---|
| Flink Cluster | 100 nodes (spot) | $43,200/month | 70% spot savings |
| Spark Training | 200 nodes (spot) | $57,600/month | 70% spot savings |
| Redis Cluster | 50 nodes (on-demand) | $21,600/month | Memory optimized |
| TensorFlow Serving | 20 nodes (auto-scale) | $8,640/month | Request-based |
| Total | $131,040/month |
Optimization:
- Spot instances for Flink/Spark
- Auto-scaling for API servers
- Scheduled training (off-peak hours)
- Right-sized Redis instances
Total Monthly Cost
| Category | Cost | Optimization |
|---|---|---|
| Storage | $8,400 | Lifecycle + compression |
| Compute | $131,040 | Spot + auto-scaling |
| Network | $15,000 | Colocation |
| Total | $154,440/month | |
| Before Optimization | $270,000/month | |
| Savings | 43% |
Failure Modes
Mode 1: Feature Store Unavailable
Mitigation:
- Redis cache with 24-hour TTL
- Local feature caching
- Fallback to popular products
- Multi-region feature store
Mode 2: Model Inference Failure
Mitigation:
- Multiple model versions
- Blue-green deployments
- Canary deployments
- Rule-based fallback
Mode 3: High Traffic Spike
Mitigation:
- Auto-scaling (target 70% utilization)
- Load shedding (degrade for low-priority)
- Caching (pre-compute recommendations) CDN for static content
SLA/SLO Definitions
slas: recommendation_api: latency: p50: "< 50ms" p95: "< 100ms" p99: "< 200ms"
availability: uptime: "99.9%" maintenance: "4 hours/month"
accuracy: ndcg: "> 0.75" click_rate: "> 5%" conversion_lift: "> 10%"
data_pipeline: freshness: user_features: "< 5 minutes" product_features: "< 1 hour" model_updates: "Daily"
completeness: feature_coverage: "> 99%"Key Takeaways
- Low latency: Feature store + model serving < 100ms
- Fresh features: Real-time feature updates via Flink
- Scalability: Handle 1M requests/second
- Personalization: Individual recommendations per user
- Cost optimization: Spot instances for 43% savings
- Fallback strategies: Graceful degradation
- A/B testing: Continuous model improvement
- Monitoring: Comprehensive metrics and alerts
Back to Module 8