Skip to content

Case Study - Healthcare Real-Time Analytics

HIPAA-Compliant Real-Time Data Platform


Business Context

Company: Regional hospital network (10 hospitals, 500K patients)

Challenge: Build real-time analytics platform for patient monitoring while maintaining strict HIPAA compliance and data privacy.

Scale: 1TB/day, 10M events/hour peak


Requirements


Architecture Design


Technology Selection

Component Selection Rationale

ComponentTechnologyHIPAA Rationale
StreamingKafka (encrypted)End-to-end encryption, ACLs
ProcessingFlinkIn-memory processing, no disk spill
De-identificationPresidio (Google)Open source PII detection
StorageS3 + SSE-KMSCustomer-managed keys
Data LakeDelta LakeACID, time travel for audits
WarehouseSnowflakeHITRUST certified, PHI support
OrchestrationAirflow (on-prem)Data stays in VPC
SecretsAWS Secrets ManagerAutomatic rotation, audit logs

HIPAA Compliance Implementation

Data De-identification

# PII de-identification pipeline
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
# Initialize Spark
spark = SparkSession.builder \
.appName("HIPAA_Deidentification") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
def de_identify_text(text: str) -> str:
"""Detect and anonymize PII in text"""
if not text:
return text
# Analyze for PII
results = analyzer.analyze(
text=text,
entities=["PERSON", "EMAIL", "PHONE_NUMBER", "SSN", "MEDICAL_LICENSE"],
language='en'
)
# Anonymize
anonymized = anonymizer.anonymize(
text=text,
analyzer_results=results
)
return anonymized.text
# Register UDF
de_identify_udf = udf(de_identify_text, StringType())
# Process patient records
def process_patient_notes(input_path: str, output_path: str):
"""De-identify patient notes"""
# Read raw data
raw_notes = spark.read.format("delta") \
.load(input_path)
# De-identify
de_identified = raw_notes \
.withColumn("notes_anonymized", de_identify_udf("notes_raw")) \
.withColumn("notes_anonymized", de_identify_udf("doctor_notes")) \
.drop("notes_raw", "doctor_notes")
# Write de-identified data
de_identified.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save(output_path)
print(f"De-identified data written to {output_path}")

Encryption at Rest

# S3 encryption with KMS
import boto3
from cryptography.fernet import Fernet
s3_client = boto3.client('s3')
kms_client = boto3.client('kms')
def upload_encrypted_data(
bucket: str,
key: str,
data: bytes,
kms_key_id: str
):
"""Upload data with KMS encryption"""
# Upload with server-side encryption
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=data,
ServerSideEncryption='aws:kms',
SSEKMSKeyId=kms_key_id,
Metadata={
'classification': 'phi',
'encrypted': 'true'
}
)
print(f"Encrypted data uploaded to s3://{bucket}/{key}")
def download_encrypted_data(bucket: str, key: str) -> bytes:
"""Download encrypted data (KMS handles decryption)"""
response = s3_client.get_object(
Bucket=bucket,
Key=key
)
return response['Body'].read()
# Example usage
upload_encrypted_data(
bucket='hospital-data-phi',
key='patient_records/2025/01/27/records.parquet',
data=patient_data,
kms_key_id='alias/healthcare-phi-key'
)

Audit Logging

