Skip to content

Great Expectations

Data Testing Framework


Overview

Great Expectations (GE) is an open-source data testing framework that enables data teams to validate data quality, document data expectations, and prevent data issues. This document covers GE fundamentals and implementation patterns for production data platforms.


Core Concepts

Expectations

Expectation: A verifiable assertion about data (e.g., “column X should not contain nulls”).

Validation: The act of checking if data meets expectations.


Quick Start

Installation

Terminal window
pip install great_expectations
# Initialize project
great_expectations init

Basic Usage

import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
# Create DataContext
context = gx.get_context()
# Connect to data
datasource = context.data_sources.add_pandas("my_datasource")
# Create Expectation Suite
suite = context.expectation_suites.add(
gx.ExpectationSuite(name="my_suite")
)
# Define expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="user_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="age",
min_value=0,
max_value=120
)
)
# Validate data
batch_request = RuntimeBatchRequest(
datasource_name="my_datasource",
data=my_dataframe,
expectation_suite_name="my_suite"
)
validation_result = context.run_validation_operator(
"action_list_operator",
[batch_request]
)

Expectation Types

Table Expectations

import great_expectations as gx
# Expect table to have specific row count
table.expect_table_row_count_to_equal(min_value=1000, max_value=10000)
# Expect table to have specific columns
table.expect_table_column_count_to_equal(value=5)
# Expect table columns to match set
table.expect_table_columns_to_match_ordered_list(
column_list=["id", "name", "email", "created_at", "status"]
)

Column Expectations

# Null checks
column.expect_column_values_to_notBeNull(column="user_id")
# Type checks
column.expect_column_values_to_be_of_type(column="amount", type_="DECIMAL")
# Value range checks
column.expect_column_values_to_be_between(
column="age",
min_value=0,
max_value=120
)
# Set membership checks
column.expect_column_values_to_be_in_set(
column="status",
value_set=["active", "inactive", "pending"]
)
# Pattern checks
column.expect_column_values_to_match_regex_pattern(
column="email",
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
# Uniqueness checks
column.expect_column_values_to_be_unique(column="user_id")
# Length checks
column.expect_column_value_lengths_to_be_between(
column="username",
min_value=3,
max_value=20
)

Aggregate Expectations

# Sum checks
table.expect_column_values_to_sum_to(
column="amount",
value=100000, # Expected total
tolerance=0.05 # Allow 5% variance
)
# Mean checks
column.expect_column_values_to_have_mean(
column="price",
mean=100,
tolerance=0.1
)
# Standard deviation checks
column.expect_column_values_to_have_standard_deviation_within_range(
column="measurement",
min_value=0,
max_value=10
)

Multicolumn Expectations

# Pairwise equality
table.expect_column_pair_values_to_be_equal(
column_A="billing_address",
column_B="shipping_address"
)
# Pairwise inequality
table.expect_column_pair_values_A_to_be_greater_than_B(
column_A="end_date",
column_B="start_date"
)
# Pairwise membership
table.expect_column_pair_values_to_be_in_set(
column_A="country_code",
column_B="expected_country_code",
value_set=["US", "CA", "UK"]
)

Expectation Suites

Creating Suites

import great_expectations as gx
# Create comprehensive suite for customers table
suite = context.expectation_suites.add(
gx.ExpectationSuite(name="customers_suite")
)
# Basic expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="customer_id")
)
# Data quality expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="email")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchRegexPattern(
column="email",
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
)
# Business logic expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["active", "inactive", "pending", "blocked"]
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="registration_year",
min_value=2000,
max_value=2025
)
)

Suite Organization


Validation Operators

Creating Operators

from great_expectations.validation_operators import ActionsListValidationOperator
# Create validation operator
validation_operator = ActionsListValidationOperator(
name="my_validation_operator",
actions=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_expectation_suite",
"action": {"class_name": "StoreExpectationSuiteAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
}
]
)
context.add_validation_operator(validation_operator)

Validation in Production

# Validate after ETL
def validate_etl_output(data_df, table_name):
"""
Validate ETL output data using Great Expectations.
"""
batch_request = RuntimeBatchRequest(
datasource_name="my_datasource",
data=data_df,
expectation_suite_name=table_name + "_suite",
batch_identifiers={"table": table_name}
)
result = context.run_validation_operator(
"my_validation_operator",
[batch_request]
)
if not result["success"]:
# Handle validation failure
send_alert(
subject=f"Data validation failed for {table_name}",
details=result
)
raise ValueError(f"Data validation failed for {table_name}")
return result

