Kubernetes Orchestration for Data
Cloud-Native Orchestration with Kubernetes
Overview
Kubernetes Operators enable cloud-native orchestration of data workloads. Instead of running orchestration tools (Airflow, Dagster) on static VMs, Kubernetes Operators leverage Kubernetes for scalable, resilient execution.
Kubernetes Operators for Orchestration
Apache Airflow Operator
apiVersion: apps/v1kind: Deploymentmetadata: name: airflow-operator namespace: airflowspec: replicas: 1 selector: matchLabels: app: airflow-operator template: metadata: labels: app: airflow-operator spec: serviceAccountName: airflow-operator containers: - name: airflow-operator image: apache/airflow:2.7.0-python3.10 command: ["airflow", "scheduler"] env: - name: AIRFLOW__CORE__DAGS_FOLDER value: "/opt/airflow/dags" - name: AIRFLOW__CORE__EXECUTOR value: "KubernetesExecutor" - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: connection-string - name: AIRFLOW__KUBERNETES__NAMESPACE value: "airflow" - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME value: "airflow-worker" - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY value: "apache/airflow" - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG value: "2.7.0-python3.10" - name: AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE value: "/opt/airflow/pod_templates/pod_template.yaml" volumeMounts: - name: dags mountPath: /opt/airflow/dags - name: logs mountPath: /opt/airflow/logs - name: pod-templates mountPath: /opt/airflow/pod_templates volumes: - name: dags persistentVolumeClaim: claimName: airflow-dags - name: logs persistentVolumeClaim: claimName: airflow-logs - name: pod-templates configMap: name: airflow-pod-templates
---apiVersion: v1kind: ServiceAccountmetadata: name: airflow-operator namespace: airflow
---apiVersion: rbac.authorization.k8s.io/v1kind: Rolemetadata: name: airflow-operator namespace: airflowrules:- apiGroups: [""] resources: ["pods", "pods/log", "pods/exec"] verbs: ["get", "list", "watch", "create", "update", "delete"]- apiGroups: [""] resources: ["secrets", "configmaps"] verbs: ["get", "list", "watch", "create", "update", "delete"]- apiGroups: ["batch"] resources: ["jobs"] verbs: ["get", "list", "watch", "create", "update", "delete"]
---apiVersion: rbac.authorization.k8s.io/v1kind: RoleBindingmetadata: name: airflow-operator namespace: airflowroleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: airflow-operatorsubjects:- kind: ServiceAccount name: airflow-operator namespace: airflowAirflow Pod Template
apiVersion: v1kind: Podmetadata: labels: app: airflow-workerspec: serviceAccountName: airflow-worker restartPolicy: Never containers: - name: base image: apache/airflow:2.7.0-python3.10 env: - name: AIRFLOW__CORE__EXECUTOR value: "KubernetesExecutor" - name: AIRFLOW__CORE__DAGS_FOLDER value: "/opt/airflow/dags" resources: requests: cpu: "500m" memory: "512Mi" limits: cpu: "1000m" memory: "2Gi" volumeMounts: - name: dags mountPath: /opt/airflow/dags - name: logs mountPath: /opt/airflow/logs volumes: - name: dags persistentVolumeClaim: claimName: airflow-dags - name: logs persistentVolumeClaim: claimName: airflow-logsDagster Kubernetes Assets
from dagster import asset, Definitionsfrom dagster_k8s import DagsterK8sLauncherfrom dagster_k8s.launcher import K8sRunLauncher
@assetdef my_asset() -> str: """Asset running on Kubernetes.""" return "Hello from Kubernetes!"
# Define Kubernetes launcherlauncher = K8sRunLauncher( image="my-company/dagster:latest", service_account_name="dagster-worker", namespace="dagster", job_image_pull_policy="Always", env={ "DATABASE_URL": "postgresql://...", },)
# Define with Kubernetes launcherdefs = Definitions( assets=[my_asset], launcher=launcher,)Prefect Kubernetes Infrastructure
from prefect import flowfrom prefect.infrastructure.kubernetes import KubernetesJob
# Define Kubernetes infrastructurek8s_job = KubernetesJob( namespace="prefect", image="my-company/prefect-flow:latest", image_pull_policy="Always", job_name="prefect-flow-{run_name}", service_account_name="prefect-worker", labels={ "app": "prefect-flow", "environment": "production", }, annotations={ "sidecar.istio.io/inject": "false", }, env={ "ENVIRONMENT": "production", "DATABASE_URL": "postgresql://...", }, resources={ "request": { "cpu": "500m", "memory": "512Mi", }, "limit": { "cpu": "1000m", "memory": "2Gi", }, },)
@flow(name="K8s Flow", infrastructure=k8s_job)def my_flow(): """Flow running on Kubernetes.""" print("Running on Kubernetes!")Kubernetes CronJobs
Native Scheduling
apiVersion: batch/v1kind: CronJobmetadata: name: daily-etl-job namespace: data-platformspec: schedule: "0 0 * * *" # Daily at midnight UTC successfulJobsHistoryLimit: 3 failedJobsHistoryLimit: 3 concurrencyPolicy: "Forbid" # Don't run if previous job still running jobTemplate: spec: template: spec: serviceAccountName: etl-job restartPolicy: OnFailure containers: - name: etl image: my-company/etl-job:latest env: - name: ENVIRONMENT value: "production" - name: DATABASE_URL valueFrom: secretKeyRef: name: database-secrets key: connection-string resources: requests: cpu: "500m" memory: "512Mi" limits: cpu: "1000m" memory: "2Gi" volumeMounts: - name: data mountPath: /app/data volumes: - name: data persistentVolumeClaim: claimName: etl-dataKubernetes Resources
Resource Management
apiVersion: v1kind: ResourceQuotametadata: name: data-platform-quota namespace: data-platformspec: hard: requests.cpu: "10" requests.memory: "20Gi" limits.cpu: "20" limits.memory: "40Gi" persistentvolumeclaims: "10" requests.storage: "100Gi"
---apiVersion: v1kind: LimitRangemetadata: name: data-platform-limits namespace: data-platformspec: limits: - max: cpu: "2" memory: "4Gi" min: cpu: "100m" memory: "128Mi" default: cpu: "500m" memory: "512Mi" defaultRequest: cpu: "250m" memory: "256Mi" type: ContainerKubernetes Monitoring
Prometheus Metrics
apiVersion: v1kind: ConfigMapmetadata: name: prometheus-config namespace: data-platformdata: prometheus.yml: | global: scrape_interval: 15s evaluation_interval: 15s
scrape_configs: - job_name: 'airflow' kubernetes_sd_configs: - role: pod namespaces: names: - airflow relabel_configs: - source_labels: [__meta_kubernetes_pod_label_app] action: keep regex: airflow.*
- job_name: 'dagster' kubernetes_sd_configs: - role: pod namespaces: names: - dagster relabel_configs: - source_labels: [__meta_kubernetes_pod_label_app] action: keep regex: dagster.*
- job_name: 'prefect' kubernetes_sd_configs: - role: pod namespaces: names: - prefect relabel_configs: - source_labels: [__meta_kubernetes_pod_label_app] action: keep regex: prefect.*Kubernetes Best Practices
DO
# 1. Use resource limitsresources: requests: cpu: "500m" memory: "512Mi" limits: cpu: "1000m" memory: "2Gi"
# 2. Use service accountsserviceAccountName: airflow-worker
# 3. Use secrets for credentialsenv:- name: DATABASE_URL valueFrom: secretKeyRef: name: database-secrets key: connection-string
# 4. Use persistent volumes for datavolumes:- name: data persistentVolumeClaim: claimName: etl-data
# 5. Use resource quotasResourceQuota for namespaceDON’T
# 1. Don't ignore resource limits# Always set requests and limits
# 2. Don't use latest image tag# Use specific versionsimage: my-app:v1.0.0 # Goodimage: my-app:latest # Bad
# 3. Don't hardcode credentials# Use secrets
# 4. Don't ignore pod disruption budgets# Set PDB for availability
# 5. Don't run as root# Use security contextsKubernetes vs. VMs
| Feature | Kubernetes | VMs |
|---|---|---|
| Scaling | Auto-scaling | Manual |
| Resilience | Self-healing | Manual recovery |
| Resource Utilization | High | Low |
| Complexity | High | Low |
| Cost | Lower (efficient) | Higher (over-provisioned) |
| Best For | Cloud-native, microservices | Traditional workloads |
Key Takeaways
- Operators: Airflow, Dagster, Prefect K8s operators
- Pod Templates: Customize task execution
- CronJobs: Native Kubernetes scheduling
- Resources: Set limits and requests
- Secrets: Use K8s secrets for credentials
- Monitoring: Prometheus metrics
- Scaling: Auto-scaling based on load
- Use When: Cloud-native, scalable orchestration needed
Back to Module 3