Skip to content

Hopsworks Guide

Enterprise Feature Store Platform


Overview

Hopsworks is an enterprise-grade feature store platform that provides data governance, version control, and feature sharing across ML teams. It integrates with major data platforms and provides both open-source and managed offerings.


Hopsworks Architecture

Platform Architecture

Key Components:

  • Feature Store API: Python, Java, Scala clients
  • HopsFS: Distributed file system for storage
  • Feature Registry: Schema, metadata, lineage
  • Online Serving: MySQL/Redis for low-latency inference
  • Offline Storage: HDFS/S3/GCS for historical data

Hopsworks Installation

Hopsworks Cloud

Terminal window
# Hopsworks Cloud (managed service)
# Sign up: https://hopsworks.ai/
# Create cluster
# - Region: us-east-1, eu-west-1, ap-southeast-1
# - Instance type: m5.large, m5.xlarge
# - Storage: 100GB, 1TB, 10TB
# Connect to cluster
import hopsworks
# Connect to Hopsworks cloud
project = hopsworks.login(
api_key="your-api-key",
project_name="my_project"
)

Self-Hosted (Docker)

Terminal window
# Self-hosted Hopsworks with Docker
# Clone repository
git clone https://github.com/logicalclocks/hopsworks.git
cd hopsworks
# Start Hopsworks
docker-compose up -d
# Access Hopsworks UI
# http://localhost:8080
# Default credentials:
# Username: admin
# Password: admin

Self-Hosted (Kubernetes)

# Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: hopsworks
spec:
replicas: 3
selector:
matchLabels:
app: hopsworks
template:
metadata:
labels:
app: hopsworks
spec:
containers:
- name: hopsworks
image: logicalclocks/hopsworks:latest
ports:
- containerPort: 8080
env:
- name: MYSQL_ROOT_PASSWORD
value: "hopsworks"
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"

Hopsworks Operations

Connection

# Connect to Hopsworks
import hopsworks
# 1. Connect with API key
project = hopsworks.login(
api_key="your-api-key",
project_name="my_project"
)
# 2. Get feature store
fs = project.get_feature_store()
# 3. Get feature store handle
from hsfs.feature_store import FeatureStore
fs = FeatureStore(
name="feature_store_name",
project_id=1
)

Feature Group Creation

# Create feature group
from hsfs.feature_group import FeatureGroup
from hsfs.engine.spark import SparkEngine
# 1. Define feature group schema
driver_stats_fg = fs.create_feature_group(
name="driver_stats",
version=1,
description="Driver statistics",
primary_key=["driver_id"],
event_time="event_timestamp",
online_enabled=True, # Enable online serving
statistics_config={"enabled": True, "histograms": True, "correlations": True},
)
# 2. Insert data (from Spark DataFrame)
driver_stats_df = spark.createDataFrame([
(1, 0.7, 100, "2025-01-27 10:00:00"),
(2, 0.6, 150, "2025-01-27 10:00:00"),
(3, 0.8, 200, "2025-01-27 10:00:00"),
], ["driver_id", "conv_rate", "avg_daily_trips", "event_timestamp"])
driver_stats_fg.insert(driver_stats_df)
# 3. Insert data (from Pandas DataFrame)
import pandas as pd
driver_stats_df = pd.DataFrame({
'driver_id': [1, 2, 3, 4, 5],
'conv_rate': [0.7, 0.6, 0.8, 0.5, 0.9],
'avg_daily_trips': [100, 150, 200, 80, 250],
'event_timestamp': pd.to_datetime('2025-01-27 10:00:00'),
})
driver_stats_fg.insert(driver_stats_df)
# 4. Enable online serving
driver_stats_fg.save(version=1, online_enabled=True)
# 5. Get feature group
driver_stats_fg = fs.get_feature_group(
name="driver_stats",
version=1
)

Feature Group Metadata

# Feature group metadata
# 1. Get schema
schema = driver_stats_fg.schema
print(schema)
# 2. Get statistics
stats = driver_stats_fg.get_statistics()
print(stats)
# 3. Get feature group info
print(f"Name: {driver_stats_fg.name}")
print(f"Version: {driver_stats_fg.version}")
print(f"Description: {driver_stats_fg.description}")
print(f"Primary key: {driver_stats_fg.primary_key}")
print(f"Event time: {driver_stats_fg.event_time}")
print(f"Online enabled: {driver_stats_fg.online_enabled}")
# 4. Get feature group tags
tags = driver_stats_fg.get_tags()
print(tags)
# 5. Add feature group tags
driver_stats_fg.add_tag("domain", "mobility")
driver_stats_fg.add_tag("team", "data_science")

