Skip to content

DataFrames vs RDDs

API Comparison and Selection Guide


Overview

This document compares DataFrames/Datasets (structured API) with RDDs (Resilient Distributed Datasets) in Apache Spark. Understanding when to use each is critical for writing efficient, maintainable Spark applications.


Quick Comparison

FeatureDataFrames/DatasetsRDDsRecommendation
PerformanceOptimized by CatalystManual optimizationDataFrame
Ease of UseHigh-level, SQL-likeLow-level, API-heavyDataFrame
Type SafetyStrong in Scala/JavaWeak (Python)DataFrame
EcosystemBroad support (Spark ML, GraphX, etc.)LimitedDataFrame
DebuggingBetter query plansHarder to optimizeDataFrame
FlexibilityHigh for structured dataVery highRDD for complex cases

Architecture Comparison

Key Difference: DataFrames benefit from the Catalyst optimizer, which automatically optimizes queries. RDDs require manual optimization.


When to Use DataFrames

Use DataFrames for 95% of Workloads

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# Read data
df = spark.read.parquet("s3://bucket/data/")
# Transformations (lazy - Catalyst optimized)
result = df.filter(col("amount") > 100) \
.groupBy("category") \
.agg(_sum("amount").alias("total"))
# Write result
result.write.parquet("s3://bucket/output/")

Benefits:

  • Catalyst Optimization: Predicate pushdown, column pruning, join reordering
  • SQL Support: Can use SQL directly
  • Type Safety: In Scala/Java Datasets
  • Performance: 10-100x faster than RDDs for structured data
  • Better Monitoring: Spark UI shows optimized plans

When to Use RDDs

Use RDDs for These Specific Cases

from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Use Case 1: Unstructured data
rdd = sc.textFile("s3://bucket/unstructured/")
# Use Case 2: Custom partitioning logic
rdd = sc.parallelize(range(100), 10) # Custom partitioner
rdd = rdd.partitionBy(lambda x: x % 100)
# Use Case 3: Low-level transformations
def complex_logic(record):
# Complex procedural logic
result = []
for item in record.items:
result.append(process(item))
return result
rdd = rdd.map(complex_logic)
# Use Case 4: Legacy code integration
# When integrating with existing Python code that expects RDDs

Specific RDD Use Cases:

  1. Unstructured data: Log parsing, text processing
  2. Custom partitioners: Very specific data distribution requirements
  3. Complex procedural logic: Hard to express in DataFrame operations
  4. Legacy integration: Working with old Spark code
  5. Python UDF performance: When you need fine-grained control

Performance Comparison

Benchmark Results

OperationDataFrameRDDSpeedup
Simple filter1.0x1.5x slower1.5x
GroupBy aggregation1.0x3x slower3x
Join optimization1.0x5x slower5x
Complex query1.0x10x slower10x

Why: Catalyst optimizer automatically chooses optimal execution plans.


DataFrame API Deep Dive

DataFrame Operations

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit
spark = SparkSession.builder.appName("DataFrameOps").getOrCreate()
# Create DataFrame
df = spark.createDataFrame([
(1, "Alice", 100),
(2, "Bob", 200),
(3, "Charlie", 150)
], ["id", "name", "score"])
# Selection
df.select("id", "name").show()
# Filtering
df.filter(col("score") > 100).show()
# Column operations
df.withColumn("pass", when(col("score") >= 150, lit("Pass")).otherwise(lit("Fail"))).show()
# Aggregation
df.groupBy().agg({"score": "avg"}).show()
# Joins
other_df = spark.createDataFrame([(1, "Engineering"), (2, "Sales")], ["id", "dept"])
df.join(other_df, "id", "inner").show()

RDD API Examples

RDD Transformations

# Map
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd_map = rdd.map(lambda x: x * 2)
# Filter
rdd_filter = rdd.filter(lambda x: x % 2 == 0)
# FlatMap
rdd_flatmap = rdd.flatMap(lambda x: range(x))
# ReduceByKey
rdd_pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
rdd_reduce = rdd_pairs.reduceByKey(lambda a, b: a + b)
# Join
rdd1 = sc.parallelize([(1, "a"), (2, "b")])
rdd2 = sc.parallelize([(1, "x"), (2, "y")])
rdd_join = rdd1.join(rdd2)
# Distinct
rdd_distinct = rdd.distinct()
# Sample
rdd_sample = rdd.sample(False, 0.1) # 10% sample

Migration Path

From RDD to DataFrame

# RDD code (legacy)
rdd = sc.textFile("s3://bucket/data/")
rdd_filtered = rdd.filter(lambda line: "error" not in line)
rdd_mapped = rdd.map(lambda line: parse_line(line))
rdd_count = rdd_mapped.count()
# Equivalent DataFrame code (modern)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("Migration").getOrCreate()
df = spark.read.text("s3://bucket/data/")
df_filtered = df.filter(col("value").contains("error") == False)
df_parsed = df.select(parse_line_udf("value").alias("parsed"))
count = df_parsed.count()

Performance Tuning

DataFrame Optimization

# Cache for reuse
df_cached = df.filter(col("date") >= "2025-01-01").cache()
df_cached.count() # Materialize cache
df_cached.groupBy("category").count().show() # Uses cache
df_cached.agg({"amount": "sum"}).show() # Uses cache
# Broadcast join for small tables
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")
# Predicate pushdown (filter at source)
df = spark.read.parquet("s3://bucket/data/") \
.filter(col("date") >= "2025-01-01") # Pushed to storage
# Column pruning (read only needed columns)
df = spark.read.parquet("s3://bucket/data/") \
.select("id", "name", "amount") # Only read 3 columns
# Adaptive query execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

