Data Contracts
Formal Agreements for Data Quality
Overview
Data contracts are formal agreements between data producers and data consumers that define schema, data quality rules, and service level agreements (SLAs). They shift data quality from downstream (consumer) enforcement to upstream (producer) enforcement, preventing bad data at the source.
Core Concept
Data Contract Components
Essential Elements
contract: name: "Orders Data Contract" version: "1.0.0" owner: "Orders Team" consumers: - "Analytics Team" - "Data Science Team" - "Finance Team"
schema: table: "orders" columns: - name: "order_id" type: "BIGINT" nullable: false description: "Unique order identifier"
- name: "customer_id" type: "BIGINT" nullable: false description: "Foreign key to customers table"
- name: "order_date" type: "TIMESTAMP" nullable: false description: "When order was placed"
- name: "total_amount" type: "DECIMAL(18,2)" nullable: false description: "Order total in USD"
data_quality_rules: - name: "no_null_order_ids" type: "schema" rule: "order_id IS NOT NULL" severity: "critical"
- name: "positive_amounts" type: "business_logic" rule: "total_amount >= 0" severity: "critical"
- name: "recent_orders" type: "freshness" rule: "MAX(order_date) >= CURRENT_TIMESTAMP - INTERVAL '7 days'" severity: "warning"
sla: availability: "99.9%" freshness: "Data available within 1 hour of order placement" completeness: "99% of orders within 24 hours" support: "orders-team@company.com"
change_management: breaking_changes_require: "consumer approval" notification_days: 14 deprecation_policy: "maintain old schema for 30 days"Contract Types
Type 1: Schema Contract
Focus: Schema structure and data types.
contract: type: "schema" version: "1.0.0"
schema_definition: columns: - name: "email" type: "VARCHAR(255)" nullable: false format: "email"
- name: "phone" type: "VARCHAR(20)" nullable: true format: "phone"
breaking_changes: - "Changing column type" - "Removing column" - "Changing nullable to non-nullable"
non_breaking_changes: - "Adding column (nullable)" - "Renaming column (with alias)"Type 2: Quality Contract
Focus: Data quality rules and thresholds.
contract: type: "quality" version: "1.0.0"
quality_rules: - name: "completeness" threshold: 0.95 # 95% complete columns: ["email", "phone"]
- name: "uniqueness" threshold: 1.0 # 100% unique columns: ["user_id"]
- name: "validity" threshold: 0.98 # 98% valid rules: - "email matches regex" - "phone matches regex"
- name: "accuracy" threshold: 0.99 # 99% accurate reference: "production_database"Type 3: Service Level Contract
Focus: Performance and availability SLAs.
contract: type: "sla" version: "1.0.0"
service_levels: availability: target: "99.9%" measurement: "monthly" penalty: "credit if < 99.5%"
freshness: target: "< 1 hour from event" measurement: "p95" penalty: "alert if > 2 hours"
throughput: target: "10,000 events/second" measurement: "sustained"
completeness: target: "99% of events" measurement: "daily"Contract Lifecycle
Implementation Patterns
Pattern 1: Schema Registry (Confluent)
from datacontract.client import import_datacontractfrom datacontract.model import DataContract
# Define contractcontract = DataContract( dataset="orders", schema={ "fields": [ {"name": "order_id", "type": "bigint", "required": True}, {"name": "customer_id", "type": "bigint", "required": True}, {"name": "total_amount", "type": "decimal(18,2)", "required": True} ] }, quality={ "rules": [ {"type": "schema", "assertion": "order_id IS NOT NULL"}, {"type": "business", "assertion": "total_amount >= 0"} ] })
# Register in schema registryregistry.register_schema( subject="orders-value", schema=contract.to_avro_schema())Pattern 2: dbt Tests as Contracts
version: 2
models: - name: stg_orders description: "Orders data with contract enforcement" config: contract: enabled: true enforcement_level: strict
columns: - name: order_id description: "Unique order identifier" tests: - unique - not_null - dbt_utils.expression_is_true: expression: "order_id > 0"
- name: total_amount description: "Order total" tests: - not_null - dbt_utils.expression_is_true: expression: "total_amount >= 0"
contract_tests: - name: freshness_check test: dbt_utils.recency args: datepart: hour field: order_date interval: 1Pattern 3: OpenAPI for Data
openapi: "3.0.0"info: title: "Orders Data Contract" version: "1.0.0" description: "Data contract for orders dataset"
paths: /orders: get: summary: "Get orders data" responses: "200": description: "Successful response" content: application/json: schema: type: "array" items: type: "object" required: ["order_id", "customer_id", "total_amount"] properties: order_id: type: "integer" customer_id: type: "integer" total_amount: type: "number" minimum: 0Enforcement Strategies
Preventive Enforcement
Implementation:
- Validate at write time
- Reject non-compliant data
- Alert producer team
- Prevent bad data from entering platform
Detective Enforcement
Implementation:
- Validate in staging environment
- Monitor compliance metrics
- Quarantine non-compliant data
- Alert on violations
Change Management
Breaking Changes
breaking_change_process: 1. "Producers propose change" 2. "Create contract version N+1" 3. "Notify consumers (14 days advance)" 4. "Consumer review and approval" 5. "Deploy new contract version" 6. "Grace period (30 days) for old version" 7. "Deprecate old version"
example_breaking_changes: - "Changing column type (INT → BIGINT)" - "Removing column" - "Changing nullable to NOT NULL" - "Changing domain values (ENUM)"Non-Breaking Changes
non_breaking_changes: - "Adding nullable column" - "Relaxing constraints" - "Adding domain value" - "Renaming column (with alias)"
process: - "Producers implement" - "Update contract version" - "Notify consumers (best effort)" - "Deploy"Contract Testing
Unit Tests
import pytestfrom datacontract.client import import_datacontract
def test_orders_contract(): """Test orders data against contract.""" contract = import_datacontract("data_contracts/orders.yml")
result = contract.test("s3://bucket/orders/")
assert result.passed(), f"Contract failed: {result.failures}"
def test_orders_freshness(): """Test orders freshness SLA.""" contract = import_datacontract("data_contracts/orders.yml")
freshness = test_freshness( contract, max_age_hours=1 )
assert freshness.passed, f"Freshness failed: {freshness.details}"Integration Tests
import pytest
@pytest.fixturedef producer_system(): """Spin up test producer system.""" return ProducerSystem(test=True)
@pytest.fixturedef contract_validator(): """Create contract validator.""" return ContractValidator("data_contracts/orders.yml")
def test_end_to_end_contract(producer_system, contract_validator): """Test full data flow against contract.""" # Generate test data test_data = producer_system.generate_test_data(rows=1000)
# Validate against contract result = contract_validator.validate(test_data)
assert result.passed, f"Contract validation failed: {result.failures}" assert result.completeness >= 0.99, "Completeness below 99%"Cost Considerations
Contract Enforcement Costs
| Strategy | Compute Cost | Storage Cost | Ops Cost | Best For |
|---|---|---|---|---|
| Preventive | Medium | None | Low | Critical systems |
| Detective | Low | Low (quarantine) | Medium | Most systems |
| None | Low | None | High (downstream) | Small orgs |
ROI of Contracts
Senior Level Considerations
Anti-Patterns
Anti-Pattern 1: Contracts without enforcement
# Bad: Contract defined but not enforcedcontract: name: "orders" # No validation integration
# Good: Contract enforced in pipelinecontract: name: "orders" enforcement: "strict" validation_action: "reject"Anti-Pattern 2: Too rigid contracts
# Bad: 100% required on everythingcompleteness: 1.0 # Unrealistic
# Good: Practical thresholdscompleteness: 0.95 # 95% completecritical_fields: 0.99 # 99% for critical fieldsAnti-Pattern 3: No versioning
# Bad: Single versioncontract: "orders"
# Good: Versioned contractscontract: name: "orders" current_version: "v2.0" supported_versions: ["v1.0", "v2.0"]Key Takeaways
- Prevention over detection: Enforce contracts at source
- Three types: Schema, quality, SLA contracts
- Versioning: Essential for breaking changes
- Automation: Integrate validation into pipelines
- Monitoring: Track compliance metrics
- Communication: Notify consumers of changes
- Enforcement: Reject non-compliant data for critical systems
- Cost: 3-10x ROI from reduced downstream issues
Back to Module 4