Data CI/CD Pipelines
Automated Testing and Deployment for Data Systems
Overview
Data CI/CD pipelines extend software CI/CD practices to data systems. They include automated testing, data validation, schema migration, and deployment strategies for data pipelines, transformations, and models.
Data CI/CD Architecture
Pipeline Stages
Key Stages:
- Lint & Format: Code quality and consistency
- Unit Tests: Test transformation logic
- Data Tests: Validate data quality
- Staging Deploy: Test environment validation
- Data Validation: Compare prod vs. staging
- Production Deploy: Blue-green or canary deployment
- Monitoring: Track metrics and anomalies
Data CI/CD Tools
Toolchain Overview
| Stage | Tools | Use Case |
|---|---|---|
| Version Control | Git, GitHub, GitLab | Source control |
| Linting | SQLFluff, Black, isort | Code formatting |
| Unit Testing | dbt test, pytest | Logic validation |
| Data Testing | Soda, Great Expectations | Data quality |
| CI/CD | GitHub Actions, Airflow, dbt Cloud | Pipeline automation |
| Diffing | datafold, daff, sql-diff | Data comparison |
| Orchestration | Airflow, dbt, Dagster | Pipeline execution |
GitHub Actions for Data
dbt CI/CD Pipeline
name: dbt CI/CD Pipeline
on: push: branches: [main, develop] pull_request: branches: [main, develop]
env: DBT_PROFILES_DIR: ./ DBT_ENV_SECRET: ${{ secrets.DBT_ENV_SECRET }}
jobs: lint: name: Lint and Format runs-on: ubuntu-latest steps: - uses: actions/checkout@v3
- name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11'
- name: Install dependencies run: | pip install dbt-core==1.5.0 pip install dbt-postgres==1.5.0 pip install sqlfluff
- name: Run SQLFluff run: | sqlfluff lint models/ --fail-on-warnings
- name: Format SQL run: | sqlfluff fix models/ --force
unit-tests: name: Unit Tests runs-on: ubuntu-latest needs: lint steps: - uses: actions/checkout@v3
- name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11'
- name: Install dbt run: | pip install dbt-core==1.5.0 pip install dbt-postgres==1.5.0
- name: Run dbt compile env: POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }} POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }} POSTGRES_TEST_DATABASE: ${{ secrets.POSTGRES_TEST_DATABASE }} run: | dbt compile --profiles-dir ./
- name: Run dbt parse run: | dbt parse --profiles-dir ./
data-tests: name: Data Tests runs-on: ubuntu-latest needs: unit-tests steps: - uses: actions/checkout@v3
- name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11'
- name: Install dbt and Soda run: | pip install dbt-core==1.5.0 pip install dbt-postgres==1.5.0 pip install soda-core-postgres
- name: Run dbt seed env: POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }} POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }} POSTGRES_TEST_DATABASE: ${{ secrets.POSTGRES_TEST_DATABASE }} run: | dbt seed --profiles-dir ./ --full-refresh
- name: Run dbt run run: | dbt run --profiles-dir ./ --target test
- name: Run dbt test run: | dbt test --profiles-dir ./ --target test
- name: Run Soda checks env: SODA_API_KEY: ${{ secrets.SODA_API_KEY }} run: | soda scan -f configuration.yaml checks.yml
deploy-staging: name: Deploy to Staging runs-on: ubuntu-latest needs: data-tests if: github.ref == 'refs/heads/develop' steps: - uses: actions/checkout@v3
- name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11'
- name: Install dbt run: | pip install dbt-core==1.5.0 pip install dbt-postgres==1.5.0 pip install dbt-cloud==1.5.0
- name: Run dbt build (staging) env: DBT_CLOUD_PROJECT_ID: ${{ secrets.DBT_CLOUD_PROJECT_ID_STAGING }} DBT_CLOUD_API_KEY: ${{ secrets.DBT_CLOUD_API_KEY }} run: | dbt cloud run --environment-id ${{ secrets.DBT_CLOUD_ENV_ID_STAGING }}
- name: Run data validation (staging) run: | python scripts/validate_deployment.py --env staging
deploy-production: name: Deploy to Production runs-on: ubuntu-latest needs: data-tests if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v3
- name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11'
- name: Install dbt and datafold run: | pip install dbt-core==1.5.0 pip install dbt-postgres==1.5.0 pip install datafold
- name: Create datafold diff env: DATAFOLD_API_KEY: ${{ secrets.DATAFOLD_API_KEY }} run: | datafold diff \ --dataset production:analytics.customers \ --dataset staging:analytics.customers \ --output datafold_diff.json
- name: Validate diff run: | python scripts/validate_diff.py --file datafold_diff.json
- name: Deploy to production (blue-green) env: DBT_CLOUD_API_KEY: ${{ secrets.DBT_CLOUD_API_KEY }} run: | python scripts/blue_green_deploy.py --env production
- name: Run monitoring run: | python scripts/monitor_deployment.py --env productionData Quality Gates
Quality Gate Configuration
quality_gates: - name: critical_checks description: Critical data quality checks checks: - type: row_count table: analytics.customers condition: count > 0 severity: critical
- type: null_check table: analytics.orders column: customer_id condition: null_ratio < 0.01 severity: critical
- type: duplicate_check table: analytics.orders column: order_id condition: duplicate_count == 0 severity: critical
- type: freshness table: analytics.orders column: order_date condition: lag_hours < 24 severity: critical
- name: warning_checks description: Warning-level data quality checks checks: - type: range_check table: analytics.orders column: order_amount condition: value >= 0 and value <= 1000000 severity: warning
- type: distribution_check table: analytics.customers column: age condition: ks_test > 0.95 severity: warning
- type: schema_drift table: analytics.orders condition: schema_match_baseline severity: warning
- name: post_deploy_checks description: Post-deployment validation checks checks: - type: row_count_comparison table: analytics.orders baseline: production_previous condition: diff_percent < 5 severity: warning
- type: column_comparison table: analytics.orders column: total_amount baseline: production_previous condition: mean_diff_percent < 10 severity: warningQuality Gate Implementation
import yamlimport great_expectations as gxfrom typing import Dict, List
class QualityGateValidator: """Validate data against quality gates"""
def __init__(self, gate_config_path: str): with open(gate_config_path) as f: self.config = yaml.safe_load(f)
def validate_critical_checks(self, context: gx.DataContext) -> Dict: """Validate critical checks"""
results = { 'passed': True, 'failed_checks': [] }
for gate in self.config['quality_gates']: if gate['name'] == 'critical_checks': for check in gate['checks']: result = self._run_check(context, check)
if not result['passed']: results['passed'] = False results['failed_checks'].append({ 'check': check, 'result': result })
return results
def validate_warning_checks(self, context: gx.DataContext) -> Dict: """Validate warning checks"""
results = { 'passed': True, 'warnings': [] }
for gate in self.config['quality_gates']: if gate['name'] == 'warning_checks': for check in gate['checks']: result = self._run_check(context, check)
if not result['passed']: results['passed'] = False results['warnings'].append({ 'check': check, 'result': result })
return results
def _run_check(self, context: gx.DataContext, check: Dict) -> Dict: """Run individual check"""
check_type = check['type']
if check_type == 'row_count': return self._row_count_check(context, check)
elif check_type == 'null_check': return self._null_check(context, check)
elif check_type == 'duplicate_check': return self._duplicate_check(context, check)
elif check_type == 'freshness': return self._freshness_check(context, check)
elif check_type == 'range_check': return self._range_check(context, check)
elif check_type == 'schema_drift': return self._schema_drift_check(context, check)
def _row_count_check(self, context: gx.DataContext, check: Dict) -> Dict: """Check row count"""
table = check['table'] condition = check['condition']
# Get batch batch = context.get_batch()
# Run expectation result = batch.expect_table_row_count_to_be_between( min_value=1, max_value=100000000 )
return { 'passed': result.success, 'result': result }
def _null_check(self, context: gx.DataContext, check: Dict) -> Dict: """Check null ratio"""
table = check['table'] column = check['column'] max_null_ratio = 0.01
batch = context.get_batch()
result = batch.expect_column_values_to_not_be_null( column=column )
return { 'passed': result.success, 'result': result }
def _freshness_check(self, context: gx.DataContext, check: Dict) -> Dict: """Check data freshness"""
table = check['table'] column = check['column'] max_lag_hours = 24
batch = context.get_batch()
result = batch.expect_column_values_to_be_between( column=column, min_value=-max_lag_hours * 3600, max_value=0 )
return { 'passed': result.success, 'result': result }
def validate_post_deploy(self, context: gx.DataContext, baseline_context: gx.DataContext) -> Dict: """Validate post-deployment"""
results = { 'passed': True, 'warnings': [] }
for gate in self.config['quality_gates']: if gate['name'] == 'post_deploy_checks': for check in gate['checks']: result = self._run_post_deploy_check(context, baseline_context, check)
if not result['passed']: results['passed'] = False results['warnings'].append({ 'check': check, 'result': result })
return resultsAirflow CI/CD
Airflow DAG Testing
import pytestfrom airflow.models import DagBagfrom datetime import datetime, timedelta
class TestDAGs: """Test Airflow DAGs"""
@pytest.fixture def dag_bag(self): """Load DAG bag"""
return DagBag(dag_folder="/path/to/dags", include_examples=False)
def test_dag_loaded(self, dag_bag): """Test DAGs are loaded without errors"""
assert dag_bag.import_errors == {}, f"Import errors: {dag_bag.import_errors}"
def test_dag_structure(self, dag_bag): """Test DAG structure"""
dag = dag_bag.get_dag(dag_id="example_etl_dag")
assert dag is not None, "DAG not found" assert len(dag.tasks) > 0, "DAG has no tasks"
def test_dag_dependencies(self, dag_bag): """Test DAG dependencies"""
dag = dag_bag.get_dag(dag_id="example_etl_dag")
# Test task dependencies extract_task = dag.get_task("extract") transform_task = dag.get_task("transform") load_task = dag.get_task("load")
assert extract_task in transform_task.upstream_list assert transform_task in load_task.upstream_list
def test_dag_schedule(self, dag_bag): """Test DAG schedule"""
dag = dag_bag.get_dag(dag_id="example_etl_dag")
assert dag.schedule_interval is not None, "DAG has no schedule" assert dag.catchup is False, "DAG catchup is enabled"
def test_dag_start_date(self, dag_bag): """Test DAG start date"""
dag = dag_bag.get_dag(dag_id="example_etl_dag")
assert dag.start_date <= datetime.now() - timedelta(days=1)Airflow CI/CD Pipeline
name: Airflow CI/CD Pipeline
on: push: branches: [main, develop] paths: - 'dags/**' - 'plugins/**' - 'tests/**'
jobs: lint: name: Lint DAGs runs-on: ubuntu-latest steps: - uses: actions/checkout@v3
- name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11'
- name: Install dependencies run: | pip install apache-airflow==2.7.0 pip install flake8 pip import black
- name: Run flake8 run: | flake8 dags/ --max-line-length=100
- name: Run black run: | black --check dags/
unit-tests: name: Unit Tests runs-on: ubuntu-latest needs: lint steps: - uses: actions/checkout@v3
- name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11'
- name: Install dependencies run: | pip install apache-airflow==2.7.0 pip install pytest
- name: Run pytest run: | pytest tests/dags/
deploy: name: Deploy DAGs runs-on: ubuntu-latest needs: unit-tests if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v3
- name: Deploy to Airflow env: AIRFLOW_URL: ${{ secrets.AIRFLOW_URL }} AIRFLOW_USERNAME: ${{ secrets.AIRFLOW_USERNAME }} AIRFLOW_PASSWORD: ${{ secrets.AIRFLOW_PASSWORD }} run: | python scripts/deploy_dags.pydbt CI/CD
dbt Project Structure
name: 'my_data_platform'version: '1.0.0'config-version: 2
profile: 'my_data_platform'
model-paths: ["models"]seed-paths: ["seeds"]test-paths: ["tests"]analysis-paths: ["analysis"]macro-paths: ["macros"]snapshot-paths: ["snapshots"]
target-path: "target"clean-targets: - "target" - "dbt_packages"
models: my_data_platform: staging: +schema: staging +materialized: view intermediate: +schema: intermediate +materialized: view marts: +schema: marts +materialized: table
tests: my_data_platform: +schema: testsdbt Tests
-- tests/schema_tests/customers_not_null.sql
SELECT customer_id, customer_name, customer_emailFROM {{ ref('stg_customers') }}WHERE customer_id IS NULL OR customer_name IS NULL OR customer_email IS NULLData CI/CD Best Practices
DO
# 1. Test everything# Unit tests, data tests, integration tests
# 2. Use quality gates# Critical checks block deployment
# 3. Deploy incrementally# Blue-green or canary deployments
# 4. Monitor everything# Metrics, alerts, observability
# 5. Automate rollback# Automatic rollback on validation failureDON’T
# 1. Don't skip tests# Essential for data quality
# 2. Don't deploy without validation# Data validation is critical
# 3. Don't ignore warnings# Warnings can indicate issues
# 4. Don't deploy all at once# Incremental deployments are safer
# 5. Don't forget rollback# Always have a rollback planKey Takeaways
- Pipeline stages: Lint → Unit tests → Data tests → Staging → Validation → Production
- Quality gates: Critical checks block deployment
- Testing: Unit, data, integration tests
- Validation: Compare prod vs. staging, data quality checks
- Deployment: Blue-green or canary deployments
- Monitoring: Track metrics and anomalies
- Rollback: Automatic rollback on validation failure
- Use When: All data systems, pipelines, transformations
Back to Module 6