Skip to content

Dagster Orchestration Guide

Data-Aware Orchestration for Modern Data Platforms


Overview

Dagster is a data-aware orchestrator for machine learning, analytics, and ETL. Unlike Airflow (task-oriented), Dagster is data-oriented - it understands the data your pipelines produce and can model complex dependencies.


Dagster Architecture

Key Differences from Airflow:

  • Data-oriented: Model data assets, not tasks
  • Type-safe: Use Python type hints
  • Testing: Test ops independently
  • Software-defined: Everything is code

Dagster Ops and Jobs

Basic Ops

from dagster import op, job
import pandas as pd
@op
def extract_sales() -> pd.DataFrame:
"""Extract sales data from source."""
import requests
response = requests.get('https://api.example.com/sales')
return pd.DataFrame(response.json())
@op
def transform_sales(sales_df: pd.DataFrame) -> pd.DataFrame:
"""Transform sales data."""
sales_df['sale_date'] = pd.to_datetime(sales_df['sale_date'])
sales_df = sales_df[sales_df['amount'] > 0]
return sales_df
@op
def load_to_warehouse(sales_df: pd.DataFrame) -> None:
"""Load sales data to warehouse."""
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@warehouse/db')
sales_df.to_sql('sales', engine, if_exists='append', index=False)
# Define job (DAG equivalent)
@job
def daily_sales_etl():
"""Daily sales ETL job."""
sales_data = extract_sales()
transformed_data = transform_sales(sales_data)
load_to_warehouse(transformed_data)

Ops with Configuration

from dagster import op, job, In, Out
from dagster import Configurable
from pydantic import Field
class ExtractSalesConfig(Configurable):
"""Configuration for extract_sales op."""
api_url: str = Field(description="API endpoint URL")
timeout: int = Field(default=30, description="Request timeout")
@op
def extract_sales(config: ExtractSalesConfig) -> pd.DataFrame:
"""Extract sales data with configuration."""
import requests
response = requests.get(config.api_url, timeout=config.timeout)
return pd.DataFrame(response.json())
@job
def configured_etl():
"""ETL job with configuration."""
sales_data = extract_sales()
transformed_data = transform_sales(sales_data)
load_to_warehouse(transformed_data)

Dagster Assets

Data Assets

from dagster import asset, AssetExecutionContext
import pandas as pd
@asset
def raw_sales(context: AssetExecutionContext) -> pd.DataFrame:
"""
Raw sales data asset.
Automatically tracked by Dagster.
"""
import requests
response = requests.get('https://api.example.com/sales')
df = pd.DataFrame(response.json())
# Add metadata
context.add_output_metadata({
'rows': len(df),
'columns': list(df.columns),
'preview': df.head().to_dict('records'),
})
return df
@asset(deps=[raw_sales])
def cleaned_sales(raw_sales: pd.DataFrame) -> pd.DataFrame:
"""
Cleaned sales data asset.
Depends on raw_sales asset.
"""
df = raw_sales.copy()
df['sale_date'] = pd.to_datetime(df['sale_date'])
df = df[df['amount'] > 0]
return df
@asset(deps=[cleaned_sales])
def sales_by_customer(cleaned_sales: pd.DataFrame) -> pd.DataFrame:
"""
Sales aggregated by customer.
Depends on cleaned_sales asset.
"""
return cleaned_sales.groupby('customer_id').agg({
'amount': 'sum',
'sale_id': 'count'
}).rename(columns={'sale_id': 'sales_count'})
@asset(deps=[sales_by_customer])
def top_customers(sales_by_customer: pd.DataFrame) -> pd.DataFrame:
"""Top 10 customers by revenue."""
return sales_by_customer.nlargest(10, 'amount')

Asset Groups

from dagster import asset_group, define_asset_job
sales_assets = asset_group(
name="sales_assets",
assets=[
raw_sales,
cleaned_sales,
sales_by_customer,
top_customers,
],
)
# Define job for asset group
sales_job = define_asset_job(
name="sales_job",
selection=sales_assets,
config={
'ops': {
'raw_sales': {
'config': {
'api_url': 'https://api.example.com/sales',
}
}
}
}
)

