Skip to content

Prefect Orchestration Guide

Modern Workflow Orchestration for Data Engineering


Overview

Prefect is a modern workflow orchestration tool that brings reliability to data workflows. Unlike traditional orchestrators, Prefect embraces failure as a normal part of data workflows and provides powerful primitives for handling it.


Prefect Architecture

Key Features:

  • Code-first: Define workflows in Python
  • Resilient: Built-in retry, caching, time limits
  • Flexible: Local, Cloud, or hybrid execution
  • Observability: Real-time monitoring and logging

Prefect Flows

Basic Flow

from prefect import flow, task
import pandas as pd
@task
def extract_sales() -> pd.DataFrame:
"""Extract sales data from source."""
import requests
response = requests.get('https://api.example.com/sales')
return pd.DataFrame(response.json())
@task
def transform_sales(sales_df: pd.DataFrame) -> pd.DataFrame:
"""Transform sales data."""
sales_df['sale_date'] = pd.to_datetime(sales_df['sale_date'])
sales_df = sales_df[sales_df['amount'] > 0]
return sales_df
@task
def load_to_warehouse(sales_df: pd.DataFrame) -> None:
"""Load sales data to warehouse."""
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@warehouse/db')
sales_df.to_sql('sales', engine, if_exists='append', index=False)
@flow(name="Daily Sales ETL")
def daily_sales_etl():
"""
Daily sales ETL flow.
"""
sales_data = extract_sales()
transformed_data = transform_sales(sales_data)
load_to_warehouse(transformed_data)
# Run flow
if __name__ == "__main__":
daily_sales_etl()

Task Features

from prefect import flow, task
from datetime import timedelta
@task(
name="Extract Data",
description="Extract data from API",
tags=["extract", "api"],
retries=3, # Retry on failure
retry_delay_seconds=60, # Wait 60s between retries
timeout_seconds=300, # Fail after 5 minutes
cache_key_fn=lambda args, kwargs: "extract-data", # Cache results
)
def extract_data() -> dict:
"""Extract data with retry and caching."""
import requests
response = requests.get('https://api.example.com/data', timeout=30)
response.raise_for_status()
return response.json()
@task(
log_prints=True, # Log print statements
)
def transform_data(data: dict) -> pd.DataFrame:
"""Transform data with logging."""
import pandas as pd
print(f"Transforming {len(data)} records")
df = pd.DataFrame(data)
return df
@flow(name="ETL Pipeline")
def etl_pipeline():
"""ETL pipeline with resilient tasks."""
data = extract_data()
df = transform_data(data)
print(f"Processed {len(df)} records")

Prefect Deployments

Deployment Configuration

from prefect import flow, task
from prefect.deployments import Deployment
from prefect.orion.schemas.schedules import IntervalSchedule
@flow(name="Scheduled ETL")
def scheduled_etl():
"""Scheduled ETL flow."""
pass
# Create deployment
deployment = Deployment.build_from_flow(
flow=scheduled_etl,
name="Hourly ETL",
schedule=IntervalSchedule(interval=timedelta(hours=1)),
tags=["etl", "production"],
parameters={
"environment": "production",
},
)
# Apply deployment
deployment.apply()

Docker Deployment

from prefect import flow
from prefect.deployments import DockerDeployment
from prefect.infrastructure import DockerContainer
@flow(name="Containerized Flow")
def containerized_flow():
"""Flow running in Docker container."""
pass
# Docker deployment
deployment = DockerDeployment(
name="Docker Deployment",
flow=containerized_flow,
infrastructure=DockerContainer(
image="my-company/data-flow:latest",
image_pull_policy="always",
env={
"DATABASE_URL": "postgresql://...",
},
volumes=["./data:/app/data"],
),
)

Kubernetes Deployment

from prefect import flow
from prefect.deployments import Deployment
from prefect.infrastructure.kubernetes import KubernetesJob
@flow(name="K8s Flow")
def k8s_flow():
"""Flow running on Kubernetes."""
pass
# Kubernetes deployment
deployment = Deployment(
name="Kubernetes Deployment",
flow=k8s_flow,
infrastructure=KubernetesJob(
namespace="data-platform",
image="my-company/data-flow:latest",
image_pull_policy="Always",
job_name="data-flow-{run_name}",
service_account_name="prefect-worker",
labels={
"app": "data-flow",
"environment": "production",
},
annotations={
"sidecar.istio.io/inject": "false",
},
env={
"ENVIRONMENT": "production",
},
resources={
"request": {
"cpu": "500m",
"memory": "512Mi",
},
"limit": {
"cpu": "1000m",
"memory": "2Gi",
},
},
),
)

Prefect State Handling

State Management

from prefect import flow, task
from prefect.states import Failed, Completed
import pandas as pd
@task
def risky_operation() -> pd.DataFrame:
"""Task that might fail."""
try:
import requests
response = requests.get('https://api.example.com/data')
response.raise_for_status()
return pd.DataFrame(response.json())
except Exception as e:
# Return failed state
raise Failed(message=f"Failed to fetch data: {e}")
@task
def fallback_operation() -> pd.DataFrame:
"""Fallback operation."""
import pandas as pd
# Use cached data
return pd.read_parquet('s3://bucket/cached_data.parquet')
@flow(name="Resilient ETL")
def resilient_etl():
"""
Flow with state handling.
"""
try:
data = risky_operation()
except:
# Fallback to cached data
data = fallback_operation()
print(f"Using {len(data)} records")
return data

