Skip to content

Deployment Strategies

Blue-Green, Canary, and Rollback for Data Systems


Overview

Deployment strategies for data systems ensure safe, reliable rollout of changes while minimizing risk and downtime. Unlike software deployments, data deployments must handle schema changes, data validation, and rollback of both code and data.


Deployment Strategy Comparison

Strategy Overview

Strategy Selection:

StrategyDowntimeRollbackComplexityUse Case
Blue-GreenZeroInstantHighCritical systems, major changes
CanaryZeroGradualMediumGradual validation, A/B testing
RollingZeroGradualLowSimple updates, no state
ShadowZeroN/AMediumTesting in production

Blue-Green Deployment

Architecture

Implementation

# Blue-green deployment for data pipelines
from typing import Dict, Callable
import time
class BlueGreenDeployment:
"""Blue-green deployment for data systems"""
def __init__(
self,
blue_environment: str,
green_environment: str,
validator: Callable
):
self.blue = blue_environment
self.green = green_environment
self.validator = validator
def deploy(
self,
deploy_function: Callable,
validate_timeout: int = 3600 # 1 hour
) -> Dict:
"""Execute blue-green deployment"""
results = {
'deployment_status': 'in_progress',
'current_environment': self.blue,
'validation_passed': False
}
try:
# 1. Deploy to green environment
print(f"Deploying to {self.green} environment...")
deploy_function(self.green)
results['green_deployment'] = 'success'
# 2. Validate green environment
print(f"Validating {self.green} environment...")
validation_start = time.time()
while time.time() - validation_start < validate_timeout:
validation_result = self.validator(self.green)
if validation_result['passed']:
results['validation_passed'] = True
results['validation_results'] = validation_result
break
print(f"Validation failed: {validation_result}")
time.sleep(60) # Wait 1 minute before retry
if not results['validation_passed']:
raise Exception("Validation timeout or failed")
# 3. Switch traffic to green
print(f"Switching traffic from {self.blue} to {self.green}...")
self._switch_traffic(self.green)
results['traffic_switched'] = True
# 4. Monitor green environment
print(f"Monitoring {self.green} environment...")
self._monitor(self.green)
# 5. Deprecate blue environment
print(f"Deprecating {self.blue} environment...")
self._deprecate(self.blue)
results['blue_deprecated'] = True
results['deployment_status'] = 'success'
results['current_environment'] = self.green
except Exception as e:
print(f"Deployment failed: {e}")
# Rollback to blue
print(f"Rolling back to {self.blue} environment...")
self._switch_traffic(self.blue)
results['deployment_status'] = 'failed'
results['rollback'] = True
results['error'] = str(e)
return results
def _switch_traffic(self, environment: str):
"""Switch traffic to environment"""
# Update DNS, load balancer, or configuration
# Example: Update Airflow variable
# Example: Update database connection string
# Example: Update API gateway routing
print(f"Switched traffic to {environment}")
def _monitor(self, environment: str, duration: int = 3600):
"""Monitor environment for issues"""
# Monitor metrics:
# - Error rate
# - Latency
# - Throughput
# - Data quality
print(f"Monitoring {environment} for {duration} seconds...")
def _deprecate(self, environment: str):
"""Deprecate old environment"""
# Archive or delete old resources
# Keep for rollback window (e.g., 7 days)
print(f"Deprecated {environment} environment")
# Example usage
def deploy_pipelines(environment: str):
"""Deploy pipelines to environment"""
if environment == "green":
# Deploy new version
print("Deploying v2 pipelines...")
else:
# Deploy current version
print("Deploying v1 pipelines...")
def validate_environment(environment: str) -> Dict:
"""Validate environment"""
# Run data quality checks
# Run data diffing
# Run smoke tests
# Mock validation
if environment == "green":
return {
'passed': True,
'checks': {
'row_count': 'passed',
'data_quality': 'passed',
'latency': 'passed'
}
}
else:
return {
'passed': False,
'error': 'Validation not run'
}
# Execute deployment
deployment = BlueGreenDeployment(
blue_environment="blue",
green_environment="green",
validator=validate_environment
)
results = deployment.deploy(deploy_pipelines)
print(f"Deployment results: {results}")

dbt Blue-Green Deployment

