Deployment Strategies
Blue-Green, Canary, and Rollback for Data Systems
Overview
Deployment strategies for data systems ensure safe, reliable rollout of changes while minimizing risk and downtime. Unlike software deployments, data deployments must handle schema changes, data validation, and rollback of both code and data.
Deployment Strategy Comparison
Strategy Overview
Strategy Selection:
| Strategy | Downtime | Rollback | Complexity | Use Case |
|---|---|---|---|---|
| Blue-Green | Zero | Instant | High | Critical systems, major changes |
| Canary | Zero | Gradual | Medium | Gradual validation, A/B testing |
| Rolling | Zero | Gradual | Low | Simple updates, no state |
| Shadow | Zero | N/A | Medium | Testing in production |
Blue-Green Deployment
Architecture
Implementation
# Blue-green deployment for data pipelines
from typing import Dict, Callableimport time
class BlueGreenDeployment: """Blue-green deployment for data systems"""
def __init__( self, blue_environment: str, green_environment: str, validator: Callable ): self.blue = blue_environment self.green = green_environment self.validator = validator
def deploy( self, deploy_function: Callable, validate_timeout: int = 3600 # 1 hour ) -> Dict: """Execute blue-green deployment"""
results = { 'deployment_status': 'in_progress', 'current_environment': self.blue, 'validation_passed': False }
try: # 1. Deploy to green environment print(f"Deploying to {self.green} environment...") deploy_function(self.green) results['green_deployment'] = 'success'
# 2. Validate green environment print(f"Validating {self.green} environment...") validation_start = time.time()
while time.time() - validation_start < validate_timeout: validation_result = self.validator(self.green)
if validation_result['passed']: results['validation_passed'] = True results['validation_results'] = validation_result break
print(f"Validation failed: {validation_result}") time.sleep(60) # Wait 1 minute before retry
if not results['validation_passed']: raise Exception("Validation timeout or failed")
# 3. Switch traffic to green print(f"Switching traffic from {self.blue} to {self.green}...") self._switch_traffic(self.green) results['traffic_switched'] = True
# 4. Monitor green environment print(f"Monitoring {self.green} environment...") self._monitor(self.green)
# 5. Deprecate blue environment print(f"Deprecating {self.blue} environment...") self._deprecate(self.blue) results['blue_deprecated'] = True
results['deployment_status'] = 'success' results['current_environment'] = self.green
except Exception as e: print(f"Deployment failed: {e}")
# Rollback to blue print(f"Rolling back to {self.blue} environment...") self._switch_traffic(self.blue)
results['deployment_status'] = 'failed' results['rollback'] = True results['error'] = str(e)
return results
def _switch_traffic(self, environment: str): """Switch traffic to environment"""
# Update DNS, load balancer, or configuration # Example: Update Airflow variable # Example: Update database connection string # Example: Update API gateway routing
print(f"Switched traffic to {environment}")
def _monitor(self, environment: str, duration: int = 3600): """Monitor environment for issues"""
# Monitor metrics: # - Error rate # - Latency # - Throughput # - Data quality
print(f"Monitoring {environment} for {duration} seconds...")
def _deprecate(self, environment: str): """Deprecate old environment"""
# Archive or delete old resources # Keep for rollback window (e.g., 7 days)
print(f"Deprecated {environment} environment")
# Example usagedef deploy_pipelines(environment: str): """Deploy pipelines to environment"""
if environment == "green": # Deploy new version print("Deploying v2 pipelines...") else: # Deploy current version print("Deploying v1 pipelines...")
def validate_environment(environment: str) -> Dict: """Validate environment"""
# Run data quality checks # Run data diffing # Run smoke tests
# Mock validation if environment == "green": return { 'passed': True, 'checks': { 'row_count': 'passed', 'data_quality': 'passed', 'latency': 'passed' } } else: return { 'passed': False, 'error': 'Validation not run' }
# Execute deploymentdeployment = BlueGreenDeployment( blue_environment="blue", green_environment="green", validator=validate_environment)
results = deployment.deploy(deploy_pipelines)print(f"Deployment results: {results}")dbt Blue-Green Deployment
# Blue-green deployment for dbt
import dbt.main as dbtfrom typing import Dict
class DbtBlueGreen: """Blue-green deployment for dbt"""
def __init__( self, blue_schema: str, green_schema: str ): self.blue_schema = blue_schema self.green_schema = green_schema
def deploy( self, dbt_project_dir: str, models: list = None ) -> Dict: """Deploy dbt models to green schema"""
results = { 'status': 'in_progress', 'green_schema': self.green_schema }
try: # 1. Run dbt in green schema print(f"Deploying to {self.green_schema} schema...")
dbt_args = [ 'run', '--project-dir', dbt_project_dir, '--profiles-dir', dbt_project_dir, '--vars', f"{{target_schema: {self.green_schema}}}" ]
if models: dbt_args.extend(['--select', ','.join(models)])
# Run dbt dbt.dbtRunner(dbt_args).invoke()
results['green_deployment'] = 'success'
# 2. Validate green schema validation = self._validate_green() results['validation'] = validation
if not validation['passed']: raise Exception("Green schema validation failed")
# 3. Switch to green (update schema variable) print(f"Switching from {self.blue_schema} to {self.green_schema}...") self._switch_schema()
results['status'] = 'success'
except Exception as e: print(f"Deployment failed: {e}")
# Rollback self._rollback()
results['status'] = 'failed' results['error'] = str(e)
return results
def _validate_green(self) -> Dict: """Validate green schema"""
# Run data tests # Run data quality checks # Compare with blue schema
return { 'passed': True, 'tests': { 'row_count': 'passed', 'null_check': 'passed', 'data_diff': 'passed' } }
def _switch_schema(self): """Switch to green schema"""
# Update dbt variable # Update downstream applications
print(f"Switched to {self.green_schema}")
def _rollback(self): """Rollback to blue schema"""
print(f"Rolling back to {self.blue_schema}") self._switch_schema()Canary Deployment
Architecture
Implementation
# Canary deployment for data pipelines
from typing import Dict, Callableimport time
class CanaryDeployment: """Canary deployment for data systems"""
def __init__( self, production_version: str, canary_version: str, monitor: Callable ): self.production = production_version self.canary = canary_version self.monitor = monitor
def deploy( self, deploy_function: Callable, traffic_steps: list = [0.1, 0.5, 1.0], monitor_duration: int = 3600 # 1 hour per step ) -> Dict: """Execute canary deployment"""
results = { 'deployment_status': 'in_progress', 'current_traffic': {self.production: 1.0, self.canary: 0.0} }
try: # 1. Deploy canary version print(f"Deploying canary version {self.canary}...") deploy_function(self.canary) results['canary_deployed'] = True
# 2. Gradual traffic shift for canary_traffic in traffic_steps: print(f"Shifting {canary_traffic * 100}% traffic to canary...")
# Update traffic routing self._update_traffic( production_traffic=1 - canary_traffic, canary_traffic=canary_traffic )
results['current_traffic'] = { self.production: 1 - canary_traffic, self.canary: canary_traffic }
# Monitor print(f"Monitoring for {monitor_duration} seconds...") monitor_start = time.time()
while time.time() - monitor_start < monitor_duration: metrics = self.monitor(self.canary)
if not metrics['healthy']: raise Exception(f"Canary unhealthy: {metrics}")
time.sleep(60) # Check every minute
# 3. Full canary deployment print("Canary deployment successful!") results['deployment_status'] = 'success'
except Exception as e: print(f"Canary deployment failed: {e}")
# Rollback to production print("Rolling back to production...") self._update_traffic( production_traffic=1.0, canary_traffic=0.0 )
results['deployment_status'] = 'failed' results['rollback'] = True results['error'] = str(e)
return results
def _update_traffic( self, production_traffic: float, canary_traffic: float ): """Update traffic routing"""
# Update load balancer # Update routing tables # Update feature flags
print(f"Traffic: {self.production}={production_traffic*100}%, {self.canary}={canary_traffic*100}%")
# Example usagedef deploy_version(version: str): """Deploy version"""
print(f"Deploying version {version}...")
def monitor_canary(version: str) -> Dict: """Monitor canary version"""
# Check metrics: # - Error rate # - Latency # - Throughput # - Data quality
# Mock monitoring return { 'healthy': True, 'metrics': { 'error_rate': 0.01, 'latency_p50': 100, 'latency_p99': 500 } }
# Execute canary deploymentcanary = CanaryDeployment( production_version="v1", canary_version="v2", monitor=monitor_canary)
results = canary.deploy( deploy_function=deploy_version, traffic_steps=[0.1, 0.5, 1.0])
print(f"Canary deployment results: {results}")Shadow Deployment
Architecture
Implementation
# Shadow deployment for data pipelines
from typing import Dict, Callable
class ShadowDeployment: """Shadow deployment for data systems"""
def __init__( self, production_version: str, shadow_version: str ): self.production = production_version self.shadow = shadow_version
def deploy( self, deploy_function: Callable, traffic_source: Callable ) -> Dict: """Execute shadow deployment"""
results = { 'deployment_status': 'in_progress', 'shadow_version': self.shadow }
try: # 1. Deploy shadow version print(f"Deploying shadow version {self.shadow}...") deploy_function(self.shadow) results['shadow_deployed'] = True
# 2. Route test traffic to both versions print("Routing test traffic to both versions...")
production_results = [] shadow_results = []
for batch in traffic_source(): # Production prod_result = self._run_production(batch) production_results.append(prod_result)
# Shadow shadow_result = self._run_shadow(batch) shadow_results.append(shadow_result)
# 3. Compare results print("Comparing results...") comparison = self._compare_results( production_results, shadow_results )
results['comparison'] = comparison
if comparison['within_threshold']: print("Shadow validation passed!") results['deployment_status'] = 'success' else: print("Shadow validation failed!") results['deployment_status'] = 'failed'
except Exception as e: print(f"Shadow deployment failed: {e}") results['deployment_status'] = 'failed' results['error'] = str(e)
return results
def _run_production(self, batch: Dict) -> Dict: """Run batch through production"""
# Process through production pipeline return {'result': 'production_output'}
def _run_shadow(self, batch: Dict) -> Dict: """Run batch through shadow"""
# Process through shadow pipeline return {'result': 'shadow_output'}
def _compare_results( self, production_results: list, shadow_results: list ) -> Dict: """Compare production and shadow results"""
# Compare outputs # Check for differences # Validate within threshold
return { 'within_threshold': True, 'differences': [] }Deployment Strategies Comparison
Strategy Trade-offs
Rollback Strategies
Rollback Methods
# Rollback strategies for data systems
from typing import Dict
class DataRollback: """Rollback strategies for data deployments"""
def __init__(self): pass
def rollback_code(self, version: str) -> Dict: """Rollback code to previous version"""
# Revert code changes # Redeploy previous version # Update routing
print(f"Rolling back code to {version}")
return { 'rollback_type': 'code', 'version': version, 'status': 'success' }
def rollback_data( self, table: str, backup_location: str ) -> Dict: """Rollback data from backup"""
# Restore from backup # Update schema # Validate data
print(f"Rolling back data for {table} from {backup_location}")
return { 'rollback_type': 'data', 'table': table, 'backup_location': backup_location, 'status': 'success' }
def rollback_schema( self, schema: str, migration_version: str ) -> Dict: """Rollback schema changes"""
# Revert migration # Update schema registry # Validate compatibility
print(f"Rolling back schema {schema} to {migration_version}")
return { 'rollback_type': 'schema', 'schema': schema, 'migration_version': migration_version, 'status': 'success' }Deployment Best Practices
DO
# 1. Use blue-green for critical systems# Zero downtime, instant rollback
# 2. Use canary for gradual validation# Monitor metrics at each step
# 3. Always test in staging first# Catch issues before production
# 4. Have automated rollback# Rollback on validation failure
# 5. Monitor deployment closely# Track metrics, errors, performanceDON’T
# 1. Don't deploy to production directly# Always use staging first
# 2. Don't skip validation# Essential for data quality
# 3. Don't ignore monitoring# Catch issues early
# 4. Don't forget rollback plan# Always have a rollback strategy
# 5. Don't deploy all at once# Use gradual rollout strategiesKey Takeaways
- Blue-green: Zero downtime, instant rollback, high complexity
- Canary: Gradual rollout, metrics monitoring, medium complexity
- Shadow: Invisible testing, no user impact, medium complexity
- Rolling: Simple incremental updates, low complexity
- Rollback: Essential for all deployments
- Validation: Critical before full deployment
- Monitoring: Track metrics at each step
- Use When: Production deployments, major changes, validation
Back to Module 6