Dagster IO Managers

Custom IO Manager

from dagster import io_manager, IoManager
from pyspark.sql import DataFrame as SparkDataFrame
class ParquetIOManager(IOManager):
"""IO Manager for reading/writing Parquet files."""
def __init__(self, base_path: str):
self.base_path = base_path
def handle_output(self, context, obj):
"""Write output to Parquet."""
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
path = f"{self.base_path}/{context.asset_key.path_head}/"
obj.write.parquet(path)
# Log metadata
context.log.info(f"Wrote {path}")
def load_input(self, context):
"""Read input from Parquet."""
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
path = f"{self.base_path}/{context.asset_key.path_head}/"
return spark.read.parquet(path)
# Use IO manager
from dagster import Definitions
defs = Definitions(
assets=[raw_sales, cleaned_sales, sales_by_customer],
resources={
"io_manager": ParquetIOManager(base_path="s3://bucket/delta/"),
}
)

Type-Based IO Manager

from dagster import io_manager, Definitions
import pandas as pd
from pyspark.sql import DataFrame as SparkDataFrame
@io_manager
def pandas_io_manager():
"""IO Manager for Pandas DataFrames."""
class PandasIOManager(IOManager):
def handle_output(self, context, obj: pd.DataFrame):
obj.to_parquet(f"s3://bucket/{context.asset_key.path_head}.parquet")
def load_input(self, context):
return pd.read_parquet(f"s3://bucket/{context.asset_key.path_head}.parquet")
return PandasIOManager()
@io_manager
def spark_io_manager():
"""IO Manager for Spark DataFrames."""
class SparkIOManager(IOManager):
def handle_output(self, context, obj: SparkDataFrame):
obj.write.parquet(f"s3://bucket/{context.asset_key.path_head}.parquet")
def load_input(self, context):
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
return spark.read.parquet(f"s3://bucket/{context.asset_key.path_head}.parquet")
return SparkIOManager()
# Type-based IO manager selection
defs = Definitions(
assets=[pandas_asset, spark_asset],
resources={
"io_manager": spark_io_manager, # For Spark DataFrames
"pandas_io_manager": pandas_io_manager, # For Pandas DataFrames
}
)

Dagster Schedules and Sensors

Schedules

from dagster import schedule, job
from datetime import datetime
@schedule(
cron_schedule="0 0 * * *", # Daily at midnight
job=daily_sales_etl,
)
def daily_sales_schedule(context):
"""
Schedule for daily sales ETL.
Runs every day at midnight UTC.
"""
return {
'ops': {
'extract_sales': {
'config': {
'api_url': f'https://api.example.com/sales?date={context.scheduled_execution_time.strftime("%Y-%m-%d")}',
}
}
}
}
# Hourly schedule
@schedule(
cron_schedule="@hourly",
job=some_job,
)
def hourly_schedule(context):
"""Run every hour."""
return {}

Sensors

from dagster import sensor, RunRequest, AssetKey
from datetime import datetime
@sensor(job=daily_sales_etl)
def s3_sensor(context):
"""
Sensor that triggers job when new file arrives in S3.
"""
import boto3
s3 = boto3.client('s3')
# Check for new files
response = s3.list_objects_v2(
Bucket='my-bucket',
Prefix='incoming/'
)
if 'Contents' in response:
for obj in response['Contents']:
# Check if already processed
if not context.cursor or obj['LastModified'] > context.cursor:
# Yield run request
yield RunRequest(
run_key=f"s3_{obj['Key']}",
run_config={
'ops': {
'extract_sales': {
'config': {
's3_key': obj['Key'],
}
}
}
}
)
# Update cursor
context.update_cursor(obj['LastModified'])
@sensor(job=some_job, minimum_interval_seconds=30)
def asset_sensor(context):
"""
Sensor that triggers when upstream assets are updated.
"""
# Check if upstream asset materialized
last_run = context.instance.get_latest_materialization_record(AssetKey("upstream_asset"))
if last_run:
# Trigger downstream job
yield RunRequest(
run_key=f"triggered_{last_run.event_log_entry.timestamp}",
)

