Case Study - IoT Manufacturing Analytics
Edge-to-Cloud Industrial IoT Platform
Business Context
Company: Global manufacturing company (50 factories, 10K machines)
Challenge: Build real-time monitoring and predictive maintenance platform for manufacturing equipment.
Scale: 100TB/day, 1M sensors, 10K messages/second peak
Requirements
Architecture Design
Technology Selection
Component Selection Rationale
| Component | Technology | Rationale |
|---|---|---|
| Edge Runtime | AWS Greengrass | Local execution, offline support |
| Edge ML | SageMaker Neo | Optimized for edge hardware |
| IoT Hub | AWS IoT Core | Device management, security |
| Stream Processing | Kinesis Data Analytics | SQL-based, low latency |
| Data Lake | S3 + Delta Lake | Cost optimization, time travel |
| Warehouse | Redshift | Spectrum for data lake queries |
| ML Platform | SageMaker | End-to-end ML pipeline |
| Time Series DB | InfluxDB | High write throughput |
Edge Processing
Greengrass Lambda Function
# Greengrass Lambda for edge processingimport jsonimport greengrasssdkfrom threading import Timerimport sqlite3import os
# Initialize Greengrass clientiot_client = greengrasssdk.client('iot-data')
# Local databaseDB_PATH = '/tmp/sensor_data.db'
def init_db(): """Initialize local SQLite database""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor()
cursor.execute(''' CREATE TABLE IF NOT EXISTS sensor_data ( timestamp INTEGER, sensor_id TEXT, machine_id TEXT, factory_id TEXT, metric_type TEXT, value REAL, unit TEXT, uploaded BOOLEAN DEFAULT 0 ) ''')
cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_uploaded ON sensor_data(uploaded) ''')
conn.commit() conn.close()
def save_locally(data): """Save data to local buffer""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor()
cursor.execute(''' INSERT INTO sensor_data (timestamp, sensor_id, machine_id, factory_id, metric_type, value, unit) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( data['timestamp'], data['sensor_id'], data['machine_id'], data['factory_id'], data['metric_type'], data['value'], data['unit'] ))
conn.commit() conn.close()
def check_anomaly(data): """Check for anomalies locally""" machine_id = data['machine_id'] metric_type = data['metric_type'] value = data['value']
# Local thresholds (could be from ML model) thresholds = { 'temperature': {'min': 20, 'max': 100}, 'vibration': {'max': 50}, 'pressure': {'min': 10, 'max': 200} }
if metric_type in thresholds: threshold = thresholds[metric_type]
if 'min' in threshold and value < threshold['min']: return True, f"{metric_type} below minimum"
if 'max' in threshold and value > threshold['max']: return True, f"{metric_type} above maximum"
return False, None
def send_local_alert(machine_id, message): """Send local alert""" alert = { 'machine_id': machine_id, 'factory_id': os.getenv('FACTORY_ID'), 'timestamp': int(time.time()), 'message': message, 'severity': 'WARNING' }
# Publish to local MQTT topic iot_client.publish( topic='factory/local/alerts', payload=json.dumps(alert) )
def upload_buffered_data(): """Upload buffered data to cloud""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor()
# Get unuploaded data cursor.execute(''' SELECT timestamp, sensor_id, machine_id, factory_id, metric_type, value, unit FROM sensor_data WHERE uploaded = 0 LIMIT 100 ''')
rows = cursor.fetchall()
if not rows: conn.close() return
# Prepare batch batch = [] for row in rows: batch.append({ 'timestamp': row[0], 'sensor_id': row[1], 'machine_id': row[2], 'factory_id': row[3], 'metric_type': row[4], 'value': row[5], 'unit': row[6] })
# Upload to cloud try: iot_client.publish( topic='factory/sensor/data', payload=json.dumps(batch) )
# Mark as uploaded ids = [row[0] for row in rows] cursor.execute(f''' UPDATE sensor_data SET uploaded = 1 WHERE timestamp IN ({','.join(map(str, ids))}) ''')
conn.commit()
except Exception as e: print(f"Upload failed: {e}")
conn.close()
# Lambda handlerdef lambda_handler(event, context): """Handle incoming sensor data"""
init_db()
# Process each record for record in event:
# Save locally save_locally(record)
# Check for anomalies is_anomaly, message = check_anomaly(record)
if is_anomaly: # Send local alert send_local_alert(record['machine_id'], message)
# Send to cloud immediately iot_client.publish( topic='factory/anomalies', payload=json.dumps({ 'data': record, 'anomaly': message, 'timestamp': int(time.time()) }) )
# Upload buffered data (every 10 records) upload_buffered_data()
return { 'statusCode': 200, 'body': json.dumps({'status': 'processed'}) }
# Schedule periodic uploaddef schedule_upload(): """Schedule periodic data upload""" upload_buffered_data() Timer(60.0, schedule_upload).start() # Every minute
schedule_upload()Edge ML Inference
# Edge ML with SageMaker Neoimport numpy as npimport jsonimport awscaml
# Load optimized model (Neo-compiled)model = awscaml.Model('/opt/ml/model')
def predict_failure(sensor_data): """Predict machine failure"""
# Prepare features features = prepare_features(sensor_data)
# Run inference prediction = model.predict(features)
return { 'failure_probability': float(prediction[0]), 'predicted_failure_time': prediction[1], 'confidence': float(prediction[2]) }
def prepare_features(sensor_data): """Prepare features for ML model"""
# Last 100 readings features = []
for reading in sensor_data[-100:]: features.extend([ reading['temperature'], reading['vibration'], reading['pressure'], reading['rpm'] ])
return np.array([features], dtype=np.float32)
# Lambda handler for ML inferencedef ml_inference_handler(event, context): """Handle ML inference requests"""
machine_id = event['machine_id']
# Get recent sensor data from local cache sensor_data = get_cached_sensor_data(machine_id)
# Run inference prediction = predict_failure(sensor_data)
# Check threshold if prediction['failure_probability'] > 0.8: # Send alert send_alert({ 'machine_id': machine_id, 'prediction': prediction, 'action': 'SCHEDULE_MAINTENANCE' })
# Take local action if prediction['failure_probability'] > 0.95: trigger_shutdown(machine_id)
return predictionCloud Processing
Kinesis Data Analytics
-- Kinesis Data Analytics SQLCREATE OR REPLACE STREAM pump_anomalies ( machine_id VARCHAR, factory_id VARCHAR, anomaly_type VARCHAR, severity VARCHAR, value DOUBLE, threshold DOUBLE, anomaly_time TIMESTAMP);
-- Detect temperature anomaliesCREATE OR REPLACE PUMP pump_temperature_anomalies ASINSERT INTO pump_anomaliesSELECT STREAM machine_id, factory_id, 'TEMPERATURE_HIGH' AS anomaly_type, CASE WHEN temperature > 120 THEN 'CRITICAL' WHEN temperature > 100 THEN 'WARNING' ELSE 'INFO' END AS severity, temperature AS value, 100 AS threshold, anomaly_timeFROM sensor_data_streamWHERE temperature > 100AND IS_VALID_TEMPERATURE(temperature) = 'TRUE';
-- Detect vibration anomaliesCREATE OR REPLACE PUMP pump_vibration_anomalies ASINSERT INTO pump_anomaliesSELECT STREAM machine_id, factory_id, 'VIBRATION_HIGH' AS anomaly_type, CASE WHEN vibration > 80 THEN 'CRITICAL' WHEN vibration > 60 THEN 'WARNING' ELSE 'INFO' END AS severity, vibration AS value, 60 AS threshold, anomaly_timeFROM sensor_data_streamWHERE vibration > 60AND IS_VALID_VIBRATION(vibration) = 'TRUE';
-- Detect patterns (rapid changes)CREATE OR REPLACE PUMP pump_rapid_changes ASINSERT INTO pump_anomaliesSELECT STREAM machine_id, factory_id, 'RAPID_CHANGE' AS anomaly_type, 'WARNING' AS severity, AVG(value) OVER ( PARTITION BY machine_id, metric_type ORDER BY ROWTIME RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW ) AS value, NULL AS threshold, ROWTIME AS anomaly_timeFROM sensor_data_streamGROUP BY machine_id, metric_type, ROWTIMEHAVING STDDEV(value) > 20;Data Lake Processing
# Batch processing with Sparkfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *from pyspark.sql.types import *from delta import *
# Initialize Spark with Deltaspark = SparkSession.builder \ .appName("IoTDataLakeProcessing") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate()
# Process sensor datadef process_sensor_data(): """Process raw sensor data"""
# Read raw data raw_data = spark.read.format("parquet") \ .load("s3://iot-lake/bronze/sensor_data/") \ .filter(col("date") >= current_date() - expr("INTERVAL 7 DAYS"))
# Data quality checks raw_data = raw_data.filter( (col("temperature").between(0, 200)) & (col("vibration").between(0, 100)) & (col("pressure").between(0, 500)) )
# Feature engineering processed_data = raw_data \ .withColumn("event_date", to_date(col("timestamp"))) \ .withColumn("event_hour", hour(col("timestamp"))) \ .withColumn("is_weekend", dayofweek(col("timestamp")).isin([1, 7])) \ .withColumn("shift", when(hour(col("timestamp")).between(6, 14), "morning") .when(hour(col("timestamp")).between(14, 22), "afternoon") .otherwise("night"))
# Calculate rolling statistics from pyspark.sql.window import Window
machine_window = Window.partitionBy("machine_id").orderBy("timestamp").rowsBetween(100, 0)
processed_data = processed_data \ .withColumn("temp_rolling_avg", avg("temperature").over(machine_window)) \ .withColumn("temp_rolling_std", stddev("temperature").over(machine_window)) \ .withColumn("vib_rolling_avg", avg("vibration").over(machine_window)) \ .withColumn("vib_rolling_std", stddev("vibration").over(machine_window))
# Detect anomalies (statistical) processed_data = processed_data \ .withColumn("temp_anomaly", when(abs(col("temperature") - col("temp_rolling_avg")) > 3 * col("temp_rolling_std"), 1).otherwise(0)) \ .withColumn("vib_anomaly", when(abs(col("vibration") - col("vib_rolling_avg")) > 3 * col("vib_rolling_std"), 1).otherwise(0))
# Write to silver layer processed_data.write.format("delta") \ .mode("overwrite") \ .partitionBy("event_date", "factory_id") \ .option("overwriteSchema", "true") \ .save("s3://iot-lake/silver/sensor_data_processed")
print("Processing complete")
# Aggregate metricsdef aggregate_metrics(): """Aggregate sensor metrics"""
processed_data = spark.read.format("delta") \ .load("s3://iot-lake/silver/sensor_data_processed")
# Machine-level aggregates machine_metrics = processed_data \ .groupBy("machine_id", "event_date") \ .agg( avg("temperature").alias("avg_temp"), max("temperature").alias("max_temp"), stddev("temperature").alias("std_temp"), avg("vibration").alias("avg_vib"), max("vibration").alias("max_vib"), stddev("vibration").alias("std_vib"), sum("temp_anomaly").alias("temp_anomaly_count"), sum("vib_anomaly").alias("vib_anomaly_count"), count("*").alias("reading_count") )
# Write to gold layer machine_metrics.write.format("delta") \ .mode("overwrite") \ .partitionBy("event_date") \ .save("s3://iot-lake/gold/machine_metrics")
# Factory-level aggregates factory_metrics = processed_data \ .groupBy("factory_id", "event_date") \ .agg( count("*").alias("total_readings"), count("machine_id").alias("active_machines"), sum("temp_anomaly").alias("total_temp_anomalies"), sum("vib_anomaly").alias("total_vib_anomalies") )
factory_metrics.write.format("delta") \ .mode("overwrite") \ .partitionBy("event_date") \ .save("s3://iot-lake/gold/factory_metrics")
print("Aggregation complete")Predictive Maintenance
Model Training
# Predictive maintenance model trainingimport sagemakerfrom sagemaker.workflow.pipeline import Pipelinefrom sagemaker.workflow.steps import ProcessingStep, TrainingStepfrom sagemaker.processing import ScriptProcessorfrom sagemaker.estimator import Estimator
# Data preprocessingdef preprocess_data(): """Preprocess data for training"""
import pandas as pd import numpy as np from sklearn.model_selection import train_test_split
# Read data sensor_data = spark.read.format("delta") \ .load("s3://iot-lake/silver/sensor_data_processed") \ .toPandas()
# Read maintenance records maintenance_data = pd.read_csv("s3://iot-lake/maintenance_records.csv")
# Merge data df = sensor_data.merge(maintenance_data, on=["machine_id", "date"])
# Create labels df["failure_within_24h"] = ( (df["maintenance_date"] - df["date"]).dt.total_seconds() <= 86400 ).astype(int)
# Feature engineering df["hours_since_last_maintenance"] = ( df["date"] - df.groupby("machine_id")["last_maintenance_date"].transform("max") ).dt.total_seconds() / 3600
# Split train, test = train_test_split(df, test_size=0.2, random_state=42)
# Save train.to_csv("s3://iot-lake/training_data/train.csv", index=False) test.to_csv("s3://iot-lake/training_data/test.csv", index=False)
# XGBoost trainingdef train_model(): """Train XGBoost model"""
from sagemaker import Session from sagemaker.xgboost import XGBoost
session = Session()
xgboost = XGBoost( entry_point="train.py", role="arn:aws:iam::123456789012:role/SageMakerRole", framework_version="1.7-1", instance_count=10, instance_type="ml.m5.4xlarge", output_path="s3://iot-lake/models/", code_location="s3://iot-lake/models/code/", hyperparameters={ "max_depth": 8, "eta": 0.1, "gamma": 0.1, "min_child_weight": 10, "subsample": 0.8, "colsample_bytree": 0.8, "num_round": 500 } )
xgboost.fit({ "train": "s3://iot-lake/training_data/train.csv", "validation": "s3://iot-lake/training_data/test.csv" })
return xgboost
# Compile for edgedef compile_for_edge(model): """Compile model for edge deployment"""
from sagemaker.edge import EdgePackager
packager = EdgePackager( role="arn:aws:iam::123456789012:role/SageMakerRole", s3_output_location="s3://iot-lake/models/edge/" )
packager.compile_model( model=model, target_device="greengrass_ml_c7g", output_model_name="predictive_maintenance" )Cost Optimization
Storage Costs
| Tier | Data | Size | Cost | Optimization |
|---|---|---|---|---|
| Hot (7 days) | Raw sensor data | 700TB | $16,100/month | S3 Standard |
| Warm (30 days) | Processed data | 1PB | $20,000/month | S3 Standard-IA |
| Cold (90 days) | Aggregated | 2PB | $12,000/month | S3 Glacier |
| Archive | Historical | 5PB | $15,000/month | Glacier Deep Archive |
| Total | $63,100/month |
Optimization:
- Lifecycle: 7 days → IA, 90 days → Glacier
- Compression: ZSTD-19 (~50% savings)
- Delta Lake: Auto-compaction
- Partitioning: Date + factory_id
Compute Costs
| Component | Compute | Cost | Optimization |
|---|---|---|---|
| Edge Devices | 10K devices | $10,000/month | On-site hardware |
| Greengrass | Included | $0 | Free tier |
| Kinesis | 100 shards | $43,200/month | Auto-scaling |
| Spark Batch | 50 nodes (spot) | $28,800/month | 70% spot savings |
| SageMaker | 20 instances (spot) | $8,640/month | Spot training |
| Total | $90,640/month |
Total Monthly Cost
| Category | Cost | Optimization |
|---|---|---|
| Storage | $63,100 | Lifecycle + compression |
| Compute | $90,640 | Spot + edge processing |
| Network | $10,000 | Data compression |
| Total | $163,740/month | |
| Annual | $1,964,880 | |
| Cost per machine | $273/month |
Failure Modes
Mode 1: Connectivity Loss
Mitigation:
- Local SQLite buffer (7 days)
- Offline ML inference
- Periodic upload attempts
- Automatic failover
Mode 2: Edge Device Failure
Mitigation:
- Redundant gateways
- Local data persistence
- Manual data export
- Field service alerts
Key Takeaways
- Edge processing: Reduce bandwidth and latency
- Offline operation: Local buffering for connectivity issues
- Edge ML: Local inference for real-time decisions
- Cost optimization: 35% savings with edge compute
- Spot instances: 70% savings for cloud training
- Compression: ZSTD for bandwidth optimization
- Local buffering: SQLite for data durability
- Predictive maintenance: 50% reduction in downtime
Back to Module 8