Vector Embeddings at Scale
Production Embedding Generation and Management
Overview
Vector embeddings at scale involves generating, storing, and managing millions to billions of vector embeddings for production AI/ML systems. This guide covers embedding generation, scaling strategies, and cost optimization.
Embedding Architecture
Scalable Embedding Pipeline
Key Components:
- Data Processing: Chunking, cleaning, normalization
- Embedding Generation: OpenAI, Cohere, open-source models
- Batch Processing: Parallel, distributed generation
- Caching: Redis, Memcached for repeated embeddings
- Monitoring: Quality, cost, latency metrics
Embedding Models
Model Comparison
| Model | Dimensions | Cost | Speed | Quality | Use Case |
|---|---|---|---|---|---|
| OpenAI ada-002 | 1536 | $0.10/1M tokens | Medium | Excellent | General purpose |
| OpenAI text-embedding-3-small | 1536 | $0.02/1M tokens | Fast | Very good | Cost-optimized |
| OpenAI text-embedding-3-large | 3072 | $0.13/1M tokens | Medium | Best | Highest quality |
| Cohere embed-v3 | 1024 | $0.10/1M tokens | Fast | Very good | Multilingual |
| Sentence-Transformers | 384-768 | Free (self-hosted) | Medium | Good | Open source |
Model Selection
# Embedding model selection
from typing import Listimport numpy as np
class EmbeddingModel: """Base embedding model"""
def embed(self, texts: List[str]) -> List[List[float]]: raise NotImplementedError
class OpenAIEmbedding(EmbeddingModel): """OpenAI embedding model"""
def __init__(self, model: str = "text-embedding-3-small"): from openai import OpenAI self.client = OpenAI() self.model = model self.dimensions = 1536 if model == "text-embedding-3-small" else 3072
def embed(self, texts: List[str]) -> List[List[float]]: """Generate embeddings"""
response = self.client.embeddings.create( model=self.model, input=texts )
return [item.embedding for item in response.data]
class CohereEmbedding(EmbeddingModel): """Cohere embedding model"""
def __init__(self, model: str = "embed-english-v3.0"): import cohere self.client = cohere.Client(api_key="your-api-key") self.model = model self.dimensions = 1024
def embed(self, texts: List[str]) -> List[List[float]]: """Generate embeddings"""
response = self.client.embed( texts=texts, model=self.model, input_type="search_document" )
return response.embeddings
class SentenceTransformerEmbedding(EmbeddingModel): """Sentence-transformers embedding model"""
def __init__(self, model: str = "all-MiniLM-L6-v2"): from sentence_transformers import SentenceTransformer self.model = SentenceTransformer(model) self.dimensions = self.model.get_sentence_embedding_dimension()
def embed(self, texts: List[str]) -> List[List[float]]: """Generate embeddings"""
embeddings = self.model.encode( texts, convert_to_numpy=True, show_progress_bar=False )
return embeddings.tolist()
# Model selection strategydef select_embedding_model( budget: str, # "low", "medium", "high" quality: str, # "good", "better", "best" language: str = "en") -> EmbeddingModel: """Select appropriate embedding model"""
if budget == "low": # Open-source return SentenceTransformerEmbedding("all-MiniLM-L6-v2")
elif budget == "medium": # Cohere or OpenAI small if language == "en": return OpenAIEmbedding("text-embedding-3-small") else: return CohereEmbedding("embed-multilingual-v3.0")
else: # high budget # OpenAI large for best quality return OpenAIEmbedding("text-embedding-3-large")Embedding Generation
Batch Processing
# Batch embedding generation
from typing import List, Iteratorimport asynciofrom concurrent.futures import ThreadPoolExecutor
class BatchEmbedder: """Batch embedding generation"""
def __init__( self, model: EmbeddingModel, batch_size: int = 100, max_workers: int = 10 ): self.model = model self.batch_size = batch_size self.max_workers = max_workers
def embed_batch( self, texts: List[str], show_progress: bool = True ) -> List[List[float]]: """Embed texts in batches"""
from tqdm import tqdm
embeddings = []
# Split into batches batches = [ texts[i:i + self.batch_size] for i in range(0, len(texts), self.batch_size) ]
# Process batches with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [ executor.submit(self.model.embed, batch) for batch in batches ]
iterator = tqdm(futures, desc="Embedding") if show_progress else futures
for future in iterator: embeddings.extend(future.result())
return embeddings
async def embed_batch_async( self, texts: List[str] ) -> List[List[float]]: """Embed texts asynchronously"""
embeddings = []
# Split into batches batches = [ texts[i:i + self.batch_size] for i in range(0, len(texts), self.batch_size) ]
# Process batches asynchronously tasks = [self._embed_batch_async(batch) for batch in batches] results = await asyncio.gather(*tasks)
for result in results: embeddings.extend(result)
return embeddings
async def _embed_batch_async(self, batch: List[str]) -> List[List[float]]: """Embed single batch asynchronously"""
# Run in thread pool to avoid blocking loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self.model.embed, batch)
# Example usagemodel = OpenAIEmbedding("text-embedding-3-small")embedder = BatchEmbedder(model, batch_size=100, max_workers=10)
# Generate 1M embeddingstexts = [f"Document {i}" for i in range(1_000_000)]embeddings = embedder.embed_batch(texts)Distributed Processing
# Distributed embedding generation with Spark
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import udffrom pyspark.sql.types import ArrayType, FloatType
class SparkEmbedder: """Distributed embedding generation with Spark"""
def __init__(self, model: EmbeddingModel): self.model = model self.spark = SparkSession.builder \ .appName("EmbeddingGeneration") \ .getOrCreate()
# Broadcast model to all workers self.model_broadcast = self.spark.sparkContext.broadcast(self.model)
def embed_dataframe( self, df, text_column: str = "text" ): """Embed text column in dataframe"""
# Define UDF def embed_udf(texts): model = self.model_broadcast.value return model.embed(texts)
# Register UDF embed_udf_spark = udf(embed_udf, ArrayType(FloatType()))
# Apply UDF df_with_embeddings = df.withColumn( "embedding", embed_udf_spark(text_column) )
return df_with_embeddings
def embed_parquet( self, input_path: str, output_path: str, text_column: str = "text" ): """Embed parquet files"""
# Read parquet df = self.spark.read.parquet(input_path)
# Embed df_with_embeddings = self.embed_dataframe(df, text_column)
# Write parquet df_with_embeddings.write.parquet(output_path)
return df_with_embeddingsEmbedding Caching
Cache Strategy
Redis Cache
# Embedding cache with Redis
import redisimport jsonimport hashlibfrom typing import List, Optional
class EmbeddingCache: """Cache embeddings in Redis"""
def __init__( self, redis_host: str = "localhost", redis_port: int = 6379, ttl: int = 60 * 60 * 24 * 7 # 7 days ): self.client = redis.Redis( host=redis_host, port=redis_port, decode_responses=True ) self.ttl = ttl
def _get_cache_key(self, text: str) -> str: """Generate cache key for text"""
# Hash text to get cache key text_hash = hashlib.sha256(text.encode()).hexdigest() return f"embedding:{text_hash}"
def get(self, text: str) -> Optional[List[float]]: """Get cached embedding"""
key = self._get_cache_key(text) cached = self.client.get(key)
if cached: return json.loads(cached)
return None
def set(self, text: str, embedding: List[float]): """Cache embedding"""
key = self._get_cache_key(text) self.client.setex( key, self.ttl, json.dumps(embedding) )
def get_batch(self, texts: List[str]) -> List[Optional[List[float]]]: """Get cached embeddings for batch"""
return [self.get(text) for text in texts]
def set_batch(self, texts: List[str], embeddings: List[List[float]]): """Cache embeddings for batch"""
for text, embedding in zip(texts, embeddings): self.set(text, embedding)Cached Embedding Service
# Embedding service with caching
class CachedEmbeddingService: """Embedding service with caching"""
def __init__( self, model: EmbeddingModel, cache: EmbeddingCache ): self.model = model self.cache = cache
def embed(self, texts: List[str]) -> List[List[float]]: """Embed texts with caching"""
# Check cache cached = self.cache.get_batch(texts)
# Separate cached and uncached uncached_indices = [ i for i, emb in enumerate(cached) if emb is None ]
uncached_texts = [texts[i] for i in uncached_indices]
# Generate embeddings for uncached if uncached_texts: new_embeddings = self.model.embed(uncached_texts)
# Cache new embeddings self.cache.set_batch(uncached_texts, new_embeddings)
# Combine cached and new embeddings = list(cached) for idx, emb in zip(uncached_indices, new_embeddings): embeddings[idx] = emb else: embeddings = cached
return embeddings
# Example usagemodel = OpenAIEmbedding("text-embedding-3-small")cache = EmbeddingCache(redis_host="localhost")service = CachedEmbeddingService(model, cache)
# First call: Cache miss, generates embeddingsembeddings1 = service.embed(["Hello world", "Machine learning"])
# Second call: Cache hit, returns cached embeddingsembeddings2 = service.embed(["Hello world", "Machine learning"])Embedding Quality
Quality Metrics
# Embedding quality evaluation
from typing import List, Dict, Tupleimport numpy as np
class EmbeddingQuality: """Evaluate embedding quality"""
def __init__(self, model: EmbeddingModel): self.model = model
def evaluate_similarity( self, positive_pairs: List[Tuple[str, str]], negative_pairs: List[Tuple[str, str]] ) -> Dict: """Evaluate similarity quality"""
# Embed all texts all_texts = [] for a, b in positive_pairs + negative_pairs: all_texts.extend([a, b])
embeddings = self.model.embed(all_texts)
# Calculate similarities positive_similarities = [] negative_similarities = []
idx = 0 for _ in positive_pairs: emb_a = np.array(embeddings[idx]) emb_b = np.array(embeddings[idx + 1]) sim = np.dot(emb_a, emb_b) / ( np.linalg.norm(emb_a) * np.linalg.norm(emb_b) ) positive_similarities.append(sim) idx += 2
for _ in negative_pairs: emb_a = np.array(embeddings[idx]) emb_b = np.array(embeddings[idx + 1]) sim = np.dot(emb_a, emb_b) / ( np.linalg.norm(emb_a) * np.linalg.norm(emb_b) ) negative_similarities.append(sim) idx += 2
# Calculate metrics avg_positive = np.mean(positive_similarities) avg_negative = np.mean(negative_similarities) separation = avg_positive - avg_negative
return { 'avg_positive_similarity': avg_positive, 'avg_negative_similarity': avg_negative, 'separation': separation, 'higher_is_better': True }
def evaluate_retrieval( self, corpus: List[str], queries: List[Tuple[str, str]], # (query, relevant_doc) top_k: int = 10 ) -> Dict: """Evaluate retrieval quality"""
# Embed corpus and queries corpus_embeddings = self.model.embed(corpus)
results = []
for query, relevant_doc in queries: query_embedding = self.model.embed([query])[0]
# Calculate similarities similarities = [] for doc_emb in corpus_embeddings: sim = np.dot(query_embedding, doc_emb) / ( np.linalg.norm(query_embedding) * np.linalg.norm(doc_emb) ) similarities.append(sim)
# Get top K top_k_indices = np.argsort(similarities)[-top_k:][::-1]
# Check if relevant doc is in top K relevant_idx = corpus.index(relevant_doc) hit = relevant_idx in top_k_indices
results.append({ 'hit': hit, 'rank': top_k_indices.tolist().index(relevant_idx) + 1 if hit else None })
# Calculate metrics hit_rate = sum(1 for r in results if r['hit']) / len(results) mrr = np.mean([ 1 / r['rank'] if r['hit'] else 0 for r in results ])
return { 'hit_rate@k': hit_rate, 'mrr': mrr, 'higher_is_better': True }Embedding Cost Optimization
Cost Calculation
# Embedding cost calculation
import tiktoken
class EmbeddingCost: """Calculate embedding costs"""
# OpenAI pricing (as of 2025) PRICING = { "text-embedding-3-small": 0.02 / 1_000_000, # $0.02 per 1M tokens "text-embedding-3-large": 0.13 / 1_000_000, # $0.13 per 1M tokens "text-embedding-ada-002": 0.10 / 1_000_000, # $0.10 per 1M tokens }
def __init__(self, model: str = "text-embedding-3-small"): self.model = model self.encoder = tiktoken.encoding_for_model(model)
def estimate_cost(self, texts: List[str]) -> Dict: """Estimate cost for embedding texts"""
# Count tokens total_tokens = sum(len(self.encoder.encode(text)) for text in texts)
# Calculate cost cost_per_token = self.PRICING[self.model] total_cost = total_tokens * cost_per_token
return { 'total_tokens': total_tokens, 'total_texts': len(texts), 'avg_tokens_per_text': total_tokens / len(texts), 'cost_per_token': cost_per_token, 'total_cost_usd': total_cost }
def optimize_cost( self, texts: List[str], max_tokens: int = 8191 ) -> List[str]: """Optimize texts to reduce cost"""
optimized = []
for text in texts: tokens = self.encoder.encode(text)
if len(tokens) > max_tokens: # Truncate to max tokens truncated_tokens = tokens[:max_tokens] truncated_text = self.encoder.decode(truncated_tokens) optimized.append(truncated_text) else: optimized.append(text)
return optimizedCost Optimization Strategies
# Cost optimization strategies
class EmbeddingOptimizer: """Optimize embedding generation for cost"""
def __init__(self, model: EmbeddingModel): self.model = model
def deduplicate_texts(self, texts: List[str]) -> List[str]: """Remove duplicate texts"""
seen = set() unique = []
for text in texts: # Normalize text normalized = text.lower().strip()
if normalized not in seen: seen.add(normalized) unique.append(text)
return unique
def chunk_texts( self, texts: List[str], max_length: int = 1000, overlap: int = 100 ) -> List[str]: """Chunk long texts"""
chunks = []
for text in texts: # Split into chunks for i in range(0, len(text), max_length - overlap): chunk = text[i:i + max_length] if chunk: chunks.append(chunk)
return chunks
def filter_short_texts(self, texts: List[str], min_length: int = 50) -> List[str]: """Filter out very short texts"""
return [text for text in texts if len(text) >= min_length]Embedding Monitoring
Metrics
# Embedding monitoring
from typing import Dict, Listfrom datetime import datetimeimport json
class EmbeddingMonitor: """Monitor embedding generation"""
def __init__(self, storage_path: str = "metrics/embeddings/"): self.storage_path = storage_path
def log_batch( self, model: str, num_texts: int, num_tokens: int, latency_ms: float, cost_usd: float, cache_hit_rate: float ): """Log embedding batch"""
log_entry = { 'timestamp': datetime.now().isoformat(), 'model': model, 'num_texts': num_texts, 'num_tokens': num_tokens, 'latency_ms': latency_ms, 'cost_usd': cost_usd, 'cache_hit_rate': cache_hit_rate }
# Write to file import os os.makedirs(self.storage_path, exist_ok=True)
file_path = f"{self.storage_path}/{model}.jsonl"
with open(file_path, 'a') as f: f.write(json.dumps(log_entry) + '\n')
def get_metrics(self, model: str) -> Dict: """Get metrics for model"""
import statistics
# Read logs logs = [] file_path = f"{self.storage_path}/{model}.jsonl"
try: with open(file_path, 'r') as f: for line in f: logs.append(json.loads(line)) except FileNotFoundError: return {}
if not logs: return {}
# Calculate metrics metrics = { 'total_texts': sum(log['num_texts'] for log in logs), 'total_tokens': sum(log['num_tokens'] for log in logs), 'total_cost_usd': sum(log['cost_usd'] for log in logs), 'avg_latency_ms': statistics.mean([log['latency_ms'] for log in logs]), 'avg_cache_hit_rate': statistics.mean([log['cache_hit_rate'] for log in logs]), }
return metricsEmbedding Best Practices
DO
# 1. Use caching# Redis for repeated embeddings
# 2. Batch requests# Reduce API calls, improve throughput
# 3. Use appropriate model# Balance cost and quality
# 4. Monitor costs# Track token usage and spending
# 5. Deduplicate texts# Remove duplicates before embeddingDON’T
# 1. Don't embed duplicates# Deduplicate first
# 2. Don't skip batching# Batching is essential for performance
# 3. Don't ignore costs# Embedding costs add up quickly
# 4. Don't use wrong model# Choose based on use case
# 5. Don't skip monitoring# Essential for operationsKey Takeaways
- Model selection: Balance cost, speed, and quality
- Batch processing: Parallel, distributed generation
- Caching: Redis for repeated embeddings
- Quality evaluation: Similarity, retrieval metrics
- Cost optimization: Deduplication, chunking, filtering
- Monitoring: Token usage, cost, latency metrics
- Scaling: Distributed processing with Spark
- Use When: Large-scale embedding generation (1M+ documents)
Back to Module 5