Skip to content

Airflow Modern Guide

Orchestration for Modern Data Platforms


Overview

Apache Airflow is the most popular open-source workflow orchestration platform for data engineering. Modern Airflow (2.0+) includes task groups, dynamic task mapping, provider packages, and improved DAG authoring.


Airflow 2.0 Architecture


Modern DAG Authoring

TaskFlow API (Airflow 2.3+)

from airflow.decorators import dag, task
from datetime import datetime, timedelta
import pandas as pd
@dag(
dag_id='daily_sales_etl',
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['etl', 'sales'],
default_args={
'owner': 'data-engineering',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
)
def daily_sales_etl():
"""
Daily sales ETL pipeline using TaskFlow API.
Extracts from source, transforms, loads to warehouse.
"""
@task
def extract_sales() -> pd.DataFrame:
"""Extract sales data from source system."""
import requests
response = requests.get('https://api.example.com/sales')
return pd.DataFrame(response.json())
@task
def transform_sales(sales_df: pd.DataFrame) -> pd.DataFrame:
"""Transform sales data."""
# Clean and transform
sales_df['sale_date'] = pd.to_datetime(sales_df['sale_date'])
sales_df = sales_df[sales_df['amount'] > 0]
return sales_df
@task
def load_to_warehouse(sales_df: pd.DataFrame) -> None:
"""Load sales data to warehouse."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='warehouse')
# Insert data
sales_df.to_sql(
'sales',
hook.get_sqlalchemy_engine(),
if_exists='append',
index=False
)
# Define task dependencies
sales_data = extract_sales()
transformed_data = transform_sales(sales_data)
load_to_warehouse(transformed_data)
# Instantiate DAG
daily_sales_etl_dag = daily_sales_etl()

Task Groups

from airflow.decorators import dag, task_group
from airflow.operators.python import PythonOperator
from datetime import datetime
@dag(
dag_id='etl_pipeline_with_groups',
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False,
)
def etl_pipeline():
"""
ETL pipeline with task groups for better organization.
"""
@task_group(group_id='extract')
def extract():
"""Extract data from multiple sources."""
from airflow.providers.http.operators.http import SimpleHttpOperator
extract_api = SimpleHttpOperator(
task_id='extract_from_api',
http_conn_id='api_source',
endpoint='/sales',
method='GET',
)
extract_db = SqlExecuteQueryOperator(
task_id='extract_from_db',
conn_id='source_db',
sql='SELECT * FROM customers WHERE updated_at >= {{ ds }}',
)
return [extract_api, extract_db]
@task_group(group_id='transform')
def transform():
"""Transform data."""
from airflow.operators.python import PythonOperator
transform_data = PythonOperator(
task_id='apply_transformations',
python_callable=lambda: print('Transforming data...'),
)
validate_data = PythonOperator(
task_id='validate_data',
python_callable=lambda: print('Validating data...'),
)
transform_data >> validate_data
@task_group(group_id='load')
def load():
"""Load data to targets."""
from airflow.providers.postgres.operators.postgres import PostgresOperator
load_warehouse = PostgresOperator(
task_id='load_to_warehouse',
postgres_conn_id='warehouse',
sql='INSERT INTO sales SELECT * FROM staging_sales',
)
load_datamart = PostgresOperator(
task_id='load_to_datamart',
postgres_conn_id='warehouse',
sql='REFRESH MATERIALIZED VIEW mv_customer_sales',
)
load_warehouse >> load_datamart
# Define task group dependencies
extract() >> transform() >> load()
etl_pipeline_dag = etl_pipeline()

Dynamic Task Mapping

from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='dynamic_file_processing',
schedule_interval='@hourly',
start_date=datetime(2025, 1, 1),
catchup=False,
)
def dynamic_file_processing():
"""
Process files dynamically using task mapping.
"""
@task
def list_files() -> list[str]:
"""List files to process."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3 = S3Hook(aws_conn_id='aws_default')
files = s3.list_keys(
bucket='my-bucket',
prefix='incoming/{{ ds_nodash }}/'
)
return files
@task
def process_file(file_path: str) -> str:
"""Process a single file."""
print(f"Processing {file_path}")
# Process file
return f"Processed {file_path}"
@task
def aggregate_results(results: list[str]) -> None:
"""Aggregate processing results."""
print(f"Processed {len(results)} files")
# Dynamic task mapping
files = list_files()
processed_files = process_file.expand(file_path=files)
aggregate_results(processed_files)
dynamic_file_processing_dag = dynamic_file_processing()

Airflow Configuration

Best Practices

# airflow.cfg
[core]
# Executor
executor = KubernetesExecutor # or CeleryExecutor, LocalExecutor
# DAGs directory
dags_folder = /opt/airflow/dags
# Plugins directory
plugins_folder = /opt/airflow/plugins
# Parallelism
parallelism = 32
dag_concurrency = 16
max_active_runs_per_dag = 4
# Load examples
load_examples = False
# Don't pick up DAGs from dead sync processes
pickled_dags_folder = /tmp/dags
[logging]
# Log level
log_level = INFO
# Base log folder
base_log_folder = /var/log/airflow
# Log format
log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(lineno)d%%(reset)s} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
[webserver]
# Webserver port
web_server_port = 8080
# Expose config
expose_config = False
[database]
# SQL Alchemy connection
sql_alchemy_conn = postgresql://airflow:password@postgres:5432/airflow
# Pool size
sql_alchemy_pool_size = 5
# Max overflow
sql_alchemy_max_overflow = 10
[kubernetes]
# Kubernetes namespace
namespace = airflow
# Worker service account name
worker_service_account_name = airflow-worker
# Worker container image
worker_container_repository = apache/airflow
worker_container_tag = 2.7.0-python3.10
# Worker resources
worker_pod_resources_request_cpu = 500m
worker_pod_resources_request_memory = 512Mi
worker_pod_resources_limit_cpu = 1000m
worker_pod_resources_limit_memory = 2Gi

