Databricks Guide
Lakehouse Platform for Data Engineering
Overview
Databricks is a unified data analytics platform that combines data warehouses and data lakes into a “Lakehouse” architecture. It provides a managed Apache Spark service, collaborative notebooks, Delta Lake for ACID transactions, and MLflow for machine learning.
Databricks Architecture
Lakehouse Architecture
Key Innovations:
- Delta Lake: ACID transactions on data lake
- Unity Catalog: Unified governance
- Photon Engine: Optimized query engine
- Serverless: No cluster management
- ML Integration: Native MLflow support
Databricks Compute
Cluster Modes
Cluster Configuration:
# Serverless cluster (recommended)serverless_compute = "serverless" # No configuration needed
# Classic cluster configurationcluster_config = { "cluster_name": "data-engineering-cluster", "spark_version": "14.3.x-scala2.12", "node_type_id": "i3.xlarge", # 1 vCPU, 30.5 GB RAM, 950 GB SSD "num_workers": 4, # 4 worker nodes "autoscale": { "min_workers": 2, "max_workers": 8 }, "enable_elastic_disk": True, # Auto-scale storage "enable_spot_instances": True, # Use spot instances (60-80% cheaper) "spot_bid_max_price": 100, # Max price as % of on-demand "runtime_engine": "PHOTON", # Photon for faster queries "data_security_mode": "SINGLE_USER" # or "ISOLATION" for shared clusters}Cluster Policies
# Create cluster policy for teampolicy = { "name": "data-engineering-policy", "spark_version": { "type": "fixed", "value": "14.3.x-scala2.12" }, "node_type_id": { "type": "allowlist", "value": ["i3.xlarge", "i3.2xlarge", "i3.4xlarge"] }, "autoscale": { "min_workers": 2, "max_workers": 10 }, "enable_spot_instances": True, "runtime_engine": "PHOTON"}
# Benefits:# - Standardize cluster configurations# - Control costs# - Ensure best practicesDelta Lake
ACID Transactions
from delta.tables import DeltaTablefrom pyspark.sql.functions import col, current_timestamp
# Create Delta tablespark.sql("""CREATE TABLE sales ( sale_id BIGINT, customer_id BIGINT, product_id BIGINT, sale_date DATE, amount DECIMAL(18,2), updated_at TIMESTAMP)USING DELTALOCATION 's3://bucket/delta/sales/'""")
# Upsert (MERGE)DeltaTable.forPath(spark, "s3://bucket/delta/sales/").alias("target") \ .merge( new_sales.alias("source"), "target.sale_id = source.sale_id" ) \ .whenMatchedUpdateAll() \ .whenNotMatchedInsertAll() \ .execute()
# Time Travelspark.read \ .format("delta") \ .option("versionAsOf", 12345) \ # Version number .load("s3://bucket/delta/sales/") \ .show()
# Time Travel by timestampspark.read \ .format("delta") \ .option("timestampAsOf", "2025-01-27 10:00:00") \ .load("s3://bucket/delta/sales/") \ .show()
# Benefits:# - ACID transactions (CRUD operations)# - Time Travel (query historical versions)# - Schema enforcement (prevent bad data)# - Upsert support (MERGE)Schema Evolution
# Schema enforcement (default)# Prevents data with wrong schema from being writtensales_df.write \ .format("delta") \ .mode("append") \ .option("mergeSchema", "false") \ # Enforce schema .save("s3://bucket/delta/sales/")
# Schema evolution (allow new columns)sales_df.withColumn("new_column", col("amount") * 1.1) \ .write \ .format("delta") \ .mode("overwrite") \ .option("overwriteSchema", "true") \ .option("mergeSchema", "true") \ # Allow schema evolution .save("s3://bucket/delta/sales/")
# Benefits:# - Prevent bad data (schema enforcement)# - Evolve schema over time (schema evolution)# - No data corruptionDatabricks SQL
Data Warehouse Queries
-- Databricks SQL: SQL endpoint for BI tools-- Fast queries on Delta Lake tables
-- Query Delta LakeSELECT customer_id, COUNT(*) as sales_count, SUM(amount) as total_amount, AVG(amount) as avg_amountFROM salesWHERE sale_date >= '2025-01-01'GROUP BY customer_idORDER BY total_amount DESC;
-- Create view for BI toolsCREATE VIEW customer_daily_sales ASSELECT customer_id, sale_date, COUNT(*) as sales_count, SUM(amount) as total_amountFROM salesGROUP BY customer_id, sale_date;
-- Benefits:# - SQL interface for data analysts# - Integration with BI tools (Tableau, Power BI)# - Fast queries (Photon engine)# - Lakehouse: Data lake + warehouse in oneUnity Catalog
Unified Governance
# Unity Catalog: Unified governance for all data# Benefits:# - Fine-grained access control# - Data lineage# - Auditing# - Multi-cloud support
# Create catalogspark.sql("CREATE CATALOG sales_catalog")
# Create schemaspark.sql("CREATE SCHEMA sales_catalog.analytics")
# Grant privilegesspark.sql("GRANT USAGE ON CATALOG sales_catalog TO GROUP sales_team")spark.sql("GRANT CREATE ON SCHEMA sales_catalog.analytics TO GROUP sales_team")spark.sql("GRANT SELECT ON TABLE sales_catalog.analytics.sales TO GROUP sales_analysts")
# Benefits:# - Centralized governance# - Fine-grained permissions# - Auditing and complianceDatabricks Workflows
Orchestration
from databricks.workflow import workflow
# Create workflow (formerly Jobs)@workflow(name="daily_sales_etl")def daily_sales_etl(): # Task 1: Extract from source extract_task = spark.read \ .format("csv") \ .option("header", "true") \ .load("s3://bucket/source/sales/")
# Task 2: Transform transform_task = extract_task \ .withColumn("processed_at", current_timestamp()) \ .filter(col("amount") > 0)
# Task 3: Load to Delta Lake transform_task.write \ .format("delta") \ .mode("overwrite") \ .partitionBy("sale_date") \ .save("s3://bucket/delta/sales/")
# Task 4: Update materialized view spark.sql("REFRESH MATERIALIZED VIEW customer_daily_sales")
# Schedule workflow# Runs daily at 12:00 AM UTCDatabricks MLflow
ML Experiment Tracking
import mlflowimport mlflow.spark
# Log ML modelwith mlflow.start_run(): # Train model model = spark RandomForestClassifier \ .setLabelCol("churned") \ .setFeaturesCol("features") \ .fit(training_data)
# Log parameters mlflow.log_param("max_depth", 10) mlflow.log_param("num_trees", 100)
# Log metrics mlflow.log_metric("accuracy", 0.95) mlflow.log_metric("precision", 0.93)
# Log model mlflow.spark.log_model(model, "model")
# Load model for inferenceloaded_model = mlflow.spark.load_model("runs:/<run_id>/model")predictions = loaded_model.transform(test_data)
# Benefits:# - Experiment tracking# - Model versioning# - Model deployment# - Integration with DatabricksDatabricks Cost Optimization
Cluster Optimization
# 1. Use serverless clusters (recommended)# No cluster management, auto-scaling, pay for what you use
# 2. Use spot instances (60-80% cheaper)cluster_config = { "enable_spot_instances": True, "spot_bid_max_price": 80, # 80% of on-demand price "spot_instance_recovery": True # Auto-recover on spot termination}
# 3. Use auto-terminationcluster_config = { "autotermination_minutes": 30 # Auto-terminate after 30 minutes idle}
# 4. Use Photon for faster queries (lower cost per query)cluster_config = { "runtime_engine": "PHOTON" # 2-4x faster queries}
# 5. Use Delta Lake for data skipping# Partition and cluster Delta tables for faster queriesspark.sql("""OPTIMIZE salesZORDER BY (customer_id, sale_date)""")Storage Optimization
# 1. Use Delta Lake OPTIMIZEspark.sql("OPTIMIZE sales") # Compact small files
# 2. Use Z-ORDER for data skippingspark.sql("OPTIMIZE sales ZORDER BY (customer_id, sale_date)")
# 3. Vacuum old versionsspark.sql("VACUUM sales RETAIN 168 HOURS") # Keep 7 days
# 4. Use column pruning# Select only needed columns
# 5. Use partitioningsales_df.write \ .format("delta") \ .partitionBy("sale_date") \ # Partition pruning .save("s3://bucket/delta/sales/")Databricks Monitoring
Cluster Metrics
# Get cluster metricsfrom databricks_cli.clusters.api import ClustersApi
clusters_api = ClustersApi()
# Get cluster statuscluster_id = "1234-567890-abc123"status = clusters_api.get_cluster_status(cluster_id)
# Get cluster metricsmetrics = clusters_api.get_cluster_metrics(cluster_id)
# Key metrics:# - CPU utilization# - Memory usage# - Disk I/O# - Network I/O# - GPU utilization (if GPU cluster)Query Metrics
-- Query history for SQL endpointsSELECT user_id, statement_text, start_time, end_time, duration_ms, rows_produced, bytes_scannedFROM system.query.historyWHERE start_time >= '2025-01-27'ORDER BY duration_ms DESCLIMIT 100;Databricks vs. Alternatives
| Feature | Databricks | Snowflake | BigQuery |
|---|---|---|---|
| Architecture | Lakehouse | Warehouse | Warehouse |
| Storage | S3/ADLS/GCS | Managed | Managed |
| Format | Delta (Parquet) | Proprietary | Capacitor |
| Compute | Spark | Proprietary | Dremel |
| ML Support | Native (MLflow) | Limited | BigQuery ML |
| Best For | ELT, ML, lakehouse | Data sharing | Serverless, cost |
Best Practices
DO
# 1. Use Delta Lake for all tablesdf.write.format("delta").save("s3://bucket/delta/table/")
# 2. Use partitioning and Z-ORDERdf.write.format("delta").partitionBy("date").save("s3://bucket/delta/table/")spark.sql("OPTIMIZE table ZORDER BY (customer_id)")
# 3. Use Photon for SQL workloadscluster_config = {"runtime_engine": "PHOTON"}
# 4. Use serverless clusterscompute = "serverless"
# 5. Use Unity Catalog for governancespark.sql("GRANT SELECT ON TABLE sales TO GROUP analysts")DON’T
# 1. Don't use CSV/JSON for production# Use Delta Lake instead
# 2. Don't ignore OPTIMIZE and VACUUM# Run regularly for performance
# 3. Don't use large clusters for small workloads# Use auto-scaling
# 4. Don't forget to auto-terminate clusters# Auto-terminate idle clusters
# 5. Don't use SELECT * for large tables# Select only needed columnsKey Takeaways
- Lakehouse: Data lake + warehouse in one platform
- Delta Lake: ACID transactions, time travel, schema enforcement
- Unity Catalog: Unified governance for all data
- Serverless: No cluster management, auto-scaling
- Photon: 2-4x faster queries
- MLflow: Native ML experiment tracking
- Workflows: Orchestration for ETL pipelines
- Cost: Use spot instances, serverless, optimize Delta tables
Back to Module 3