# Blue-green deployment for dbt
import dbt.main as dbt
from typing import Dict
class DbtBlueGreen:
"""Blue-green deployment for dbt"""
def __init__(
self,
blue_schema: str,
green_schema: str
):
self.blue_schema = blue_schema
self.green_schema = green_schema
def deploy(
self,
dbt_project_dir: str,
models: list = None
) -> Dict:
"""Deploy dbt models to green schema"""
results = {
'status': 'in_progress',
'green_schema': self.green_schema
}
try:
# 1. Run dbt in green schema
print(f"Deploying to {self.green_schema} schema...")
dbt_args = [
'run',
'--project-dir', dbt_project_dir,
'--profiles-dir', dbt_project_dir,
'--vars', f"{{target_schema: {self.green_schema}}}"
]
if models:
dbt_args.extend(['--select', ','.join(models)])
# Run dbt
dbt.dbtRunner(dbt_args).invoke()
results['green_deployment'] = 'success'
# 2. Validate green schema
validation = self._validate_green()
results['validation'] = validation
if not validation['passed']:
raise Exception("Green schema validation failed")
# 3. Switch to green (update schema variable)
print(f"Switching from {self.blue_schema} to {self.green_schema}...")
self._switch_schema()
results['status'] = 'success'
except Exception as e:
print(f"Deployment failed: {e}")
# Rollback
self._rollback()
results['status'] = 'failed'
results['error'] = str(e)
return results
def _validate_green(self) -> Dict:
"""Validate green schema"""
# Run data tests
# Run data quality checks
# Compare with blue schema
return {
'passed': True,
'tests': {
'row_count': 'passed',
'null_check': 'passed',
'data_diff': 'passed'
}
}
def _switch_schema(self):
"""Switch to green schema"""
# Update dbt variable
# Update downstream applications
print(f"Switched to {self.green_schema}")
def _rollback(self):
"""Rollback to blue schema"""
print(f"Rolling back to {self.blue_schema}")
self._switch_schema()

Canary Deployment

Architecture

Implementation

# Canary deployment for data pipelines
from typing import Dict, Callable
import time
class CanaryDeployment:
"""Canary deployment for data systems"""
def __init__(
self,
production_version: str,
canary_version: str,
monitor: Callable
):
self.production = production_version
self.canary = canary_version
self.monitor = monitor
def deploy(
self,
deploy_function: Callable,
traffic_steps: list = [0.1, 0.5, 1.0],
monitor_duration: int = 3600 # 1 hour per step
) -> Dict:
"""Execute canary deployment"""
results = {
'deployment_status': 'in_progress',
'current_traffic': {self.production: 1.0, self.canary: 0.0}
}
try:
# 1. Deploy canary version
print(f"Deploying canary version {self.canary}...")
deploy_function(self.canary)
results['canary_deployed'] = True
# 2. Gradual traffic shift
for canary_traffic in traffic_steps:
print(f"Shifting {canary_traffic * 100}% traffic to canary...")
# Update traffic routing
self._update_traffic(
production_traffic=1 - canary_traffic,
canary_traffic=canary_traffic
)
results['current_traffic'] = {
self.production: 1 - canary_traffic,
self.canary: canary_traffic
}
# Monitor
print(f"Monitoring for {monitor_duration} seconds...")
monitor_start = time.time()
while time.time() - monitor_start < monitor_duration:
metrics = self.monitor(self.canary)
if not metrics['healthy']:
raise Exception(f"Canary unhealthy: {metrics}")
time.sleep(60) # Check every minute
# 3. Full canary deployment
print("Canary deployment successful!")
results['deployment_status'] = 'success'
except Exception as e:
print(f"Canary deployment failed: {e}")
# Rollback to production
print("Rolling back to production...")
self._update_traffic(
production_traffic=1.0,
canary_traffic=0.0
)
results['deployment_status'] = 'failed'
results['rollback'] = True
results['error'] = str(e)
return results
def _update_traffic(
self,
production_traffic: float,
canary_traffic: float
):
"""Update traffic routing"""
# Update load balancer
# Update routing tables
# Update feature flags
print(f"Traffic: {self.production}={production_traffic*100}%, {self.canary}={canary_traffic*100}%")
# Example usage
def deploy_version(version: str):
"""Deploy version"""
print(f"Deploying version {version}...")
def monitor_canary(version: str) -> Dict:
"""Monitor canary version"""
# Check metrics:
# - Error rate
# - Latency
# - Throughput
# - Data quality
# Mock monitoring
return {
'healthy': True,
'metrics': {
'error_rate': 0.01,
'latency_p50': 100,
'latency_p99': 500
}
}
# Execute canary deployment
canary = CanaryDeployment(
production_version="v1",
canary_version="v2",
monitor=monitor_canary
)
results = canary.deploy(
deploy_function=deploy_version,
traffic_steps=[0.1, 0.5, 1.0]
)
print(f"Canary deployment results: {results}")

