Case Study - Healthcare Real-Time Analytics
HIPAA-Compliant Real-Time Data Platform
Business Context
Company: Regional hospital network (10 hospitals, 500K patients)
Challenge: Build real-time analytics platform for patient monitoring while maintaining strict HIPAA compliance and data privacy.
Scale: 1TB/day, 10M events/hour peak
Requirements
Architecture Design
Technology Selection
Component Selection Rationale
| Component | Technology | HIPAA Rationale |
|---|---|---|
| Streaming | Kafka (encrypted) | End-to-end encryption, ACLs |
| Processing | Flink | In-memory processing, no disk spill |
| De-identification | Presidio (Google) | Open source PII detection |
| Storage | S3 + SSE-KMS | Customer-managed keys |
| Data Lake | Delta Lake | ACID, time travel for audits |
| Warehouse | Snowflake | HITRUST certified, PHI support |
| Orchestration | Airflow (on-prem) | Data stays in VPC |
| Secrets | AWS Secrets Manager | Automatic rotation, audit logs |
HIPAA Compliance Implementation
Data De-identification
# PII de-identification pipelinefrom presidio_analyzer import AnalyzerEnginefrom presidio_anonymizer import AnonymizerEnginefrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import udffrom pyspark.sql.types import StringType
analyzer = AnalyzerEngine()anonymizer = AnonymizerEngine()
# Initialize Sparkspark = SparkSession.builder \ .appName("HIPAA_Deidentification") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate()
def de_identify_text(text: str) -> str: """Detect and anonymize PII in text"""
if not text: return text
# Analyze for PII results = analyzer.analyze( text=text, entities=["PERSON", "EMAIL", "PHONE_NUMBER", "SSN", "MEDICAL_LICENSE"], language='en' )
# Anonymize anonymized = anonymizer.anonymize( text=text, analyzer_results=results )
return anonymized.text
# Register UDFde_identify_udf = udf(de_identify_text, StringType())
# Process patient recordsdef process_patient_notes(input_path: str, output_path: str): """De-identify patient notes"""
# Read raw data raw_notes = spark.read.format("delta") \ .load(input_path)
# De-identify de_identified = raw_notes \ .withColumn("notes_anonymized", de_identify_udf("notes_raw")) \ .withColumn("notes_anonymized", de_identify_udf("doctor_notes")) \ .drop("notes_raw", "doctor_notes")
# Write de-identified data de_identified.write \ .format("delta") \ .mode("overwrite") \ .option("overwriteSchema", "true") \ .save(output_path)
print(f"De-identified data written to {output_path}")Encryption at Rest
# S3 encryption with KMSimport boto3from cryptography.fernet import Fernet
s3_client = boto3.client('s3')kms_client = boto3.client('kms')
def upload_encrypted_data( bucket: str, key: str, data: bytes, kms_key_id: str): """Upload data with KMS encryption"""
# Upload with server-side encryption s3_client.put_object( Bucket=bucket, Key=key, Body=data, ServerSideEncryption='aws:kms', SSEKMSKeyId=kms_key_id, Metadata={ 'classification': 'phi', 'encrypted': 'true' } )
print(f"Encrypted data uploaded to s3://{bucket}/{key}")
def download_encrypted_data(bucket: str, key: str) -> bytes: """Download encrypted data (KMS handles decryption)"""
response = s3_client.get_object( Bucket=bucket, Key=key )
return response['Body'].read()
# Example usageupload_encrypted_data( bucket='hospital-data-phi', key='patient_records/2025/01/27/records.parquet', data=patient_data, kms_key_id='alias/healthcare-phi-key')Audit Logging
# Comprehensive audit loggingimport jsonimport loggingfrom datetime import datetimefrom functools import wraps
logger = logging.getLogger('hipaa_audit')
def audit_log(access_type: str): """Decorator for audit logging"""
def decorator(func): @wraps(func) def wrapper(*args, **kwargs): # Log access attempt log_entry = { 'timestamp': datetime.utcnow().isoformat(), 'access_type': access_type, 'function': func.__name__, 'user': kwargs.get('user_id', 'system'), 'ip_address': kwargs.get('ip_address'), 'resource': kwargs.get('resource_id'), 'result': None }
try: result = func(*args, **kwargs) log_entry['result'] = 'success' return result except Exception as e: log_entry['result'] = f'error: {str(e)}' log_entry['error_details'] = str(e) raise finally: # Write to immutable audit log logger.info(json.dumps(log_entry))
# Store in CloudTrail/S3 write_audit_log_to_s3(log_entry)
return wrapper return decorator
@audit_log('phi_read')def read_patient_record( patient_id: str, user_id: str, ip_address: str) -> dict: """Read patient record with audit logging"""
# Check authorization if not is_authorized(user_id, patient_id): raise UnauthorizedAccess(f"User {user_id} not authorized for patient {patient_id}")
# Read from encrypted storage record = read_from_snowflake(patient_id)
return recordReal-Time Analytics Pipeline
Stream Processing
# Flink real-time patient monitoringfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import StreamTableEnvironmentfrom pyflink.common import Typesfrom pyflink.datastream.functions import MapFunction
class PatientVitalsAlert(MapFunction): """Alert on abnormal vitals"""
def map(self, value): patient_id, heart_rate, blood_pressure, oxygen = value
# Check thresholds alerts = []
if heart_rate > 120 or heart_rate < 50: alerts.append({ 'type': 'HEART_RATE', 'severity': 'HIGH' if heart_rate > 130 or heart_rate < 45 else 'MEDIUM', 'value': heart_rate, 'patient_id': patient_id })
if blood_pressure > 180 or blood_pressure < 90: alerts.append({ 'type': 'BLOOD_PRESSURE', 'severity': 'HIGH' if blood_pressure > 200 or blood_pressure < 80 else 'MEDIUM', 'value': blood_pressure, 'patient_id': patient_id })
if oxygen < 90: alerts.append({ 'type': 'OXYGEN_SATURATION', 'severity': 'CRITICAL', 'value': oxygen, 'patient_id': patient_id })
return alerts
# Create Flink environmentenv = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(10)
t_env = StreamTableEnvironment.create(env)
# Create source table (Kafka)t_env.execute_sql(""" CREATE TABLE patient_vitals ( patient_id BIGINT, heart_rate INT, blood_pressure INT, oxygen_saturation INT, timestamp TIMESTAMP(3), WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'patient-vitals-encrypted', 'properties.bootstrap.servers' = 'kafka.internal:9093', 'properties.group.id' = 'vitals-monitoring', 'format' = 'json', 'properties.ssl.truststore.location' = '/etc/kafka/secrets/truststore.jks', 'properties.ssl.keystore.location' = '/etc/kafka/secrets/keystore.jks' )""")
# Process vitalsvitals_stream = t_env.from_path('patient_vitals')
alert_stream = vitals_stream \ .map(PatientVitalsAlert(), Types.LIST(Types.MAP())) \ .flat_map(lambda alerts: alerts)
# Create sink table (alerts)t_env.execute_sql(""" CREATE TABLE patient_alerts ( patient_id BIGINT, alert_type STRING, severity STRING, value INT, timestamp TIMESTAMP(3), PRIMARY KEY (patient_id, timestamp) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'patient-alerts', 'properties.bootstrap.servers' = 'kafka.internal:9093', 'format' = 'json' )""")
# Write alertsalert_stream.execute_insert('patient_alerts')Cost Optimization
Storage Costs
| Tier | Data | Size | Monthly Cost | Optimization |
|---|---|---|---|---|
| Hot | Active patient data | 100TB | $2,300 | S3 Standard |
| Warm | 6-24 months | 200TB | $2,000 | S3 Standard-IA |
| Cold | 2-7 years | 500TB | $3,000 | S3 Glacier |
| Archive | 7+ years | 1PB | $1,500 | Glacier Deep Archive |
| Total | $8,800/month |
Optimization:
- Lifecycle policies: 30 days → IA, 1 year → Glacier
- Compression: ZSTD for all data (~40% savings)
- Deduplication: Delta Lake dedupe for EHR data
Compute Costs
| Component | Compute | Cost | Optimization |
|---|---|---|---|
| Flink Cluster | 20 nodes | $8,640/month | Spot instances |
| Spark Batch | 50 nodes | $21,600/month | Spot instances |
| Snowflake | Medium warehouse | $18,000/month | Scheduled suspension |
| Total | $48,240/month |
Optimization:
- Spot instances for Flink/Spark (60% savings)
- Snowflake auto-suspend (5 min idle)
- Scheduled operations (batch during off-peak)
Total Monthly Cost
| Category | Cost | Annual |
|---|---|---|
| Storage | $8,800 | $105,600 |
| Compute | $48,240 | $578,880 |
| Network | $3,000 | $36,000 |
| Monitoring | $2,000 | $24,000 |
| Total | $62,040/month | $744,480/year |
Failure Modes
Mode 1: Encryption Key Failure
Mitigation:
- Multi-region KMS keys
- Automatic key rotation
- Store encrypted backups
- Incident response plan
Mode 2: PII Leakage
Mitigation:
- Automated PII scanning on output
- Pre-production PII testing
- Regular security audits
- Data classification tools
Mode 3: Real-Time Alert Failure
Mitigation:
- Redundant alert systems
- PagerDuty integration
- Alert queueing during failures
- Regular alert system testing
SLA/SLO Definitions
slas: real_time_monitoring: latency: p50: "< 2 seconds" p95: "< 5 seconds" p99: "< 10 seconds"
availability: uptime: "99.95%" maintenance_window: "Sunday 2-4 AM"
data_freshness: patient_vitals: "< 5 seconds" dashboards: "< 1 minute" batch_reports: "< 4 hours"
compliance: encryption: "100% of PHI" audit_logging: "100% of access" retention: "7 years minimum" breach_notification: "< 60 days"
data_quality: completeness: "> 99.9%" pii_detection: "100%" de_identification: "> 99%"Migration Strategy
Phase 1: Assessment (2 months)
Phase 2: Implementation (6 months)
Key Takeaways
- HIPAA first: Compliance drives all architecture decisions
- Encryption everywhere: At rest and in transit
- De-identification: Detect and remove PII before analytics
- Audit everything: Immutable logs of all access
- Least privilege: RBAC for all data access
- Disaster recovery: Multi-region backup for PHI
- Regular audits: Quarterly security assessments
- Cost optimization: Spot instances, lifecycle policies
Back to Module 8