Prefect Orchestration Guide
Modern Workflow Orchestration for Data Engineering
Overview
Prefect is a modern workflow orchestration tool that brings reliability to data workflows. Unlike traditional orchestrators, Prefect embraces failure as a normal part of data workflows and provides powerful primitives for handling it.
Prefect Architecture
Key Features:
- Code-first: Define workflows in Python
- Resilient: Built-in retry, caching, time limits
- Flexible: Local, Cloud, or hybrid execution
- Observability: Real-time monitoring and logging
Prefect Flows
Basic Flow
from prefect import flow, taskimport pandas as pd
@taskdef extract_sales() -> pd.DataFrame: """Extract sales data from source.""" import requests response = requests.get('https://api.example.com/sales') return pd.DataFrame(response.json())
@taskdef transform_sales(sales_df: pd.DataFrame) -> pd.DataFrame: """Transform sales data.""" sales_df['sale_date'] = pd.to_datetime(sales_df['sale_date']) sales_df = sales_df[sales_df['amount'] > 0] return sales_df
@taskdef load_to_warehouse(sales_df: pd.DataFrame) -> None: """Load sales data to warehouse.""" from sqlalchemy import create_engine engine = create_engine('postgresql://user:pass@warehouse/db') sales_df.to_sql('sales', engine, if_exists='append', index=False)
@flow(name="Daily Sales ETL")def daily_sales_etl(): """ Daily sales ETL flow. """ sales_data = extract_sales() transformed_data = transform_sales(sales_data) load_to_warehouse(transformed_data)
# Run flowif __name__ == "__main__": daily_sales_etl()Task Features
from prefect import flow, taskfrom datetime import timedelta
@task( name="Extract Data", description="Extract data from API", tags=["extract", "api"], retries=3, # Retry on failure retry_delay_seconds=60, # Wait 60s between retries timeout_seconds=300, # Fail after 5 minutes cache_key_fn=lambda args, kwargs: "extract-data", # Cache results)def extract_data() -> dict: """Extract data with retry and caching.""" import requests response = requests.get('https://api.example.com/data', timeout=30) response.raise_for_status() return response.json()
@task( log_prints=True, # Log print statements)def transform_data(data: dict) -> pd.DataFrame: """Transform data with logging.""" import pandas as pd print(f"Transforming {len(data)} records") df = pd.DataFrame(data) return df
@flow(name="ETL Pipeline")def etl_pipeline(): """ETL pipeline with resilient tasks.""" data = extract_data() df = transform_data(data) print(f"Processed {len(df)} records")Prefect Deployments
Deployment Configuration
from prefect import flow, taskfrom prefect.deployments import Deploymentfrom prefect.orion.schemas.schedules import IntervalSchedule
@flow(name="Scheduled ETL")def scheduled_etl(): """Scheduled ETL flow.""" pass
# Create deploymentdeployment = Deployment.build_from_flow( flow=scheduled_etl, name="Hourly ETL", schedule=IntervalSchedule(interval=timedelta(hours=1)), tags=["etl", "production"], parameters={ "environment": "production", },)
# Apply deploymentdeployment.apply()Docker Deployment
from prefect import flowfrom prefect.deployments import DockerDeploymentfrom prefect.infrastructure import DockerContainer
@flow(name="Containerized Flow")def containerized_flow(): """Flow running in Docker container.""" pass
# Docker deploymentdeployment = DockerDeployment( name="Docker Deployment", flow=containerized_flow, infrastructure=DockerContainer( image="my-company/data-flow:latest", image_pull_policy="always", env={ "DATABASE_URL": "postgresql://...", }, volumes=["./data:/app/data"], ),)Kubernetes Deployment
from prefect import flowfrom prefect.deployments import Deploymentfrom prefect.infrastructure.kubernetes import KubernetesJob
@flow(name="K8s Flow")def k8s_flow(): """Flow running on Kubernetes.""" pass
# Kubernetes deploymentdeployment = Deployment( name="Kubernetes Deployment", flow=k8s_flow, infrastructure=KubernetesJob( namespace="data-platform", image="my-company/data-flow:latest", image_pull_policy="Always", job_name="data-flow-{run_name}", service_account_name="prefect-worker", labels={ "app": "data-flow", "environment": "production", }, annotations={ "sidecar.istio.io/inject": "false", }, env={ "ENVIRONMENT": "production", }, resources={ "request": { "cpu": "500m", "memory": "512Mi", }, "limit": { "cpu": "1000m", "memory": "2Gi", }, }, ),)Prefect State Handling
State Management
from prefect import flow, taskfrom prefect.states import Failed, Completedimport pandas as pd
@taskdef risky_operation() -> pd.DataFrame: """Task that might fail.""" try: import requests response = requests.get('https://api.example.com/data') response.raise_for_status() return pd.DataFrame(response.json()) except Exception as e: # Return failed state raise Failed(message=f"Failed to fetch data: {e}")
@taskdef fallback_operation() -> pd.DataFrame: """Fallback operation.""" import pandas as pd # Use cached data return pd.read_parquet('s3://bucket/cached_data.parquet')
@flow(name="Resilient ETL")def resilient_etl(): """ Flow with state handling. """ try: data = risky_operation() except: # Fallback to cached data data = fallback_operation()
print(f"Using {len(data)} records") return dataConditional Flows
from prefect import flow, taskfrom prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash)def should_process() -> bool: """Check if flow should run.""" import requests response = requests.get('https://api.example.com/check') return response.json()['should_process']
@flow(name="Conditional Flow")def conditional_flow(): """ Flow with conditional execution. """ if should_process().result(): print("Processing...") # Run main flow else: print("Skipping...") # Skip processingPrefect Blocks
Using Blocks
from prefect import flow, taskfrom prefect.blocks.system import JSONfrom prefect.blocks.core import Block
# Create blockdata_lake_config = JSON( value={ "s3_bucket": "my-bucket", "prefix": "data/", "region": "us-east-1", })data_lake_config.save("data-lake-config")
# Use block in flow@taskdef read_data_lake(): """Read from data lake using block.""" config = Block.load("data-lake-config") import boto3 s3 = boto3.client('s3', region_name=config.value['region']) response = s3.list_objects_v2( Bucket=config.value['s3_bucket'], Prefix=config.value['prefix'], ) return response
@flow(name="Block Example")def block_example(): """Flow using blocks.""" read_data_lake()Custom Blocks
from prefect.blocks.core import Blockfrom pydantic import Fieldfrom typing import Any
class DatabaseBlock(Block): """ Custom block for database connections. """ _block_type_name = "database" _logo_url = "https://example.com/logo.png"
connection_string: str = Field(..., description="Database connection string") table_name: str = Field(..., description="Table name")
def query(self, sql: str) -> Any: """Execute query.""" from sqlalchemy import create_engine engine = create_engine(self.connection_string) return engine.execute(sql).fetchall()
def insert(self, data: Any) -> None: """Insert data.""" from sqlalchemy import create_engine import pandas as pd engine = create_engine(self.connection_string) pd.DataFrame(data).to_sql(self.table_name, engine, if_exists='append', index=False)
# Use custom blockdb_block = DatabaseBlock( connection_string="postgresql://user:pass@warehouse/db", table_name="sales",)db_block.save("warehouse-db")
@taskdef load_to_warehouse(data: Any): """Load data to warehouse using block.""" db = Block.load("warehouse-db") db.insert(data)
@flow(name="Custom Block Example")def custom_block_example(): """Flow using custom block.""" load_to_warehouse([{"id": 1, "value": 100}])Prefect Testing
Testing Flows
from prefect import flow, taskfrom prefect.testing.utilities import prefect_test_harnessimport pytest
@taskdef add(x: int, y: int) -> int: """Add two numbers.""" return x + y
@flow(name="Test Flow")def test_flow(x: int, y: int) -> int: """Simple test flow.""" return add(x, y)
# Test flowdef test_test_flow(): """Test flow execution.""" result = test_flow(1, 2) assert result == 3
# Test with statedef test_test_flow_state(): """Test flow state.""" state = test_flow(1, 2) assert state.is_completed() assert state.result() == 3
# Test taskdef test_add_task(): """Test add task.""" state = add(1, 2) assert state.is_completed() assert state.result() == 3Prefect Best Practices
DO
# 1. Use type hints for flows and tasks@flowdef my_flow(x: int) -> int: return x + 1
# 2. Use task features (retry, cache, timeout)@task(retries=3, cache_key_fn=...)def my_task(): pass
# 3. Use blocks for configurationconfig = Block.load("my-config")
# 4. Test flows before deployingdef test_my_flow(): result = my_flow(1) assert result == 2
# 5. Use deployments for productiondeployment = Deployment.build_from_flow(flow=my_flow)DON’T
# 1. Don't ignore state# Check state.is_completed()
# 2. Don't ignore error handling# Use try-except or state handlers
# 3. Don't hardcode configuration# Use blocks or environment variables
# 4. Don't skip testing# Test flows before deploying
# 5. Don't ignore logging# Use log_prints=True or loggerPrefect vs. Airflow vs. Dagster
| Feature | Prefect | Airflow | Dagster |
|---|---|---|---|
| Paradigm | Code-first | Config-first | Data-oriented |
| Type Safety | Python type hints | Dynamic | Python type hints |
| Testing | Easy | Hard | Easy |
| Deployment | Docker/K8s native | Requires setup | Docker/K8s native |
| State Handling | Built-in | Manual | Built-in |
| Maturity | Modern | Mature | Modern |
| Best For | Modern Python, cloud-native | Traditional ETL | ML, analytics |
Key Takeaways
- Code-first: Define workflows in Python
- Resilient: Built-in retry, caching, time limits
- Flexible: Local, Cloud, or hybrid execution
- Deployments: Docker, Kubernetes, serverless
- State Handling: Built-in state management
- Blocks: Reusable configuration
- Testing: Easy to test flows
- Use When: Modern Python, cloud-native, resilience needed
Back to Module 3