Skip to content

Data Lineage Tracking

End-to-End Data Flow Visibility


Overview

Data lineage tracking provides visibility into how data flows from source systems to consumption. It enables impact analysis, root cause analysis, and compliance by documenting the complete data journey.


Lineage Architecture

Lineage Types:

  • Technical: Table-to-table, column-to-column dependencies
  • Business: Business terms, data ownership
  • Operational: Job runs, data quality, performance metrics

OpenLineage

OpenLineage Architecture

OpenLineage Components:

  • Client Libraries: Emit lineage events from Python, SQL, Spark, Airflow, dbt
  • Marquez: Backend API for lineage events
  • OpenMetadata/DataHub: Frontend for lineage visualization

OpenLineage Integration

Airflow Integration

# Airflow DAG with OpenLineage
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.openlineage.extractors import OpenLineageExtractOperator
from datetime import datetime, timedelta
import openlineage
# Configure OpenLineage
openlineage_host = "http://marquez:5000"
default_args = {
"owner": "data-engineering",
"start_date": datetime(2025, 1, 1),
"openlineage_host": openlineage_host,
}
with DAG(
'daily_sales_etl',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
tags=['etl', 'sales'],
) as dag:
# Task with OpenLineage extraction
extract_sales = OpenLineageExtractOperator(
task_id='extract_sales',
openlineage_host=openlineage_host,
openlineage_namespace='default',
# Lineage is automatically captured
)
# Regular task (OpenLineage automatically emits events)
transform_data = PythonOperator(
task_id='transform_data',
python_callable=lambda: print("Transforming data...")
)
load_data = PythonOperator(
task_id='load_data',
python_callable=lambda: print("Loading data...")
)
extract_sales >> transform_data >> load_data

Spark Integration

# Spark job with OpenLineage
from openlineage.spark import OpenLineageSparkListener
from pyspark.sql import SparkSession
# Create Spark session with OpenLineage listener
spark = SparkSession.builder \
.appName("ETL Job") \
.config("spark.extraListeners", "openlineage.spark.OpenLineageSparkListener") \
.config("spark.openlineage.host", "http://marquez:5000") \
.config("spark.openlineage.namespace", "default") \
.getOrCreate()
# Run ETL job
df = spark.read.parquet("s3://bucket/raw/sales/")
df_transformed = df.filter(df.amount > 0)
df_transformed.write.parquet("s3://bucket/curated/sales/")
# Lineage is automatically captured:
# - Read: s3://bucket/raw/sales/
# - Transform: Filter
# - Write: s3://://bucket/curated/sales/

dbt Integration

dbt_project.yml
name: 'my_data_platform'
version: '1.0.0'
config-version: 2
# OpenLineage configuration
macro-paths: ["macros"]
dispatch:
- macro_namespace: dbt
search_packages:
- openlineage
# Enable OpenLineage in dbt
# dbt automatically emits lineage events for:
# - Models (tables, views)
# - Tests
# - Seeds
# - Snapshots

Marquez Backend

Marquez Configuration

marquez.yml
marquez:
namespace: default
openlineage:
transport:
type: http
url: http://openlineage-event:5000/api/v1/lineage
storage:
type: postgres
host: postgres
port: 5432
database: marquez
username: marquez
password: ${MARQUEZ_PASSWORD}
admin:
port: 5000
host: 0.0.0.0
ui:
port: 3000
host: 0.0.0.0

Marquez API

# Query lineage from Marquez
import requests
# Get lineage for a specific table
response = requests.get(
'http://marquez:5000/api/v1/namespaces/default/lineage/table/analytics.customers',
headers={'Accept': 'application/json'}
)
lineage_data = response.json()
# Lineage includes:
# - Upstream tables (sources)
# - Downstream tables (consumers)
# - Transformations (SQL, Spark jobs, Airflow tasks)
# Example response:
{
"tableName": "analytics.customers",
"inputs": ["raw.customers", "stg_customers"],
"outputs": ["mart.customer_360", "reports.daily_customers"],
"transformations": [
{
"type": "sql",
"sql": "SELECT * FROM stg_customers WHERE customer_id IS NOT NULL",
"location": "models/customers.sql"
},
{
"type": "spark",
"app": "ETL Job",
"location": "etl/jobs/customers.py"
}
]
}

DataHub Lineage

DataHub Configuration

# DataHub deployment
datahub:
gms:
port: 8080
host: 0.0.0.0
frontend:
port: 9002
host: 0.0.0.0
datahub:
graphql:
port: 9001
host: 0.0.0.0
mae:
mysql:
host: mysql
port: 3306
database: datahub
username: datahub
password: ${DATAHUB_PASSWORD}
pe:
postgres:
host: postgres
port: 5432
database: datahub
username: datahub
password: ${DATAHUB_PASSWORD}

