Skip to content

Vectorization

CPU-Optimized Query Execution


Overview

Vectorization (also known as batch processing or SIMD - Single Instruction, Multiple Data) processes multiple rows simultaneously using modern CPU instructions, dramatically improving query performance.


Vectorization Architecture

Scalar vs. Vectorized

Vectorization Benefits:

  • CPU utilization: 80-100% vs. 10-20% for scalar
  • Cache efficiency: Better memory access patterns
  • Performance: 2-10x faster for supported operations
  • Modern CPUs: Leverages SIMD instructions (AVX, AVX-512)

Vectorized Operations

Supported Operations

Supported Data Types:

  • Integers: TINYINT, SMALLINT, INT, BIGINT
  • Floating-point: FLOAT, DOUBLE
  • Decimals: DECIMAL (with fixed precision)
  • Dates/timestamps: DATE, TIMESTAMP
  • Strings: VARCHAR (with limitations)

Vectorization Configuration

Spark Vectorization

# Enable vectorization in Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Vectorization") \
.config("spark.sql.codegen.wholeStage", "true") \
.config("spark.sql.optimizer.inSubqueryConversionEnabled", "true") \
.config("spark.sql.codegen.wholeStage", "true") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.getOrCreate()
# Configuration parameters:
# - codegen.wholeStage: Enable whole-stage code generation
# - Combines multiple operations into single function
# - Default: true
# - Recommended: Always enabled
# - adaptive.enabled: Enable adaptive query execution
# - Dynamically selects join strategies
# - Default: true
# - Recommended: Always enabled
# - adaptive.skewJoin.enabled: Handle skew in joins
# - Default: true
# - Recommended: For skewed data

Pandas Vectorization

# Vectorized operations with Pandas
import pandas as pd
import numpy as np
# Vectorized operations (fast)
df = pd.DataFrame({
'quantity': [10, 20, 30, 40, 50],
'unit_price': [100.0, 50.0, 33.33, 25.0, 20.0]
})
# Vectorized multiplication
df['total'] = df['quantity'] * df['unit_price']
# Vectorized filtering
high_value = df[df['total'] > 500]
# Vectorized aggregation
total_revenue = df['total'].sum()
avg_quantity = df['quantity'].mean()
# Avoid row-by-row operations (slow)
# Bad:
# for i, row in df.iterrows():
# df.loc[i, 'total'] = row['quantity'] * row['unit_price']

NumPy Vectorization

# NumPy vectorized operations
import numpy as np
# Create arrays
prices = np.array([100.0, 50.0, 33.33, 25.0, 20.0])
quantities = np.array([10, 20, 30, 40, 50])
# Vectorized multiplication (SIMD)
totals = prices * quantities
# Vectorized filtering
high_value_mask = totals > 500
high_values = totals[high_value_mask]
# Vectorized aggregation
total_revenue = np.sum(totals)
avg_quantity = np.mean(quantities)
std_prices = np.std(prices)

Vectorization Optimization

Columnar Storage

-- Columnar storage enables vectorization
-- Parquet (columnar) - Vectorized
CREATE TABLE sales_parquet (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2)
) STORED AS PARQUET;
-- ORC (columnar) - Vectorized
CREATE TABLE sales_orc (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2)
) STORED AS ORC;
-- Row-based (not vectorized) - Avoid for analytics
CREATE TABLE_sales_text (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2)
) STORED AS TEXTFILE;

Code Generation

# Whole-stage code generation
spark.conf.set("spark.sql.codegen.wholeStage", "true")
# Whole-stage code generation:
# Combines multiple operations into single function
# Reduces virtual function calls
# Improves CPU cache efficiency
# Example:
# Filter → Project → Aggregation becomes single function
# Filter-Project-Aggregate()
# Enable adaptive query execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.shuffle.targetPostShuffleInputEnabled", "true")

Vectorization Best Practices

DO

# 1. Use vectorized operations
# Built-in functions are vectorized
# 2. Use columnar formats
# Parquet, ORC for analytics
# 3. Enable whole-stage codegen
# Combines operations for efficiency
# 4. Avoid row-by-row operations
# iterrows() is slow
# 5. Use appropriate data types
# Avoid excessive precision

DON’T

# 1. Don't use UDFs excessively
# Breaks vectorization
# 2. Don't use row-based formats
# Text, JSON for analytics
# 3. Don't use Python loops
# Vectorized alternatives available
# 4. Don't ignore data types
# Precision impacts vectorization
# 5. Don't disable codegen
# Critical for performance

Vectorization vs. Non-Vectorized

Performance Comparison

# Vectorization performance comparison
import pandas as pd
import time
# Create large dataset
df = pd.DataFrame({
'col_a': np.random.rand(1_000_000),
'col_b': np.random.rand(1_000_000),
'col_c': np.random.rand(1_000_000)
})
# Scalar (slow)
start = time.time()
scalar_result = []
for i, row in df.iterrows():
scalar_result.append(row['col_a'] * row['col_b'])
scalar_time = time.time() - start
# Vectorized (fast)
start = time.time()
vectorized_result = df['col_a'] * df['col_b']
vectorized_time = time.time() - start
print(f"Scalar time: {scalar_time:.3f}s")
print(f"Vectorized time: {vectorized_time:.3f}s")
print(f"Speedup: {scalar_time / vectorized_time:.1f}x")
# Typical results:
# Scalar time: 15.000s
# Vectorized time: 0.002s
# Speedup: 7500x

Key Takeaways

  1. Vectorization: Process multiple rows simultaneously
  2. SIMD instructions: Modern CPU optimization (AVX, AVX-512)
  3. CPU utilization: 80-100% vs. 10-20% for scalar
  4. Performance: 2-10x faster for supported operations
  5. Columnar formats: Parquet, ORC enable vectorization
  6. Whole-stage codegen: Combines operations for efficiency
  7. Avoid UDFs: Breaks vectorization
  8. Use When: All analytics workloads, performance optimization

Back to Module 7