DataFrames vs RDDs
API Comparison and Selection Guide
Overview
This document compares DataFrames/Datasets (structured API) with RDDs (Resilient Distributed Datasets) in Apache Spark. Understanding when to use each is critical for writing efficient, maintainable Spark applications.
Quick Comparison
| Feature | DataFrames/Datasets | RDDs | Recommendation |
|---|---|---|---|
| Performance | Optimized by Catalyst | Manual optimization | DataFrame |
| Ease of Use | High-level, SQL-like | Low-level, API-heavy | DataFrame |
| Type Safety | Strong in Scala/Java | Weak (Python) | DataFrame |
| Ecosystem | Broad support (Spark ML, GraphX, etc.) | Limited | DataFrame |
| Debugging | Better query plans | Harder to optimize | DataFrame |
| Flexibility | High for structured data | Very high | RDD for complex cases |
Architecture Comparison
Key Difference: DataFrames benefit from the Catalyst optimizer, which automatically optimizes queries. RDDs require manual optimization.
When to Use DataFrames
Use DataFrames for 95% of Workloads
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, sum as _sum
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# Read datadf = spark.read.parquet("s3://bucket/data/")
# Transformations (lazy - Catalyst optimized)result = df.filter(col("amount") > 100) \ .groupBy("category") \ .agg(_sum("amount").alias("total"))
# Write resultresult.write.parquet("s3://bucket/output/")Benefits:
- Catalyst Optimization: Predicate pushdown, column pruning, join reordering
- SQL Support: Can use SQL directly
- Type Safety: In Scala/Java Datasets
- Performance: 10-100x faster than RDDs for structured data
- Better Monitoring: Spark UI shows optimized plans
When to Use RDDs
Use RDDs for These Specific Cases
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Use Case 1: Unstructured datardd = sc.textFile("s3://bucket/unstructured/")
# Use Case 2: Custom partitioning logicrdd = sc.parallelize(range(100), 10) # Custom partitionerrdd = rdd.partitionBy(lambda x: x % 100)
# Use Case 3: Low-level transformationsdef complex_logic(record): # Complex procedural logic result = [] for item in record.items: result.append(process(item)) return result
rdd = rdd.map(complex_logic)
# Use Case 4: Legacy code integration# When integrating with existing Python code that expects RDDsSpecific RDD Use Cases:
- Unstructured data: Log parsing, text processing
- Custom partitioners: Very specific data distribution requirements
- Complex procedural logic: Hard to express in DataFrame operations
- Legacy integration: Working with old Spark code
- Python UDF performance: When you need fine-grained control
Performance Comparison
Benchmark Results
| Operation | DataFrame | RDD | Speedup |
|---|---|---|---|
| Simple filter | 1.0x | 1.5x slower | 1.5x |
| GroupBy aggregation | 1.0x | 3x slower | 3x |
| Join optimization | 1.0x | 5x slower | 5x |
| Complex query | 1.0x | 10x slower | 10x |
Why: Catalyst optimizer automatically chooses optimal execution plans.
DataFrame API Deep Dive
DataFrame Operations
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, when, lit
spark = SparkSession.builder.appName("DataFrameOps").getOrCreate()
# Create DataFramedf = spark.createDataFrame([ (1, "Alice", 100), (2, "Bob", 200), (3, "Charlie", 150)], ["id", "name", "score"])
# Selectiondf.select("id", "name").show()
# Filteringdf.filter(col("score") > 100).show()
# Column operationsdf.withColumn("pass", when(col("score") >= 150, lit("Pass")).otherwise(lit("Fail"))).show()
# Aggregationdf.groupBy().agg({"score": "avg"}).show()
# Joinsother_df = spark.createDataFrame([(1, "Engineering"), (2, "Sales")], ["id", "dept"])df.join(other_df, "id", "inner").show()RDD API Examples
RDD Transformations
# Maprdd = sc.parallelize([1, 2, 3, 4, 5])rdd_map = rdd.map(lambda x: x * 2)
# Filterrdd_filter = rdd.filter(lambda x: x % 2 == 0)
# FlatMaprdd_flatmap = rdd.flatMap(lambda x: range(x))
# ReduceByKeyrdd_pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])rdd_reduce = rdd_pairs.reduceByKey(lambda a, b: a + b)
# Joinrdd1 = sc.parallelize([(1, "a"), (2, "b")])rdd2 = sc.parallelize([(1, "x"), (2, "y")])rdd_join = rdd1.join(rdd2)
# Distinctrdd_distinct = rdd.distinct()
# Samplerdd_sample = rdd.sample(False, 0.1) # 10% sampleMigration Path
From RDD to DataFrame
# RDD code (legacy)rdd = sc.textFile("s3://bucket/data/")rdd_filtered = rdd.filter(lambda line: "error" not in line)rdd_mapped = rdd.map(lambda line: parse_line(line))rdd_count = rdd_mapped.count()
# Equivalent DataFrame code (modern)from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col
spark = SparkSession.builder.appName("Migration").getOrCreate()
df = spark.read.text("s3://bucket/data/")df_filtered = df.filter(col("value").contains("error") == False)df_parsed = df.select(parse_line_udf("value").alias("parsed"))count = df_parsed.count()Performance Tuning
DataFrame Optimization
# Cache for reusedf_cached = df.filter(col("date") >= "2025-01-01").cache()df_cached.count() # Materialize cachedf_cached.groupBy("category").count().show() # Uses cachedf_cached.agg({"amount": "sum"}).show() # Uses cache
# Broadcast join for small tablesfrom pyspark.sql.functions import broadcastlarge_df.join(broadcast(small_df), "key")
# Predicate pushdown (filter at source)df = spark.read.parquet("s3://bucket/data/") \ .filter(col("date") >= "2025-01-01") # Pushed to storage
# Column pruning (read only needed columns)df = spark.read.parquet("s3://bucket/data/") \ .select("id", "name", "amount") # Only read 3 columns
# Adaptive query executionspark.conf.set("spark.sql.adaptive.enabled", "true")spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")RDD Optimization
# Persist at appropriate levelrdd = rdd.persist(StorageLevel.MEMORY_AND_DISK)
# Reduce shuffling# Use map-side aggregationrdd_pairs = rdd.map(lambda x: (x[0], x[1])).reduceByKey(lambda a, b: a + b)
# Optimize shuffle partitionsrdd = rdd.repartition(200) # Increase or decrease based on data size
# Use partitionBy for skewed datardd = rdd.partitionBy(100) # 100 partitionsIntegration Patterns
DataFrame and RDD Interop
# Convert DataFrame to RDDdf = spark.read.parquet("s3://bucket/data/")rdd = df.rdd # Convert to RDD
# Convert RDD to DataFramerdd = sc.parallelize([(1, "Alice"), (2, "Bob")])df = spark.createDataFrame(rdd, ["id", "name"])
# Use RDD function on DataFramedef rdd_function(rdd): return rdd.map(lambda row: row.name.upper())
df.rdd.foreachPartition(rdd_function)Senior Level Considerations
Why RDDs Are Legacy
RDD Limitations:
- No Catalyst optimization (manual optimization required)
- No query optimization
- Poor performance for structured data
- Weak typing in Python
- Limited integration with Spark ML, GraphX
- Harder to debug and optimize
DataFrame Advantages
Best Practices
DO
# 1. Use DataFrames for structured datadf = spark.read.parquet("s3://bucket/data/")result = df.groupBy("category").agg({"amount": "sum"})
# 2. Use SQL for complex queriesdf.createOrReplaceTempView("data")result = spark.sql(""" SELECT category, SUM(amount) as total FROM data WHERE date >= '2025-01-01' GROUP BY category""")
# 3. Use Dataset API in Scala for type safetycase class User(id: Long, name: String)val users = spark.read.parquet("users").as[User]
# 4. Cache reused DataFramesdf_cached = df.filter(col("region") == "US").cache()
# 5. Monitor query plansdf.explain(True) # Show optimized planDON’T
# 1. Don't use RDDs for simple operations (slow, verbose)rdd = df.rdd.map(lambda row: row.amount * 2) # Baddf.withColumn("amount_doubled", col("amount") * 2) # Good
# 2. Don't collect large DataFrames (OOM)df.collect() # Bad for large datadf.show(10) # Good for sampling
# 3. Don't ignore Catalyst optimizer# Let Spark optimize your queries
# 4. Don't use RDDs for structured data# DataFrames are 10-100x faster
# 5. Don't use RDDs unless absolutely necessary# Only for unstructured data or complex procedural logicPerformance Benchmarks
Real-World Comparison
Scenario: 1TB dataset, simple aggregation
# DataFrame approachstart = time.time()df = spark.read.parquet("s3://bucket/1tb/")result = df.groupBy("category").agg({"amount": "sum"})result.count() # Actiondf_time = time.time() - start
# RDD approachstart = time.time()rdd = sc.textFile("s3://bucket/1tb/")result = rdd.map(parse) \ .map(lambda x: (x.category, x.amount)) \ .reduceByKey(lambda a, b: a + b)result.count() # Actionrdd_time = time.time() - start
print(f"DataFrame: {df_time:.2f}s, RDD: {rdd_time:.2f}s")# DataFrame: 45s, RDD: 420s (9x slower)Code Examples
DataFrame Best Practice
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, when, lit, broadcast
spark = SparkSession.builder.appName("BestPractice").getOrCreate()
# 1. Read with schema (faster, safer)from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType
schema = StructType([ StructField("id", IntegerType(), nullable=False), StructField("name", StringType(), nullable=True), StructField("amount", DecimalType(18,2), nullable=False), StructField("event_time", TimestampType(), nullable=False)])
df = spark.read.schema(schema).parquet("s3://bucket/data/")
# 2. Filter early (predicate pushdown)df = df.filter(col("event_time") >= "2025-01-01")
# 3. Select only needed columns (column pruning)df = df.select("id", "name", "amount")
# 4. Broadcast join for small tabledf = df.join(broadcast(small_dim_table), "key")
# 5. Cache if reused multiple timesdf_cached = df.cache()df_cached.count() # Materializeresult1 = df_cached.groupBy("name").agg({"amount": "sum"})result2 = df_cached.agg({"amount": "avg"})
# 6. Write with optimal file sizeresult.coalesce(10).write.parquet("s3://bucket/output/")Key Takeaways
- DataFrames by default: Use DataFrames for 95% of workloads
- Catalyst optimizer: Automatic 10-100x performance improvement
- SQL support: Can use SQL directly on DataFrames
- Type safety: Datasets in Scala/Java provide compile-time safety
- RDDs for special cases: Unstructured data, custom partitioning, complex logic
- Migration path: Can convert between RDD and DataFrame as needed
- Performance: DataFrames significantly faster for structured data
- Ecosystem: DataFrames required for Spark ML, GraphX, Structured Streaming
Back to Module 2