Hopsworks Feature Retrieval

Online Retrieval (Inference)

# Retrieve features for online inference
# 1. Get single feature
feature_vector = fs.get_feature_vector({
"driver_stats": ["conv_rate", "avg_daily_trips"],
}, entity_value="1")
print(feature_vector)
# 2. Get multiple features
feature_vector = fs.get_feature_vector({
"driver_stats": ["conv_rate", "avg_daily_trips"],
"driver_demographics": ["age", "gender"],
}, entity_value="1")
print(feature_vector)
# 3. Get features for multiple entities
feature_vectors = fs.get_feature_vectors(
feature_group_name="driver_stats",
feature_names=["conv_rate", "avg_daily_trips"],
entity_values=["1", "2", "3"]
)
print(feature_vectors)
# 4. Batch retrieval (for inference server)
feature_df = fs.get_feature_group("driver_stats", 1).read()
# Filter by entity
filtered_df = feature_df.filter(feature_df.driver_id.isin([1, 2, 3]))
# Convert to Pandas
filtered_df_pd = filtered_df.toPandas()

Historical Retrieval (Training)

# Retrieve historical features for training
from hsfs.training_dataset import TrainingDataset
# 1. Create training dataset
td = fs.create_training_dataset(
name="driver_training_dataset",
version=1,
description="Training dataset for driver prediction model",
label=["target"],
features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"],
)
# 2. Insert training data
from pyspark.sql import functions as F
# Get historical data
driver_stats_fg = fs.get_feature_group("driver_stats", 1)
historical_df = driver_stats_fg.read()
# Add label (example)
historical_df_with_label = historical_df.withColumn(
"target",
F.when(F.col("conv_rate") > 0.7, 1).otherwise(0)
)
# Save training dataset
td.save(historical_df_with_label)
# 3. Get training dataset
td = fs.get_training_dataset("driver_training_dataset", 1)
training_df = td.read()
# 4. Use in training
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# Convert to Pandas
training_pd = training_df.toPandas()
# Prepare features and target
X = training_pd[["conv_rate", "avg_daily_trips"]]
y = training_pd["target"]
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Train model
model = RandomForestClassifier()
model.fit(X_train, y_train)
# Evaluate
score = model.score(X_test, y_test)
print(f"Accuracy: {score:.4f}")
# 5. Save model to Hopsworks
import joblib
model_path = "Models/driver_model.pkl"
joblib.dump(model, model_path)
# Upload to Hopsworks
project = hopsworks.login(api_key="your-api-key")
model_registry = project.get_model_registry()
model = model_registry.python.create_model(
name="driver_model",
version=1,
description="Driver prediction model"
)
model.save(model_path)

Hopsworks Advanced Features

Feature Transformations

# On-demand feature transformations
from hsfs.transformation_function import TransformationFunction
# 1. Define transformation function
class ConversionRateTransformation(TransformationFunction):
"""
Transform conversion rate to log scale
"""
def transform(self, df):
from pyspark.sql import functions as F
return df.withColumn("conv_rate_log", F.log(F.col("conv_rate") + 1))
# 2. Register transformation
transformation = fs.create_transformation_function(
name="conv_rate_log",
transformation_func=ConversionRateTransformation()
)
# 3. Apply transformation
transformed_df = transformation.transform(driver_stats_df)
# 4. Create feature group with transformation
driver_stats_transformed_fg = fs.create_feature_group(
name="driver_stats_transformed",
version=1,
primary_key=["driver_id"],
event_time="event_timestamp",
online_enabled=True,
)
driver_stats_transformed_fg.insert(transformed_df)

Feature Store Integration

# Integrate with external data sources
# 1. Read from Kafka
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Hopsworks-Kafka-Integration") \
.getOrCreate()
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "driver_stats") \
.load()
# Parse JSON
from pyspark.sql import functions as F
parsed_df = kafka_df \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Write to feature group (streaming)
query = parsed_df \
.writeStream \
.foreachBatch(lambda batch_df, batch_id: driver_stats_fg.insert(batch_df)) \
.start()
# 2. Read from data warehouse
# Snowflake
snowflake_df = spark.read \
.format("snowflake") \
.option("sfUrl", "account.snowflakecomputing.com") \
.option("sfUser", "username") \
.option("sfPassword", "password") \
.option("sfDatabase", "database") \
.option("sfSchema", "schema") \
.option("dbtable", "table") \
.load()
# Insert into feature group
driver_stats_fg.insert(snowflake_df)

Hopsworks Performance

Optimization Strategies

