Hopsworks Guide
Enterprise Feature Store Platform
Overview
Hopsworks is an enterprise-grade feature store platform that provides data governance, version control, and feature sharing across ML teams. It integrates with major data platforms and provides both open-source and managed offerings.
Hopsworks Architecture
Platform Architecture
Key Components:
- Feature Store API: Python, Java, Scala clients
- HopsFS: Distributed file system for storage
- Feature Registry: Schema, metadata, lineage
- Online Serving: MySQL/Redis for low-latency inference
- Offline Storage: HDFS/S3/GCS for historical data
Hopsworks Installation
Hopsworks Cloud
# Hopsworks Cloud (managed service)
# Sign up: https://hopsworks.ai/
# Create cluster# - Region: us-east-1, eu-west-1, ap-southeast-1# - Instance type: m5.large, m5.xlarge# - Storage: 100GB, 1TB, 10TB
# Connect to clusterimport hopsworks
# Connect to Hopsworks cloudproject = hopsworks.login( api_key="your-api-key", project_name="my_project")Self-Hosted (Docker)
# Self-hosted Hopsworks with Docker
# Clone repositorygit clone https://github.com/logicalclocks/hopsworks.gitcd hopsworks
# Start Hopsworksdocker-compose up -d
# Access Hopsworks UI# http://localhost:8080
# Default credentials:# Username: admin# Password: adminSelf-Hosted (Kubernetes)
# Kubernetes deployment
apiVersion: apps/v1kind: Deploymentmetadata: name: hopsworksspec: replicas: 3 selector: matchLabels: app: hopsworks template: metadata: labels: app: hopsworks spec: containers: - name: hopsworks image: logicalclocks/hopsworks:latest ports: - containerPort: 8080 env: - name: MYSQL_ROOT_PASSWORD value: "hopsworks" resources: requests: memory: "4Gi" cpu: "2" limits: memory: "8Gi" cpu: "4"Hopsworks Operations
Connection
# Connect to Hopsworks
import hopsworks
# 1. Connect with API keyproject = hopsworks.login( api_key="your-api-key", project_name="my_project")
# 2. Get feature storefs = project.get_feature_store()
# 3. Get feature store handlefrom hsfs.feature_store import FeatureStorefs = FeatureStore( name="feature_store_name", project_id=1)Feature Group Creation
# Create feature group
from hsfs.feature_group import FeatureGroupfrom hsfs.engine.spark import SparkEngine
# 1. Define feature group schemadriver_stats_fg = fs.create_feature_group( name="driver_stats", version=1, description="Driver statistics", primary_key=["driver_id"], event_time="event_timestamp", online_enabled=True, # Enable online serving statistics_config={"enabled": True, "histograms": True, "correlations": True},)
# 2. Insert data (from Spark DataFrame)driver_stats_df = spark.createDataFrame([ (1, 0.7, 100, "2025-01-27 10:00:00"), (2, 0.6, 150, "2025-01-27 10:00:00"), (3, 0.8, 200, "2025-01-27 10:00:00"),], ["driver_id", "conv_rate", "avg_daily_trips", "event_timestamp"])
driver_stats_fg.insert(driver_stats_df)
# 3. Insert data (from Pandas DataFrame)import pandas as pd
driver_stats_df = pd.DataFrame({ 'driver_id': [1, 2, 3, 4, 5], 'conv_rate': [0.7, 0.6, 0.8, 0.5, 0.9], 'avg_daily_trips': [100, 150, 200, 80, 250], 'event_timestamp': pd.to_datetime('2025-01-27 10:00:00'),})
driver_stats_fg.insert(driver_stats_df)
# 4. Enable online servingdriver_stats_fg.save(version=1, online_enabled=True)
# 5. Get feature groupdriver_stats_fg = fs.get_feature_group( name="driver_stats", version=1)Feature Group Metadata
# Feature group metadata
# 1. Get schemaschema = driver_stats_fg.schemaprint(schema)
# 2. Get statisticsstats = driver_stats_fg.get_statistics()print(stats)
# 3. Get feature group infoprint(f"Name: {driver_stats_fg.name}")print(f"Version: {driver_stats_fg.version}")print(f"Description: {driver_stats_fg.description}")print(f"Primary key: {driver_stats_fg.primary_key}")print(f"Event time: {driver_stats_fg.event_time}")print(f"Online enabled: {driver_stats_fg.online_enabled}")
# 4. Get feature group tagstags = driver_stats_fg.get_tags()print(tags)
# 5. Add feature group tagsdriver_stats_fg.add_tag("domain", "mobility")driver_stats_fg.add_tag("team", "data_science")Hopsworks Feature Retrieval
Online Retrieval (Inference)
# Retrieve features for online inference
# 1. Get single featurefeature_vector = fs.get_feature_vector({ "driver_stats": ["conv_rate", "avg_daily_trips"],}, entity_value="1")
print(feature_vector)
# 2. Get multiple featuresfeature_vector = fs.get_feature_vector({ "driver_stats": ["conv_rate", "avg_daily_trips"], "driver_demographics": ["age", "gender"],}, entity_value="1")
print(feature_vector)
# 3. Get features for multiple entitiesfeature_vectors = fs.get_feature_vectors( feature_group_name="driver_stats", feature_names=["conv_rate", "avg_daily_trips"], entity_values=["1", "2", "3"])
print(feature_vectors)
# 4. Batch retrieval (for inference server)feature_df = fs.get_feature_group("driver_stats", 1).read()
# Filter by entityfiltered_df = feature_df.filter(feature_df.driver_id.isin([1, 2, 3]))
# Convert to Pandasfiltered_df_pd = filtered_df.toPandas()Historical Retrieval (Training)
# Retrieve historical features for training
from hsfs.training_dataset import TrainingDataset
# 1. Create training datasettd = fs.create_training_dataset( name="driver_training_dataset", version=1, description="Training dataset for driver prediction model", label=["target"], features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"],)
# 2. Insert training datafrom pyspark.sql import functions as F
# Get historical datadriver_stats_fg = fs.get_feature_group("driver_stats", 1)historical_df = driver_stats_fg.read()
# Add label (example)historical_df_with_label = historical_df.withColumn( "target", F.when(F.col("conv_rate") > 0.7, 1).otherwise(0))
# Save training datasettd.save(historical_df_with_label)
# 3. Get training datasettd = fs.get_training_dataset("driver_training_dataset", 1)training_df = td.read()
# 4. Use in trainingimport pandas as pdfrom sklearn.ensemble import RandomForestClassifierfrom sklearn.model_selection import train_test_split
# Convert to Pandastraining_pd = training_df.toPandas()
# Prepare features and targetX = training_pd[["conv_rate", "avg_daily_trips"]]y = training_pd["target"]
# Train/test splitX_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42)
# Train modelmodel = RandomForestClassifier()model.fit(X_train, y_train)
# Evaluatescore = model.score(X_test, y_test)print(f"Accuracy: {score:.4f}")
# 5. Save model to Hopsworksimport joblib
model_path = "Models/driver_model.pkl"joblib.dump(model, model_path)
# Upload to Hopsworksproject = hopsworks.login(api_key="your-api-key")model_registry = project.get_model_registry()
model = model_registry.python.create_model( name="driver_model", version=1, description="Driver prediction model")
model.save(model_path)Hopsworks Advanced Features
Feature Transformations
# On-demand feature transformations
from hsfs.transformation_function import TransformationFunction
# 1. Define transformation functionclass ConversionRateTransformation(TransformationFunction): """ Transform conversion rate to log scale """
def transform(self, df): from pyspark.sql import functions as F return df.withColumn("conv_rate_log", F.log(F.col("conv_rate") + 1))
# 2. Register transformationtransformation = fs.create_transformation_function( name="conv_rate_log", transformation_func=ConversionRateTransformation())
# 3. Apply transformationtransformed_df = transformation.transform(driver_stats_df)
# 4. Create feature group with transformationdriver_stats_transformed_fg = fs.create_feature_group( name="driver_stats_transformed", version=1, primary_key=["driver_id"], event_time="event_timestamp", online_enabled=True,)
driver_stats_transformed_fg.insert(transformed_df)Feature Store Integration
# Integrate with external data sources
# 1. Read from Kafkafrom pyspark.sql import SparkSession
spark = SparkSession.builder \ .appName("Hopsworks-Kafka-Integration") \ .getOrCreate()
kafka_df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "driver_stats") \ .load()
# Parse JSONfrom pyspark.sql import functions as F
parsed_df = kafka_df \ .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \ .select("data.*")
# Write to feature group (streaming)query = parsed_df \ .writeStream \ .foreachBatch(lambda batch_df, batch_id: driver_stats_fg.insert(batch_df)) \ .start()
# 2. Read from data warehouse# Snowflakesnowflake_df = spark.read \ .format("snowflake") \ .option("sfUrl", "account.snowflakecomputing.com") \ .option("sfUser", "username") \ .option("sfPassword", "password") \ .option("sfDatabase", "database") \ .option("sfSchema", "schema") \ .option("dbtable", "table") \ .load()
# Insert into feature groupdriver_stats_fg.insert(snowflake_df)Hopsworks Performance
Optimization Strategies
# Performance optimization
# 1. Use online serving for low-latency# MySQL: < 50ms# Redis: < 10ms
# 2. Use batch retrievalfeature_vectors = fs.get_feature_vectors( feature_group_name="driver_stats", feature_names=["conv_rate", "avg_daily_trips"], entity_values=["1", "2", "3"] # Batch of 3)
# 3. Use caching# Enable feature group cachingdriver_stats_fg.enable_caching()
# 4. Use partitioning# Partition by date for faster retrievaldriver_stats_fg = fs.create_feature_group( name="driver_stats_partitioned", version=1, primary_key=["driver_id"], event_time="event_timestamp", partition_by=["event_date"], online_enabled=True,)
# 5. Use incremental updates# Only update new datadriver_stats_fg.insert(new_data_df, write_options={"start_index": "old_data_count"})Hopsworks Cost Optimization
Cost Strategies
# Cost optimization
# 1. Use appropriate storage# HDFS: Free (self-hosted)# S3: $0.023 per GB# GCS: $0.020 per GB
# 2. Use online serving wisely# Enable only for frequently accessed featuresdriver_stats_fg = fs.create_feature_group( name="driver_stats", version=1, primary_key=["driver_id"], event_time="event_timestamp", online_enabled=True, # Only enable if needed)
# 3. Use compression# Enable compression for offline storagedriver_stats_fg.save(compression=True)
# 4. Clean up old data# Delete old feature groupsfs.delete_feature_group("old_feature_group", version=1)
# 5. Use tiered storage# Hot data: Online store (MySQL/Redis)# Warm data: Offline store (HDFS/S3)# Cold data: Archive (Glacier/Azure Archive)Hopsworks Monitoring
Metrics
# Monitor feature store
# 1. Get feature group statisticsstats = driver_stats_fg.get_statistics()print(stats)
# 2. Get feature distributiondistribution = driver_stats_fg.get_feature_distribution("conv_rate")print(distribution)
# 3. Get feature correlationcorrelation = driver_stats_fg.get_correlation_matrix()print(correlation)
# 4. Monitor online serving latencyimport time
start = time.time()feature_vector = fs.get_feature_vector({ "driver_stats": ["conv_rate"],}, entity_value="1")latency = time.time() - start
print(f"Online serving latency: {latency * 1000:.2f}ms")
# 5. Monitor ingestion throughputstart = time.time()driver_stats_fg.insert(driver_stats_df)throughput = time.time() - start
print(f"Ingestion time: {throughput:.2f}s")Hopsworks Security
Access Control
# Access control
# 1. Get feature group permissionspermissions = driver_stats_fg.get_permissions()print(permissions)
# 2. Grant accessdriver_stats_fg.grant_access( user="data_scientist", permission="read")
# 3. Revoke accessdriver_stats_fg.revoke_access( user="data_scientist", permission="write")
# 4. Enable encryption# Hopsworks automatically encrypts data at rest and in transitHopsworks Best Practices
DO
# 1. Use version control# Version feature groups and training datasets
# 2. Use meaningful names# driver_stats (not fg1)
# 3. Use statistics# Enable statistics for all feature groups
# 4. Use online serving wisely# Only enable for frequently accessed features
# 5. Monitor data quality# Check for drift, anomaliesDON’T
# 1. Don't ignore schema# Define all features with types
# 2. Don't skip versioning# Essential for reproducibility
# 3. Don't forget event time# Required for point-in-time correctness
# 4. Don't ignore monitoring# Essential for operations
# 5. Don't hardcode values# Use feature storeHopsworks vs. Alternatives
| Feature | Hopsworks | Feast | Tecton | Vertex AI |
|---|---|---|---|---|
| Open Source | Yes (limited) | Yes | No | No |
| Managed | Yes | No (self-hosted) | Yes | Yes |
| Online Store | MySQL, Redis | Redis, DynamoDB | Redis, DynamoDB | Redis |
| Offline Store | HDFS, S3, GCS | Parquet, BigQuery | Snowflake, BigQuery | BigQuery |
| Feature Registry | Yes | Yes | Yes | Yes |
| Pricing | Usage-based | Free (self-hosted) | Custom | Usage-based |
| Best For | Enterprise, governance | Open-source, custom | Enterprise, managed | GCP, Vertex AI |
Key Takeaways
- Enterprise platform: Managed feature store with governance
- Feature groups: Version-controlled feature definitions
- Online serving: Low-latency inference (MySQL/Redis)
- Offline storage: Historical training data (HDFS/S3)
- Feature registry: Schema, metadata, lineage
- Transformations: On-demand feature transformations
- Integration: Kafka, data warehouse, Spark
- Use When: Enterprise, feature governance, managed service
Back to Module 5