Skip to content

Databricks Guide

Lakehouse Platform for Data Engineering


Overview

Databricks is a unified data analytics platform that combines data warehouses and data lakes into a “Lakehouse” architecture. It provides a managed Apache Spark service, collaborative notebooks, Delta Lake for ACID transactions, and MLflow for machine learning.


Databricks Architecture

Lakehouse Architecture

Key Innovations:

  • Delta Lake: ACID transactions on data lake
  • Unity Catalog: Unified governance
  • Photon Engine: Optimized query engine
  • Serverless: No cluster management
  • ML Integration: Native MLflow support

Databricks Compute

Cluster Modes

Cluster Configuration:

# Serverless cluster (recommended)
serverless_compute = "serverless" # No configuration needed
# Classic cluster configuration
cluster_config = {
"cluster_name": "data-engineering-cluster",
"spark_version": "14.3.x-scala2.12",
"node_type_id": "i3.xlarge", # 1 vCPU, 30.5 GB RAM, 950 GB SSD
"num_workers": 4, # 4 worker nodes
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"enable_elastic_disk": True, # Auto-scale storage
"enable_spot_instances": True, # Use spot instances (60-80% cheaper)
"spot_bid_max_price": 100, # Max price as % of on-demand
"runtime_engine": "PHOTON", # Photon for faster queries
"data_security_mode": "SINGLE_USER" # or "ISOLATION" for shared clusters
}

Cluster Policies

# Create cluster policy for team
policy = {
"name": "data-engineering-policy",
"spark_version": {
"type": "fixed",
"value": "14.3.x-scala2.12"
},
"node_type_id": {
"type": "allowlist",
"value": ["i3.xlarge", "i3.2xlarge", "i3.4xlarge"]
},
"autoscale": {
"min_workers": 2,
"max_workers": 10
},
"enable_spot_instances": True,
"runtime_engine": "PHOTON"
}
# Benefits:
# - Standardize cluster configurations
# - Control costs
# - Ensure best practices

Delta Lake

ACID Transactions

from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp
# Create Delta table
spark.sql("""
CREATE TABLE sales (
sale_id BIGINT,
customer_id BIGINT,
product_id BIGINT,
sale_date DATE,
amount DECIMAL(18,2),
updated_at TIMESTAMP
)
USING DELTA
LOCATION 's3://bucket/delta/sales/'
""")
# Upsert (MERGE)
DeltaTable.forPath(spark, "s3://bucket/delta/sales/").alias("target") \
.merge(
new_sales.alias("source"),
"target.sale_id = source.sale_id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Time Travel
spark.read \
.format("delta") \
.option("versionAsOf", 12345) \ # Version number
.load("s3://bucket/delta/sales/") \
.show()
# Time Travel by timestamp
spark.read \
.format("delta") \
.option("timestampAsOf", "2025-01-27 10:00:00") \
.load("s3://bucket/delta/sales/") \
.show()
# Benefits:
# - ACID transactions (CRUD operations)
# - Time Travel (query historical versions)
# - Schema enforcement (prevent bad data)
# - Upsert support (MERGE)

Schema Evolution

# Schema enforcement (default)
# Prevents data with wrong schema from being written
sales_df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "false") \ # Enforce schema
.save("s3://bucket/delta/sales/")
# Schema evolution (allow new columns)
sales_df.withColumn("new_column", col("amount") * 1.1) \
.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.option("mergeSchema", "true") \ # Allow schema evolution
.save("s3://bucket/delta/sales/")
# Benefits:
# - Prevent bad data (schema enforcement)
# - Evolve schema over time (schema evolution)
# - No data corruption

Databricks SQL

Data Warehouse Queries

-- Databricks SQL: SQL endpoint for BI tools
-- Fast queries on Delta Lake tables
-- Query Delta Lake
SELECT
customer_id,
COUNT(*) as sales_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount
FROM sales
WHERE sale_date >= '2025-01-01'
GROUP BY customer_id
ORDER BY total_amount DESC;
-- Create view for BI tools
CREATE VIEW customer_daily_sales AS
SELECT
customer_id,
sale_date,
COUNT(*) as sales_count,
SUM(amount) as total_amount
FROM sales
GROUP BY customer_id, sale_date;
-- Benefits:
# - SQL interface for data analysts
# - Integration with BI tools (Tableau, Power BI)
# - Fast queries (Photon engine)
# - Lakehouse: Data lake + warehouse in one

Unity Catalog

Unified Governance

# Unity Catalog: Unified governance for all data
# Benefits:
# - Fine-grained access control
# - Data lineage
# - Auditing
# - Multi-cloud support
# Create catalog
spark.sql("CREATE CATALOG sales_catalog")
# Create schema
spark.sql("CREATE SCHEMA sales_catalog.analytics")
# Grant privileges
spark.sql("GRANT USAGE ON CATALOG sales_catalog TO GROUP sales_team")
spark.sql("GRANT CREATE ON SCHEMA sales_catalog.analytics TO GROUP sales_team")
spark.sql("GRANT SELECT ON TABLE sales_catalog.analytics.sales TO GROUP sales_analysts")
# Benefits:
# - Centralized governance
# - Fine-grained permissions
# - Auditing and compliance

Databricks Workflows

Orchestration

