Materialized Views
Pre-Computed Results for Performance
Overview
Materialized views store pre-computed query results that are refreshed periodically, providing fast query performance for expensive aggregations and joins.
Materialized View Architecture
View vs. Materialized View
Materialized View Benefits:
- Fast queries: Pre-computed results
- Reduced compute: No aggregation on query
- Incremental refresh: Only new data processed
- Cost savings: Less compute = lower cost
Materialized View Patterns
Aggregation Pattern
-- Materialized view for aggregations
CREATE MATERIALIZED VIEW mv_daily_revenue ASSELECT order_date, COUNT(DISTINCT order_id) AS total_orders, COUNT(DISTINCT customer_id) AS unique_customers, SUM(amount) AS total_revenue, AVG(amount) AS avg_order_valueFROM salesGROUP BY order_date;
-- Create unique index for refreshCREATE UNIQUE INDEX mv_daily_revenue_idxON mv_daily_revenue (order_date);
-- Refresh materialized viewREFRESH MATERIALIZED VIEW mv_daily_revenue;
-- Query materialized view (fast)SELECT * FROM mv_daily_revenueWHERE order_date >= '2025-01-01';
-- Performance: 100x faster than computing from base tableJoin Pattern
-- Materialized view for expensive joins
CREATE MATERIALIZED VIEW mv_customer_orders ASSELECT c.customer_id, c.customer_name, c.customer_email, COUNT(o.order_id) AS total_orders, SUM(o.amount) AS total_spent, MAX(o.order_date) AS last_order_dateFROM customers cLEFT JOIN orders o ON c.customer_id = o.customer_idGROUP BY c.customer_id, c.customer_name, c.customer_email;
-- Refresh materialized viewREFRESH MATERIALIZED VIEW mv_customer_orders;
-- Query materialized viewSELECT * FROM mv_customer_ordersWHERE total_spent > 1000;Incremental Refresh
Incremental Materialized Views
-- Incremental materialized view (PostgreSQL)
CREATE MATERIALIZED VIEW mv_inventory_summary ASSELECT product_id, SUM(quantity) AS total_quantity, SUM(quantity * unit_cost) AS total_cost, AVG(unit_cost) AS avg_costFROM inventoryGROUP BY product_id;
-- Create unique index for incremental refreshCREATE UNIQUE INDEX mv_inventory_summary_idxON mv_inventory_summary (product_id);
-- Refresh concurrently (allows queries during refresh)REFRESH MATERIALIZED VIEW CONCURRENTLY mv_inventory_summary;
-- Schedule refresh (cron job)-- Every hourCRON_EXPR = "0 * * * *" # Every hour
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_inventory_summary;Spark Incremental Refresh
# Incremental materialized view with Delta Lake
from delta import *from pyspark.sql import SparkSession
spark = SparkSession.builder \ .appName("MaterializedViews") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .getOrCreate()
def refresh_incremental_mv( base_table: str, mv_table: str, watermark_column: str, watermark_hours: int = 24): """Refresh materialized view incrementally"""
from delta.tables import DeltaTable
# Get last refresh timestamp mv = DeltaTable.forPath(spark, mv_table) last_refresh = mv.history().select("timestamp").first()[0]
# Read new data only new_data = spark.read \ .format("delta") \ .load(base_table) \ .filter(f"col(`{watermark_column}`) > to_timestamp('{last_refresh}')")
# Upsert into materialized view new_data.createOrReplaceTempView("new_data")
spark.read \ .format("delta") \ .load(mv_table) \ .createOrReplaceTempView("existing_data")
# Merge new data spark.sql(f""" MERGE INTO {mv_table} target USING new_data source ON target.product_id = source.product_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """)
# Example usagerefresh_incremental_mv( base_table="s3://bucket/sales/", mv_table="s3://bucket/mv_daily_revenue/", watermark_column="order_timestamp", watermark_hours=24)Materialized View Strategies
Refresh Strategies
Refresh Scheduling
# Scheduled materialized view refresh
import airflowfrom airflow import DAGfrom airflow.operators.postgres import PostgresOperatorfrom airflow.providers.slack.operators.slack import SlackAPIPostOperatorfrom datetime import datetime, timedelta
default_args = { 'owner': 'data-engineering', 'start_date': datetime(2025, 1, 1), 'depends_on_past': False, 'retries': 2,}
dag = DAG( 'refresh_materialized_views', default_args=default_args, schedule_interval='0 * * * *', # Every hour catchup=False, tags=['performance', 'materialized-views'],)
# Refresh hourly materialized viewsrefresh_daily_revenue = PostgresOperator( task_id='refresh_daily_revenue', sql="REFRESH MATERIALIZED VIEW CONCURRENTLY mv_daily_revenue;", postgres_conn_id='analytics_db', dag=dag)
refresh_customer_orders = PostgresOperator( task_id='refresh_customer_orders', sql="REFRESH MATERIALIZED VIEW CONCURRENTLY mv_customer_orders;", postgres_conn_id='analytics_db', dag=dag)
# Notify on failurealert = SlackAPIPostOperator( task_id='alert_on_failure', slack_conn_id='slack_default', channel='#data-engineering', text='Materialized view refresh failed!', trigger_rule='all_failed', dag=dag)
[refresh_daily_revenue, refresh_customer_orders] >> alertMaterialized View Best Practices
DO
-- 1. Use for expensive aggregations-- SUM, COUNT, AVG with GROUP BY
-- 2. Use for expensive joins-- Large joins, denormalization
-- 3. Refresh during off-peak hours-- Minimize impact on queries
-- 4. Use incremental refresh when possible-- Reduces refresh time
-- 5. Create indexes on MVs-- Faster refresh and queriesDON’T
-- 1. Don't materialize everything-- Only expensive queries
-- 2. Don't refresh too frequently-- Wastes compute
-- 3. Don't forget to refresh-- Data becomes stale
-- 4. Don't use for frequently changing data-- Refresh overhead too high
-- 5. Don't create MVs on MVs-- Chain refresh is complexKey Takeaways
- Pre-computed results: Store expensive aggregations
- Fast queries: 100x faster than computing from base tables
- Incremental refresh: Only process new data
- Scheduling: Refresh based on data velocity
- Indexes: Create indexes on MVs for performance
- Concurrency: Use CONCURRENTLY for non-blocking refresh
- Monitoring: Track refresh time and query performance
- Use When: Expensive aggregations, BI dashboards, frequently accessed data
Back to Module 7