Dagster Orchestration Guide
Data-Aware Orchestration for Modern Data Platforms
Overview
Dagster is a data-aware orchestrator for machine learning, analytics, and ETL. Unlike Airflow (task-oriented), Dagster is data-oriented - it understands the data your pipelines produce and can model complex dependencies.
Dagster Architecture
Key Differences from Airflow:
- Data-oriented: Model data assets, not tasks
- Type-safe: Use Python type hints
- Testing: Test ops independently
- Software-defined: Everything is code
Dagster Ops and Jobs
Basic Ops
from dagster import op, jobimport pandas as pd
@opdef extract_sales() -> pd.DataFrame: """Extract sales data from source.""" import requests response = requests.get('https://api.example.com/sales') return pd.DataFrame(response.json())
@opdef transform_sales(sales_df: pd.DataFrame) -> pd.DataFrame: """Transform sales data.""" sales_df['sale_date'] = pd.to_datetime(sales_df['sale_date']) sales_df = sales_df[sales_df['amount'] > 0] return sales_df
@opdef load_to_warehouse(sales_df: pd.DataFrame) -> None: """Load sales data to warehouse.""" from sqlalchemy import create_engine engine = create_engine('postgresql://user:pass@warehouse/db') sales_df.to_sql('sales', engine, if_exists='append', index=False)
# Define job (DAG equivalent)@jobdef daily_sales_etl(): """Daily sales ETL job.""" sales_data = extract_sales() transformed_data = transform_sales(sales_data) load_to_warehouse(transformed_data)Ops with Configuration
from dagster import op, job, In, Outfrom dagster import Configurablefrom pydantic import Field
class ExtractSalesConfig(Configurable): """Configuration for extract_sales op.""" api_url: str = Field(description="API endpoint URL") timeout: int = Field(default=30, description="Request timeout")
@opdef extract_sales(config: ExtractSalesConfig) -> pd.DataFrame: """Extract sales data with configuration.""" import requests response = requests.get(config.api_url, timeout=config.timeout) return pd.DataFrame(response.json())
@jobdef configured_etl(): """ETL job with configuration.""" sales_data = extract_sales() transformed_data = transform_sales(sales_data) load_to_warehouse(transformed_data)Dagster Assets
Data Assets
from dagster import asset, AssetExecutionContextimport pandas as pd
@assetdef raw_sales(context: AssetExecutionContext) -> pd.DataFrame: """ Raw sales data asset. Automatically tracked by Dagster. """ import requests response = requests.get('https://api.example.com/sales') df = pd.DataFrame(response.json())
# Add metadata context.add_output_metadata({ 'rows': len(df), 'columns': list(df.columns), 'preview': df.head().to_dict('records'), })
return df
@asset(deps=[raw_sales])def cleaned_sales(raw_sales: pd.DataFrame) -> pd.DataFrame: """ Cleaned sales data asset. Depends on raw_sales asset. """ df = raw_sales.copy() df['sale_date'] = pd.to_datetime(df['sale_date']) df = df[df['amount'] > 0] return df
@asset(deps=[cleaned_sales])def sales_by_customer(cleaned_sales: pd.DataFrame) -> pd.DataFrame: """ Sales aggregated by customer. Depends on cleaned_sales asset. """ return cleaned_sales.groupby('customer_id').agg({ 'amount': 'sum', 'sale_id': 'count' }).rename(columns={'sale_id': 'sales_count'})
@asset(deps=[sales_by_customer])def top_customers(sales_by_customer: pd.DataFrame) -> pd.DataFrame: """Top 10 customers by revenue.""" return sales_by_customer.nlargest(10, 'amount')Asset Groups
from dagster import asset_group, define_asset_job
sales_assets = asset_group( name="sales_assets", assets=[ raw_sales, cleaned_sales, sales_by_customer, top_customers, ],)
# Define job for asset groupsales_job = define_asset_job( name="sales_job", selection=sales_assets, config={ 'ops': { 'raw_sales': { 'config': { 'api_url': 'https://api.example.com/sales', } } } })Dagster IO Managers
Custom IO Manager
from dagster import io_manager, IoManagerfrom pyspark.sql import DataFrame as SparkDataFrame
class ParquetIOManager(IOManager): """IO Manager for reading/writing Parquet files."""
def __init__(self, base_path: str): self.base_path = base_path
def handle_output(self, context, obj): """Write output to Parquet.""" from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()
path = f"{self.base_path}/{context.asset_key.path_head}/" obj.write.parquet(path)
# Log metadata context.log.info(f"Wrote {path}")
def load_input(self, context): """Read input from Parquet.""" from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()
path = f"{self.base_path}/{context.asset_key.path_head}/" return spark.read.parquet(path)
# Use IO managerfrom dagster import Definitions
defs = Definitions( assets=[raw_sales, cleaned_sales, sales_by_customer], resources={ "io_manager": ParquetIOManager(base_path="s3://bucket/delta/"), })Type-Based IO Manager
from dagster import io_manager, Definitionsimport pandas as pdfrom pyspark.sql import DataFrame as SparkDataFrame
@io_managerdef pandas_io_manager(): """IO Manager for Pandas DataFrames.""" class PandasIOManager(IOManager): def handle_output(self, context, obj: pd.DataFrame): obj.to_parquet(f"s3://bucket/{context.asset_key.path_head}.parquet")
def load_input(self, context): return pd.read_parquet(f"s3://bucket/{context.asset_key.path_head}.parquet")
return PandasIOManager()
@io_managerdef spark_io_manager(): """IO Manager for Spark DataFrames.""" class SparkIOManager(IOManager): def handle_output(self, context, obj: SparkDataFrame): obj.write.parquet(f"s3://bucket/{context.asset_key.path_head}.parquet")
def load_input(self, context): from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() return spark.read.parquet(f"s3://bucket/{context.asset_key.path_head}.parquet")
return SparkIOManager()
# Type-based IO manager selectiondefs = Definitions( assets=[pandas_asset, spark_asset], resources={ "io_manager": spark_io_manager, # For Spark DataFrames "pandas_io_manager": pandas_io_manager, # For Pandas DataFrames })Dagster Schedules and Sensors
Schedules
from dagster import schedule, jobfrom datetime import datetime
@schedule( cron_schedule="0 0 * * *", # Daily at midnight job=daily_sales_etl,)def daily_sales_schedule(context): """ Schedule for daily sales ETL. Runs every day at midnight UTC. """ return { 'ops': { 'extract_sales': { 'config': { 'api_url': f'https://api.example.com/sales?date={context.scheduled_execution_time.strftime("%Y-%m-%d")}', } } } }
# Hourly schedule@schedule( cron_schedule="@hourly", job=some_job,)def hourly_schedule(context): """Run every hour.""" return {}Sensors
from dagster import sensor, RunRequest, AssetKeyfrom datetime import datetime
@sensor(job=daily_sales_etl)def s3_sensor(context): """ Sensor that triggers job when new file arrives in S3. """ import boto3 s3 = boto3.client('s3')
# Check for new files response = s3.list_objects_v2( Bucket='my-bucket', Prefix='incoming/' )
if 'Contents' in response: for obj in response['Contents']: # Check if already processed if not context.cursor or obj['LastModified'] > context.cursor: # Yield run request yield RunRequest( run_key=f"s3_{obj['Key']}", run_config={ 'ops': { 'extract_sales': { 'config': { 's3_key': obj['Key'], } } } } )
# Update cursor context.update_cursor(obj['LastModified'])
@sensor(job=some_job, minimum_interval_seconds=30)def asset_sensor(context): """ Sensor that triggers when upstream assets are updated. """ # Check if upstream asset materialized last_run = context.instance.get_latest_materialization_record(AssetKey("upstream_asset"))
if last_run: # Trigger downstream job yield RunRequest( run_key=f"triggered_{last_run.event_log_entry.timestamp}", )Dagster Testing
Testing Ops
from dagster import build_op_contextimport pandas as pdimport pytest
def test_transform_sales(): """Test transform_sales op.""" from my_dag import transform_sales
# Create test input test_df = pd.DataFrame({ 'sale_id': [1, 2, 3], 'customer_id': [1, 2, 3], 'sale_date': ['2025-01-27', '2025-01-27', '2025-01-27'], 'amount': [100, -50, 200], # One negative amount })
# Execute op context = build_op_context() result = transform_sales(context, test_df)
# Assert result assert len(result) == 2 # Negative amount removed assert result['amount'].min() > 0 # All positive assert pd.api.types.is_datetime64_any_dtype(result['sale_date']) # Date converted
def test_extract_sales(monkeypatch): """Test extract_sales op.""" from my_dag import extract_sales from unittest.mock import Mock
# Mock API response mock_response = Mock() mock_response.json.return_value = [ {'sale_id': 1, 'amount': 100}, {'sale_id': 2, 'amount': 200}, ]
monkeypatch.setattr('requests.get', lambda *args, **kwargs: mock_response)
# Execute op context = build_op_context() result = extract_sales(context)
# Assert result assert len(result) == 2 assert result['amount'].sum() == 300Testing Jobs
from dagster import build_select_context, materialize_to_memoryfrom my_dag import daily_sales_etl
def test_daily_sales_etl(): """Test entire ETL job.""" # Build context context = build_select_context( daily_sales_etl, op_selection=['extract_sales', 'transform_sales'], )
# Execute job result = context.execute_plan(daily_sales_etl)
# Assert success assert result.success
def test_assets(): """Test asset materialization.""" from my_dag import raw_sales, cleaned_sales
# Materialize to memory (no external storage) result = materialize_to_memory( [raw_sales, cleaned_sales], )
# Assert success assert result.success assert result.output_for_node('cleaned_sales') is not NoneDagster Best Practices
DO
# 1. Use assets for data modeling@assetdef my_data(): return pd.DataFrame(...)
# 2. Use type hints for ops@opdef my_op(df: pd.DataFrame) -> pd.DataFrame: return df
# 3. Use IO managers for I/O@io_managerdef my_io_manager(): return MyIOManager()
# 4. Test ops independentlydef test_my_op(): result = my_op(build_op_context(), test_input) assert result == expected
# 5. Use sensors for event-driven triggers@sensor(job=my_job)def my_sensor(context): yield RunRequest(...)DON’T
# 1. Don't ignore data dependencies# Bad: No dependencies between ops@opdef op1(): return "data"
@opdef op2(): return "more data"
# Good: Explicit dependencies@opdef op1(): return "data"
@opdef op2(data): return data + " more"
# 2. Don't hardcode values# Use configuration instead
# 3. Don't ignore testing# Test ops and jobs
# 4. Don't use Dagster like Airflow# Model data assets, not tasks
# 5. Don't ignore IO managers# Use IO managers for data I/ODagster vs. Airflow
| Feature | Dagster | Airflow |
|---|---|---|
| Paradigm | Data-oriented | Task-oriented |
| Type Safety | Python type hints | Dynamic |
| Testing | Test ops independently | Hard to test |
| Data Tracking | Assets (automatic) | Manual (XCom) |
| Configuration | Pydantic models | JSON/YAML |
| Maturity | Newer | Mature |
| Best For | ML, analytics, complex data | ETL, batch processing |
Key Takeaways
- Data-oriented: Model data assets, not tasks
- Type-safe: Python type hints for ops
- Testing: Test ops independently
- Assets: Automatic data lineage
- IO Managers: Type-based I/O handling
- Sensors: Event-driven triggers
- Software-defined: Everything is code
- Use When: ML, analytics, complex data dependencies
Back to Module 3