Skip to content

Caching Strategies

Result Caching for Performance


Overview

Caching stores query results or intermediate data in fast storage to avoid recomputation, reducing query latency and compute costs.


Cache Architecture

Cache Levels

Cache Levels:

  • Query result cache: Full query results
  • Materialized views: Pre-computed aggregations
  • File cache: Hot data files in memory
  • Page cache: Database pages in memory
  • Application cache: Custom caching layer

Caching Strategies

Result Caching

# Query result caching with Redis
import redis
import json
import hashlib
from typing import Any, Dict
import psycopg2
from datetime import timedelta
class QueryCache:
"""Query result caching with Redis"""
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379):
self.client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
def _generate_cache_key(self, query: str, params: tuple = None) -> str:
"""Generate cache key from query and parameters"""
cache_input = f"{query}:{params}"
return hashlib.sha256(cache_input.encode()).hexdigest()
def get_cached_result(self, query: str, params: tuple = None) -> Any:
"""Get cached query result"""
cache_key = self._generate_cache_key(query, params)
cached_result = self.client.get(cache_key)
if cached_result:
return json.loads(cached_result)
return None
def cache_result(
self,
query: str,
result: Any,
params: tuple = None,
ttl: int = 3600
):
"""Cache query result"""
cache_key = self._generate_cache_key(query, params)
self.client.setex(
cache_key,
ttl,
json.dumps(result)
)
def execute_with_cache(
self,
query: str,
params: tuple = None,
ttl: int = 3600
) -> Any:
"""Execute query with caching"""
# Try cache first
cached = self.get_cached_result(query, params)
if cached is not None:
print("Cache hit!")
return cached
print("Cache miss, executing query...")
# Execute query
conn = psycopg2.connect("dbname=analytics user=postgres")
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
result = cursor.fetchall()
# Cache result
self.cache_result(query, result, params, ttl)
return result
# Example usage
cache = QueryCache()
# First execution: Cache miss
result1 = cache.execute_with_cache(
"SELECT * FROM customers WHERE country = %s",
params=("US",),
ttl=3600 # 1 hour
)
# Second execution: Cache hit
result2 = cache.execute_with_cache(
"SELECT * FROM customers WHERE country = %s",
params=("US",)
)

Materialized Views

-- Materialized views for caching
-- Create materialized view
CREATE MATERIALIZED VIEW mv_daily_sales AS
SELECT
order_date,
COUNT(*) AS total_orders,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount
FROM sales
GROUP BY order_date;
-- Refresh materialized view
REFRESH MATERIALIZED VIEW mv_daily_sales;
-- Query materialized view (fast)
SELECT * FROM mv_daily_sales
WHERE order_date = '2025-01-27';
-- Create incremental materialized view (PostgreSQL)
CREATE MATERIALIZED VIEW mv_customer_summary AS
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(amount) AS total_spent
FROM sales
GROUP BY customer_id;
-- Create unique index for refresh
CREATE UNIQUE INDEX mv_customer_summary_idx
ON mv_customer_summary (customer_id);
-- Incremental refresh
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_customer_summary;

Cache Invalidation

Invalidation Strategies

Write-Through Caching

# Write-through caching
class WriteThroughCache:
"""Write-through caching pattern"""
def __init__(self, cache_client, db_client):
self.cache = cache_client
self.db = db_client
def update_data(self, table: str, key: str, data: dict):
"""Update data with write-through cache"""
# Update database first
self.db.execute(
f"UPDATE {table} SET data = %s WHERE key = %s",
(json.dumps(data), key)
)
# Update cache immediately
cache_key = f"{table}:{key}"
self.cache.setex(
cache_key,
3600, # 1 hour TTL
json.dumps(data)
)
def get_data(self, table: str, key: str) -> dict:
"""Get data with cache-aside pattern"""
cache_key = f"{table}:{key}"
# Try cache first
cached = self.cache.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss, get from database
result = self.db.execute(
f"SELECT * FROM {table} WHERE key = %s",
(key,)
).fetchone()
# Populate cache
self.cache.setex(
cache_key,
3600,
json.dumps(result)
)
return result

Cache Technologies

Technology Comparison

TechnologyTypeLatencyCostBest For
RedisIn-memory< 1msPaidFast lookups
MemcachedIn-memory< 1msPaidSimple caching
ElastiCacheManaged Redis< 1msPaidAWS integration
BigQuery BI EngineManagedSecondsIncludedBI queries
Snowflake Result CacheManagedSecondsIncludedQuery results
Databricks CacheIn-memory< 10msIncludedSpark workloads

Cache Best Practices

DO

# 1. Cache frequently accessed data
# High read-to-write ratio
# 2. Set appropriate TTL
# Balance freshness and performance
# 3. Use write-through for consistency
# Immediate cache updates
# 4. Monitor cache hit rate
# Aim for > 80% hit rate
# 5. Use cache warming
# Pre-populate cache on startup

DON’T

# 1. Don't cache rapidly changing data
# Cache invalidation is expensive
# 2. Don't set TTL too long
# Stale data issues
# 3. Don't cache large results
# Memory intensive
# 4. Don't ignore cache size limits
# Eviction affects performance
# 5. Don't forget cache invalidation
# Essential for consistency

Key Takeaways

  1. Result caching: Store query results for fast retrieval
  2. Materialized views: Pre-compute expensive aggregations
  3. Write-through: Immediate cache updates for consistency
  4. Redis/Memcached: Fast in-memory caching
  5. TTL: Set appropriate expiration times
  6. Hit rate: Monitor and optimize cache effectiveness
  7. Invalidation: Time-based, event-based, or manual
  8. Use When: Frequent queries, expensive computations, BI workloads

Back to Module 7