Skip to content

Case Study - IoT Manufacturing Analytics

Edge-to-Cloud Industrial IoT Platform


Business Context

Company: Global manufacturing company (50 factories, 10K machines)

Challenge: Build real-time monitoring and predictive maintenance platform for manufacturing equipment.

Scale: 100TB/day, 1M sensors, 10K messages/second peak


Requirements


Architecture Design


Technology Selection

Component Selection Rationale

ComponentTechnologyRationale
Edge RuntimeAWS GreengrassLocal execution, offline support
Edge MLSageMaker NeoOptimized for edge hardware
IoT HubAWS IoT CoreDevice management, security
Stream ProcessingKinesis Data AnalyticsSQL-based, low latency
Data LakeS3 + Delta LakeCost optimization, time travel
WarehouseRedshiftSpectrum for data lake queries
ML PlatformSageMakerEnd-to-end ML pipeline
Time Series DBInfluxDBHigh write throughput

Edge Processing

Greengrass Lambda Function

# Greengrass Lambda for edge processing
import json
import greengrasssdk
from threading import Timer
import sqlite3
import os
# Initialize Greengrass client
iot_client = greengrasssdk.client('iot-data')
# Local database
DB_PATH = '/tmp/sensor_data.db'
def init_db():
"""Initialize local SQLite database"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensor_data (
timestamp INTEGER,
sensor_id TEXT,
machine_id TEXT,
factory_id TEXT,
metric_type TEXT,
value REAL,
unit TEXT,
uploaded BOOLEAN DEFAULT 0
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_uploaded
ON sensor_data(uploaded)
''')
conn.commit()
conn.close()
def save_locally(data):
"""Save data to local buffer"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO sensor_data
(timestamp, sensor_id, machine_id, factory_id, metric_type, value, unit)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
data['timestamp'],
data['sensor_id'],
data['machine_id'],
data['factory_id'],
data['metric_type'],
data['value'],
data['unit']
))
conn.commit()
conn.close()
def check_anomaly(data):
"""Check for anomalies locally"""
machine_id = data['machine_id']
metric_type = data['metric_type']
value = data['value']
# Local thresholds (could be from ML model)
thresholds = {
'temperature': {'min': 20, 'max': 100},
'vibration': {'max': 50},
'pressure': {'min': 10, 'max': 200}
}
if metric_type in thresholds:
threshold = thresholds[metric_type]
if 'min' in threshold and value < threshold['min']:
return True, f"{metric_type} below minimum"
if 'max' in threshold and value > threshold['max']:
return True, f"{metric_type} above maximum"
return False, None
def send_local_alert(machine_id, message):
"""Send local alert"""
alert = {
'machine_id': machine_id,
'factory_id': os.getenv('FACTORY_ID'),
'timestamp': int(time.time()),
'message': message,
'severity': 'WARNING'
}
# Publish to local MQTT topic
iot_client.publish(
topic='factory/local/alerts',
payload=json.dumps(alert)
)
def upload_buffered_data():
"""Upload buffered data to cloud"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Get unuploaded data
cursor.execute('''
SELECT timestamp, sensor_id, machine_id, factory_id,
metric_type, value, unit
FROM sensor_data
WHERE uploaded = 0
LIMIT 100
''')
rows = cursor.fetchall()
if not rows:
conn.close()
return
# Prepare batch
batch = []
for row in rows:
batch.append({
'timestamp': row[0],
'sensor_id': row[1],
'machine_id': row[2],
'factory_id': row[3],
'metric_type': row[4],
'value': row[5],
'unit': row[6]
})
# Upload to cloud
try:
iot_client.publish(
topic='factory/sensor/data',
payload=json.dumps(batch)
)
# Mark as uploaded
ids = [row[0] for row in rows]
cursor.execute(f'''
UPDATE sensor_data
SET uploaded = 1
WHERE timestamp IN ({','.join(map(str, ids))})
''')
conn.commit()
except Exception as e:
print(f"Upload failed: {e}")
conn.close()
# Lambda handler
def lambda_handler(event, context):
"""Handle incoming sensor data"""
init_db()
# Process each record
for record in event:
# Save locally
save_locally(record)
# Check for anomalies
is_anomaly, message = check_anomaly(record)
if is_anomaly:
# Send local alert
send_local_alert(record['machine_id'], message)
# Send to cloud immediately
iot_client.publish(
topic='factory/anomalies',
payload=json.dumps({
'data': record,
'anomaly': message,
'timestamp': int(time.time())
})
)
# Upload buffered data (every 10 records)
upload_buffered_data()
return {
'statusCode': 200,
'body': json.dumps({'status': 'processed'})
}
# Schedule periodic upload
def schedule_upload():
"""Schedule periodic data upload"""
upload_buffered_data()
Timer(60.0, schedule_upload).start() # Every minute
schedule_upload()

Edge ML Inference

# Edge ML with SageMaker Neo
import numpy as np
import json
import awscaml
# Load optimized model (Neo-compiled)
model = awscaml.Model('/opt/ml/model')
def predict_failure(sensor_data):
"""Predict machine failure"""
# Prepare features
features = prepare_features(sensor_data)
# Run inference
prediction = model.predict(features)
return {
'failure_probability': float(prediction[0]),
'predicted_failure_time': prediction[1],
'confidence': float(prediction[2])
}
def prepare_features(sensor_data):
"""Prepare features for ML model"""
# Last 100 readings
features = []
for reading in sensor_data[-100:]:
features.extend([
reading['temperature'],
reading['vibration'],
reading['pressure'],
reading['rpm']
])
return np.array([features], dtype=np.float32)
# Lambda handler for ML inference
def ml_inference_handler(event, context):
"""Handle ML inference requests"""
machine_id = event['machine_id']
# Get recent sensor data from local cache
sensor_data = get_cached_sensor_data(machine_id)
# Run inference
prediction = predict_failure(sensor_data)
# Check threshold
if prediction['failure_probability'] > 0.8:
# Send alert
send_alert({
'machine_id': machine_id,
'prediction': prediction,
'action': 'SCHEDULE_MAINTENANCE'
})
# Take local action
if prediction['failure_probability'] > 0.95:
trigger_shutdown(machine_id)
return prediction

Cloud Processing

Kinesis Data Analytics

-- Kinesis Data Analytics SQL
CREATE OR REPLACE STREAM pump_anomalies (
machine_id VARCHAR,
factory_id VARCHAR,
anomaly_type VARCHAR,
severity VARCHAR,
value DOUBLE,
threshold DOUBLE,
anomaly_time TIMESTAMP
);
-- Detect temperature anomalies
CREATE OR REPLACE PUMP pump_temperature_anomalies AS
INSERT INTO pump_anomalies
SELECT STREAM
machine_id,
factory_id,
'TEMPERATURE_HIGH' AS anomaly_type,
CASE
WHEN temperature > 120 THEN 'CRITICAL'
WHEN temperature > 100 THEN 'WARNING'
ELSE 'INFO'
END AS severity,
temperature AS value,
100 AS threshold,
anomaly_time
FROM sensor_data_stream
WHERE temperature > 100
AND IS_VALID_TEMPERATURE(temperature) = 'TRUE';
-- Detect vibration anomalies
CREATE OR REPLACE PUMP pump_vibration_anomalies AS
INSERT INTO pump_anomalies
SELECT STREAM
machine_id,
factory_id,
'VIBRATION_HIGH' AS anomaly_type,
CASE
WHEN vibration > 80 THEN 'CRITICAL'
WHEN vibration > 60 THEN 'WARNING'
ELSE 'INFO'
END AS severity,
vibration AS value,
60 AS threshold,
anomaly_time
FROM sensor_data_stream
WHERE vibration > 60
AND IS_VALID_VIBRATION(vibration) = 'TRUE';
-- Detect patterns (rapid changes)
CREATE OR REPLACE PUMP pump_rapid_changes AS
INSERT INTO pump_anomalies
SELECT STREAM
machine_id,
factory_id,
'RAPID_CHANGE' AS anomaly_type,
'WARNING' AS severity,
AVG(value) OVER (
PARTITION BY machine_id, metric_type
ORDER BY ROWTIME
RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
) AS value,
NULL AS threshold,
ROWTIME AS anomaly_time
FROM sensor_data_stream
GROUP BY machine_id, metric_type, ROWTIME
HAVING STDDEV(value) > 20;

Data Lake Processing

# Batch processing with Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *
# Initialize Spark with Delta
spark = SparkSession.builder \
.appName("IoTDataLakeProcessing") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Process sensor data
def process_sensor_data():
"""Process raw sensor data"""
# Read raw data
raw_data = spark.read.format("parquet") \
.load("s3://iot-lake/bronze/sensor_data/") \
.filter(col("date") >= current_date() - expr("INTERVAL 7 DAYS"))
# Data quality checks
raw_data = raw_data.filter(
(col("temperature").between(0, 200)) &
(col("vibration").between(0, 100)) &
(col("pressure").between(0, 500))
)
# Feature engineering
processed_data = raw_data \
.withColumn("event_date", to_date(col("timestamp"))) \
.withColumn("event_hour", hour(col("timestamp"))) \
.withColumn("is_weekend", dayofweek(col("timestamp")).isin([1, 7])) \
.withColumn("shift",
when(hour(col("timestamp")).between(6, 14), "morning")
.when(hour(col("timestamp")).between(14, 22), "afternoon")
.otherwise("night"))
# Calculate rolling statistics
from pyspark.sql.window import Window
machine_window = Window.partitionBy("machine_id").orderBy("timestamp").rowsBetween(100, 0)
processed_data = processed_data \
.withColumn("temp_rolling_avg", avg("temperature").over(machine_window)) \
.withColumn("temp_rolling_std", stddev("temperature").over(machine_window)) \
.withColumn("vib_rolling_avg", avg("vibration").over(machine_window)) \
.withColumn("vib_rolling_std", stddev("vibration").over(machine_window))
# Detect anomalies (statistical)
processed_data = processed_data \
.withColumn("temp_anomaly",
when(abs(col("temperature") - col("temp_rolling_avg")) > 3 * col("temp_rolling_std"), 1).otherwise(0)) \
.withColumn("vib_anomaly",
when(abs(col("vibration") - col("vib_rolling_avg")) > 3 * col("vib_rolling_std"), 1).otherwise(0))
# Write to silver layer
processed_data.write.format("delta") \
.mode("overwrite") \
.partitionBy("event_date", "factory_id") \
.option("overwriteSchema", "true") \
.save("s3://iot-lake/silver/sensor_data_processed")
print("Processing complete")
# Aggregate metrics
def aggregate_metrics():
"""Aggregate sensor metrics"""
processed_data = spark.read.format("delta") \
.load("s3://iot-lake/silver/sensor_data_processed")
# Machine-level aggregates
machine_metrics = processed_data \
.groupBy("machine_id", "event_date") \
.agg(
avg("temperature").alias("avg_temp"),
max("temperature").alias("max_temp"),
stddev("temperature").alias("std_temp"),
avg("vibration").alias("avg_vib"),
max("vibration").alias("max_vib"),
stddev("vibration").alias("std_vib"),
sum("temp_anomaly").alias("temp_anomaly_count"),
sum("vib_anomaly").alias("vib_anomaly_count"),
count("*").alias("reading_count")
)
# Write to gold layer
machine_metrics.write.format("delta") \
.mode("overwrite") \
.partitionBy("event_date") \
.save("s3://iot-lake/gold/machine_metrics")
# Factory-level aggregates
factory_metrics = processed_data \
.groupBy("factory_id", "event_date") \
.agg(
count("*").alias("total_readings"),
count("machine_id").alias("active_machines"),
sum("temp_anomaly").alias("total_temp_anomalies"),
sum("vib_anomaly").alias("total_vib_anomalies")
)
factory_metrics.write.format("delta") \
.mode("overwrite") \
.partitionBy("event_date") \
.save("s3://iot-lake/gold/factory_metrics")
print("Aggregation complete")

Predictive Maintenance

Model Training

# Predictive maintenance model training
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.processing import ScriptProcessor
from sagemaker.estimator import Estimator
# Data preprocessing
def preprocess_data():
"""Preprocess data for training"""
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
# Read data
sensor_data = spark.read.format("delta") \
.load("s3://iot-lake/silver/sensor_data_processed") \
.toPandas()
# Read maintenance records
maintenance_data = pd.read_csv("s3://iot-lake/maintenance_records.csv")
# Merge data
df = sensor_data.merge(maintenance_data, on=["machine_id", "date"])
# Create labels
df["failure_within_24h"] = (
(df["maintenance_date"] - df["date"]).dt.total_seconds() <= 86400
).astype(int)
# Feature engineering
df["hours_since_last_maintenance"] = (
df["date"] - df.groupby("machine_id")["last_maintenance_date"].transform("max")
).dt.total_seconds() / 3600
# Split
train, test = train_test_split(df, test_size=0.2, random_state=42)
# Save
train.to_csv("s3://iot-lake/training_data/train.csv", index=False)
test.to_csv("s3://iot-lake/training_data/test.csv", index=False)
# XGBoost training
def train_model():
"""Train XGBoost model"""
from sagemaker import Session
from sagemaker.xgboost import XGBoost
session = Session()
xgboost = XGBoost(
entry_point="train.py",
role="arn:aws:iam::123456789012:role/SageMakerRole",
framework_version="1.7-1",
instance_count=10,
instance_type="ml.m5.4xlarge",
output_path="s3://iot-lake/models/",
code_location="s3://iot-lake/models/code/",
hyperparameters={
"max_depth": 8,
"eta": 0.1,
"gamma": 0.1,
"min_child_weight": 10,
"subsample": 0.8,
"colsample_bytree": 0.8,
"num_round": 500
}
)
xgboost.fit({
"train": "s3://iot-lake/training_data/train.csv",
"validation": "s3://iot-lake/training_data/test.csv"
})
return xgboost
# Compile for edge
def compile_for_edge(model):
"""Compile model for edge deployment"""
from sagemaker.edge import EdgePackager
packager = EdgePackager(
role="arn:aws:iam::123456789012:role/SageMakerRole",
s3_output_location="s3://iot-lake/models/edge/"
)
packager.compile_model(
model=model,
target_device="greengrass_ml_c7g",
output_model_name="predictive_maintenance"
)

Cost Optimization

Storage Costs

TierDataSizeCostOptimization
Hot (7 days)Raw sensor data700TB$16,100/monthS3 Standard
Warm (30 days)Processed data1PB$20,000/monthS3 Standard-IA
Cold (90 days)Aggregated2PB$12,000/monthS3 Glacier
ArchiveHistorical5PB$15,000/monthGlacier Deep Archive
Total$63,100/month

Optimization:

  • Lifecycle: 7 days → IA, 90 days → Glacier
  • Compression: ZSTD-19 (~50% savings)
  • Delta Lake: Auto-compaction
  • Partitioning: Date + factory_id

Compute Costs

ComponentComputeCostOptimization
Edge Devices10K devices$10,000/monthOn-site hardware
GreengrassIncluded$0Free tier
Kinesis100 shards$43,200/monthAuto-scaling
Spark Batch50 nodes (spot)$28,800/month70% spot savings
SageMaker20 instances (spot)$8,640/monthSpot training
Total$90,640/month

Total Monthly Cost

CategoryCostOptimization
Storage$63,100Lifecycle + compression
Compute$90,640Spot + edge processing
Network$10,000Data compression
Total$163,740/month
Annual$1,964,880
Cost per machine$273/month

Failure Modes

Mode 1: Connectivity Loss

Mitigation:

  • Local SQLite buffer (7 days)
  • Offline ML inference
  • Periodic upload attempts
  • Automatic failover

Mode 2: Edge Device Failure

Mitigation:

  • Redundant gateways
  • Local data persistence
  • Manual data export
  • Field service alerts

Key Takeaways

  1. Edge processing: Reduce bandwidth and latency
  2. Offline operation: Local buffering for connectivity issues
  3. Edge ML: Local inference for real-time decisions
  4. Cost optimization: 35% savings with edge compute
  5. Spot instances: 70% savings for cloud training
  6. Compression: ZSTD for bandwidth optimization
  7. Local buffering: SQLite for data durability
  8. Predictive maintenance: 50% reduction in downtime

Back to Module 8