Skip to content

Data Diffing

Data Comparison and Validation Strategies


Overview

Data diffing compares datasets across environments, time periods, or transformations to identify differences, validate data quality, and ensure consistency. It’s essential for CI/CD pipelines, data migration, and production validation.


Data Diffing Strategies

Strategy Comparison

Strategy Selection:

StrategyAccuracySpeedCostUse Case
Row-by-row100%SlowHighSmall datasets, critical validation
AggregateStatisticalFastLowLarge datasets, trend validation
SchemaMetadataVery fastVery lowSchema changes, migration
SampleProbabilisticFastLowQuick validation, large datasets

Row-by-Row Diffing

Exact Match Comparison

# Row-by-row diffing with datafold
import datafold
from typing import Dict, List
class DatafoldDiff:
"""Row-by-row data diffing"""
def __init__(self, api_key: str):
self.client = datafold.Client(api_key)
def compare_datasets(
self,
dataset_a: str, # "project:schema.table"
dataset_b: str,
primary_keys: List[str]
) -> Dict:
"""Compare two datasets row-by-row"""
# Create diff
diff = self.client.create_diff(
dataset_a=dataset_a,
dataset_b=dataset_b,
primary_keys=primary_keys
)
# Get results
results = {
'rows_added': diff.rows_added,
'rows_removed': diff.rows_removed,
'rows_changed': diff.rows_changed,
'rows_unchanged': diff.rows_unchanged,
'change_percentage': (diff.rows_changed + diff.rows_added + diff.rows_removed) / diff.total_rows * 100
}
return results
def get_column_differences(
self,
dataset_a: str,
dataset_b: str,
primary_keys: List[str]
) -> Dict:
"""Get column-level differences"""
diff = self.client.create_diff(
dataset_a=dataset_a,
dataset_b=dataset_b,
primary_keys=primary_keys
)
column_diffs = {}
for column_diff in diff.column_differences:
column_diffs[column_diff.column_name] = {
'changed_rows': column_diff.changed_rows,
'change_percentage': column_diff.change_percentage
}
return column_diffs
# Example usage
diff = DatafoldDiff(api_key="your-api-key")
# Compare staging vs. production
results = diff.compare_datasets(
dataset_a="staging:analytics.customers",
dataset_b="production:analytics.customers",
primary_keys=["customer_id"]
)
print(f"Rows added: {results['rows_added']}")
print(f"Rows removed: {results['rows_removed']}")
print(f"Rows changed: {results['rows_changed']}")
print(f"Change percentage: {results['change_percentage']:.2f}%")

SQL-Based Diffing

-- SQL-based row-by-row diffing
-- 1. Find rows only in table A (staging)
WITH only_in_a AS (
SELECT
customer_id,
customer_name,
customer_email
FROM staging.customers a
WHERE NOT EXISTS (
SELECT 1
FROM production.customers b
WHERE b.customer_id = a.customer_id
)
),
-- 2. Find rows only in table B (production)
only_in_b AS (
SELECT
customer_id,
customer_name,
customer_email
FROM production.customers b
WHERE NOT EXISTS (
SELECT 1
FROM staging.customers a
WHERE a.customer_id = b.customer_id
)
),
-- 3. Find rows with differences
differences AS (
SELECT
a.customer_id,
a.customer_name AS a_name,
b.customer_name AS b_name,
a.customer_email AS a_email,
b.customer_email AS b_email
FROM staging.customers a
JOIN production.customers b ON a.customer_id = b.customer_id
WHERE
a.customer_name != b.customer_name
OR a.customer_email != b.customer_email
)
-- Summary
SELECT
'Rows only in staging' AS category,
COUNT(*) AS row_count
FROM only_in_a
UNION ALL
SELECT
'Rows only in production' AS category,
COUNT(*) AS row_count
FROM only_in_b
UNION ALL
SELECT
'Rows with differences' AS category,
COUNT(*) AS row_count
FROM differences;

Aggregate Diffing

Statistical Comparison