Shadow Deployment

Architecture

Implementation

# Shadow deployment for data pipelines
from typing import Dict, Callable
class ShadowDeployment:
"""Shadow deployment for data systems"""
def __init__(
self,
production_version: str,
shadow_version: str
):
self.production = production_version
self.shadow = shadow_version
def deploy(
self,
deploy_function: Callable,
traffic_source: Callable
) -> Dict:
"""Execute shadow deployment"""
results = {
'deployment_status': 'in_progress',
'shadow_version': self.shadow
}
try:
# 1. Deploy shadow version
print(f"Deploying shadow version {self.shadow}...")
deploy_function(self.shadow)
results['shadow_deployed'] = True
# 2. Route test traffic to both versions
print("Routing test traffic to both versions...")
production_results = []
shadow_results = []
for batch in traffic_source():
# Production
prod_result = self._run_production(batch)
production_results.append(prod_result)
# Shadow
shadow_result = self._run_shadow(batch)
shadow_results.append(shadow_result)
# 3. Compare results
print("Comparing results...")
comparison = self._compare_results(
production_results,
shadow_results
)
results['comparison'] = comparison
if comparison['within_threshold']:
print("Shadow validation passed!")
results['deployment_status'] = 'success'
else:
print("Shadow validation failed!")
results['deployment_status'] = 'failed'
except Exception as e:
print(f"Shadow deployment failed: {e}")
results['deployment_status'] = 'failed'
results['error'] = str(e)
return results
def _run_production(self, batch: Dict) -> Dict:
"""Run batch through production"""
# Process through production pipeline
return {'result': 'production_output'}
def _run_shadow(self, batch: Dict) -> Dict:
"""Run batch through shadow"""
# Process through shadow pipeline
return {'result': 'shadow_output'}
def _compare_results(
self,
production_results: list,
shadow_results: list
) -> Dict:
"""Compare production and shadow results"""
# Compare outputs
# Check for differences
# Validate within threshold
return {
'within_threshold': True,
'differences': []
}

Deployment Strategies Comparison

Strategy Trade-offs


Rollback Strategies

Rollback Methods

# Rollback strategies for data systems
from typing import Dict
class DataRollback:
"""Rollback strategies for data deployments"""
def __init__(self):
pass
def rollback_code(self, version: str) -> Dict:
"""Rollback code to previous version"""
# Revert code changes
# Redeploy previous version
# Update routing
print(f"Rolling back code to {version}")
return {
'rollback_type': 'code',
'version': version,
'status': 'success'
}
def rollback_data(
self,
table: str,
backup_location: str
) -> Dict:
"""Rollback data from backup"""
# Restore from backup
# Update schema
# Validate data
print(f"Rolling back data for {table} from {backup_location}")
return {
'rollback_type': 'data',
'table': table,
'backup_location': backup_location,
'status': 'success'
}
def rollback_schema(
self,
schema: str,
migration_version: str
) -> Dict:
"""Rollback schema changes"""
# Revert migration
# Update schema registry
# Validate compatibility
print(f"Rolling back schema {schema} to {migration_version}")
return {
'rollback_type': 'schema',
'schema': schema,
'migration_version': migration_version,
'status': 'success'
}

Deployment Best Practices

DO

# 1. Use blue-green for critical systems
# Zero downtime, instant rollback
# 2. Use canary for gradual validation
# Monitor metrics at each step
# 3. Always test in staging first
# Catch issues before production
# 4. Have automated rollback
# Rollback on validation failure
# 5. Monitor deployment closely
# Track metrics, errors, performance

DON’T

# 1. Don't deploy to production directly
# Always use staging first
# 2. Don't skip validation
# Essential for data quality
# 3. Don't ignore monitoring
# Catch issues early
# 4. Don't forget rollback plan
# Always have a rollback strategy
# 5. Don't deploy all at once
# Use gradual rollout strategies

Key Takeaways

  1. Blue-green: Zero downtime, instant rollback, high complexity
  2. Canary: Gradual rollout, metrics monitoring, medium complexity
  3. Shadow: Invisible testing, no user impact, medium complexity
  4. Rolling: Simple incremental updates, low complexity
  5. Rollback: Essential for all deployments
  6. Validation: Critical before full deployment
  7. Monitoring: Track metrics at each step
  8. Use When: Production deployments, major changes, validation

Back to Module 6