from databricks.workflow import workflow
# Create workflow (formerly Jobs)
@workflow(name="daily_sales_etl")
def daily_sales_etl():
# Task 1: Extract from source
extract_task = spark.read \
.format("csv") \
.option("header", "true") \
.load("s3://bucket/source/sales/")
# Task 2: Transform
transform_task = extract_task \
.withColumn("processed_at", current_timestamp()) \
.filter(col("amount") > 0)
# Task 3: Load to Delta Lake
transform_task.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("sale_date") \
.save("s3://bucket/delta/sales/")
# Task 4: Update materialized view
spark.sql("REFRESH MATERIALIZED VIEW customer_daily_sales")
# Schedule workflow
# Runs daily at 12:00 AM UTC

Databricks MLflow

ML Experiment Tracking

import mlflow
import mlflow.spark
# Log ML model
with mlflow.start_run():
# Train model
model = spark RandomForestClassifier \
.setLabelCol("churned") \
.setFeaturesCol("features") \
.fit(training_data)
# Log parameters
mlflow.log_param("max_depth", 10)
mlflow.log_param("num_trees", 100)
# Log metrics
mlflow.log_metric("accuracy", 0.95)
mlflow.log_metric("precision", 0.93)
# Log model
mlflow.spark.log_model(model, "model")
# Load model for inference
loaded_model = mlflow.spark.load_model("runs:/<run_id>/model")
predictions = loaded_model.transform(test_data)
# Benefits:
# - Experiment tracking
# - Model versioning
# - Model deployment
# - Integration with Databricks

Databricks Cost Optimization

Cluster Optimization

# 1. Use serverless clusters (recommended)
# No cluster management, auto-scaling, pay for what you use
# 2. Use spot instances (60-80% cheaper)
cluster_config = {
"enable_spot_instances": True,
"spot_bid_max_price": 80, # 80% of on-demand price
"spot_instance_recovery": True # Auto-recover on spot termination
}
# 3. Use auto-termination
cluster_config = {
"autotermination_minutes": 30 # Auto-terminate after 30 minutes idle
}
# 4. Use Photon for faster queries (lower cost per query)
cluster_config = {
"runtime_engine": "PHOTON" # 2-4x faster queries
}
# 5. Use Delta Lake for data skipping
# Partition and cluster Delta tables for faster queries
spark.sql("""
OPTIMIZE sales
ZORDER BY (customer_id, sale_date)
""")

Storage Optimization

# 1. Use Delta Lake OPTIMIZE
spark.sql("OPTIMIZE sales") # Compact small files
# 2. Use Z-ORDER for data skipping
spark.sql("OPTIMIZE sales ZORDER BY (customer_id, sale_date)")
# 3. Vacuum old versions
spark.sql("VACUUM sales RETAIN 168 HOURS") # Keep 7 days
# 4. Use column pruning
# Select only needed columns
# 5. Use partitioning
sales_df.write \
.format("delta") \
.partitionBy("sale_date") \ # Partition pruning
.save("s3://bucket/delta/sales/")

Databricks Monitoring

Cluster Metrics

# Get cluster metrics
from databricks_cli.clusters.api import ClustersApi
clusters_api = ClustersApi()
# Get cluster status
cluster_id = "1234-567890-abc123"
status = clusters_api.get_cluster_status(cluster_id)
# Get cluster metrics
metrics = clusters_api.get_cluster_metrics(cluster_id)
# Key metrics:
# - CPU utilization
# - Memory usage
# - Disk I/O
# - Network I/O
# - GPU utilization (if GPU cluster)

Query Metrics

-- Query history for SQL endpoints
SELECT
user_id,
statement_text,
start_time,
end_time,
duration_ms,
rows_produced,
bytes_scanned
FROM system.query.history
WHERE start_time >= '2025-01-27'
ORDER BY duration_ms DESC
LIMIT 100;

Databricks vs. Alternatives

FeatureDatabricksSnowflakeBigQuery
ArchitectureLakehouseWarehouseWarehouse
StorageS3/ADLS/GCSManagedManaged
FormatDelta (Parquet)ProprietaryCapacitor
ComputeSparkProprietaryDremel
ML SupportNative (MLflow)LimitedBigQuery ML
Best ForELT, ML, lakehouseData sharingServerless, cost

Best Practices

DO

# 1. Use Delta Lake for all tables
df.write.format("delta").save("s3://bucket/delta/table/")
# 2. Use partitioning and Z-ORDER
df.write.format("delta").partitionBy("date").save("s3://bucket/delta/table/")
spark.sql("OPTIMIZE table ZORDER BY (customer_id)")
# 3. Use Photon for SQL workloads
cluster_config = {"runtime_engine": "PHOTON"}
# 4. Use serverless clusters
compute = "serverless"
# 5. Use Unity Catalog for governance
spark.sql("GRANT SELECT ON TABLE sales TO GROUP analysts")

DON’T

# 1. Don't use CSV/JSON for production
# Use Delta Lake instead
# 2. Don't ignore OPTIMIZE and VACUUM
# Run regularly for performance
# 3. Don't use large clusters for small workloads
# Use auto-scaling
# 4. Don't forget to auto-terminate clusters
# Auto-terminate idle clusters
# 5. Don't use SELECT * for large tables
# Select only needed columns

Key Takeaways

  1. Lakehouse: Data lake + warehouse in one platform
  2. Delta Lake: ACID transactions, time travel, schema enforcement
  3. Unity Catalog: Unified governance for all data
  4. Serverless: No cluster management, auto-scaling
  5. Photon: 2-4x faster queries
  6. MLflow: Native ML experiment tracking
  7. Workflows: Orchestration for ETL pipelines
  8. Cost: Use spot instances, serverless, optimize Delta tables

Back to Module 3