RDD Optimization

# Persist at appropriate level
rdd = rdd.persist(StorageLevel.MEMORY_AND_DISK)
# Reduce shuffling
# Use map-side aggregation
rdd_pairs = rdd.map(lambda x: (x[0], x[1])).reduceByKey(lambda a, b: a + b)
# Optimize shuffle partitions
rdd = rdd.repartition(200) # Increase or decrease based on data size
# Use partitionBy for skewed data
rdd = rdd.partitionBy(100) # 100 partitions

Integration Patterns

DataFrame and RDD Interop

# Convert DataFrame to RDD
df = spark.read.parquet("s3://bucket/data/")
rdd = df.rdd # Convert to RDD
# Convert RDD to DataFrame
rdd = sc.parallelize([(1, "Alice"), (2, "Bob")])
df = spark.createDataFrame(rdd, ["id", "name"])
# Use RDD function on DataFrame
def rdd_function(rdd):
return rdd.map(lambda row: row.name.upper())
df.rdd.foreachPartition(rdd_function)

Senior Level Considerations

Why RDDs Are Legacy

RDD Limitations:

  • No Catalyst optimization (manual optimization required)
  • No query optimization
  • Poor performance for structured data
  • Weak typing in Python
  • Limited integration with Spark ML, GraphX
  • Harder to debug and optimize

DataFrame Advantages


Best Practices

DO

# 1. Use DataFrames for structured data
df = spark.read.parquet("s3://bucket/data/")
result = df.groupBy("category").agg({"amount": "sum"})
# 2. Use SQL for complex queries
df.createOrReplaceTempView("data")
result = spark.sql("""
SELECT category, SUM(amount) as total
FROM data
WHERE date >= '2025-01-01'
GROUP BY category
""")
# 3. Use Dataset API in Scala for type safety
case class User(id: Long, name: String)
val users = spark.read.parquet("users").as[User]
# 4. Cache reused DataFrames
df_cached = df.filter(col("region") == "US").cache()
# 5. Monitor query plans
df.explain(True) # Show optimized plan

DON’T

# 1. Don't use RDDs for simple operations (slow, verbose)
rdd = df.rdd.map(lambda row: row.amount * 2) # Bad
df.withColumn("amount_doubled", col("amount") * 2) # Good
# 2. Don't collect large DataFrames (OOM)
df.collect() # Bad for large data
df.show(10) # Good for sampling
# 3. Don't ignore Catalyst optimizer
# Let Spark optimize your queries
# 4. Don't use RDDs for structured data
# DataFrames are 10-100x faster
# 5. Don't use RDDs unless absolutely necessary
# Only for unstructured data or complex procedural logic

Performance Benchmarks

Real-World Comparison

Scenario: 1TB dataset, simple aggregation

# DataFrame approach
start = time.time()
df = spark.read.parquet("s3://bucket/1tb/")
result = df.groupBy("category").agg({"amount": "sum"})
result.count() # Action
df_time = time.time() - start
# RDD approach
start = time.time()
rdd = sc.textFile("s3://bucket/1tb/")
result = rdd.map(parse) \
.map(lambda x: (x.category, x.amount)) \
.reduceByKey(lambda a, b: a + b)
result.count() # Action
rdd_time = time.time() - start
print(f"DataFrame: {df_time:.2f}s, RDD: {rdd_time:.2f}s")
# DataFrame: 45s, RDD: 420s (9x slower)

Code Examples

DataFrame Best Practice

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, broadcast
spark = SparkSession.builder.appName("BestPractice").getOrCreate()
# 1. Read with schema (faster, safer)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType
schema = StructType([
StructField("id", IntegerType(), nullable=False),
StructField("name", StringType(), nullable=True),
StructField("amount", DecimalType(18,2), nullable=False),
StructField("event_time", TimestampType(), nullable=False)
])
df = spark.read.schema(schema).parquet("s3://bucket/data/")
# 2. Filter early (predicate pushdown)
df = df.filter(col("event_time") >= "2025-01-01")
# 3. Select only needed columns (column pruning)
df = df.select("id", "name", "amount")
# 4. Broadcast join for small table
df = df.join(broadcast(small_dim_table), "key")
# 5. Cache if reused multiple times
df_cached = df.cache()
df_cached.count() # Materialize
result1 = df_cached.groupBy("name").agg({"amount": "sum"})
result2 = df_cached.agg({"amount": "avg"})
# 6. Write with optimal file size
result.coalesce(10).write.parquet("s3://bucket/output/")

Key Takeaways

  1. DataFrames by default: Use DataFrames for 95% of workloads
  2. Catalyst optimizer: Automatic 10-100x performance improvement
  3. SQL support: Can use SQL directly on DataFrames
  4. Type safety: Datasets in Scala/Java provide compile-time safety
  5. RDDs for special cases: Unstructured data, custom partitioning, complex logic
  6. Migration path: Can convert between RDD and DataFrame as needed
  7. Performance: DataFrames significantly faster for structured data
  8. Ecosystem: DataFrames required for Spark ML, GraphX, Structured Streaming

Back to Module 2