# Aggregate diffing with statistics
import pandas as pd
import numpy as np
from scipy import stats
from typing import Dict, List
class AggregateDiff:
"""Statistical comparison of datasets"""
def __init__(self):
pass
def compare_numeric_columns(
self,
df_a: pd.DataFrame,
df_b: pd.DataFrame,
columns: List[str]
) -> Dict:
"""Compare numeric columns statistically"""
results = {}
for column in columns:
series_a = df_a[column].dropna()
series_b = df_b[column].dropna()
# Basic statistics
stats_a = {
'mean': series_a.mean(),
'median': series_a.median(),
'std': series_a.std(),
'min': series_a.min(),
'max': series_a.max(),
'count': len(series_a)
}
stats_b = {
'mean': series_b.mean(),
'median': series_b.median(),
'std': series_b.std(),
'min': series_b.min(),
'max': series_b.max(),
'count': len(series_b)
}
# Kolmogorov-Smirnov test (distribution similarity)
ks_statistic, ks_pvalue = stats.ks_2samp(series_a, series_b)
# Mean difference percentage
mean_diff_pct = abs(stats_a['mean'] - stats_b['mean']) / stats_a['mean'] * 100
results[column] = {
'stats_a': stats_a,
'stats_b': stats_b,
'ks_statistic': ks_statistic,
'ks_pvalue': ks_pvalue,
'mean_diff_pct': mean_diff_pct,
'distribution_similar': ks_pvalue > 0.05, # 95% confidence
'mean_similar': mean_diff_pct < 5 # Within 5%
}
return results
def compare_categorical_columns(
self,
df_a: pd.DataFrame,
df_b: pd.DataFrame,
columns: List[str]
) -> Dict:
"""Compare categorical columns"""
results = {}
for column in columns:
# Value counts
counts_a = df_a[column].value_counts(normalize=True)
counts_b = df_b[column].value_counts(normalize=True)
# Get all unique values
all_values = set(counts_a.index) | set(counts_b.index)
# Compare distributions
distribution_diff = {}
for value in all_values:
pct_a = counts_a.get(value, 0)
pct_b = counts_b.get(value, 0)
distribution_diff[value] = {
'pct_a': pct_a,
'pct_b': pct_b,
'diff': abs(pct_a - pct_b)
}
results[column] = {
'unique_values_a': len(counts_a),
'unique_values_b': len(counts_b),
'distribution_diff': distribution_diff
}
return results
def compare_row_counts(
self,
df_a: pd.DataFrame,
df_b: pd.DataFrame
) -> Dict:
"""Compare row counts"""
count_a = len(df_a)
count_b = len(df_b)
diff = count_b - count_a
diff_pct = abs(diff) / count_a * 100
return {
'count_a': count_a,
'count_b': count_b,
'diff': diff,
'diff_pct': diff_pct,
'within_threshold': diff_pct < 5 # Within 5%
}
# Example usage
import pandas as pd
# Load data
df_staging = pd.read_sql("SELECT * FROM staging.customers", con=staging_conn)
df_production = pd.read_sql("SELECT * FROM production.customers", con=production_conn)
# Compare
diff = AggregateDiff()
# Row count comparison
row_count_diff = diff.compare_row_counts(df_staging, df_production)
print(f"Row count diff: {row_count_diff}")
# Numeric column comparison
numeric_diff = diff.compare_numeric_columns(
df_staging,
df_production,
columns=['age', 'income', 'score']
)
print(f"Numeric diff: {numeric_diff}")
# Categorical column comparison
categorical_diff = diff.compare_categorical_columns(
df_staging,
df_production,
columns=['segment', 'region']
)
print(f"Categorical diff: {categorical_diff}")

Great Expectations Diffing

