Caching Strategies
Result Caching for Performance
Overview
Caching stores query results or intermediate data in fast storage to avoid recomputation, reducing query latency and compute costs.
Cache Architecture
Cache Levels
Cache Levels:
- Query result cache: Full query results
- Materialized views: Pre-computed aggregations
- File cache: Hot data files in memory
- Page cache: Database pages in memory
- Application cache: Custom caching layer
Caching Strategies
Result Caching
# Query result caching with Redis
import redisimport jsonimport hashlibfrom typing import Any, Dictimport psycopg2from datetime import timedelta
class QueryCache: """Query result caching with Redis"""
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379): self.client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
def _generate_cache_key(self, query: str, params: tuple = None) -> str: """Generate cache key from query and parameters"""
cache_input = f"{query}:{params}" return hashlib.sha256(cache_input.encode()).hexdigest()
def get_cached_result(self, query: str, params: tuple = None) -> Any: """Get cached query result"""
cache_key = self._generate_cache_key(query, params) cached_result = self.client.get(cache_key)
if cached_result: return json.loads(cached_result)
return None
def cache_result( self, query: str, result: Any, params: tuple = None, ttl: int = 3600 ): """Cache query result"""
cache_key = self._generate_cache_key(query, params) self.client.setex( cache_key, ttl, json.dumps(result) )
def execute_with_cache( self, query: str, params: tuple = None, ttl: int = 3600 ) -> Any: """Execute query with caching"""
# Try cache first cached = self.get_cached_result(query, params) if cached is not None: print("Cache hit!") return cached
print("Cache miss, executing query...")
# Execute query conn = psycopg2.connect("dbname=analytics user=postgres") cursor = conn.cursor()
if params: cursor.execute(query, params) else: cursor.execute(query)
result = cursor.fetchall()
# Cache result self.cache_result(query, result, params, ttl)
return result
# Example usagecache = QueryCache()
# First execution: Cache missresult1 = cache.execute_with_cache( "SELECT * FROM customers WHERE country = %s", params=("US",), ttl=3600 # 1 hour)
# Second execution: Cache hitresult2 = cache.execute_with_cache( "SELECT * FROM customers WHERE country = %s", params=("US",))Materialized Views
-- Materialized views for caching
-- Create materialized viewCREATE MATERIALIZED VIEW mv_daily_sales ASSELECT order_date, COUNT(*) AS total_orders, SUM(amount) AS total_amount, AVG(amount) AS avg_amountFROM salesGROUP BY order_date;
-- Refresh materialized viewREFRESH MATERIALIZED VIEW mv_daily_sales;
-- Query materialized view (fast)SELECT * FROM mv_daily_salesWHERE order_date = '2025-01-27';
-- Create incremental materialized view (PostgreSQL)CREATE MATERIALIZED VIEW mv_customer_summary ASSELECT customer_id, COUNT(*) AS order_count, SUM(amount) AS total_spentFROM salesGROUP BY customer_id;
-- Create unique index for refreshCREATE UNIQUE INDEX mv_customer_summary_idxON mv_customer_summary (customer_id);
-- Incremental refreshREFRESH MATERIALIZED VIEW CONCURRENTLY mv_customer_summary;Cache Invalidation
Invalidation Strategies
Write-Through Caching
# Write-through caching
class WriteThroughCache: """Write-through caching pattern"""
def __init__(self, cache_client, db_client): self.cache = cache_client self.db = db_client
def update_data(self, table: str, key: str, data: dict): """Update data with write-through cache"""
# Update database first self.db.execute( f"UPDATE {table} SET data = %s WHERE key = %s", (json.dumps(data), key) )
# Update cache immediately cache_key = f"{table}:{key}" self.cache.setex( cache_key, 3600, # 1 hour TTL json.dumps(data) )
def get_data(self, table: str, key: str) -> dict: """Get data with cache-aside pattern"""
cache_key = f"{table}:{key}"
# Try cache first cached = self.cache.get(cache_key) if cached: return json.loads(cached)
# Cache miss, get from database result = self.db.execute( f"SELECT * FROM {table} WHERE key = %s", (key,) ).fetchone()
# Populate cache self.cache.setex( cache_key, 3600, json.dumps(result) )
return resultCache Technologies
Technology Comparison
| Technology | Type | Latency | Cost | Best For |
|---|---|---|---|---|
| Redis | In-memory | < 1ms | Paid | Fast lookups |
| Memcached | In-memory | < 1ms | Paid | Simple caching |
| ElastiCache | Managed Redis | < 1ms | Paid | AWS integration |
| BigQuery BI Engine | Managed | Seconds | Included | BI queries |
| Snowflake Result Cache | Managed | Seconds | Included | Query results |
| Databricks Cache | In-memory | < 10ms | Included | Spark workloads |
Cache Best Practices
DO
# 1. Cache frequently accessed data# High read-to-write ratio
# 2. Set appropriate TTL# Balance freshness and performance
# 3. Use write-through for consistency# Immediate cache updates
# 4. Monitor cache hit rate# Aim for > 80% hit rate
# 5. Use cache warming# Pre-populate cache on startupDON’T
# 1. Don't cache rapidly changing data# Cache invalidation is expensive
# 2. Don't set TTL too long# Stale data issues
# 3. Don't cache large results# Memory intensive
# 4. Don't ignore cache size limits# Eviction affects performance
# 5. Don't forget cache invalidation# Essential for consistencyKey Takeaways
- Result caching: Store query results for fast retrieval
- Materialized views: Pre-compute expensive aggregations
- Write-through: Immediate cache updates for consistency
- Redis/Memcached: Fast in-memory caching
- TTL: Set appropriate expiration times
- Hit rate: Monitor and optimize cache effectiveness
- Invalidation: Time-based, event-based, or manual
- Use When: Frequent queries, expensive computations, BI workloads
Back to Module 7