Ingesting Lineage

# Ingest lineage into DataHub
from datahub.lineage import LineageEmitter
from datahub.metadata.complexity import ComplexityLevel
# Configure lineage emitter
emitter = LineageEmitter(
gms_server="http://datahub-gms:8080",
pipeline_name="sales_etl_pipeline"
)
# Emit lineage events
emitter.emit_dataset_ownership_event(
dataset_urn="urn:li:dataset:urn:li:graph:1234",
owner="data-engineering@my-company.com",
complexity_level=ComplexityLevel.MEDIUM
)
# Emit upstream lineage
emitter.emit_upstream_lineage_event(
upstream_urn="urn:li:dataset:urn:li:graph:5678",
downstream_urn="urn:li:dataset:urn:li:graph:1234",
transformation_type="sql",
transformation_sql="SELECT * FROM raw.customers WHERE customer_id IS NOT NULL"
)
# Emit downstream lineage
emitter.emit_downstream_lineage_event(
upstream_urn="urn:li:dataset:urn:li:graph:1234",
downstream_urn="urn:li:dataset:urn:li:graph:9012",
transformation_type="sql",
transformation_sql="SELECT customer_id, COUNT(*) AS order_count FROM analytics.orders GROUP BY customer_id"
)

Lineage Visualization

Lineage Graph

Lineage Query:

  • Upstream: What feeds this table? (sources, transformations)
  • Downstream: What does this table feed? (marts, reports)
  • Impact analysis: If this table changes, what breaks?

Impact Analysis

Upstream Impact

-- Query: What feeds analytics.customers?
WITH RECURSIVE lineage AS (
-- Base case: immediate upstream
SELECT
table_name,
schema_name,
'immediate_upstream' AS distance
FROM lineage
WHERE downstream_table = 'analytics.customers'
AND downstream_schema = 'analytics'
UNION ALL
-- Recursive case: upstream of upstream
SELECT
l.table_name,
l.schema_name,
distance + 1 AS distance
FROM lineage l
JOIN lineage r ON l.downstream_table = r.table_name
WHERE l.distance > 0
)
SELECT
schema_name,
table_name,
MAX(distance) AS degrees_of_separation
FROM lineage
GROUP BY schema_name, table_name
ORDER BY degrees_of_separation;
-- Result:
-- schema_name | table_name | degrees_of_separation
-- raw | customers_raw | 2
-- staging | stg_customers | 1
-- staging | stg_orders | 2

Downstream Impact

-- Query: What does analytics.customers feed?
WITH RECURSIVE lineage AS (
-- Base case: immediate downstream
SELECT
table_name,
schema_name,
'immediate_downstream' AS distance
FROM lineage
WHERE upstream_table = 'analytics.customers'
AND upstream_schema = 'analytics'
UNION ALL
-- Recursive case: downstream of downstream
SELECT
l.table_name,
l.schema_name,
distance + 1 AS distance
FROM lineage l
JOIN lineage r ON l.upstream_table = r.table_name
WHERE l.distance > 0
)
SELECT
schema_name,
table_name,
MAX(distance) AS degrees_of_separation
FROM lineage
GROUP BY schema_name, table_name
ORDER BY degrees_of_separation;
-- Result:
-- schema_name | table_name | degrees_of_separation
-- marts | customer_360 | 1
-- marts | sales_summary | 2
-- reports | customer_summary | 1
-- ml | customer_churn_model | 2

Lineage Best Practices

DO

# 1. Emit lineage events for all data transformations
# Automatic: OpenLineage listeners
# Manual: Emit events using SDK
# 2. Use unique identifiers
# URN format: urn:li:dataset:namespace:table
# 3. Document business lineage
# Connect technical tables to business terms
# 4. Track metadata changes
# Schema changes, new tables, deprecated tables
# 5. Visualize lineage
# Use DataHub UI for lineage graphs

DON’T

# 1. Don't ignore lineage
# Essential for impact analysis
# 2. Don't document lineage manually
# Automatic lineage is more accurate
# 3. Don't skip upstream dependencies
# Track all data sources
# 4. Don't forget business lineage
# Connect to business terms
# 5. Don't ignore deprecated tables
# Mark deprecated, don't delete lineage

Key Takeaways

  1. Lineage tracking: End-to-end data flow visibility
  2. OpenLineage: Open standard for lineage events
  3. Marquez: Backend for lineage events
  4. DataHub: Frontend for lineage visualization
  5. Impact analysis: Upstream/downstream dependencies
  6. Root cause analysis: Trace errors to source
  7. Compliance: Regulatory requirements (GDPR, CCPA)
  8. Use When: Data governance, impact analysis, root cause analysis

Back to Module 4