Great Expectations
Data Testing Framework
Overview
Great Expectations (GE) is an open-source data testing framework that enables data teams to validate data quality, document data expectations, and prevent data issues. This document covers GE fundamentals and implementation patterns for production data platforms.
Core Concepts
Expectations
Expectation: A verifiable assertion about data (e.g., “column X should not contain nulls”).
Validation: The act of checking if data meets expectations.
Quick Start
Installation
pip install great_expectations
# Initialize projectgreat_expectations initBasic Usage
import great_expectations as gxfrom great_expectations.core.batch import RuntimeBatchRequest
# Create DataContextcontext = gx.get_context()
# Connect to datadatasource = context.data_sources.add_pandas("my_datasource")
# Create Expectation Suitesuite = context.expectation_suites.add( gx.ExpectationSuite(name="my_suite"))
# Define expectationssuite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="user_id"))
suite.add_expectation( gx.expectations.ExpectColumnValuesToBeBetween( column="age", min_value=0, max_value=120 ))
# Validate databatch_request = RuntimeBatchRequest( datasource_name="my_datasource", data=my_dataframe, expectation_suite_name="my_suite")
validation_result = context.run_validation_operator( "action_list_operator", [batch_request])Expectation Types
Table Expectations
import great_expectations as gx
# Expect table to have specific row counttable.expect_table_row_count_to_equal(min_value=1000, max_value=10000)
# Expect table to have specific columnstable.expect_table_column_count_to_equal(value=5)
# Expect table columns to match settable.expect_table_columns_to_match_ordered_list( column_list=["id", "name", "email", "created_at", "status"])Column Expectations
# Null checkscolumn.expect_column_values_to_notBeNull(column="user_id")
# Type checkscolumn.expect_column_values_to_be_of_type(column="amount", type_="DECIMAL")
# Value range checkscolumn.expect_column_values_to_be_between( column="age", min_value=0, max_value=120)
# Set membership checkscolumn.expect_column_values_to_be_in_set( column="status", value_set=["active", "inactive", "pending"])
# Pattern checkscolumn.expect_column_values_to_match_regex_pattern( column="email", regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
# Uniqueness checkscolumn.expect_column_values_to_be_unique(column="user_id")
# Length checkscolumn.expect_column_value_lengths_to_be_between( column="username", min_value=3, max_value=20)Aggregate Expectations
# Sum checkstable.expect_column_values_to_sum_to( column="amount", value=100000, # Expected total tolerance=0.05 # Allow 5% variance)
# Mean checkscolumn.expect_column_values_to_have_mean( column="price", mean=100, tolerance=0.1)
# Standard deviation checkscolumn.expect_column_values_to_have_standard_deviation_within_range( column="measurement", min_value=0, max_value=10)Multicolumn Expectations
# Pairwise equalitytable.expect_column_pair_values_to_be_equal( column_A="billing_address", column_B="shipping_address")
# Pairwise inequalitytable.expect_column_pair_values_A_to_be_greater_than_B( column_A="end_date", column_B="start_date")
# Pairwise membershiptable.expect_column_pair_values_to_be_in_set( column_A="country_code", column_B="expected_country_code", value_set=["US", "CA", "UK"])Expectation Suites
Creating Suites
import great_expectations as gx
# Create comprehensive suite for customers tablesuite = context.expectation_suites.add( gx.ExpectationSuite(name="customers_suite"))
# Basic expectationssuite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id"))suite.add_expectation( gx.expectations.ExpectColumnValuesToBeUnique(column="customer_id"))
# Data quality expectationssuite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="email"))suite.add_expectation( gx.expectations.ExpectColumnValuesToMatchRegexPattern( column="email", regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" ))
# Business logic expectationssuite.add_expectation( gx.expectations.ExpectColumnValuesToBeInSet( column="status", value_set=["active", "inactive", "pending", "blocked"] ))
suite.add_expectation( gx.expectations.ExpectColumnValuesToBeBetween( column="registration_year", min_value=2000, max_value=2025 ))Suite Organization
Validation Operators
Creating Operators
from great_expectations.validation_operators import ActionsListValidationOperator
# Create validation operatorvalidation_operator = ActionsListValidationOperator( name="my_validation_operator", actions=[ { "name": "store_validation_result", "action": {"class_name": "StoreValidationResultAction"}, }, { "name": "store_expectation_suite", "action": {"class_name": "StoreExpectationSuiteAction"}, }, { "name": "update_data_docs", "action": {"class_name": "UpdateDataDocsAction"}, } ])
context.add_validation_operator(validation_operator)Validation in Production
# Validate after ETLdef validate_etl_output(data_df, table_name): """ Validate ETL output data using Great Expectations. """ batch_request = RuntimeBatchRequest( datasource_name="my_datasource", data=data_df, expectation_suite_name=table_name + "_suite", batch_identifiers={"table": table_name} )
result = context.run_validation_operator( "my_validation_operator", [batch_request] )
if not result["success"]: # Handle validation failure send_alert( subject=f"Data validation failed for {table_name}", details=result ) raise ValueError(f"Data validation failed for {table_name}")
return resultData Docs
Generating Documentation
# Build data docs (auto-generated from expectations)context.build_data_docs()
# Data Docs include:# - Expectation definitions# - Validation results history# - Data profiles# - Sample dataData Docs as Data Dictionary
Checkpoints
Creating Checkpoints
# Create checkpoint (reusable validation configuration)checkpoint_config = { "name": "my_checkpoint", "config_version": 1.0, "class_name": "SimpleCheckpoint", "batch_request": { "datasource_name": "my_datasource", "data_connector_name": "my_data_connector", "data_asset_name": "my_table", }}
context.add_checkpoint(**checkpoint_config)
# Run checkpointcheckpoint_result = context.run_checkpoint( checkpoint_name="my_checkpoint")Checkpoints in CI/CD
name: Great Expectations Validation
on: push: branches: [main] pull_request:
jobs: validate: runs-on: ubuntu-latest
steps: - uses: actions/checkout@v2
- name: Set up Python uses: actions/setup-python@v2 with: python-version: '3.9'
- name: Install dependencies run: pip install great_expectations pandas
- name: Run Great Expectations run: great_expectations checkpoint run my_checkpoint
- name: Upload validation results uses: actions/upload-artifact@v2 with: name: validation-results path: uncommitted/validations/Integration Patterns
dbt Integration
# dbt model validationimport great_expectations as geimport dbt
# Get dbt model resultsdbt_results = dbt.run_model("my_model")
# Validate with GEdf = dbt_results.to_dataframe()
batch_request = RuntimeBatchRequest( datasource_name="dbt_datasource", data=df, expectation_suite_name="dbt_model_suite")
validation_result = context.run_validation_operator( "my_validation_operator", [batch_request])Airflow Integration
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom great_expectations_provider import GreatExpectationsOperator
with DAG('ge_validation', start_date='2025-01-01') as dag:
validate_task = GreatExpectationsOperator( task_id='validate_data', expectation_suite_name='my_suite', batch_kwargs={ 'datasource_name': 'my_datasource', 'data_connector_name': 'my_data_connector', 'data_asset_name': 'my_table' } )Performance Optimization
Sampling for Large Datasets
# Use sampling for validation on large datasetsbatch_request = RuntimeBatchRequest( datasource_name="my_datasource", data=my_large_dataframe.sample(frac=0.1), # 10% sample expectation_suite_name="my_suite")Efficient Expectations
# Bad: Expensive expectationcolumn.expect_column_values_to_be_unique( column="user_id", result_format="COMPLETE" # Returns all values)
# Good: Use SAMPLE or SUMMARYcolumn.expect_column_values_to_be_unique( column="user_id", result_format="SUMMARY" # Returns summary only)Cost Considerations
Validation Cost
| Scenario | Cost | Mitigation |
|---|---|---|
| Full validation | High (compute) | Use sampling |
| Complex expectations | High | Simplify or use sampling |
| Every run | High | Validate only on changes |
Optimization Strategies
# 1. Validate only on changesif data_has_changed(): validate_data()
# 2. Use sampling for large tablessampled_df = large_df.sample(frac=0.01) # 1% sample
# 3. Run critical expectations onlysuite = filter_critical_expectations(full_suite)
# 4. Batch validationvalidate_multiple_tables(tables, batch_size=10)Senior Level Considerations
Anti-Patterns
Anti-Pattern 1: Too many expectations
# Bad: 1000+ expectations (slow, hard to maintain)suite.add_expectation(...) # × 1000
# Good: 20-30 critical expectationssuite.add_expectation(...) # × 25Anti-Pattern 2: No remediation actions
# Bad: Validation fails, no actionif not result["success"]: pass
# Good: Alert and remediateif not result["success"]: send_alert(result) quarantine_bad_data()Anti-Pattern 3: Only testing in dev
# Bad: Only validate in developmentif ENV == "dev": validate_data()
# Good: Validate in production toovalidate_data() # Always validateKey Takeaways
- Expectations: Verifiable assertions about data quality
- Suites: Organize expectations by criticality
- Validation: Run checks on data batches
- Data Docs: Auto-generated documentation
- CI/CD: Integrate validation into pipelines
- Sampling: Optimize for large datasets
- Critical expectations: Focus on what matters most
- Remediation: Define actions on validation failure
Back to Module 4