Conditional Flows

from prefect import flow, task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash)
def should_process() -> bool:
"""Check if flow should run."""
import requests
response = requests.get('https://api.example.com/check')
return response.json()['should_process']
@flow(name="Conditional Flow")
def conditional_flow():
"""
Flow with conditional execution.
"""
if should_process().result():
print("Processing...")
# Run main flow
else:
print("Skipping...")
# Skip processing

Prefect Blocks

Using Blocks

from prefect import flow, task
from prefect.blocks.system import JSON
from prefect.blocks.core import Block
# Create block
data_lake_config = JSON(
value={
"s3_bucket": "my-bucket",
"prefix": "data/",
"region": "us-east-1",
}
)
data_lake_config.save("data-lake-config")
# Use block in flow
@task
def read_data_lake():
"""Read from data lake using block."""
config = Block.load("data-lake-config")
import boto3
s3 = boto3.client('s3', region_name=config.value['region'])
response = s3.list_objects_v2(
Bucket=config.value['s3_bucket'],
Prefix=config.value['prefix'],
)
return response
@flow(name="Block Example")
def block_example():
"""Flow using blocks."""
read_data_lake()

Custom Blocks

from prefect.blocks.core import Block
from pydantic import Field
from typing import Any
class DatabaseBlock(Block):
"""
Custom block for database connections.
"""
_block_type_name = "database"
_logo_url = "https://example.com/logo.png"
connection_string: str = Field(..., description="Database connection string")
table_name: str = Field(..., description="Table name")
def query(self, sql: str) -> Any:
"""Execute query."""
from sqlalchemy import create_engine
engine = create_engine(self.connection_string)
return engine.execute(sql).fetchall()
def insert(self, data: Any) -> None:
"""Insert data."""
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine(self.connection_string)
pd.DataFrame(data).to_sql(self.table_name, engine, if_exists='append', index=False)
# Use custom block
db_block = DatabaseBlock(
connection_string="postgresql://user:pass@warehouse/db",
table_name="sales",
)
db_block.save("warehouse-db")
@task
def load_to_warehouse(data: Any):
"""Load data to warehouse using block."""
db = Block.load("warehouse-db")
db.insert(data)
@flow(name="Custom Block Example")
def custom_block_example():
"""Flow using custom block."""
load_to_warehouse([{"id": 1, "value": 100}])

Prefect Testing

Testing Flows

from prefect import flow, task
from prefect.testing.utilities import prefect_test_harness
import pytest
@task
def add(x: int, y: int) -> int:
"""Add two numbers."""
return x + y
@flow(name="Test Flow")
def test_flow(x: int, y: int) -> int:
"""Simple test flow."""
return add(x, y)
# Test flow
def test_test_flow():
"""Test flow execution."""
result = test_flow(1, 2)
assert result == 3
# Test with state
def test_test_flow_state():
"""Test flow state."""
state = test_flow(1, 2)
assert state.is_completed()
assert state.result() == 3
# Test task
def test_add_task():
"""Test add task."""
state = add(1, 2)
assert state.is_completed()
assert state.result() == 3

Prefect Best Practices

DO

# 1. Use type hints for flows and tasks
@flow
def my_flow(x: int) -> int:
return x + 1
# 2. Use task features (retry, cache, timeout)
@task(retries=3, cache_key_fn=...)
def my_task():
pass
# 3. Use blocks for configuration
config = Block.load("my-config")
# 4. Test flows before deploying
def test_my_flow():
result = my_flow(1)
assert result == 2
# 5. Use deployments for production
deployment = Deployment.build_from_flow(flow=my_flow)

DON’T

# 1. Don't ignore state
# Check state.is_completed()
# 2. Don't ignore error handling
# Use try-except or state handlers
# 3. Don't hardcode configuration
# Use blocks or environment variables
# 4. Don't skip testing
# Test flows before deploying
# 5. Don't ignore logging
# Use log_prints=True or logger

Prefect vs. Airflow vs. Dagster

FeaturePrefectAirflowDagster
ParadigmCode-firstConfig-firstData-oriented
Type SafetyPython type hintsDynamicPython type hints
TestingEasyHardEasy
DeploymentDocker/K8s nativeRequires setupDocker/K8s native
State HandlingBuilt-inManualBuilt-in
MaturityModernMatureModern
Best ForModern Python, cloud-nativeTraditional ETLML, analytics

Key Takeaways

  1. Code-first: Define workflows in Python
  2. Resilient: Built-in retry, caching, time limits
  3. Flexible: Local, Cloud, or hybrid execution
  4. Deployments: Docker, Kubernetes, serverless
  5. State Handling: Built-in state management
  6. Blocks: Reusable configuration
  7. Testing: Easy to test flows
  8. Use When: Modern Python, cloud-native, resilience needed

Back to Module 3