Skip to content

Data CI/CD Pipelines

Automated Testing and Deployment for Data Systems


Overview

Data CI/CD pipelines extend software CI/CD practices to data systems. They include automated testing, data validation, schema migration, and deployment strategies for data pipelines, transformations, and models.


Data CI/CD Architecture

Pipeline Stages

Key Stages:

  • Lint & Format: Code quality and consistency
  • Unit Tests: Test transformation logic
  • Data Tests: Validate data quality
  • Staging Deploy: Test environment validation
  • Data Validation: Compare prod vs. staging
  • Production Deploy: Blue-green or canary deployment
  • Monitoring: Track metrics and anomalies

Data CI/CD Tools

Toolchain Overview

StageToolsUse Case
Version ControlGit, GitHub, GitLabSource control
LintingSQLFluff, Black, isortCode formatting
Unit Testingdbt test, pytestLogic validation
Data TestingSoda, Great ExpectationsData quality
CI/CDGitHub Actions, Airflow, dbt CloudPipeline automation
Diffingdatafold, daff, sql-diffData comparison
OrchestrationAirflow, dbt, DagsterPipeline execution

GitHub Actions for Data

dbt CI/CD Pipeline

.github/workflows/dbt-ci-cd.yml
name: dbt CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main, develop]
env:
DBT_PROFILES_DIR: ./
DBT_ENV_SECRET: ${{ secrets.DBT_ENV_SECRET }}
jobs:
lint:
name: Lint and Format
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install dbt-core==1.5.0
pip install dbt-postgres==1.5.0
pip install sqlfluff
- name: Run SQLFluff
run: |
sqlfluff lint models/ --fail-on-warnings
- name: Format SQL
run: |
sqlfluff fix models/ --force
unit-tests:
name: Unit Tests
runs-on: ubuntu-latest
needs: lint
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dbt
run: |
pip install dbt-core==1.5.0
pip install dbt-postgres==1.5.0
- name: Run dbt compile
env:
POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }}
POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }}
POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }}
POSTGRES_TEST_DATABASE: ${{ secrets.POSTGRES_TEST_DATABASE }}
run: |
dbt compile --profiles-dir ./
- name: Run dbt parse
run: |
dbt parse --profiles-dir ./
data-tests:
name: Data Tests
runs-on: ubuntu-latest
needs: unit-tests
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dbt and Soda
run: |
pip install dbt-core==1.5.0
pip install dbt-postgres==1.5.0
pip install soda-core-postgres
- name: Run dbt seed
env:
POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }}
POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }}
POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }}
POSTGRES_TEST_DATABASE: ${{ secrets.POSTGRES_TEST_DATABASE }}
run: |
dbt seed --profiles-dir ./ --full-refresh
- name: Run dbt run
run: |
dbt run --profiles-dir ./ --target test
- name: Run dbt test
run: |
dbt test --profiles-dir ./ --target test
- name: Run Soda checks
env:
SODA_API_KEY: ${{ secrets.SODA_API_KEY }}
run: |
soda scan -f configuration.yaml checks.yml
deploy-staging:
name: Deploy to Staging
runs-on: ubuntu-latest
needs: data-tests
if: github.ref == 'refs/heads/develop'
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dbt
run: |
pip install dbt-core==1.5.0
pip install dbt-postgres==1.5.0
pip install dbt-cloud==1.5.0
- name: Run dbt build (staging)
env:
DBT_CLOUD_PROJECT_ID: ${{ secrets.DBT_CLOUD_PROJECT_ID_STAGING }}
DBT_CLOUD_API_KEY: ${{ secrets.DBT_CLOUD_API_KEY }}
run: |
dbt cloud run --environment-id ${{ secrets.DBT_CLOUD_ENV_ID_STAGING }}
- name: Run data validation (staging)
run: |
python scripts/validate_deployment.py --env staging
deploy-production:
name: Deploy to Production
runs-on: ubuntu-latest
needs: data-tests
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dbt and datafold
run: |
pip install dbt-core==1.5.0
pip install dbt-postgres==1.5.0
pip install datafold
- name: Create datafold diff
env:
DATAFOLD_API_KEY: ${{ secrets.DATAFOLD_API_KEY }}
run: |
datafold diff \
--dataset production:analytics.customers \
--dataset staging:analytics.customers \
--output datafold_diff.json
- name: Validate diff
run: |
python scripts/validate_diff.py --file datafold_diff.json
- name: Deploy to production (blue-green)
env:
DBT_CLOUD_API_KEY: ${{ secrets.DBT_CLOUD_API_KEY }}
run: |
python scripts/blue_green_deploy.py --env production
- name: Run monitoring
run: |
python scripts/monitor_deployment.py --env production

