Feast Guide
Feature Store for Machine Learning
Overview
Feast (Feature Store) is an open-source feature store for machine learning. It provides a unified interface for managing, serving, and storing features for ML models, enabling feature reuse, consistency, and version control.
Feast Architecture
Feature Store Architecture
Key Components:
- Feature Views: Feature definitions and schema
- Data Sources: Batch (data warehouse), streaming (Kafka), request (inference time)
- Offline Store: Parquet files for training data generation
- Online Store: Redis/DynamoDB for low-latency inference
- Feature Registry: Version control and metadata
Feast Installation
Installation
# Install Feast
pip install feast
# Install with specific dependenciespip install 'feast[redis]' # Redis online storepip install 'feast[dynamodb]' # DynamoDB online storepip install 'feast[gcp]' # GCP dependenciespip install 'feast[aws]' # AWS dependencies
# Verify installationfeast versionInitialize Feature Store
# Initialize feature store
feast init my_feature_repo
cd my_feature_repo
# Directory structure:# my_feature_repo/# ├── feature_repo/# │ ├── __init__.py# │ ├── example.py # Feature definitions# ├── data/ # Offline store (Parquet)# └── feast.yaml # ConfigurationConfiguration
project: my_feature_reporegistry: path: data/registry.dbprovider: localonline_store: type: redis connection_string: "localhost:6379"offline_store: type: file path: dataFeast Operations
Feature Definition
from datetime import timedeltafrom feast import ( Entity, Feature, FeatureView, Field, FileSource, RequestSource)from feast.types import Float32, Int64, Stringfrom feast.data_source import PushSource
# 1. Define entity (primary key)driver = Entity( name="driver_id", join_keys=["driver_id"], description="Driver ID",)
# 2. Define batch source (data warehouse)driver_stats_fv = FeatureView( name="driver_stats_fv", entities=[driver], ttl=timedelta(days=1), schema=[ Field(name="conv_rate", dtype=Float32), Field(name="avg_daily_trips", dtype=Int64), Field(name="created_timestamp", dtype=Int64), ], source=FileSource( path="data/driver_stats.parquet", timestamp_field="created_timestamp", created_timestamp_column="created_timestamp", ),)
# 3. Define streaming source (Kafka)from kafka import KafkaSource
driver_stats_stream = FeatureView( name="driver_stats_stream", entities=[driver], ttl=timedelta(hours=1), schema=[ Field(name="conv_rate", dtype=Float32), Field(name="avg_daily_trips", dtype=Int64), ], source=KafkaSource( bootstrap_servers="localhost:9092", topic="driver_stats", timestamp_field="event_timestamp", batch_source=FileSource( path="data/driver_stats.parquet", timestamp_field="event_timestamp", ), ),)
# 4. Define request source (inference time)request_features = RequestSource( name="request_features", schema=[ Field(name="trip_distance", dtype=Float32), Field(name="trip_duration", dtype=Float32), ],)
# 5. Define on-demand feature view (transformation)from feast import OnDemandFeatureView
driver_age_input = RequestSource( name="driver_age_input", schema=[ Field(name="val_to_add", dtype=Int64), ],)
driver_age_fv = OnDemandFeatureView( name="driver_age_fv", sources=[driver_stats_fv, driver_age_input], schema=[ Field(name="conv_rate_plus_val", dtype=Float32), ],)
from pyspark.sql import functions as F
@df_transformation( sources=[driver_stats_fv, driver_age_input], schema=[Field(name="conv_rate_plus_val", dtype=Float32)],)def transform_driver_age(source_dicts: Dict[str, DataFrame]) -> DataFrame: """ Transform driver stats by adding a value """ driver_stats_df = source_dicts[driver_stats_fv.name] driver_age_input_df = source_dicts[driver_age_input.name]
return driver_stats_df.join( driver_age_input_df, on="driver_id", how="left" ).select( "driver_id", "event_timestamp", (F.col("conv_rate") + F.col("val_to_add")).alias( "conv_rate_plus_val" ), )Feature Registration
# Register features
cd feature_repofeast apply
# Output:# Registered entity driver_id# Registered feature view driver_stats_fv# Registered feature view driver_stats_stream# Registered on-demand feature view driver_age_fvData Ingestion
# Ingest data into feature store
from feast import FeatureStoreimport pandas as pd
# Initialize feature storestore = FeatureStore(repo_path="feature_repo")
# 1. Ingest batch datadriver_stats_df = pd.DataFrame({ 'driver_id': [1, 2, 3, 4, 5], 'conv_rate': [0.7, 0.6, 0.8, 0.5, 0.9], 'avg_daily_trips': [100, 150, 200, 80, 250], 'created_timestamp': pd.Timestamp('2025-01-27'),})
store ingest( driver_stats_fv, driver_stats_df, # Infers Infra for offline and online stores)
# 2. Ingest streaming datafrom kafka import KafkaProducerimport json
producer = KafkaProducer( bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# Push to Kafkaevent = { 'driver_id': 1, 'conv_rate': 0.75, 'avg_daily_trips': 110, 'event_timestamp': int(pd.Timestamp('2025-01-27').timestamp()),}
producer.send('driver_stats', value=event)producer.flush()
# 3. Ingest from data warehouse# Feast automatically ingests from batch sourcesFeast Feature Retrieval
Online Retrieval (Inference)
# Retrieve features for online inference
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo")
# 1. Get latest features (online store)feature_vector = store.get_online_features( features=[ "driver_stats_fv:conv_rate", "driver_stats_fv:avg_daily_trips", ], entity_rows=[ {"driver_id": 1}, {"driver_id": 2}, {"driver_id": 3}, ],)
# Output:# driver_id conv_rate avg_daily_trips# 0 1 0.70 100# 1 2 0.60 150# 2 3 0.80 200
# Convert to DataFramedf = feature_vector.to_df()print(df)
# 2. Get historical features (offline store)from datetime import datetime, timedelta
entity_df = pd.DataFrame({ 'driver_id': [1, 2, 3], 'event_timestamp': [ datetime(2025, 1, 27, 12, 0), datetime(2025, 1, 27, 12, 0), datetime(2025, 1, 27, 12, 0), ],)
training_data = store.get_historical_features( features=[ "driver_stats_fv:conv_rate", "driver_stats_fv:avg_daily_trips", ], entity_df=entity_df,).to_df()
print(training_data)Historical Retrieval (Training)
# Retrieve historical features for training
from feast import FeatureStoreimport pandas as pd
store = FeatureStore(repo_path="feature_repo")
# 1. Define entity DataFrameentity_df = pd.DataFrame({ 'driver_id': [1, 2, 3, 4, 5], 'event_timestamp': [ pd.Timestamp('2025-01-27 10:00:00'), pd.Timestamp('2025-01-27 10:00:00'), pd.Timestamp('2025-01-27 10:00:00'), pd.Timestamp('2025-01-27 10:00:00'), pd.Timestamp('2025-01-27 10:00:00'), ],})
# 2. Get historical features (point-in-time correct)training_df = store.get_historical_features( features=[ "driver_stats_fv:conv_rate", "driver_stats_fv:avg_daily_trips", ], entity_df=entity_df,).to_df()
# 3. Save training datatraining_df.to_csv('data/training_data.csv', index=False)
# 4. Use in trainingfrom sklearn.ensemble import RandomForestRegressorfrom sklearn.model_selection import train_test_split
# Prepare features and targetX = training_df[['conv_rate', 'avg_daily_trips']]y = training_df['target'] # Assume target column exists
# Train/test splitX_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42)
# Train modelmodel = RandomForestRegressor()model.fit(X_train, y_train)
# Evaluatescore = model.score(X_test, y_test)print(f"R² score: {score:.4f}")Feast Advanced Features
Feature Transformations
# On-demand feature transformations
from feast import OnDemandFeatureView, RequestSourcefrom feast.data_source import RequestSourcefrom feast.types import Float32
# 1. Define request sourceval_to_add_request = RequestSource( name="val_to_add", schema=[ Field(name="val_to_add", dtype=Float32), ],)
# 2. Define on-demand feature view@pd_transformation( sources=[driver_stats_fv, val_to_add_request], schema=[ Field(name="conv_rate_plus_val", dtype=Float32), ],)def transform_conv_rate(source_dicts: Dict[str, pd.DataFrame]) -> pd.DataFrame: """ Transform conv_rate by adding a value """ driver_stats_df = source_dicts[driver_stats_fv.name] val_to_add_df = source_dicts[val_to_add_request.name]
return pd.DataFrame({ "driver_id": driver_stats_df["driver_id"], "event_timestamp": driver_stats_df["event_timestamp"], "conv_rate_plus_val": driver_stats_df["conv_rate"] + val_to_add_df["val_to_add"], })
conv_rate_plus_odfv = OnDemandFeatureView( name="conv_rate_plus_odfv", sources=[driver_stats_fv, val_to_add_request], schema=[Field(name="conv_rate_plus_val", dtype=Float32)], pandas_transformation=transform_conv_rate,)
# 3. Use on-demand featurefeature_vector = store.get_online_features( features=[ "conv_rate_plus_odfv:conv_rate_plus_val", ], entity_rows=[ { "driver_id": 1, "val_to_add": 0.1, } ],)Feature Services
# Feature services for serving
from feast import FeatureService
# Define feature service (group of features)driver_activity_v1 = FeatureService( name="driver_activity_v1", features=[ driver_stats_fv[["conv_rate", "avg_daily_trips"]], ],)
driver_activity_v2 = FeatureService( name="driver_activity_v2", features=[ driver_stats_fv[["conv_rate", "avg_daily_trips"]], conv_rate_plus_odfv[["conv_rate_plus_val"]], ],)
# Use feature servicefeature_vector = store.get_online_features( feature_service=driver_activity_v1, entity_rows=[ {"driver_id": 1}, {"driver_id": 2}, ],)Feast Performance
Optimization Strategies
# Performance optimization
# 1. Use appropriate TTLdriver_stats_fv = FeatureView( name="driver_stats_fv", entities=[driver], ttl=timedelta(hours=1), # Shorter TTL = faster lookups # ...)
# 2. Use batch retrievalfeature_vectors = store.get_online_features( features=[ "driver_stats_fv:conv_rate", "driver_stats_fv:avg_daily_trips", ], entity_rows=[ {"driver_id": i} for i in range(1, 1000) # Batch of 1000 ],)
# 3. Use online store for low-latency# Redis: < 10ms# DynamoDB: < 50ms
# 4. Use materialization# Pre-compute features for faster retrievalfeast materialize-incremental 2025-01-27T00:00:00
# 5. Use feature services# Group features for optimized retrievalFeast Cost Optimization
Cost Strategies
# Cost optimization
# 1. Use Redis for online store (cheaper than DynamoDB)# Redis: Self-hosted (free)# DynamoDB: $0.20 per million read requests
# 2. Use appropriate TTL# Shorter TTL = less storage = lower cost
# 3. Use file-based offline store# Parquet files: Free (S3, GCS)# BigQuery: $5 per TB
# 4. Clean up old featuresfeast teardown # Remove all features
# 5. Use incremental materialization# Only materialize new datafeast materialize-incremental 2025-01-27T00:00:00Feast Monitoring
Metrics
# Monitor feature store
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo")
# 1. Get feature statisticsstats = store.get_online_features( features=["driver_stats_fv:conv_rate"], entity_rows=[{"driver_id": i} for i in range(1, 1000)],)
print(stats.describe())
# 2. Monitor ingestion latencyimport time
start = time.time()store.ingest(driver_stats_fv, driver_stats_df)ingestion_latency = time.time() - start
print(f"Ingestion latency: {ingestion_latency:.2f}s")
# 3. Monitor retrieval latencystart = time.time()feature_vector = store.get_online_features( features=["driver_stats_fv:conv_rate"], entity_rows=[{"driver_id": i} for i in range(1, 1000)],)retrieval_latency = time.time() - start
print(f"Retrieval latency: {retrieval_latency:.2f}s")Feast Best Practices
DO
# 1. Use version control# Git for feature definitions
# 2. Use meaningful names# driver_stats_fv (not fv1)
# 3. Use appropriate TTL# Balance freshness and cost
# 4. Use feature services# Group features for serving
# 5. Monitor latency# Essential for inferenceDON’T
# 1. Don't ignore schema# Define all fields
# 2. Don't skip TTL# Essential for data freshness
# 3. Don't forget entity keys# Primary keys are required
# 4. Don't ignore monitoring# Essential for operations
# 5. Don't hardcode values# Use feature storeFeast vs. Alternatives
| Feature | Feast | Hopsworks | Tecton | Vertex AI |
|---|---|---|---|---|
| Open Source | Yes | Yes (limited) | No | No |
| Online Store | Redis, DynamoDB | Redis, MySQL | Redis, DynamoDB | Redis |
| Offline Store | Parquet, BigQuery | Hive, BigQuery | Snowflake, BigQuery | BigQuery |
| Streaming | Kafka, Kinesis | Kafka | Kafka | Pub/Sub |
| Pricing | Free (self-hosted) | Free tier | Custom | Usage-based |
| Best For | Open-source, custom | Enterprise, feature registry | Enterprise, managed | GCP, Vertex AI |
Key Takeaways
- Open source: Free, self-hosted feature store
- Feature views: Define features with schema
- Data sources: Batch, streaming, request
- Online store: Low-latency inference (Redis)
- Offline store: Historical training data (Parquet)
- Feature registry: Version control and metadata
- Transformations: On-demand feature transformations
- Use When: Open-source, custom feature store, MLOps
Back to Module 5