Data Diffing
Data Comparison and Validation Strategies
Overview
Data diffing compares datasets across environments, time periods, or transformations to identify differences, validate data quality, and ensure consistency. It’s essential for CI/CD pipelines, data migration, and production validation.
Data Diffing Strategies
Strategy Comparison
Strategy Selection:
| Strategy | Accuracy | Speed | Cost | Use Case |
|---|---|---|---|---|
| Row-by-row | 100% | Slow | High | Small datasets, critical validation |
| Aggregate | Statistical | Fast | Low | Large datasets, trend validation |
| Schema | Metadata | Very fast | Very low | Schema changes, migration |
| Sample | Probabilistic | Fast | Low | Quick validation, large datasets |
Row-by-Row Diffing
Exact Match Comparison
# Row-by-row diffing with datafold
import datafoldfrom typing import Dict, List
class DatafoldDiff: """Row-by-row data diffing"""
def __init__(self, api_key: str): self.client = datafold.Client(api_key)
def compare_datasets( self, dataset_a: str, # "project:schema.table" dataset_b: str, primary_keys: List[str] ) -> Dict: """Compare two datasets row-by-row"""
# Create diff diff = self.client.create_diff( dataset_a=dataset_a, dataset_b=dataset_b, primary_keys=primary_keys )
# Get results results = { 'rows_added': diff.rows_added, 'rows_removed': diff.rows_removed, 'rows_changed': diff.rows_changed, 'rows_unchanged': diff.rows_unchanged, 'change_percentage': (diff.rows_changed + diff.rows_added + diff.rows_removed) / diff.total_rows * 100 }
return results
def get_column_differences( self, dataset_a: str, dataset_b: str, primary_keys: List[str] ) -> Dict: """Get column-level differences"""
diff = self.client.create_diff( dataset_a=dataset_a, dataset_b=dataset_b, primary_keys=primary_keys )
column_diffs = {}
for column_diff in diff.column_differences: column_diffs[column_diff.column_name] = { 'changed_rows': column_diff.changed_rows, 'change_percentage': column_diff.change_percentage }
return column_diffs
# Example usagediff = DatafoldDiff(api_key="your-api-key")
# Compare staging vs. productionresults = diff.compare_datasets( dataset_a="staging:analytics.customers", dataset_b="production:analytics.customers", primary_keys=["customer_id"])
print(f"Rows added: {results['rows_added']}")print(f"Rows removed: {results['rows_removed']}")print(f"Rows changed: {results['rows_changed']}")print(f"Change percentage: {results['change_percentage']:.2f}%")SQL-Based Diffing
-- SQL-based row-by-row diffing
-- 1. Find rows only in table A (staging)WITH only_in_a AS ( SELECT customer_id, customer_name, customer_email FROM staging.customers a WHERE NOT EXISTS ( SELECT 1 FROM production.customers b WHERE b.customer_id = a.customer_id )),
-- 2. Find rows only in table B (production)only_in_b AS ( SELECT customer_id, customer_name, customer_email FROM production.customers b WHERE NOT EXISTS ( SELECT 1 FROM staging.customers a WHERE a.customer_id = b.customer_id )),
-- 3. Find rows with differencesdifferences AS ( SELECT a.customer_id, a.customer_name AS a_name, b.customer_name AS b_name, a.customer_email AS a_email, b.customer_email AS b_email FROM staging.customers a JOIN production.customers b ON a.customer_id = b.customer_id WHERE a.customer_name != b.customer_name OR a.customer_email != b.customer_email)
-- SummarySELECT 'Rows only in staging' AS category, COUNT(*) AS row_countFROM only_in_a
UNION ALL
SELECT 'Rows only in production' AS category, COUNT(*) AS row_countFROM only_in_b
UNION ALL
SELECT 'Rows with differences' AS category, COUNT(*) AS row_countFROM differences;Aggregate Diffing
Statistical Comparison
# Aggregate diffing with statistics
import pandas as pdimport numpy as npfrom scipy import statsfrom typing import Dict, List
class AggregateDiff: """Statistical comparison of datasets"""
def __init__(self): pass
def compare_numeric_columns( self, df_a: pd.DataFrame, df_b: pd.DataFrame, columns: List[str] ) -> Dict: """Compare numeric columns statistically"""
results = {}
for column in columns: series_a = df_a[column].dropna() series_b = df_b[column].dropna()
# Basic statistics stats_a = { 'mean': series_a.mean(), 'median': series_a.median(), 'std': series_a.std(), 'min': series_a.min(), 'max': series_a.max(), 'count': len(series_a) }
stats_b = { 'mean': series_b.mean(), 'median': series_b.median(), 'std': series_b.std(), 'min': series_b.min(), 'max': series_b.max(), 'count': len(series_b) }
# Kolmogorov-Smirnov test (distribution similarity) ks_statistic, ks_pvalue = stats.ks_2samp(series_a, series_b)
# Mean difference percentage mean_diff_pct = abs(stats_a['mean'] - stats_b['mean']) / stats_a['mean'] * 100
results[column] = { 'stats_a': stats_a, 'stats_b': stats_b, 'ks_statistic': ks_statistic, 'ks_pvalue': ks_pvalue, 'mean_diff_pct': mean_diff_pct, 'distribution_similar': ks_pvalue > 0.05, # 95% confidence 'mean_similar': mean_diff_pct < 5 # Within 5% }
return results
def compare_categorical_columns( self, df_a: pd.DataFrame, df_b: pd.DataFrame, columns: List[str] ) -> Dict: """Compare categorical columns"""
results = {}
for column in columns: # Value counts counts_a = df_a[column].value_counts(normalize=True) counts_b = df_b[column].value_counts(normalize=True)
# Get all unique values all_values = set(counts_a.index) | set(counts_b.index)
# Compare distributions distribution_diff = {} for value in all_values: pct_a = counts_a.get(value, 0) pct_b = counts_b.get(value, 0) distribution_diff[value] = { 'pct_a': pct_a, 'pct_b': pct_b, 'diff': abs(pct_a - pct_b) }
results[column] = { 'unique_values_a': len(counts_a), 'unique_values_b': len(counts_b), 'distribution_diff': distribution_diff }
return results
def compare_row_counts( self, df_a: pd.DataFrame, df_b: pd.DataFrame ) -> Dict: """Compare row counts"""
count_a = len(df_a) count_b = len(df_b)
diff = count_b - count_a diff_pct = abs(diff) / count_a * 100
return { 'count_a': count_a, 'count_b': count_b, 'diff': diff, 'diff_pct': diff_pct, 'within_threshold': diff_pct < 5 # Within 5% }
# Example usageimport pandas as pd
# Load datadf_staging = pd.read_sql("SELECT * FROM staging.customers", con=staging_conn)df_production = pd.read_sql("SELECT * FROM production.customers", con=production_conn)
# Comparediff = AggregateDiff()
# Row count comparisonrow_count_diff = diff.compare_row_counts(df_staging, df_production)print(f"Row count diff: {row_count_diff}")
# Numeric column comparisonnumeric_diff = diff.compare_numeric_columns( df_staging, df_production, columns=['age', 'income', 'score'])print(f"Numeric diff: {numeric_diff}")
# Categorical column comparisoncategorical_diff = diff.compare_categorical_columns( df_staging, df_production, columns=['segment', 'region'])print(f"Categorical diff: {categorical_diff}")Great Expectations Diffing
# Great Expectations statistical diffing
import great_expectations as gxfrom typing import Dict
class GreatExpectationsDiff: """Statistical diffing with Great Expectations"""
def __init__(self, context_path: str = "./great_expectations"): self.context = gx.get_context(context_path=context_path)
def compare_datasets( self, datasource_a: str, datasource_b: str, table_name: str ) -> Dict: """Compare two datasets statistically"""
# Get data assets asset_a = self.context.get_datasource(datasource_a).get_asset(table_name) asset_b = self.context.get_datasource(datasource_b).get_asset(table_name)
# Create expectations suite_a = self.context.get_or_create_expectation_suite("dataset_a_expectations") suite_b = self.context.get_or_create_expectation_suite("dataset_b_expectations")
# Create validation operators validator_a = self.context.get_validator( batch=asset_a.get_batch(), expectation_suite=suite_a )
validator_b = self.context.get_validator( batch=asset_b.get_batch(), expectation_suite=suite_b )
# Run expectations result_a = validator_a.validate() result_b = validator_b.validate()
# Compare results results = { 'dataset_a_stats': self._get_dataset_stats(result_a), 'dataset_b_stats': self._get_dataset_stats(result_b), 'differences': self._compare_results(result_a, result_b) }
return results
def _get_dataset_stats(self, validation_result) -> Dict: """Extract dataset statistics"""
return { 'row_count': validation_result.statistics['row_count'], 'columns': list(validation_result.statistics['table_columns'].keys()) }
def _compare_results(self, result_a, result_b) -> Dict: """Compare validation results"""
differences = { 'row_count_diff': abs( result_a.statistics['row_count'] - result_b.statistics['row_count'] ), 'column_diffs': [] }
# Compare columns columns_a = set(result_a.statistics['table_columns'].keys()) columns_b = set(result_b.statistics['table_columns'].keys())
differences['columns_only_in_a'] = columns_a - columns_b differences['columns_only_in_b'] = columns_b - columns_a differences['common_columns'] = columns_a & columns_b
return differencesSchema Diffing
Schema Comparison
# Schema diffing with SQLAlchemy
from sqlalchemy import create_engine, inspectfrom typing import Dict, List
class SchemaDiff: """Compare database schemas"""
def __init__(self, connection_string_a: str, connection_string_b: str): self.engine_a = create_engine(connection_string_a) self.engine_b = create_engine(connection_string_b)
def compare_schemas(self, schema_name: str = None) -> Dict: """Compare schemas between two databases"""
inspector_a = inspect(self.engine_a) inspector_b = inspect(self.engine_b)
# Get table names tables_a = set(inspector_a.get_table_names(schema=schema_name)) tables_b = set(inspector_b.get_table_names(schema=schema_name))
differences = { 'tables_only_in_a': tables_a - tables_b, 'tables_only_in_b': tables_b - tables_a, 'common_tables': tables_a & tables_b, 'table_differences': {} }
# Compare common tables for table in differences['common_tables']: table_diff = self._compare_tables( inspector_a, inspector_b, table, schema_name )
if table_diff: differences['table_differences'][table] = table_diff
return differences
def _compare_tables( self, inspector_a, inspector_b, table_name: str, schema_name: str = None ) -> Dict: """Compare table schemas"""
# Get columns columns_a = inspector_a.get_columns(table_name, schema=schema_name) columns_b = inspector_b.get_columns(table_name, schema=schema_name)
# Get column names col_names_a = {col['name']: col for col in columns_a} col_names_b = {col['name']: col for col in columns_b}
differences = { 'columns_only_in_a': set(col_names_a.keys()) - set(col_names_b.keys()), 'columns_only_in_b': set(col_names_b.keys()) - set(col_names_a.keys()), 'column_type_changes': [], 'column_nullable_changes': [] }
# Compare common columns for col_name in set(col_names_a.keys()) & set(col_names_b.keys()): col_a = col_names_a[col_name] col_b = col_names_b[col_name]
# Type changes if col_a['type'] != col_b['type']: differences['column_type_changes'].append({ 'column': col_name, 'type_a': str(col_a['type']), 'type_b': str(col_b['type']) })
# Nullable changes if col_a.get('nullable') != col_b.get('nullable'): differences['column_nullable_changes'].append({ 'column': col_name, 'nullable_a': col_a.get('nullable'), 'nullable_b': col_b.get('nullable') })
return differences if any(differences.values()) else None
# Example usagediff = SchemaDiff( connection_string_a="postgresql://user:pass@staging-host/db", connection_string_b="postgresql://user:pass@production-host/db")
# Compare schemasschema_diff = diff.compare_schemas(schema_name="analytics")
print(f"Tables only in staging: {schema_diff['tables_only_in_a']}")print(f"Tables only in production: {schema_diff['tables_only_in_b']}")print(f"Table differences: {schema_diff['table_differences']}")Sample Diffing
Random Sample Validation
# Sample-based diffing
import pandas as pdimport numpy as npfrom typing import Dict
class SampleDiff: """Fast diffing using random samples"""
def __init__(self, sample_size: int = 1000): self.sample_size = sample_size
def compare_samples( self, df_a: pd.DataFrame, df_b: pd.DataFrame, primary_key: str ) -> Dict: """Compare random samples from datasets"""
# Sample from both datasets sample_a = df_a.sample(n=min(self.sample_size, len(df_a))) sample_b = df_b.sample(n=min(self.sample_size, len(df_b)))
# Merge on primary key merged = sample_a.merge( sample_b, on=primary_key, how='outer', suffixes=('_a', '_b'), indicator=True )
# Analyze differences results = { 'sample_size_a': len(sample_a), 'sample_size_b': len(sample_b), 'rows_only_in_a': len(merged[merged['_merge'] == 'left_only']), 'rows_only_in_b': len(merged[merged['_merge'] == 'right_only']), 'rows_in_both': len(merged[merged['_merge'] == 'both']), }
# Compare columns for common rows common_rows = merged[merged['_merge'] == 'both'] column_diffs = []
for col in df_a.columns: if col == primary_key: continue
if col + '_a' in common_rows.columns and col + '_b' in common_rows.columns: # Count differences diff_mask = common_rows[col + '_a'] != common_rows[col + '_b'] diff_count = diff_mask.sum()
if diff_count > 0: column_diffs.append({ 'column': col, 'diff_count': diff_count, 'diff_percentage': diff_count / len(common_rows) * 100 })
results['column_differences'] = column_diffs
return results
# Example usagediff = SampleDiff(sample_size=1000)
results = diff.compare_samples( df_staging, df_production, primary_key='customer_id')
print(f"Sample results: {results}")Data Diffing Tools
Tool Comparison
| Tool | Type | Language | Cost | Best For |
|---|---|---|---|---|
| datafold | Row-by-row | Python, Web | Paid | Production validation |
| daff | Row-by-row | Python | Free | Small datasets |
| sql-diff | Row-by-row | SQL | Free | Database comparison |
| Great Expectations | Aggregate | Python | Free (open source) | Statistical validation |
| Soda | Aggregate | Python, SQL | Free tier | Data quality checks |
| dbt | Schema | SQL, Jinja | Free (open source) | Transformation testing |
Data Diffing Best Practices
DO
# 1. Use appropriate strategy# Row-by-row for small data, aggregate for large
# 2. Validate schema first# Catch schema changes early
# 3. Use samples for quick validation# Faster for large datasets
# 4. Automate diffing in CI/CD# Run on every deployment
# 5. Track diff history# Monitor trends over timeDON’T
# 1. Don't diff entire production# Use samples for large datasets
# 2. Don't ignore statistical significance# Small differences might be noise
# 3. Don't skip schema diffing# Schema changes are critical
# 4. Don't forget NULL handling# NULLs can cause issues
# 5. Don't ignore time zones# Timestamp comparisons need careKey Takeaways
- Row-by-row: Exact match comparison for critical validation
- Aggregate: Statistical comparison for large datasets
- Schema: Metadata comparison for migration
- Sample: Fast validation using random samples
- Tools: datafold, Great Expectations, Soda
- CI/CD: Automate diffing in deployment pipelines
- Thresholds: Define acceptable difference thresholds
- Use When: CI/CD, data migration, production validation
Back to Module 6