# Great Expectations statistical diffing
import great_expectations as gx
from typing import Dict
class GreatExpectationsDiff:
"""Statistical diffing with Great Expectations"""
def __init__(self, context_path: str = "./great_expectations"):
self.context = gx.get_context(context_path=context_path)
def compare_datasets(
self,
datasource_a: str,
datasource_b: str,
table_name: str
) -> Dict:
"""Compare two datasets statistically"""
# Get data assets
asset_a = self.context.get_datasource(datasource_a).get_asset(table_name)
asset_b = self.context.get_datasource(datasource_b).get_asset(table_name)
# Create expectations
suite_a = self.context.get_or_create_expectation_suite("dataset_a_expectations")
suite_b = self.context.get_or_create_expectation_suite("dataset_b_expectations")
# Create validation operators
validator_a = self.context.get_validator(
batch=asset_a.get_batch(),
expectation_suite=suite_a
)
validator_b = self.context.get_validator(
batch=asset_b.get_batch(),
expectation_suite=suite_b
)
# Run expectations
result_a = validator_a.validate()
result_b = validator_b.validate()
# Compare results
results = {
'dataset_a_stats': self._get_dataset_stats(result_a),
'dataset_b_stats': self._get_dataset_stats(result_b),
'differences': self._compare_results(result_a, result_b)
}
return results
def _get_dataset_stats(self, validation_result) -> Dict:
"""Extract dataset statistics"""
return {
'row_count': validation_result.statistics['row_count'],
'columns': list(validation_result.statistics['table_columns'].keys())
}
def _compare_results(self, result_a, result_b) -> Dict:
"""Compare validation results"""
differences = {
'row_count_diff': abs(
result_a.statistics['row_count'] - result_b.statistics['row_count']
),
'column_diffs': []
}
# Compare columns
columns_a = set(result_a.statistics['table_columns'].keys())
columns_b = set(result_b.statistics['table_columns'].keys())
differences['columns_only_in_a'] = columns_a - columns_b
differences['columns_only_in_b'] = columns_b - columns_a
differences['common_columns'] = columns_a & columns_b
return differences

Schema Diffing

Schema Comparison

# Schema diffing with SQLAlchemy
from sqlalchemy import create_engine, inspect
from typing import Dict, List
class SchemaDiff:
"""Compare database schemas"""
def __init__(self, connection_string_a: str, connection_string_b: str):
self.engine_a = create_engine(connection_string_a)
self.engine_b = create_engine(connection_string_b)
def compare_schemas(self, schema_name: str = None) -> Dict:
"""Compare schemas between two databases"""
inspector_a = inspect(self.engine_a)
inspector_b = inspect(self.engine_b)
# Get table names
tables_a = set(inspector_a.get_table_names(schema=schema_name))
tables_b = set(inspector_b.get_table_names(schema=schema_name))
differences = {
'tables_only_in_a': tables_a - tables_b,
'tables_only_in_b': tables_b - tables_a,
'common_tables': tables_a & tables_b,
'table_differences': {}
}
# Compare common tables
for table in differences['common_tables']:
table_diff = self._compare_tables(
inspector_a,
inspector_b,
table,
schema_name
)
if table_diff:
differences['table_differences'][table] = table_diff
return differences
def _compare_tables(
self,
inspector_a,
inspector_b,
table_name: str,
schema_name: str = None
) -> Dict:
"""Compare table schemas"""
# Get columns
columns_a = inspector_a.get_columns(table_name, schema=schema_name)
columns_b = inspector_b.get_columns(table_name, schema=schema_name)
# Get column names
col_names_a = {col['name']: col for col in columns_a}
col_names_b = {col['name']: col for col in columns_b}
differences = {
'columns_only_in_a': set(col_names_a.keys()) - set(col_names_b.keys()),
'columns_only_in_b': set(col_names_b.keys()) - set(col_names_a.keys()),
'column_type_changes': [],
'column_nullable_changes': []
}
# Compare common columns
for col_name in set(col_names_a.keys()) & set(col_names_b.keys()):
col_a = col_names_a[col_name]
col_b = col_names_b[col_name]
# Type changes
if col_a['type'] != col_b['type']:
differences['column_type_changes'].append({
'column': col_name,
'type_a': str(col_a['type']),
'type_b': str(col_b['type'])
})
# Nullable changes
if col_a.get('nullable') != col_b.get('nullable'):
differences['column_nullable_changes'].append({
'column': col_name,
'nullable_a': col_a.get('nullable'),
'nullable_b': col_b.get('nullable')
})
return differences if any(differences.values()) else None
# Example usage
diff = SchemaDiff(
connection_string_a="postgresql://user:pass@staging-host/db",
connection_string_b="postgresql://user:pass@production-host/db"
)
# Compare schemas
schema_diff = diff.compare_schemas(schema_name="analytics")
print(f"Tables only in staging: {schema_diff['tables_only_in_a']}")
print(f"Tables only in production: {schema_diff['tables_only_in_b']}")
print(f"Table differences: {schema_diff['table_differences']}")

