Vectorization
CPU-Optimized Query Execution
Overview
Vectorization (also known as batch processing or SIMD - Single Instruction, Multiple Data) processes multiple rows simultaneously using modern CPU instructions, dramatically improving query performance.
Vectorization Architecture
Scalar vs. Vectorized
Vectorization Benefits:
- CPU utilization: 80-100% vs. 10-20% for scalar
- Cache efficiency: Better memory access patterns
- Performance: 2-10x faster for supported operations
- Modern CPUs: Leverages SIMD instructions (AVX, AVX-512)
Vectorized Operations
Supported Operations
Supported Data Types:
- Integers: TINYINT, SMALLINT, INT, BIGINT
- Floating-point: FLOAT, DOUBLE
- Decimals: DECIMAL (with fixed precision)
- Dates/timestamps: DATE, TIMESTAMP
- Strings: VARCHAR (with limitations)
Vectorization Configuration
Spark Vectorization
# Enable vectorization in Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \ .appName("Vectorization") \ .config("spark.sql.codegen.wholeStage", "true") \ .config("spark.sql.optimizer.inSubqueryConversionEnabled", "true") \ .config("spark.sql.codegen.wholeStage", "true") \ .config("spark.sql.adaptive.enabled", "true") \ .config("spark.sql.adaptive.skewJoin.enabled", "true") \ .getOrCreate()
# Configuration parameters:# - codegen.wholeStage: Enable whole-stage code generation# - Combines multiple operations into single function# - Default: true# - Recommended: Always enabled# - adaptive.enabled: Enable adaptive query execution# - Dynamically selects join strategies# - Default: true# - Recommended: Always enabled# - adaptive.skewJoin.enabled: Handle skew in joins# - Default: true# - Recommended: For skewed dataPandas Vectorization
# Vectorized operations with Pandas
import pandas as pdimport numpy as np
# Vectorized operations (fast)df = pd.DataFrame({ 'quantity': [10, 20, 30, 40, 50], 'unit_price': [100.0, 50.0, 33.33, 25.0, 20.0]})
# Vectorized multiplicationdf['total'] = df['quantity'] * df['unit_price']
# Vectorized filteringhigh_value = df[df['total'] > 500]
# Vectorized aggregationtotal_revenue = df['total'].sum()avg_quantity = df['quantity'].mean()
# Avoid row-by-row operations (slow)# Bad:# for i, row in df.iterrows():# df.loc[i, 'total'] = row['quantity'] * row['unit_price']NumPy Vectorization
# NumPy vectorized operations
import numpy as np
# Create arraysprices = np.array([100.0, 50.0, 33.33, 25.0, 20.0])quantities = np.array([10, 20, 30, 40, 50])
# Vectorized multiplication (SIMD)totals = prices * quantities
# Vectorized filteringhigh_value_mask = totals > 500high_values = totals[high_value_mask]
# Vectorized aggregationtotal_revenue = np.sum(totals)avg_quantity = np.mean(quantities)std_prices = np.std(prices)Vectorization Optimization
Columnar Storage
-- Columnar storage enables vectorization
-- Parquet (columnar) - VectorizedCREATE TABLE sales_parquet ( order_id BIGINT, customer_id BIGINT, order_date DATE, amount DECIMAL(10,2)) STORED AS PARQUET;
-- ORC (columnar) - VectorizedCREATE TABLE sales_orc ( order_id BIGINT, customer_id BIGINT, order_date DATE, amount DECIMAL(10,2)) STORED AS ORC;
-- Row-based (not vectorized) - Avoid for analyticsCREATE TABLE_sales_text ( order_id BIGINT, customer_id BIGINT, order_date DATE, amount DECIMAL(10,2)) STORED AS TEXTFILE;Code Generation
# Whole-stage code generation
spark.conf.set("spark.sql.codegen.wholeStage", "true")
# Whole-stage code generation:# Combines multiple operations into single function# Reduces virtual function calls# Improves CPU cache efficiency# Example:# Filter → Project → Aggregation becomes single function# Filter-Project-Aggregate()
# Enable adaptive query executionspark.conf.set("spark.sql.adaptive.enabled", "true")spark.conf.set("spark.sql.adaptive.shuffle.targetPostShuffleInputEnabled", "true")Vectorization Best Practices
DO
# 1. Use vectorized operations# Built-in functions are vectorized
# 2. Use columnar formats# Parquet, ORC for analytics
# 3. Enable whole-stage codegen# Combines operations for efficiency
# 4. Avoid row-by-row operations# iterrows() is slow
# 5. Use appropriate data types# Avoid excessive precisionDON’T
# 1. Don't use UDFs excessively# Breaks vectorization
# 2. Don't use row-based formats# Text, JSON for analytics
# 3. Don't use Python loops# Vectorized alternatives available
# 4. Don't ignore data types# Precision impacts vectorization
# 5. Don't disable codegen# Critical for performanceVectorization vs. Non-Vectorized
Performance Comparison
# Vectorization performance comparison
import pandas as pdimport time
# Create large datasetdf = pd.DataFrame({ 'col_a': np.random.rand(1_000_000), 'col_b': np.random.rand(1_000_000), 'col_c': np.random.rand(1_000_000)})
# Scalar (slow)start = time.time()scalar_result = []for i, row in df.iterrows(): scalar_result.append(row['col_a'] * row['col_b'])scalar_time = time.time() - start
# Vectorized (fast)start = time.time()vectorized_result = df['col_a'] * df['col_b']vectorized_time = time.time() - start
print(f"Scalar time: {scalar_time:.3f}s")print(f"Vectorized time: {vectorized_time:.3f}s")print(f"Speedup: {scalar_time / vectorized_time:.1f}x")
# Typical results:# Scalar time: 15.000s# Vectorized time: 0.002s# Speedup: 7500xKey Takeaways
- Vectorization: Process multiple rows simultaneously
- SIMD instructions: Modern CPU optimization (AVX, AVX-512)
- CPU utilization: 80-100% vs. 10-20% for scalar
- Performance: 2-10x faster for supported operations
- Columnar formats: Parquet, ORC enable vectorization
- Whole-stage codegen: Combines operations for efficiency
- Avoid UDFs: Breaks vectorization
- Use When: All analytics workloads, performance optimization
Back to Module 7