Data Quality Gates

Quality Gate Configuration

quality_gates.yaml
quality_gates:
- name: critical_checks
description: Critical data quality checks
checks:
- type: row_count
table: analytics.customers
condition: count > 0
severity: critical
- type: null_check
table: analytics.orders
column: customer_id
condition: null_ratio < 0.01
severity: critical
- type: duplicate_check
table: analytics.orders
column: order_id
condition: duplicate_count == 0
severity: critical
- type: freshness
table: analytics.orders
column: order_date
condition: lag_hours < 24
severity: critical
- name: warning_checks
description: Warning-level data quality checks
checks:
- type: range_check
table: analytics.orders
column: order_amount
condition: value >= 0 and value <= 1000000
severity: warning
- type: distribution_check
table: analytics.customers
column: age
condition: ks_test > 0.95
severity: warning
- type: schema_drift
table: analytics.orders
condition: schema_match_baseline
severity: warning
- name: post_deploy_checks
description: Post-deployment validation checks
checks:
- type: row_count_comparison
table: analytics.orders
baseline: production_previous
condition: diff_percent < 5
severity: warning
- type: column_comparison
table: analytics.orders
column: total_amount
baseline: production_previous
condition: mean_diff_percent < 10
severity: warning

Quality Gate Implementation

scripts/quality_gates.py
import yaml
import great_expectations as gx
from typing import Dict, List
class QualityGateValidator:
"""Validate data against quality gates"""
def __init__(self, gate_config_path: str):
with open(gate_config_path) as f:
self.config = yaml.safe_load(f)
def validate_critical_checks(self, context: gx.DataContext) -> Dict:
"""Validate critical checks"""
results = {
'passed': True,
'failed_checks': []
}
for gate in self.config['quality_gates']:
if gate['name'] == 'critical_checks':
for check in gate['checks']:
result = self._run_check(context, check)
if not result['passed']:
results['passed'] = False
results['failed_checks'].append({
'check': check,
'result': result
})
return results
def validate_warning_checks(self, context: gx.DataContext) -> Dict:
"""Validate warning checks"""
results = {
'passed': True,
'warnings': []
}
for gate in self.config['quality_gates']:
if gate['name'] == 'warning_checks':
for check in gate['checks']:
result = self._run_check(context, check)
if not result['passed']:
results['passed'] = False
results['warnings'].append({
'check': check,
'result': result
})
return results
def _run_check(self, context: gx.DataContext, check: Dict) -> Dict:
"""Run individual check"""
check_type = check['type']
if check_type == 'row_count':
return self._row_count_check(context, check)
elif check_type == 'null_check':
return self._null_check(context, check)
elif check_type == 'duplicate_check':
return self._duplicate_check(context, check)
elif check_type == 'freshness':
return self._freshness_check(context, check)
elif check_type == 'range_check':
return self._range_check(context, check)
elif check_type == 'schema_drift':
return self._schema_drift_check(context, check)
def _row_count_check(self, context: gx.DataContext, check: Dict) -> Dict:
"""Check row count"""
table = check['table']
condition = check['condition']
# Get batch
batch = context.get_batch()
# Run expectation
result = batch.expect_table_row_count_to_be_between(
min_value=1,
max_value=100000000
)
return {
'passed': result.success,
'result': result
}
def _null_check(self, context: gx.DataContext, check: Dict) -> Dict:
"""Check null ratio"""
table = check['table']
column = check['column']
max_null_ratio = 0.01
batch = context.get_batch()
result = batch.expect_column_values_to_not_be_null(
column=column
)
return {
'passed': result.success,
'result': result
}
def _freshness_check(self, context: gx.DataContext, check: Dict) -> Dict:
"""Check data freshness"""
table = check['table']
column = check['column']
max_lag_hours = 24
batch = context.get_batch()
result = batch.expect_column_values_to_be_between(
column=column,
min_value=-max_lag_hours * 3600,
max_value=0
)
return {
'passed': result.success,
'result': result
}
def validate_post_deploy(self, context: gx.DataContext, baseline_context: gx.DataContext) -> Dict:
"""Validate post-deployment"""
results = {
'passed': True,
'warnings': []
}
for gate in self.config['quality_gates']:
if gate['name'] == 'post_deploy_checks':
for check in gate['checks']:
result = self._run_post_deploy_check(context, baseline_context, check)
if not result['passed']:
results['passed'] = False
results['warnings'].append({
'check': check,
'result': result
})
return results

Airflow CI/CD

Airflow DAG Testing

