Data Lineage Tracking
End-to-End Data Flow Visibility
Overview
Data lineage tracking provides visibility into how data flows from source systems to consumption. It enables impact analysis, root cause analysis, and compliance by documenting the complete data journey.
Lineage Architecture
Lineage Types:
- Technical: Table-to-table, column-to-column dependencies
- Business: Business terms, data ownership
- Operational: Job runs, data quality, performance metrics
OpenLineage
OpenLineage Architecture
OpenLineage Components:
- Client Libraries: Emit lineage events from Python, SQL, Spark, Airflow, dbt
- Marquez: Backend API for lineage events
- OpenMetadata/DataHub: Frontend for lineage visualization
OpenLineage Integration
Airflow Integration
# Airflow DAG with OpenLineage
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.openlineage.extractors import OpenLineageExtractOperatorfrom datetime import datetime, timedeltaimport openlineage
# Configure OpenLineageopenlineage_host = "http://marquez:5000"
default_args = { "owner": "data-engineering", "start_date": datetime(2025, 1, 1), "openlineage_host": openlineage_host,}
with DAG( 'daily_sales_etl', default_args=default_args, schedule_interval='@daily', catchup=False, tags=['etl', 'sales'],) as dag:
# Task with OpenLineage extraction extract_sales = OpenLineageExtractOperator( task_id='extract_sales', openlineage_host=openlineage_host, openlineage_namespace='default', # Lineage is automatically captured )
# Regular task (OpenLineage automatically emits events) transform_data = PythonOperator( task_id='transform_data', python_callable=lambda: print("Transforming data...") )
load_data = PythonOperator( task_id='load_data', python_callable=lambda: print("Loading data...") )
extract_sales >> transform_data >> load_dataSpark Integration
# Spark job with OpenLineage
from openlineage.spark import OpenLineageSparkListenerfrom pyspark.sql import SparkSession
# Create Spark session with OpenLineage listenerspark = SparkSession.builder \ .appName("ETL Job") \ .config("spark.extraListeners", "openlineage.spark.OpenLineageSparkListener") \ .config("spark.openlineage.host", "http://marquez:5000") \ .config("spark.openlineage.namespace", "default") \ .getOrCreate()
# Run ETL jobdf = spark.read.parquet("s3://bucket/raw/sales/")df_transformed = df.filter(df.amount > 0)df_transformed.write.parquet("s3://bucket/curated/sales/")
# Lineage is automatically captured:# - Read: s3://bucket/raw/sales/# - Transform: Filter# - Write: s3://://bucket/curated/sales/dbt Integration
name: 'my_data_platform'version: '1.0.0'config-version: 2
# OpenLineage configurationmacro-paths: ["macros"]dispatch: - macro_namespace: dbt search_packages: - openlineage
# Enable OpenLineage in dbt# dbt automatically emits lineage events for:# - Models (tables, views)# - Tests# - Seeds# - SnapshotsMarquez Backend
Marquez Configuration
marquez: namespace: default openlineage: transport: type: http url: http://openlineage-event:5000/api/v1/lineage
storage: type: postgres host: postgres port: 5432 database: marquez username: marquez password: ${MARQUEZ_PASSWORD}
admin: port: 5000 host: 0.0.0.0
ui: port: 3000 host: 0.0.0.0Marquez API
# Query lineage from Marquez
import requests
# Get lineage for a specific tableresponse = requests.get( 'http://marquez:5000/api/v1/namespaces/default/lineage/table/analytics.customers', headers={'Accept': 'application/json'})
lineage_data = response.json()
# Lineage includes:# - Upstream tables (sources)# - Downstream tables (consumers)# - Transformations (SQL, Spark jobs, Airflow tasks)
# Example response:{ "tableName": "analytics.customers", "inputs": ["raw.customers", "stg_customers"], "outputs": ["mart.customer_360", "reports.daily_customers"], "transformations": [ { "type": "sql", "sql": "SELECT * FROM stg_customers WHERE customer_id IS NOT NULL", "location": "models/customers.sql" }, { "type": "spark", "app": "ETL Job", "location": "etl/jobs/customers.py" } ]}DataHub Lineage
DataHub Configuration
# DataHub deployment
datahub: gms: port: 8080 host: 0.0.0.0
frontend: port: 9002 host: 0.0.0.0
datahub: graphql: port: 9001 host: 0.0.0.0
mae: mysql: host: mysql port: 3306 database: datahub username: datahub password: ${DATAHUB_PASSWORD}
pe: postgres: host: postgres port: 5432 database: datahub username: datahub password: ${DATAHUB_PASSWORD}Ingesting Lineage
# Ingest lineage into DataHub
from datahub.lineage import LineageEmitterfrom datahub.metadata.complexity import ComplexityLevel
# Configure lineage emitteremitter = LineageEmitter( gms_server="http://datahub-gms:8080", pipeline_name="sales_etl_pipeline")
# Emit lineage eventsemitter.emit_dataset_ownership_event( dataset_urn="urn:li:dataset:urn:li:graph:1234", owner="data-engineering@my-company.com", complexity_level=ComplexityLevel.MEDIUM)
# Emit upstream lineageemitter.emit_upstream_lineage_event( upstream_urn="urn:li:dataset:urn:li:graph:5678", downstream_urn="urn:li:dataset:urn:li:graph:1234", transformation_type="sql", transformation_sql="SELECT * FROM raw.customers WHERE customer_id IS NOT NULL")
# Emit downstream lineageemitter.emit_downstream_lineage_event( upstream_urn="urn:li:dataset:urn:li:graph:1234", downstream_urn="urn:li:dataset:urn:li:graph:9012", transformation_type="sql", transformation_sql="SELECT customer_id, COUNT(*) AS order_count FROM analytics.orders GROUP BY customer_id")Lineage Visualization
Lineage Graph
Lineage Query:
- Upstream: What feeds this table? (sources, transformations)
- Downstream: What does this table feed? (marts, reports)
- Impact analysis: If this table changes, what breaks?
Impact Analysis
Upstream Impact
-- Query: What feeds analytics.customers?
WITH RECURSIVE lineage AS ( -- Base case: immediate upstream SELECT table_name, schema_name, 'immediate_upstream' AS distance FROM lineage WHERE downstream_table = 'analytics.customers' AND downstream_schema = 'analytics'
UNION ALL
-- Recursive case: upstream of upstream SELECT l.table_name, l.schema_name, distance + 1 AS distance FROM lineage l JOIN lineage r ON l.downstream_table = r.table_name WHERE l.distance > 0)SELECT schema_name, table_name, MAX(distance) AS degrees_of_separationFROM lineageGROUP BY schema_name, table_nameORDER BY degrees_of_separation;
-- Result:-- schema_name | table_name | degrees_of_separation-- raw | customers_raw | 2-- staging | stg_customers | 1-- staging | stg_orders | 2Downstream Impact
-- Query: What does analytics.customers feed?
WITH RECURSIVE lineage AS ( -- Base case: immediate downstream SELECT table_name, schema_name, 'immediate_downstream' AS distance FROM lineage WHERE upstream_table = 'analytics.customers' AND upstream_schema = 'analytics'
UNION ALL
-- Recursive case: downstream of downstream SELECT l.table_name, l.schema_name, distance + 1 AS distance FROM lineage l JOIN lineage r ON l.upstream_table = r.table_name WHERE l.distance > 0)SELECT schema_name, table_name, MAX(distance) AS degrees_of_separationFROM lineageGROUP BY schema_name, table_nameORDER BY degrees_of_separation;
-- Result:-- schema_name | table_name | degrees_of_separation-- marts | customer_360 | 1-- marts | sales_summary | 2-- reports | customer_summary | 1-- ml | customer_churn_model | 2Lineage Best Practices
DO
# 1. Emit lineage events for all data transformations# Automatic: OpenLineage listeners# Manual: Emit events using SDK
# 2. Use unique identifiers# URN format: urn:li:dataset:namespace:table
# 3. Document business lineage# Connect technical tables to business terms
# 4. Track metadata changes# Schema changes, new tables, deprecated tables
# 5. Visualize lineage# Use DataHub UI for lineage graphsDON’T
# 1. Don't ignore lineage# Essential for impact analysis
# 2. Don't document lineage manually# Automatic lineage is more accurate
# 3. Don't skip upstream dependencies# Track all data sources
# 4. Don't forget business lineage# Connect to business terms
# 5. Don't ignore deprecated tables# Mark deprecated, don't delete lineageKey Takeaways
- Lineage tracking: End-to-end data flow visibility
- OpenLineage: Open standard for lineage events
- Marquez: Backend for lineage events
- DataHub: Frontend for lineage visualization
- Impact analysis: Upstream/downstream dependencies
- Root cause analysis: Trace errors to source
- Compliance: Regulatory requirements (GDPR, CCPA)
- Use When: Data governance, impact analysis, root cause analysis
Back to Module 4