Skip to content

Kubernetes Orchestration for Data

Cloud-Native Orchestration with Kubernetes


Overview

Kubernetes Operators enable cloud-native orchestration of data workloads. Instead of running orchestration tools (Airflow, Dagster) on static VMs, Kubernetes Operators leverage Kubernetes for scalable, resilient execution.


Kubernetes Operators for Orchestration

Apache Airflow Operator

airflow-operator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-operator
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: airflow-operator
template:
metadata:
labels:
app: airflow-operator
spec:
serviceAccountName: airflow-operator
containers:
- name: airflow-operator
image: apache/airflow:2.7.0-python3.10
command: ["airflow", "scheduler"]
env:
- name: AIRFLOW__CORE__DAGS_FOLDER
value: "/opt/airflow/dags"
- name: AIRFLOW__CORE__EXECUTOR
value: "KubernetesExecutor"
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: connection-string
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: "airflow"
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: "airflow-worker"
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: "apache/airflow"
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: "2.7.0-python3.10"
- name: AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE
value: "/opt/airflow/pod_templates/pod_template.yaml"
volumeMounts:
- name: dags
mountPath: /opt/airflow/dags
- name: logs
mountPath: /opt/airflow/logs
- name: pod-templates
mountPath: /opt/airflow/pod_templates
volumes:
- name: dags
persistentVolumeClaim:
claimName: airflow-dags
- name: logs
persistentVolumeClaim:
claimName: airflow-logs
- name: pod-templates
configMap:
name: airflow-pod-templates
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: airflow-operator
namespace: airflow
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: airflow-operator
namespace: airflow
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "pods/exec"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: [""]
resources: ["secrets", "configmaps"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-operator
namespace: airflow
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: airflow-operator
subjects:
- kind: ServiceAccount
name: airflow-operator
namespace: airflow

Airflow Pod Template

airflow-pod-template.yaml
apiVersion: v1
kind: Pod
metadata:
labels:
app: airflow-worker
spec:
serviceAccountName: airflow-worker
restartPolicy: Never
containers:
- name: base
image: apache/airflow:2.7.0-python3.10
env:
- name: AIRFLOW__CORE__EXECUTOR
value: "KubernetesExecutor"
- name: AIRFLOW__CORE__DAGS_FOLDER
value: "/opt/airflow/dags"
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "2Gi"
volumeMounts:
- name: dags
mountPath: /opt/airflow/dags
- name: logs
mountPath: /opt/airflow/logs
volumes:
- name: dags
persistentVolumeClaim:
claimName: airflow-dags
- name: logs
persistentVolumeClaim:
claimName: airflow-logs

Dagster Kubernetes Assets

dagster_k8s_assets.py
from dagster import asset, Definitions
from dagster_k8s import DagsterK8sLauncher
from dagster_k8s.launcher import K8sRunLauncher
@asset
def my_asset() -> str:
"""Asset running on Kubernetes."""
return "Hello from Kubernetes!"
# Define Kubernetes launcher
launcher = K8sRunLauncher(
image="my-company/dagster:latest",
service_account_name="dagster-worker",
namespace="dagster",
job_image_pull_policy="Always",
env={
"DATABASE_URL": "postgresql://...",
},
)
# Define with Kubernetes launcher
defs = Definitions(
assets=[my_asset],
launcher=launcher,
)

Prefect Kubernetes Infrastructure

prefect_k8s_infrastructure.py
from prefect import flow
from prefect.infrastructure.kubernetes import KubernetesJob
# Define Kubernetes infrastructure
k8s_job = KubernetesJob(
namespace="prefect",
image="my-company/prefect-flow:latest",
image_pull_policy="Always",
job_name="prefect-flow-{run_name}",
service_account_name="prefect-worker",
labels={
"app": "prefect-flow",
"environment": "production",
},
annotations={
"sidecar.istio.io/inject": "false",
},
env={
"ENVIRONMENT": "production",
"DATABASE_URL": "postgresql://...",
},
resources={
"request": {
"cpu": "500m",
"memory": "512Mi",
},
"limit": {
"cpu": "1000m",
"memory": "2Gi",
},
},
)
@flow(name="K8s Flow", infrastructure=k8s_job)
def my_flow():
"""Flow running on Kubernetes."""
print("Running on Kubernetes!")

Kubernetes CronJobs

Native Scheduling

kubernetes-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: daily-etl-job
namespace: data-platform
spec:
schedule: "0 0 * * *" # Daily at midnight UTC
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
concurrencyPolicy: "Forbid" # Don't run if previous job still running
jobTemplate:
spec:
template:
spec:
serviceAccountName: etl-job
restartPolicy: OnFailure
containers:
- name: etl
image: my-company/etl-job:latest
env:
- name: ENVIRONMENT
value: "production"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: database-secrets
key: connection-string
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "2Gi"
volumeMounts:
- name: data
mountPath: /app/data
volumes:
- name: data
persistentVolumeClaim:
claimName: etl-data

Kubernetes Resources

Resource Management

resource-quota.yaml
apiVersion: v1
kind: ResourceQuota
metadata:
name: data-platform-quota
namespace: data-platform
spec:
hard:
requests.cpu: "10"
requests.memory: "20Gi"
limits.cpu: "20"
limits.memory: "40Gi"
persistentvolumeclaims: "10"
requests.storage: "100Gi"
---
apiVersion: v1
kind: LimitRange
metadata:
name: data-platform-limits
namespace: data-platform
spec:
limits:
- max:
cpu: "2"
memory: "4Gi"
min:
cpu: "100m"
memory: "128Mi"
default:
cpu: "500m"
memory: "512Mi"
defaultRequest:
cpu: "250m"
memory: "256Mi"
type: Container

Kubernetes Monitoring

Prometheus Metrics

prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
namespace: data-platform
data:
prometheus.yml: |
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'airflow'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- airflow
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
action: keep
regex: airflow.*
- job_name: 'dagster'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- dagster
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
action: keep
regex: dagster.*
- job_name: 'prefect'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- prefect
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
action: keep
regex: prefect.*

Kubernetes Best Practices

DO

# 1. Use resource limits
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "2Gi"
# 2. Use service accounts
serviceAccountName: airflow-worker
# 3. Use secrets for credentials
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: database-secrets
key: connection-string
# 4. Use persistent volumes for data
volumes:
- name: data
persistentVolumeClaim:
claimName: etl-data
# 5. Use resource quotas
ResourceQuota for namespace

DON’T

# 1. Don't ignore resource limits
# Always set requests and limits
# 2. Don't use latest image tag
# Use specific versions
image: my-app:v1.0.0 # Good
image: my-app:latest # Bad
# 3. Don't hardcode credentials
# Use secrets
# 4. Don't ignore pod disruption budgets
# Set PDB for availability
# 5. Don't run as root
# Use security contexts

Kubernetes vs. VMs

FeatureKubernetesVMs
ScalingAuto-scalingManual
ResilienceSelf-healingManual recovery
Resource UtilizationHighLow
ComplexityHighLow
CostLower (efficient)Higher (over-provisioned)
Best ForCloud-native, microservicesTraditional workloads

Key Takeaways

  1. Operators: Airflow, Dagster, Prefect K8s operators
  2. Pod Templates: Customize task execution
  3. CronJobs: Native Kubernetes scheduling
  4. Resources: Set limits and requests
  5. Secrets: Use K8s secrets for credentials
  6. Monitoring: Prometheus metrics
  7. Scaling: Auto-scaling based on load
  8. Use When: Cloud-native, scalable orchestration needed

Back to Module 3