Sample Diffing

Random Sample Validation

# Sample-based diffing
import pandas as pd
import numpy as np
from typing import Dict
class SampleDiff:
"""Fast diffing using random samples"""
def __init__(self, sample_size: int = 1000):
self.sample_size = sample_size
def compare_samples(
self,
df_a: pd.DataFrame,
df_b: pd.DataFrame,
primary_key: str
) -> Dict:
"""Compare random samples from datasets"""
# Sample from both datasets
sample_a = df_a.sample(n=min(self.sample_size, len(df_a)))
sample_b = df_b.sample(n=min(self.sample_size, len(df_b)))
# Merge on primary key
merged = sample_a.merge(
sample_b,
on=primary_key,
how='outer',
suffixes=('_a', '_b'),
indicator=True
)
# Analyze differences
results = {
'sample_size_a': len(sample_a),
'sample_size_b': len(sample_b),
'rows_only_in_a': len(merged[merged['_merge'] == 'left_only']),
'rows_only_in_b': len(merged[merged['_merge'] == 'right_only']),
'rows_in_both': len(merged[merged['_merge'] == 'both']),
}
# Compare columns for common rows
common_rows = merged[merged['_merge'] == 'both']
column_diffs = []
for col in df_a.columns:
if col == primary_key:
continue
if col + '_a' in common_rows.columns and col + '_b' in common_rows.columns:
# Count differences
diff_mask = common_rows[col + '_a'] != common_rows[col + '_b']
diff_count = diff_mask.sum()
if diff_count > 0:
column_diffs.append({
'column': col,
'diff_count': diff_count,
'diff_percentage': diff_count / len(common_rows) * 100
})
results['column_differences'] = column_diffs
return results
# Example usage
diff = SampleDiff(sample_size=1000)
results = diff.compare_samples(
df_staging,
df_production,
primary_key='customer_id'
)
print(f"Sample results: {results}")

Data Diffing Tools

Tool Comparison

ToolTypeLanguageCostBest For
datafoldRow-by-rowPython, WebPaidProduction validation
daffRow-by-rowPythonFreeSmall datasets
sql-diffRow-by-rowSQLFreeDatabase comparison
Great ExpectationsAggregatePythonFree (open source)Statistical validation
SodaAggregatePython, SQLFree tierData quality checks
dbtSchemaSQL, JinjaFree (open source)Transformation testing

Data Diffing Best Practices

DO

# 1. Use appropriate strategy
# Row-by-row for small data, aggregate for large
# 2. Validate schema first
# Catch schema changes early
# 3. Use samples for quick validation
# Faster for large datasets
# 4. Automate diffing in CI/CD
# Run on every deployment
# 5. Track diff history
# Monitor trends over time

DON’T

# 1. Don't diff entire production
# Use samples for large datasets
# 2. Don't ignore statistical significance
# Small differences might be noise
# 3. Don't skip schema diffing
# Schema changes are critical
# 4. Don't forget NULL handling
# NULLs can cause issues
# 5. Don't ignore time zones
# Timestamp comparisons need care

Key Takeaways

  1. Row-by-row: Exact match comparison for critical validation
  2. Aggregate: Statistical comparison for large datasets
  3. Schema: Metadata comparison for migration
  4. Sample: Fast validation using random samples
  5. Tools: datafold, Great Expectations, Soda
  6. CI/CD: Automate diffing in deployment pipelines
  7. Thresholds: Define acceptable difference thresholds
  8. Use When: CI/CD, data migration, production validation

Back to Module 6