Airflow Providers

Provider Packages

Terminal window
# Apache Provider (AWS)
pip install apache-airflow-providers-amazon
# Google Provider (GCP)
pip install apache-airflow-providers-google
# Microsoft Azure Provider
pip install apache-airflow-providers-microsoft-azure
# PostgreSQL Provider
pip install apache-airflow-providers-postgres
# Snowflake Provider
pip install apache-airflow-providers-snowflake
# Databricks Provider
pip install apache-airflow-providers-databricks
# HTTP Provider
pip install apache-airflow-providers-http

Using Providers

from airflow import DAG
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from datetime import datetime
with DAG(
's3_to_redshift_etl',
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
# Transfer S3 to Redshift
s3_to_redshift = S3ToRedshiftOperator(
task_id='s3_to_redshift',
s3_bucket='my-bucket',
s3_key='sales/{{ ds_nodash }}/*.csv',
schema='public',
table='sales',
copy_params={
'FORMAT': 'CSV',
'DELIMITER': ',',
'IGNOREHEADER': 1,
},
redshift_conn_id='redshift_default',
aws_conn_id='aws_default',
)
# Vacuum and analyze
vacuum_analyze = PostgresOperator(
task_id='vacuum_analyze',
postgres_conn_id='redshift_default',
sql='''
VACUUM FULL public.sales;
ANALYZE public.sales;
''',
)
# Send Slack notification
slack_success = SlackAPIPostOperator(
task_id='slack_success',
slack_conn_id='slack_default',
text='✅ S3 to Redshift ETL completed successfully!',
channel='#data-engineering',
)
s3_to_redshift >> vacuum_analyze >> slack_success

Airflow Monitoring

Metrics

from airflow import DAG
from airflow.providers.apache.kafka.operators.produce import KafkaProduceOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
def send_metrics(**context):
"""Send DAG metrics to Kafka."""
import json
from airflow.providers.kafka.hooks.kafka import KafkaHook
kafka_hook = KafkaHook(kafka_conn_id='kafka_default')
metrics = {
'dag_id': context['dag'].dag_id,
'task_id': context['task'].task_id,
'execution_date': str(context['execution_date']),
'start_date': str(context['dag_run'].start_date),
'duration': (context['dag_run'].end_date - context['dag_run'].start_date).total_seconds(),
'status': context['dag_run'].state,
}
kafka_hook.produce(
topic='airflow_metrics',
value=json.dumps(metrics).encode('utf-8'),
)
with DAG(
'etl_with_metrics',
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False,
on_success_callback=send_metrics,
on_failure_callback=send_metrics,
) as dag:
# DAG tasks...
pass

Airflow Best Practices

DO

# 1. Use TaskFlow API (Airflow 2.3+)
from airflow.decorators import dag, task
@dag(...)
def my_dag():
@task
def my_task():
return "data"
my_task()
# 2. Use task groups for organization
@task_group(group_id='extract')
def extract():
# Multiple extraction tasks
pass
# 3. Use XCom sparingly (prefer return values)
@task
def task1():
return "data"
@task
def task2(data):
print(data)
task1() >> task2()
# 4. Use connections for secrets
# Store credentials in Airflow connections, not in DAGs
# 5. Use proper scheduling
schedule_interval='@daily' # Not rate limiting

DON’T

# 1. Don't use Top Level Python code in DAG files
# Bad: Top-level code runs on scheduler parse
import time
time.sleep(10) # Don't do this!
# 2. Don't ignore catchup=False
# Bad: Processes all historical runs
dag = DAG('my_dag', start_date=datetime(2020, 1, 1))
# Good: Only process recent runs
dag = DAG('my_dag', start_date=datetime(2025, 1, 1), catchup=False)
# 3. Don't use large data in XCom
# XCom has size limits (1GB default)
# 4. Don't hardcode values
# Bad: Hardcoded values
'table_name': 'my_table'
# Good: Use variables or macros
'table_name': '{{ var.value.MY_TABLE }}'
# 5. Don't ignore data dependencies
# Use sensors for data availability

Key Takeaways

  1. TaskFlow API: Modern, type-safe DAG authoring
  2. Task Groups: Organize complex DAGs
  3. Dynamic Mapping: Process dynamic data
  4. Providers: 1000+ operators for all services
  5. Monitoring: Track DAG performance
  6. Best Practices: Use TaskFlow API, avoid top-level code
  7. CI/CD: Test DAGs before deploying
  8. Scaling: KubernetesExecutor for cloud-native scaling

Back to Module 3