Dagster Testing

Testing Ops

from dagster import build_op_context
import pandas as pd
import pytest
def test_transform_sales():
"""Test transform_sales op."""
from my_dag import transform_sales
# Create test input
test_df = pd.DataFrame({
'sale_id': [1, 2, 3],
'customer_id': [1, 2, 3],
'sale_date': ['2025-01-27', '2025-01-27', '2025-01-27'],
'amount': [100, -50, 200], # One negative amount
})
# Execute op
context = build_op_context()
result = transform_sales(context, test_df)
# Assert result
assert len(result) == 2 # Negative amount removed
assert result['amount'].min() > 0 # All positive
assert pd.api.types.is_datetime64_any_dtype(result['sale_date']) # Date converted
def test_extract_sales(monkeypatch):
"""Test extract_sales op."""
from my_dag import extract_sales
from unittest.mock import Mock
# Mock API response
mock_response = Mock()
mock_response.json.return_value = [
{'sale_id': 1, 'amount': 100},
{'sale_id': 2, 'amount': 200},
]
monkeypatch.setattr('requests.get', lambda *args, **kwargs: mock_response)
# Execute op
context = build_op_context()
result = extract_sales(context)
# Assert result
assert len(result) == 2
assert result['amount'].sum() == 300

Testing Jobs

from dagster import build_select_context, materialize_to_memory
from my_dag import daily_sales_etl
def test_daily_sales_etl():
"""Test entire ETL job."""
# Build context
context = build_select_context(
daily_sales_etl,
op_selection=['extract_sales', 'transform_sales'],
)
# Execute job
result = context.execute_plan(daily_sales_etl)
# Assert success
assert result.success
def test_assets():
"""Test asset materialization."""
from my_dag import raw_sales, cleaned_sales
# Materialize to memory (no external storage)
result = materialize_to_memory(
[raw_sales, cleaned_sales],
)
# Assert success
assert result.success
assert result.output_for_node('cleaned_sales') is not None

Dagster Best Practices

DO

# 1. Use assets for data modeling
@asset
def my_data():
return pd.DataFrame(...)
# 2. Use type hints for ops
@op
def my_op(df: pd.DataFrame) -> pd.DataFrame:
return df
# 3. Use IO managers for I/O
@io_manager
def my_io_manager():
return MyIOManager()
# 4. Test ops independently
def test_my_op():
result = my_op(build_op_context(), test_input)
assert result == expected
# 5. Use sensors for event-driven triggers
@sensor(job=my_job)
def my_sensor(context):
yield RunRequest(...)

DON’T

# 1. Don't ignore data dependencies
# Bad: No dependencies between ops
@op
def op1():
return "data"
@op
def op2():
return "more data"
# Good: Explicit dependencies
@op
def op1():
return "data"
@op
def op2(data):
return data + " more"
# 2. Don't hardcode values
# Use configuration instead
# 3. Don't ignore testing
# Test ops and jobs
# 4. Don't use Dagster like Airflow
# Model data assets, not tasks
# 5. Don't ignore IO managers
# Use IO managers for data I/O

Dagster vs. Airflow

FeatureDagsterAirflow
ParadigmData-orientedTask-oriented
Type SafetyPython type hintsDynamic
TestingTest ops independentlyHard to test
Data TrackingAssets (automatic)Manual (XCom)
ConfigurationPydantic modelsJSON/YAML
MaturityNewerMature
Best ForML, analytics, complex dataETL, batch processing

Key Takeaways

  1. Data-oriented: Model data assets, not tasks
  2. Type-safe: Python type hints for ops
  3. Testing: Test ops independently
  4. Assets: Automatic data lineage
  5. IO Managers: Type-based I/O handling
  6. Sensors: Event-driven triggers
  7. Software-defined: Everything is code
  8. Use When: ML, analytics, complex data dependencies

Back to Module 3