Skip to content

Vector Embeddings at Scale

Production Embedding Generation and Management


Overview

Vector embeddings at scale involves generating, storing, and managing millions to billions of vector embeddings for production AI/ML systems. This guide covers embedding generation, scaling strategies, and cost optimization.


Embedding Architecture

Scalable Embedding Pipeline

Key Components:

  • Data Processing: Chunking, cleaning, normalization
  • Embedding Generation: OpenAI, Cohere, open-source models
  • Batch Processing: Parallel, distributed generation
  • Caching: Redis, Memcached for repeated embeddings
  • Monitoring: Quality, cost, latency metrics

Embedding Models

Model Comparison

ModelDimensionsCostSpeedQualityUse Case
OpenAI ada-0021536$0.10/1M tokensMediumExcellentGeneral purpose
OpenAI text-embedding-3-small1536$0.02/1M tokensFastVery goodCost-optimized
OpenAI text-embedding-3-large3072$0.13/1M tokensMediumBestHighest quality
Cohere embed-v31024$0.10/1M tokensFastVery goodMultilingual
Sentence-Transformers384-768Free (self-hosted)MediumGoodOpen source

Model Selection

# Embedding model selection
from typing import List
import numpy as np
class EmbeddingModel:
"""Base embedding model"""
def embed(self, texts: List[str]) -> List[List[float]]:
raise NotImplementedError
class OpenAIEmbedding(EmbeddingModel):
"""OpenAI embedding model"""
def __init__(self, model: str = "text-embedding-3-small"):
from openai import OpenAI
self.client = OpenAI()
self.model = model
self.dimensions = 1536 if model == "text-embedding-3-small" else 3072
def embed(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings"""
response = self.client.embeddings.create(
model=self.model,
input=texts
)
return [item.embedding for item in response.data]
class CohereEmbedding(EmbeddingModel):
"""Cohere embedding model"""
def __init__(self, model: str = "embed-english-v3.0"):
import cohere
self.client = cohere.Client(api_key="your-api-key")
self.model = model
self.dimensions = 1024
def embed(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings"""
response = self.client.embed(
texts=texts,
model=self.model,
input_type="search_document"
)
return response.embeddings
class SentenceTransformerEmbedding(EmbeddingModel):
"""Sentence-transformers embedding model"""
def __init__(self, model: str = "all-MiniLM-L6-v2"):
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(model)
self.dimensions = self.model.get_sentence_embedding_dimension()
def embed(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings"""
embeddings = self.model.encode(
texts,
convert_to_numpy=True,
show_progress_bar=False
)
return embeddings.tolist()
# Model selection strategy
def select_embedding_model(
budget: str, # "low", "medium", "high"
quality: str, # "good", "better", "best"
language: str = "en"
) -> EmbeddingModel:
"""Select appropriate embedding model"""
if budget == "low":
# Open-source
return SentenceTransformerEmbedding("all-MiniLM-L6-v2")
elif budget == "medium":
# Cohere or OpenAI small
if language == "en":
return OpenAIEmbedding("text-embedding-3-small")
else:
return CohereEmbedding("embed-multilingual-v3.0")
else: # high budget
# OpenAI large for best quality
return OpenAIEmbedding("text-embedding-3-large")

Embedding Generation

Batch Processing

# Batch embedding generation
from typing import List, Iterator
import asyncio
from concurrent.futures import ThreadPoolExecutor
class BatchEmbedder:
"""Batch embedding generation"""
def __init__(
self,
model: EmbeddingModel,
batch_size: int = 100,
max_workers: int = 10
):
self.model = model
self.batch_size = batch_size
self.max_workers = max_workers
def embed_batch(
self,
texts: List[str],
show_progress: bool = True
) -> List[List[float]]:
"""Embed texts in batches"""
from tqdm import tqdm
embeddings = []
# Split into batches
batches = [
texts[i:i + self.batch_size]
for i in range(0, len(texts), self.batch_size)
]
# Process batches
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [
executor.submit(self.model.embed, batch)
for batch in batches
]
iterator = tqdm(futures, desc="Embedding") if show_progress else futures
for future in iterator:
embeddings.extend(future.result())
return embeddings
async def embed_batch_async(
self,
texts: List[str]
) -> List[List[float]]:
"""Embed texts asynchronously"""
embeddings = []
# Split into batches
batches = [
texts[i:i + self.batch_size]
for i in range(0, len(texts), self.batch_size)
]
# Process batches asynchronously
tasks = [self._embed_batch_async(batch) for batch in batches]
results = await asyncio.gather(*tasks)
for result in results:
embeddings.extend(result)
return embeddings
async def _embed_batch_async(self, batch: List[str]) -> List[List[float]]:
"""Embed single batch asynchronously"""
# Run in thread pool to avoid blocking
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.model.embed, batch)
# Example usage
model = OpenAIEmbedding("text-embedding-3-small")
embedder = BatchEmbedder(model, batch_size=100, max_workers=10)
# Generate 1M embeddings
texts = [f"Document {i}" for i in range(1_000_000)]
embeddings = embedder.embed_batch(texts)

Distributed Processing

# Distributed embedding generation with Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType
class SparkEmbedder:
"""Distributed embedding generation with Spark"""
def __init__(self, model: EmbeddingModel):
self.model = model
self.spark = SparkSession.builder \
.appName("EmbeddingGeneration") \
.getOrCreate()
# Broadcast model to all workers
self.model_broadcast = self.spark.sparkContext.broadcast(self.model)
def embed_dataframe(
self,
df,
text_column: str = "text"
):
"""Embed text column in dataframe"""
# Define UDF
def embed_udf(texts):
model = self.model_broadcast.value
return model.embed(texts)
# Register UDF
embed_udf_spark = udf(embed_udf, ArrayType(FloatType()))
# Apply UDF
df_with_embeddings = df.withColumn(
"embedding",
embed_udf_spark(text_column)
)
return df_with_embeddings
def embed_parquet(
self,
input_path: str,
output_path: str,
text_column: str = "text"
):
"""Embed parquet files"""
# Read parquet
df = self.spark.read.parquet(input_path)
# Embed
df_with_embeddings = self.embed_dataframe(df, text_column)
# Write parquet
df_with_embeddings.write.parquet(output_path)
return df_with_embeddings

Embedding Caching

Cache Strategy

Redis Cache

# Embedding cache with Redis
import redis
import json
import hashlib
from typing import List, Optional
class EmbeddingCache:
"""Cache embeddings in Redis"""
def __init__(
self,
redis_host: str = "localhost",
redis_port: int = 6379,
ttl: int = 60 * 60 * 24 * 7 # 7 days
):
self.client = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)
self.ttl = ttl
def _get_cache_key(self, text: str) -> str:
"""Generate cache key for text"""
# Hash text to get cache key
text_hash = hashlib.sha256(text.encode()).hexdigest()
return f"embedding:{text_hash}"
def get(self, text: str) -> Optional[List[float]]:
"""Get cached embedding"""
key = self._get_cache_key(text)
cached = self.client.get(key)
if cached:
return json.loads(cached)
return None
def set(self, text: str, embedding: List[float]):
"""Cache embedding"""
key = self._get_cache_key(text)
self.client.setex(
key,
self.ttl,
json.dumps(embedding)
)
def get_batch(self, texts: List[str]) -> List[Optional[List[float]]]:
"""Get cached embeddings for batch"""
return [self.get(text) for text in texts]
def set_batch(self, texts: List[str], embeddings: List[List[float]]):
"""Cache embeddings for batch"""
for text, embedding in zip(texts, embeddings):
self.set(text, embedding)

Cached Embedding Service

# Embedding service with caching
class CachedEmbeddingService:
"""Embedding service with caching"""
def __init__(
self,
model: EmbeddingModel,
cache: EmbeddingCache
):
self.model = model
self.cache = cache
def embed(self, texts: List[str]) -> List[List[float]]:
"""Embed texts with caching"""
# Check cache
cached = self.cache.get_batch(texts)
# Separate cached and uncached
uncached_indices = [
i for i, emb in enumerate(cached)
if emb is None
]
uncached_texts = [texts[i] for i in uncached_indices]
# Generate embeddings for uncached
if uncached_texts:
new_embeddings = self.model.embed(uncached_texts)
# Cache new embeddings
self.cache.set_batch(uncached_texts, new_embeddings)
# Combine cached and new
embeddings = list(cached)
for idx, emb in zip(uncached_indices, new_embeddings):
embeddings[idx] = emb
else:
embeddings = cached
return embeddings
# Example usage
model = OpenAIEmbedding("text-embedding-3-small")
cache = EmbeddingCache(redis_host="localhost")
service = CachedEmbeddingService(model, cache)
# First call: Cache miss, generates embeddings
embeddings1 = service.embed(["Hello world", "Machine learning"])
# Second call: Cache hit, returns cached embeddings
embeddings2 = service.embed(["Hello world", "Machine learning"])

Embedding Quality

Quality Metrics

# Embedding quality evaluation
from typing import List, Dict, Tuple
import numpy as np
class EmbeddingQuality:
"""Evaluate embedding quality"""
def __init__(self, model: EmbeddingModel):
self.model = model
def evaluate_similarity(
self,
positive_pairs: List[Tuple[str, str]],
negative_pairs: List[Tuple[str, str]]
) -> Dict:
"""Evaluate similarity quality"""
# Embed all texts
all_texts = []
for a, b in positive_pairs + negative_pairs:
all_texts.extend([a, b])
embeddings = self.model.embed(all_texts)
# Calculate similarities
positive_similarities = []
negative_similarities = []
idx = 0
for _ in positive_pairs:
emb_a = np.array(embeddings[idx])
emb_b = np.array(embeddings[idx + 1])
sim = np.dot(emb_a, emb_b) / (
np.linalg.norm(emb_a) * np.linalg.norm(emb_b)
)
positive_similarities.append(sim)
idx += 2
for _ in negative_pairs:
emb_a = np.array(embeddings[idx])
emb_b = np.array(embeddings[idx + 1])
sim = np.dot(emb_a, emb_b) / (
np.linalg.norm(emb_a) * np.linalg.norm(emb_b)
)
negative_similarities.append(sim)
idx += 2
# Calculate metrics
avg_positive = np.mean(positive_similarities)
avg_negative = np.mean(negative_similarities)
separation = avg_positive - avg_negative
return {
'avg_positive_similarity': avg_positive,
'avg_negative_similarity': avg_negative,
'separation': separation,
'higher_is_better': True
}
def evaluate_retrieval(
self,
corpus: List[str],
queries: List[Tuple[str, str]], # (query, relevant_doc)
top_k: int = 10
) -> Dict:
"""Evaluate retrieval quality"""
# Embed corpus and queries
corpus_embeddings = self.model.embed(corpus)
results = []
for query, relevant_doc in queries:
query_embedding = self.model.embed([query])[0]
# Calculate similarities
similarities = []
for doc_emb in corpus_embeddings:
sim = np.dot(query_embedding, doc_emb) / (
np.linalg.norm(query_embedding) * np.linalg.norm(doc_emb)
)
similarities.append(sim)
# Get top K
top_k_indices = np.argsort(similarities)[-top_k:][::-1]
# Check if relevant doc is in top K
relevant_idx = corpus.index(relevant_doc)
hit = relevant_idx in top_k_indices
results.append({
'hit': hit,
'rank': top_k_indices.tolist().index(relevant_idx) + 1 if hit else None
})
# Calculate metrics
hit_rate = sum(1 for r in results if r['hit']) / len(results)
mrr = np.mean([
1 / r['rank'] if r['hit'] else 0
for r in results
])
return {
'hit_rate@k': hit_rate,
'mrr': mrr,
'higher_is_better': True
}

Embedding Cost Optimization

Cost Calculation

# Embedding cost calculation
import tiktoken
class EmbeddingCost:
"""Calculate embedding costs"""
# OpenAI pricing (as of 2025)
PRICING = {
"text-embedding-3-small": 0.02 / 1_000_000, # $0.02 per 1M tokens
"text-embedding-3-large": 0.13 / 1_000_000, # $0.13 per 1M tokens
"text-embedding-ada-002": 0.10 / 1_000_000, # $0.10 per 1M tokens
}
def __init__(self, model: str = "text-embedding-3-small"):
self.model = model
self.encoder = tiktoken.encoding_for_model(model)
def estimate_cost(self, texts: List[str]) -> Dict:
"""Estimate cost for embedding texts"""
# Count tokens
total_tokens = sum(len(self.encoder.encode(text)) for text in texts)
# Calculate cost
cost_per_token = self.PRICING[self.model]
total_cost = total_tokens * cost_per_token
return {
'total_tokens': total_tokens,
'total_texts': len(texts),
'avg_tokens_per_text': total_tokens / len(texts),
'cost_per_token': cost_per_token,
'total_cost_usd': total_cost
}
def optimize_cost(
self,
texts: List[str],
max_tokens: int = 8191
) -> List[str]:
"""Optimize texts to reduce cost"""
optimized = []
for text in texts:
tokens = self.encoder.encode(text)
if len(tokens) > max_tokens:
# Truncate to max tokens
truncated_tokens = tokens[:max_tokens]
truncated_text = self.encoder.decode(truncated_tokens)
optimized.append(truncated_text)
else:
optimized.append(text)
return optimized

Cost Optimization Strategies

# Cost optimization strategies
class EmbeddingOptimizer:
"""Optimize embedding generation for cost"""
def __init__(self, model: EmbeddingModel):
self.model = model
def deduplicate_texts(self, texts: List[str]) -> List[str]:
"""Remove duplicate texts"""
seen = set()
unique = []
for text in texts:
# Normalize text
normalized = text.lower().strip()
if normalized not in seen:
seen.add(normalized)
unique.append(text)
return unique
def chunk_texts(
self,
texts: List[str],
max_length: int = 1000,
overlap: int = 100
) -> List[str]:
"""Chunk long texts"""
chunks = []
for text in texts:
# Split into chunks
for i in range(0, len(text), max_length - overlap):
chunk = text[i:i + max_length]
if chunk:
chunks.append(chunk)
return chunks
def filter_short_texts(self, texts: List[str], min_length: int = 50) -> List[str]:
"""Filter out very short texts"""
return [text for text in texts if len(text) >= min_length]

Embedding Monitoring

Metrics

# Embedding monitoring
from typing import Dict, List
from datetime import datetime
import json
class EmbeddingMonitor:
"""Monitor embedding generation"""
def __init__(self, storage_path: str = "metrics/embeddings/"):
self.storage_path = storage_path
def log_batch(
self,
model: str,
num_texts: int,
num_tokens: int,
latency_ms: float,
cost_usd: float,
cache_hit_rate: float
):
"""Log embedding batch"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'model': model,
'num_texts': num_texts,
'num_tokens': num_tokens,
'latency_ms': latency_ms,
'cost_usd': cost_usd,
'cache_hit_rate': cache_hit_rate
}
# Write to file
import os
os.makedirs(self.storage_path, exist_ok=True)
file_path = f"{self.storage_path}/{model}.jsonl"
with open(file_path, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
def get_metrics(self, model: str) -> Dict:
"""Get metrics for model"""
import statistics
# Read logs
logs = []
file_path = f"{self.storage_path}/{model}.jsonl"
try:
with open(file_path, 'r') as f:
for line in f:
logs.append(json.loads(line))
except FileNotFoundError:
return {}
if not logs:
return {}
# Calculate metrics
metrics = {
'total_texts': sum(log['num_texts'] for log in logs),
'total_tokens': sum(log['num_tokens'] for log in logs),
'total_cost_usd': sum(log['cost_usd'] for log in logs),
'avg_latency_ms': statistics.mean([log['latency_ms'] for log in logs]),
'avg_cache_hit_rate': statistics.mean([log['cache_hit_rate'] for log in logs]),
}
return metrics

Embedding Best Practices

DO

# 1. Use caching
# Redis for repeated embeddings
# 2. Batch requests
# Reduce API calls, improve throughput
# 3. Use appropriate model
# Balance cost and quality
# 4. Monitor costs
# Track token usage and spending
# 5. Deduplicate texts
# Remove duplicates before embedding

DON’T

# 1. Don't embed duplicates
# Deduplicate first
# 2. Don't skip batching
# Batching is essential for performance
# 3. Don't ignore costs
# Embedding costs add up quickly
# 4. Don't use wrong model
# Choose based on use case
# 5. Don't skip monitoring
# Essential for operations

Key Takeaways

  1. Model selection: Balance cost, speed, and quality
  2. Batch processing: Parallel, distributed generation
  3. Caching: Redis for repeated embeddings
  4. Quality evaluation: Similarity, retrieval metrics
  5. Cost optimization: Deduplication, chunking, filtering
  6. Monitoring: Token usage, cost, latency metrics
  7. Scaling: Distributed processing with Spark
  8. Use When: Large-scale embedding generation (1M+ documents)

Back to Module 5