# Comprehensive audit logging
import json
import logging
from datetime import datetime
from functools import wraps
logger = logging.getLogger('hipaa_audit')
def audit_log(access_type: str):
"""Decorator for audit logging"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Log access attempt
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'access_type': access_type,
'function': func.__name__,
'user': kwargs.get('user_id', 'system'),
'ip_address': kwargs.get('ip_address'),
'resource': kwargs.get('resource_id'),
'result': None
}
try:
result = func(*args, **kwargs)
log_entry['result'] = 'success'
return result
except Exception as e:
log_entry['result'] = f'error: {str(e)}'
log_entry['error_details'] = str(e)
raise
finally:
# Write to immutable audit log
logger.info(json.dumps(log_entry))
# Store in CloudTrail/S3
write_audit_log_to_s3(log_entry)
return wrapper
return decorator
@audit_log('phi_read')
def read_patient_record(
patient_id: str,
user_id: str,
ip_address: str
) -> dict:
"""Read patient record with audit logging"""
# Check authorization
if not is_authorized(user_id, patient_id):
raise UnauthorizedAccess(f"User {user_id} not authorized for patient {patient_id}")
# Read from encrypted storage
record = read_from_snowflake(patient_id)
return record

Real-Time Analytics Pipeline

Stream Processing

# Flink real-time patient monitoring
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.common import Types
from pyflink.datastream.functions import MapFunction
class PatientVitalsAlert(MapFunction):
"""Alert on abnormal vitals"""
def map(self, value):
patient_id, heart_rate, blood_pressure, oxygen = value
# Check thresholds
alerts = []
if heart_rate > 120 or heart_rate < 50:
alerts.append({
'type': 'HEART_RATE',
'severity': 'HIGH' if heart_rate > 130 or heart_rate < 45 else 'MEDIUM',
'value': heart_rate,
'patient_id': patient_id
})
if blood_pressure > 180 or blood_pressure < 90:
alerts.append({
'type': 'BLOOD_PRESSURE',
'severity': 'HIGH' if blood_pressure > 200 or blood_pressure < 80 else 'MEDIUM',
'value': blood_pressure,
'patient_id': patient_id
})
if oxygen < 90:
alerts.append({
'type': 'OXYGEN_SATURATION',
'severity': 'CRITICAL',
'value': oxygen,
'patient_id': patient_id
})
return alerts
# Create Flink environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(10)
t_env = StreamTableEnvironment.create(env)
# Create source table (Kafka)
t_env.execute_sql("""
CREATE TABLE patient_vitals (
patient_id BIGINT,
heart_rate INT,
blood_pressure INT,
oxygen_saturation INT,
timestamp TIMESTAMP(3),
WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'patient-vitals-encrypted',
'properties.bootstrap.servers' = 'kafka.internal:9093',
'properties.group.id' = 'vitals-monitoring',
'format' = 'json',
'properties.ssl.truststore.location' = '/etc/kafka/secrets/truststore.jks',
'properties.ssl.keystore.location' = '/etc/kafka/secrets/keystore.jks'
)
""")
# Process vitals
vitals_stream = t_env.from_path('patient_vitals')
alert_stream = vitals_stream \
.map(PatientVitalsAlert(), Types.LIST(Types.MAP())) \
.flat_map(lambda alerts: alerts)
# Create sink table (alerts)
t_env.execute_sql("""
CREATE TABLE patient_alerts (
patient_id BIGINT,
alert_type STRING,
severity STRING,
value INT,
timestamp TIMESTAMP(3),
PRIMARY KEY (patient_id, timestamp) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'patient-alerts',
'properties.bootstrap.servers' = 'kafka.internal:9093',
'format' = 'json'
)
""")
# Write alerts
alert_stream.execute_insert('patient_alerts')

Cost Optimization

Storage Costs

TierDataSizeMonthly CostOptimization
HotActive patient data100TB$2,300S3 Standard
Warm6-24 months200TB$2,000S3 Standard-IA
Cold2-7 years500TB$3,000S3 Glacier
Archive7+ years1PB$1,500Glacier Deep Archive
Total$8,800/month

Optimization:

  • Lifecycle policies: 30 days → IA, 1 year → Glacier
  • Compression: ZSTD for all data (~40% savings)
  • Deduplication: Delta Lake dedupe for EHR data

Compute Costs

ComponentComputeCostOptimization
Flink Cluster20 nodes$8,640/monthSpot instances
Spark Batch50 nodes$21,600/monthSpot instances
SnowflakeMedium warehouse$18,000/monthScheduled suspension
Total$48,240/month

Optimization:

  • Spot instances for Flink/Spark (60% savings)
  • Snowflake auto-suspend (5 min idle)
  • Scheduled operations (batch during off-peak)

Total Monthly Cost

CategoryCostAnnual
Storage$8,800$105,600
Compute$48,240$578,880
Network$3,000$36,000
Monitoring$2,000$24,000
Total$62,040/month$744,480/year

Failure Modes

Mode 1: Encryption Key Failure

Mitigation:

  • Multi-region KMS keys
  • Automatic key rotation
  • Store encrypted backups
  • Incident response plan

Mode 2: PII Leakage

Mitigation:

  • Automated PII scanning on output
  • Pre-production PII testing
  • Regular security audits
  • Data classification tools

Mode 3: Real-Time Alert Failure

Mitigation:

  • Redundant alert systems
  • PagerDuty integration
  • Alert queueing during failures
  • Regular alert system testing

SLA/SLO Definitions

slas:
real_time_monitoring:
latency:
p50: "< 2 seconds"
p95: "< 5 seconds"
p99: "< 10 seconds"
availability:
uptime: "99.95%"
maintenance_window: "Sunday 2-4 AM"
data_freshness:
patient_vitals: "< 5 seconds"
dashboards: "< 1 minute"
batch_reports: "< 4 hours"
compliance:
encryption: "100% of PHI"
audit_logging: "100% of access"
retention: "7 years minimum"
breach_notification: "< 60 days"
data_quality:
completeness: "> 99.9%"
pii_detection: "100%"
de_identification: "> 99%"

Migration Strategy

Phase 1: Assessment (2 months)

Phase 2: Implementation (6 months)


Key Takeaways

  1. HIPAA first: Compliance drives all architecture decisions
  2. Encryption everywhere: At rest and in transit
  3. De-identification: Detect and remove PII before analytics
  4. Audit everything: Immutable logs of all access
  5. Least privilege: RBAC for all data access
  6. Disaster recovery: Multi-region backup for PHI
  7. Regular audits: Quarterly security assessments
  8. Cost optimization: Spot instances, lifecycle policies

Back to Module 8