Data Docs

Generating Documentation

# Build data docs (auto-generated from expectations)
context.build_data_docs()
# Data Docs include:
# - Expectation definitions
# - Validation results history
# - Data profiles
# - Sample data

Data Docs as Data Dictionary


Checkpoints

Creating Checkpoints

# Create checkpoint (reusable validation configuration)
checkpoint_config = {
"name": "my_checkpoint",
"config_version": 1.0,
"class_name": "SimpleCheckpoint",
"batch_request": {
"datasource_name": "my_datasource",
"data_connector_name": "my_data_connector",
"data_asset_name": "my_table",
}
}
context.add_checkpoint(**checkpoint_config)
# Run checkpoint
checkpoint_result = context.run_checkpoint(
checkpoint_name="my_checkpoint"
)

Checkpoints in CI/CD

.github/workflows/ge_validation.yml
name: Great Expectations Validation
on:
push:
branches: [main]
pull_request:
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: pip install great_expectations pandas
- name: Run Great Expectations
run: great_expectations checkpoint run my_checkpoint
- name: Upload validation results
uses: actions/upload-artifact@v2
with:
name: validation-results
path: uncommitted/validations/

Integration Patterns

dbt Integration

# dbt model validation
import great_expectations as ge
import dbt
# Get dbt model results
dbt_results = dbt.run_model("my_model")
# Validate with GE
df = dbt_results.to_dataframe()
batch_request = RuntimeBatchRequest(
datasource_name="dbt_datasource",
data=df,
expectation_suite_name="dbt_model_suite"
)
validation_result = context.run_validation_operator(
"my_validation_operator",
[batch_request]
)

Airflow Integration

from airflow import DAG
from airflow.operators.python import PythonOperator
from great_expectations_provider import GreatExpectationsOperator
with DAG('ge_validation', start_date='2025-01-01') as dag:
validate_task = GreatExpectationsOperator(
task_id='validate_data',
expectation_suite_name='my_suite',
batch_kwargs={
'datasource_name': 'my_datasource',
'data_connector_name': 'my_data_connector',
'data_asset_name': 'my_table'
}
)

Performance Optimization

Sampling for Large Datasets

# Use sampling for validation on large datasets
batch_request = RuntimeBatchRequest(
datasource_name="my_datasource",
data=my_large_dataframe.sample(frac=0.1), # 10% sample
expectation_suite_name="my_suite"
)

Efficient Expectations

# Bad: Expensive expectation
column.expect_column_values_to_be_unique(
column="user_id",
result_format="COMPLETE" # Returns all values
)
# Good: Use SAMPLE or SUMMARY
column.expect_column_values_to_be_unique(
column="user_id",
result_format="SUMMARY" # Returns summary only
)

Cost Considerations

Validation Cost

ScenarioCostMitigation
Full validationHigh (compute)Use sampling
Complex expectationsHighSimplify or use sampling
Every runHighValidate only on changes

Optimization Strategies

# 1. Validate only on changes
if data_has_changed():
validate_data()
# 2. Use sampling for large tables
sampled_df = large_df.sample(frac=0.01) # 1% sample
# 3. Run critical expectations only
suite = filter_critical_expectations(full_suite)
# 4. Batch validation
validate_multiple_tables(tables, batch_size=10)

Senior Level Considerations

Anti-Patterns

Anti-Pattern 1: Too many expectations

# Bad: 1000+ expectations (slow, hard to maintain)
suite.add_expectation(...) # × 1000
# Good: 20-30 critical expectations
suite.add_expectation(...) # × 25

Anti-Pattern 2: No remediation actions

# Bad: Validation fails, no action
if not result["success"]:
pass
# Good: Alert and remediate
if not result["success"]:
send_alert(result)
quarantine_bad_data()

Anti-Pattern 3: Only testing in dev

# Bad: Only validate in development
if ENV == "dev":
validate_data()
# Good: Validate in production too
validate_data() # Always validate

Key Takeaways

  1. Expectations: Verifiable assertions about data quality
  2. Suites: Organize expectations by criticality
  3. Validation: Run checks on data batches
  4. Data Docs: Auto-generated documentation
  5. CI/CD: Integrate validation into pipelines
  6. Sampling: Optimize for large datasets
  7. Critical expectations: Focus on what matters most
  8. Remediation: Define actions on validation failure

Back to Module 4