# Performance optimization
# 1. Use online serving for low-latency
# MySQL: < 50ms
# Redis: < 10ms
# 2. Use batch retrieval
feature_vectors = fs.get_feature_vectors(
feature_group_name="driver_stats",
feature_names=["conv_rate", "avg_daily_trips"],
entity_values=["1", "2", "3"] # Batch of 3
)
# 3. Use caching
# Enable feature group caching
driver_stats_fg.enable_caching()
# 4. Use partitioning
# Partition by date for faster retrieval
driver_stats_fg = fs.create_feature_group(
name="driver_stats_partitioned",
version=1,
primary_key=["driver_id"],
event_time="event_timestamp",
partition_by=["event_date"],
online_enabled=True,
)
# 5. Use incremental updates
# Only update new data
driver_stats_fg.insert(new_data_df, write_options={"start_index": "old_data_count"})

Hopsworks Cost Optimization

Cost Strategies

# Cost optimization
# 1. Use appropriate storage
# HDFS: Free (self-hosted)
# S3: $0.023 per GB
# GCS: $0.020 per GB
# 2. Use online serving wisely
# Enable only for frequently accessed features
driver_stats_fg = fs.create_feature_group(
name="driver_stats",
version=1,
primary_key=["driver_id"],
event_time="event_timestamp",
online_enabled=True, # Only enable if needed
)
# 3. Use compression
# Enable compression for offline storage
driver_stats_fg.save(compression=True)
# 4. Clean up old data
# Delete old feature groups
fs.delete_feature_group("old_feature_group", version=1)
# 5. Use tiered storage
# Hot data: Online store (MySQL/Redis)
# Warm data: Offline store (HDFS/S3)
# Cold data: Archive (Glacier/Azure Archive)

Hopsworks Monitoring

Metrics

# Monitor feature store
# 1. Get feature group statistics
stats = driver_stats_fg.get_statistics()
print(stats)
# 2. Get feature distribution
distribution = driver_stats_fg.get_feature_distribution("conv_rate")
print(distribution)
# 3. Get feature correlation
correlation = driver_stats_fg.get_correlation_matrix()
print(correlation)
# 4. Monitor online serving latency
import time
start = time.time()
feature_vector = fs.get_feature_vector({
"driver_stats": ["conv_rate"],
}, entity_value="1")
latency = time.time() - start
print(f"Online serving latency: {latency * 1000:.2f}ms")
# 5. Monitor ingestion throughput
start = time.time()
driver_stats_fg.insert(driver_stats_df)
throughput = time.time() - start
print(f"Ingestion time: {throughput:.2f}s")

Hopsworks Security

Access Control

# Access control
# 1. Get feature group permissions
permissions = driver_stats_fg.get_permissions()
print(permissions)
# 2. Grant access
driver_stats_fg.grant_access(
user="data_scientist",
permission="read"
)
# 3. Revoke access
driver_stats_fg.revoke_access(
user="data_scientist",
permission="write"
)
# 4. Enable encryption
# Hopsworks automatically encrypts data at rest and in transit

Hopsworks Best Practices

DO

# 1. Use version control
# Version feature groups and training datasets
# 2. Use meaningful names
# driver_stats (not fg1)
# 3. Use statistics
# Enable statistics for all feature groups
# 4. Use online serving wisely
# Only enable for frequently accessed features
# 5. Monitor data quality
# Check for drift, anomalies

DON’T

# 1. Don't ignore schema
# Define all features with types
# 2. Don't skip versioning
# Essential for reproducibility
# 3. Don't forget event time
# Required for point-in-time correctness
# 4. Don't ignore monitoring
# Essential for operations
# 5. Don't hardcode values
# Use feature store

Hopsworks vs. Alternatives

FeatureHopsworksFeastTectonVertex AI
Open SourceYes (limited)YesNoNo
ManagedYesNo (self-hosted)YesYes
Online StoreMySQL, RedisRedis, DynamoDBRedis, DynamoDBRedis
Offline StoreHDFS, S3, GCSParquet, BigQuerySnowflake, BigQueryBigQuery
Feature RegistryYesYesYesYes
PricingUsage-basedFree (self-hosted)CustomUsage-based
Best ForEnterprise, governanceOpen-source, customEnterprise, managedGCP, Vertex AI

Key Takeaways

  1. Enterprise platform: Managed feature store with governance
  2. Feature groups: Version-controlled feature definitions
  3. Online serving: Low-latency inference (MySQL/Redis)
  4. Offline storage: Historical training data (HDFS/S3)
  5. Feature registry: Schema, metadata, lineage
  6. Transformations: On-demand feature transformations
  7. Integration: Kafka, data warehouse, Spark
  8. Use When: Enterprise, feature governance, managed service

Back to Module 5