Airflow Modern Guide
Orchestration for Modern Data Platforms
Overview
Apache Airflow is the most popular open-source workflow orchestration platform for data engineering. Modern Airflow (2.0+) includes task groups, dynamic task mapping, provider packages, and improved DAG authoring.
Airflow 2.0 Architecture
Modern DAG Authoring
TaskFlow API (Airflow 2.3+)
from airflow.decorators import dag, taskfrom datetime import datetime, timedeltaimport pandas as pd
@dag( dag_id='daily_sales_etl', schedule_interval='@daily', start_date=datetime(2025, 1, 1), catchup=False, tags=['etl', 'sales'], default_args={ 'owner': 'data-engineering', 'retries': 3, 'retry_delay': timedelta(minutes=5), })def daily_sales_etl(): """ Daily sales ETL pipeline using TaskFlow API. Extracts from source, transforms, loads to warehouse. """
@task def extract_sales() -> pd.DataFrame: """Extract sales data from source system.""" import requests response = requests.get('https://api.example.com/sales') return pd.DataFrame(response.json())
@task def transform_sales(sales_df: pd.DataFrame) -> pd.DataFrame: """Transform sales data.""" # Clean and transform sales_df['sale_date'] = pd.to_datetime(sales_df['sale_date']) sales_df = sales_df[sales_df['amount'] > 0] return sales_df
@task def load_to_warehouse(sales_df: pd.DataFrame) -> None: """Load sales data to warehouse.""" from airflow.providers.postgres.hooks.postgres import PostgresHook hook = PostgresHook(postgres_conn_id='warehouse')
# Insert data sales_df.to_sql( 'sales', hook.get_sqlalchemy_engine(), if_exists='append', index=False )
# Define task dependencies sales_data = extract_sales() transformed_data = transform_sales(sales_data) load_to_warehouse(transformed_data)
# Instantiate DAGdaily_sales_etl_dag = daily_sales_etl()Task Groups
from airflow.decorators import dag, task_groupfrom airflow.operators.python import PythonOperatorfrom datetime import datetime
@dag( dag_id='etl_pipeline_with_groups', schedule_interval='@daily', start_date=datetime(2025, 1, 1), catchup=False,)def etl_pipeline(): """ ETL pipeline with task groups for better organization. """
@task_group(group_id='extract') def extract(): """Extract data from multiple sources.""" from airflow.providers.http.operators.http import SimpleHttpOperator
extract_api = SimpleHttpOperator( task_id='extract_from_api', http_conn_id='api_source', endpoint='/sales', method='GET', )
extract_db = SqlExecuteQueryOperator( task_id='extract_from_db', conn_id='source_db', sql='SELECT * FROM customers WHERE updated_at >= {{ ds }}', )
return [extract_api, extract_db]
@task_group(group_id='transform') def transform(): """Transform data.""" from airflow.operators.python import PythonOperator
transform_data = PythonOperator( task_id='apply_transformations', python_callable=lambda: print('Transforming data...'), )
validate_data = PythonOperator( task_id='validate_data', python_callable=lambda: print('Validating data...'), )
transform_data >> validate_data
@task_group(group_id='load') def load(): """Load data to targets.""" from airflow.providers.postgres.operators.postgres import PostgresOperator
load_warehouse = PostgresOperator( task_id='load_to_warehouse', postgres_conn_id='warehouse', sql='INSERT INTO sales SELECT * FROM staging_sales', )
load_datamart = PostgresOperator( task_id='load_to_datamart', postgres_conn_id='warehouse', sql='REFRESH MATERIALIZED VIEW mv_customer_sales', )
load_warehouse >> load_datamart
# Define task group dependencies extract() >> transform() >> load()
etl_pipeline_dag = etl_pipeline()Dynamic Task Mapping
from airflow.decorators import dag, taskfrom datetime import datetime
@dag( dag_id='dynamic_file_processing', schedule_interval='@hourly', start_date=datetime(2025, 1, 1), catchup=False,)def dynamic_file_processing(): """ Process files dynamically using task mapping. """
@task def list_files() -> list[str]: """List files to process.""" from airflow.providers.amazon.aws.hooks.s3 import S3Hook s3 = S3Hook(aws_conn_id='aws_default') files = s3.list_keys( bucket='my-bucket', prefix='incoming/{{ ds_nodash }}/' ) return files
@task def process_file(file_path: str) -> str: """Process a single file.""" print(f"Processing {file_path}") # Process file return f"Processed {file_path}"
@task def aggregate_results(results: list[str]) -> None: """Aggregate processing results.""" print(f"Processed {len(results)} files")
# Dynamic task mapping files = list_files() processed_files = process_file.expand(file_path=files) aggregate_results(processed_files)
dynamic_file_processing_dag = dynamic_file_processing()Airflow Configuration
Best Practices
# airflow.cfg
[core]# Executorexecutor = KubernetesExecutor # or CeleryExecutor, LocalExecutor
# DAGs directorydags_folder = /opt/airflow/dags
# Plugins directoryplugins_folder = /opt/airflow/plugins
# Parallelismparallelism = 32dag_concurrency = 16max_active_runs_per_dag = 4
# Load examplesload_examples = False
# Don't pick up DAGs from dead sync processespickled_dags_folder = /tmp/dags
[logging]# Log levellog_level = INFO
# Base log folderbase_log_folder = /var/log/airflow
# Log formatlog_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(lineno)d%%(reset)s} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
[webserver]# Webserver portweb_server_port = 8080
# Expose configexpose_config = False
[database]# SQL Alchemy connectionsql_alchemy_conn = postgresql://airflow:password@postgres:5432/airflow
# Pool sizesql_alchemy_pool_size = 5
# Max overflowsql_alchemy_max_overflow = 10
[kubernetes]# Kubernetes namespacenamespace = airflow
# Worker service account nameworker_service_account_name = airflow-worker
# Worker container imageworker_container_repository = apache/airflowworker_container_tag = 2.7.0-python3.10
# Worker resourcesworker_pod_resources_request_cpu = 500mworker_pod_resources_request_memory = 512Miworker_pod_resources_limit_cpu = 1000mworker_pod_resources_limit_memory = 2GiAirflow Providers
Provider Packages
# Apache Provider (AWS)pip install apache-airflow-providers-amazon
# Google Provider (GCP)pip install apache-airflow-providers-google
# Microsoft Azure Providerpip install apache-airflow-providers-microsoft-azure
# PostgreSQL Providerpip install apache-airflow-providers-postgres
# Snowflake Providerpip install apache-airflow-providers-snowflake
# Databricks Providerpip install apache-airflow-providers-databricks
# HTTP Providerpip install apache-airflow-providers-httpUsing Providers
from airflow import DAGfrom airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperatorfrom airflow.providers.postgres.operators.postgres import PostgresOperatorfrom airflow.providers.slack.operators.slack import SlackAPIPostOperatorfrom datetime import datetime
with DAG( 's3_to_redshift_etl', schedule_interval='@daily', start_date=datetime(2025, 1, 1), catchup=False,) as dag: # Transfer S3 to Redshift s3_to_redshift = S3ToRedshiftOperator( task_id='s3_to_redshift', s3_bucket='my-bucket', s3_key='sales/{{ ds_nodash }}/*.csv', schema='public', table='sales', copy_params={ 'FORMAT': 'CSV', 'DELIMITER': ',', 'IGNOREHEADER': 1, }, redshift_conn_id='redshift_default', aws_conn_id='aws_default', )
# Vacuum and analyze vacuum_analyze = PostgresOperator( task_id='vacuum_analyze', postgres_conn_id='redshift_default', sql=''' VACUUM FULL public.sales; ANALYZE public.sales; ''', )
# Send Slack notification slack_success = SlackAPIPostOperator( task_id='slack_success', slack_conn_id='slack_default', text='✅ S3 to Redshift ETL completed successfully!', channel='#data-engineering', )
s3_to_redshift >> vacuum_analyze >> slack_successAirflow Monitoring
Metrics
from airflow import DAGfrom airflow.providers.apache.kafka.operators.produce import KafkaProduceOperatorfrom airflow.operators.python import PythonOperatorfrom datetime import datetime
def send_metrics(**context): """Send DAG metrics to Kafka.""" import json from airflow.providers.kafka.hooks.kafka import KafkaHook
kafka_hook = KafkaHook(kafka_conn_id='kafka_default')
metrics = { 'dag_id': context['dag'].dag_id, 'task_id': context['task'].task_id, 'execution_date': str(context['execution_date']), 'start_date': str(context['dag_run'].start_date), 'duration': (context['dag_run'].end_date - context['dag_run'].start_date).total_seconds(), 'status': context['dag_run'].state, }
kafka_hook.produce( topic='airflow_metrics', value=json.dumps(metrics).encode('utf-8'), )
with DAG( 'etl_with_metrics', schedule_interval='@daily', start_date=datetime(2025, 1, 1), catchup=False, on_success_callback=send_metrics, on_failure_callback=send_metrics,) as dag: # DAG tasks... passAirflow Best Practices
DO
# 1. Use TaskFlow API (Airflow 2.3+)from airflow.decorators import dag, task
@dag(...)def my_dag(): @task def my_task(): return "data"
my_task()
# 2. Use task groups for organization@task_group(group_id='extract')def extract(): # Multiple extraction tasks pass
# 3. Use XCom sparingly (prefer return values)@taskdef task1(): return "data"
@taskdef task2(data): print(data)
task1() >> task2()
# 4. Use connections for secrets# Store credentials in Airflow connections, not in DAGs
# 5. Use proper schedulingschedule_interval='@daily' # Not rate limitingDON’T
# 1. Don't use Top Level Python code in DAG files# Bad: Top-level code runs on scheduler parseimport timetime.sleep(10) # Don't do this!
# 2. Don't ignore catchup=False# Bad: Processes all historical runsdag = DAG('my_dag', start_date=datetime(2020, 1, 1))
# Good: Only process recent runsdag = DAG('my_dag', start_date=datetime(2025, 1, 1), catchup=False)
# 3. Don't use large data in XCom# XCom has size limits (1GB default)
# 4. Don't hardcode values# Bad: Hardcoded values'table_name': 'my_table'
# Good: Use variables or macros'table_name': '{{ var.value.MY_TABLE }}'
# 5. Don't ignore data dependencies# Use sensors for data availabilityKey Takeaways
- TaskFlow API: Modern, type-safe DAG authoring
- Task Groups: Organize complex DAGs
- Dynamic Mapping: Process dynamic data
- Providers: 1000+ operators for all services
- Monitoring: Track DAG performance
- Best Practices: Use TaskFlow API, avoid top-level code
- CI/CD: Test DAGs before deploying
- Scaling: KubernetesExecutor for cloud-native scaling
Back to Module 3