tests/dags/test_example_dag.py
import pytest
from airflow.models import DagBag
from datetime import datetime, timedelta
class TestDAGs:
"""Test Airflow DAGs"""
@pytest.fixture
def dag_bag(self):
"""Load DAG bag"""
return DagBag(dag_folder="/path/to/dags", include_examples=False)
def test_dag_loaded(self, dag_bag):
"""Test DAGs are loaded without errors"""
assert dag_bag.import_errors == {}, f"Import errors: {dag_bag.import_errors}"
def test_dag_structure(self, dag_bag):
"""Test DAG structure"""
dag = dag_bag.get_dag(dag_id="example_etl_dag")
assert dag is not None, "DAG not found"
assert len(dag.tasks) > 0, "DAG has no tasks"
def test_dag_dependencies(self, dag_bag):
"""Test DAG dependencies"""
dag = dag_bag.get_dag(dag_id="example_etl_dag")
# Test task dependencies
extract_task = dag.get_task("extract")
transform_task = dag.get_task("transform")
load_task = dag.get_task("load")
assert extract_task in transform_task.upstream_list
assert transform_task in load_task.upstream_list
def test_dag_schedule(self, dag_bag):
"""Test DAG schedule"""
dag = dag_bag.get_dag(dag_id="example_etl_dag")
assert dag.schedule_interval is not None, "DAG has no schedule"
assert dag.catchup is False, "DAG catchup is enabled"
def test_dag_start_date(self, dag_bag):
"""Test DAG start date"""
dag = dag_bag.get_dag(dag_id="example_etl_dag")
assert dag.start_date <= datetime.now() - timedelta(days=1)

Airflow CI/CD Pipeline

.github/workflows/airflow-ci-cd.yml
name: Airflow CI/CD Pipeline
on:
push:
branches: [main, develop]
paths:
- 'dags/**'
- 'plugins/**'
- 'tests/**'
jobs:
lint:
name: Lint DAGs
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install apache-airflow==2.7.0
pip install flake8
pip import black
- name: Run flake8
run: |
flake8 dags/ --max-line-length=100
- name: Run black
run: |
black --check dags/
unit-tests:
name: Unit Tests
runs-on: ubuntu-latest
needs: lint
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install apache-airflow==2.7.0
pip install pytest
- name: Run pytest
run: |
pytest tests/dags/
deploy:
name: Deploy DAGs
runs-on: ubuntu-latest
needs: unit-tests
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Deploy to Airflow
env:
AIRFLOW_URL: ${{ secrets.AIRFLOW_URL }}
AIRFLOW_USERNAME: ${{ secrets.AIRFLOW_USERNAME }}
AIRFLOW_PASSWORD: ${{ secrets.AIRFLOW_PASSWORD }}
run: |
python scripts/deploy_dags.py

dbt CI/CD

dbt Project Structure

dbt_project.yml
name: 'my_data_platform'
version: '1.0.0'
config-version: 2
profile: 'my_data_platform'
model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
analysis-paths: ["analysis"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
my_data_platform:
staging:
+schema: staging
+materialized: view
intermediate:
+schema: intermediate
+materialized: view
marts:
+schema: marts
+materialized: table
tests:
my_data_platform:
+schema: tests

dbt Tests

-- tests/schema_tests/customers_not_null.sql
SELECT
customer_id,
customer_name,
customer_email
FROM {{ ref('stg_customers') }}
WHERE
customer_id IS NULL
OR customer_name IS NULL
OR customer_email IS NULL

Data CI/CD Best Practices

DO

# 1. Test everything
# Unit tests, data tests, integration tests
# 2. Use quality gates
# Critical checks block deployment
# 3. Deploy incrementally
# Blue-green or canary deployments
# 4. Monitor everything
# Metrics, alerts, observability
# 5. Automate rollback
# Automatic rollback on validation failure

DON’T

# 1. Don't skip tests
# Essential for data quality
# 2. Don't deploy without validation
# Data validation is critical
# 3. Don't ignore warnings
# Warnings can indicate issues
# 4. Don't deploy all at once
# Incremental deployments are safer
# 5. Don't forget rollback
# Always have a rollback plan

Key Takeaways

  1. Pipeline stages: Lint → Unit tests → Data tests → Staging → Validation → Production
  2. Quality gates: Critical checks block deployment
  3. Testing: Unit, data, integration tests
  4. Validation: Compare prod vs. staging, data quality checks
  5. Deployment: Blue-green or canary deployments
  6. Monitoring: Track metrics and anomalies
  7. Rollback: Automatic rollback on validation failure
  8. Use When: All data systems, pipelines, transformations

Back to Module 6