티스토리 뷰

개발

[ETL] Apache Spark - RDD vs DataFrame

0hyeon의 2025. 2. 15. 10:07
반응형
  • RDD는 저수준 API라서 모든 연산을 직접 관리해야 하고, Catalyst & Tungsten 최적화를 받지 못해서 느림.
  • DataFrame은 스키마 기반의 테이블 형태라 SQL처럼 다룰 수 있고, Catalyst & Tungsten 최적화 덕분에 성능이 훨씬 좋음.

1. DataFrame과 DataSet의 차이 (Spark 2.0+)

  • Spark 2.0부터 DataFrame은 Dataset[Row]로 구현됨
  • 즉, DataFrame = Dataset[Row] 이므로 DataFrame과 Dataset은 사실상 같은 개념
  • 다만, DataFrame은 Row 객체를 저장하고, Dataset은 Type-safe하게 사용할 수 있는 장점이 있음.

언제 DataFrame vs Dataset을 사용할까?

  • DataFrame (Dataset[Row]):
    • Python, R에서는 Dataset이 없기 때문에 DataFrame만 사용.
    • 일반적인 분석 및 SQL과 결합할 때 사용.
    • 동적 데이터 처리에 유리.
  • Dataset[T] (타입이 지정된 Dataset):
    • Scala, Java에서 강한 타입 체크(Type-Safety)가 필요할 때.
    • case class를 사용하여 구조화된 데이터를 처리할 때.
    • .map() 같은 연산에서 런타임 오류를 줄이고 싶을 때.

즉, Spark 2.x 이상에서는 DataFrame을 써도 되고, 타입 안정성이 필요하면 Dataset을 사용하면 됨.


2. RDD vs DataFrame 코드 비교

간단한 예제를 보면 차이를 이해하기 쉽다.

❌ RDD 방식 (구식, 비효율적)

val rdd = spark.sparkContext.parallelize(Seq(("Alice", 25), ("Bob", 30))) val rddMapped = rdd.map { case (name, age) => (name.toUpperCase, age + 10) }
  • 직접 데이터를 변환해야 함.
  • 최적화 불가능, 메모리 사용 비효율적.

✅ DataFrame 방식 (최적화된 방식)

import spark.implicits._ val df = Seq(("Alice", 25), ("Bob", 30)).toDF("name", "age") val dfTransformed = df.withColumn("new_age", $"age" + 10)
  • 더 읽기 쉽고 간결함.
  • Catalyst & Tungsten 최적화로 성능 향상.

4. 결론

  • RDD는 낮은 수준의 분산 데이터 구조 → 비효율적, 성능 최적화 어려움.
  • DataFrame은 스키마 기반, SQL 최적화 가능 → 더 빠르고 효율적.
  • Dataset은 DataFrame과 같은데, 타입 안정성(type-safe) 추가.
  • Spark 2.0+부터는 DataFrame = Dataset[Row]라서 Python에서는 사실상 Dataset이 필요 없음.
  • Scala/Java에서만 Dataset이 의미가 있음 (타입 안정성 필요할 때).

🚀 최신 Spark에서는 가능하면 RDD 대신 DataFrame이나 Dataset을 쓰는 것이 성능 면에서 훨씬 유리함.

 

결론: RDD는 거의 쓸 일이 없으며, DataFrame이 기본 선택이고, Scala/Java에서는 타입 안정성이 필요하면 Dataset을 고려.


🔥 Catalyst Optimizer란?

Catalyst Optimizer는 Spark의 SQL 및 DataFrame 연산을 최적화하는 쿼리 엔진이다.

작동 방식

  1. Abstract Syntax Tree (AST) 변환 → SQL & DataFrame을 논리 계획(Logical Plan)으로 변환.
  2. 논리적 최적화 → 컬럼 프루닝, 필터 푸시다운, 중복 제거 등 수행.
  3. 물리적 계획 최적화 → 실행 비용이 가장 낮은 플랜 선택.
  4. 코드 생성 (Code Generation) → 최종적으로 Java Bytecode 변환.

Catalyst Optimizer가 성능이 좋은 이유

  • 쿼리 계획을 자동으로 최적화하여 불필요한 연산을 제거.
  • 필터 푸시다운을 적용하여 최소한의 데이터만 읽음.
  • Project, Filter, Join 등의 연산을 효율적으로 재배열.

Tungsten Optimizer란?

Tungsten Optimizer는 메모리 및 CPU 성능을 최적화하는 Spark 엔진이다.

핵심 요소

  1. 바이트코드 생성 (Whole Stage Code Generation)
    • Spark의 DataFrame/Dataset 연산을 직접 Java 바이트코드로 변환하여 실행 속도를 극대화.
  2. 오프 힙 메모리 (Off-Heap Memory Management)
    • JVM의 가비지 컬렉션을 피하고, C++ Native 수준의 메모리 관리 수행.
  3. 컬럼 기반 메모리 저장 (Columnar Memory Format)
    • Pandas 같은 컬럼 단위 메모리 레이아웃을 사용하여 연산 속도 향상.
  4. 캐시 최적화 (Cache-aware Computation)
    • CPU L1/L2 캐시를 최적화하여 연산 속도를 극대화.

Tungsten Optimizer가 성능이 좋은 이유

  • RDD보다 메모리 효율적 (JVM GC 부담 ↓, Native 메모리 관리).
  • 컬럼 단위 연산을 수행해 벡터 연산 및 SIMD 명령어 활용 가능.
  • 불필요한 오브젝트 생성을 줄여서 GC 비용 최소화.

🚀 Catalyst + Tungsten = 고성능 Spark API

Catalyst는 연산을 논리적으로 최적화하고, Tungsten은 CPU와 메모리 성능을 최적화하기 때문에 Spark의 DataFrame/Dataset API가 RDD보다 빠름.

👉 결론: RDD보다 DataFrame/Dataset이 빠른 이유는 Catalyst와 Tungsten 최적화가 적용되기 때문!

 

🚀 Vectorized Execution (벡터화 실행)란?

**Vectorized Execution(벡터화 실행)**은 CPU의 SIMD(Single Instruction Multiple Data) 명령어를 활용하여 한 번의 연산으로 여러 개의 데이터를 동시에 처리하는 방식입니다.

즉, 한 번의 CPU 명령어로 여러 개의 데이터를 병렬로 계산할 수 있어 성능이 크게 향상됩니다.

 

Spark에서 벡터화 연산(Vectorized Execution)은 spark.createDataFrame을 사용해야만 최적화됩니다.
즉, RDD는 벡터화 연산이 적용되지 않으며, DataFrame을 사용해야만 벡터화 연산이 활성화됩니다.


🚀 왜 RDD에서는 벡터화 연산이 적용되지 않을까?

RDD는 기본적으로 JVM 객체(파이썬에서는 Python 객체) 기반의 데이터 구조를 사용하며, Spark의 Catalyst Optimizer 및 Tungsten Optimizer를 활용하지 못합니다.

RDD의 문제점

  1. 행(Row) 단위 실행 → 컬럼 단위로 데이터를 처리할 수 없음.
  2. 메모리 비효율 → 객체(오브젝트) 단위로 저장되어 JVM의 GC(Garbage Collection) 부담이 큼.
  3. 벡터화 실행 불가 → SIMD(Single Instruction Multiple Data) 활용이 불가능.
 
rdd = spark.sparkContext.parallelize([(1, 2, 3), (4, 5, 6), (7, 8, 9)])

# RDD는 행 단위 처리 → 벡터화 실행 안 됨.
rdd_sum = rdd.map(lambda row: row[0] + row[1] + row[2])
print(rdd_sum.collect())  # [6, 15, 24]

 

벡터화 실행 불가능 → 느리고 메모리 사용량이 많음.


🚀 DataFrame을 사용하면 벡터화 연산이 활성화됨

Spark의 DataFrame API는 컬럼 단위(Vectorized Execution) 연산이 가능하며, Catalyst & Tungsten 최적화가 적용됨.

 
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("Vectorized").getOrCreate()

df = spark.createDataFrame([(1, 2, 3), (4, 5, 6), (7, 8, 9)], ["a", "b", "c"])

# ✅ 컬럼 단위 벡터 연산
df = df.withColumn("sum", col("a") + col("b") + col("c"))
df.show()
 
+---+---+---+----+ 
| a | b | c | sum| 
+---+---+---+----+ 
| 1 | 2 | 3 | 6  | 
| 4 | 5 | 6 | 15 |
| 7 | 8 | 9 | 24 |
+---+---+---+----+

 

Catalyst & Tungsten 최적화가 자동 적용됨.
컬럼 단위 연산이므로 벡터화 실행 가능.
SIMD 활용하여 성능 향상.


🚀 Pandas UDFs(Vectorized UDFs)로 더욱 빠른 벡터 연산 가능

Spark의 **Pandas UDFs (Vectorized UDFs)**는 Apache Arrow를 활용하여 추가적인 벡터화 최적화를 제공합니다.

 

 

from pyspark.sql.functions import pandas_udf
import pandas as pd

# Pandas UDF (Vectorized Execution 적용)
@pandas_udf("int")
def vectorized_sum(a: pd.Series, b: pd.Series, c: pd.Series) -> pd.Series:
    return a + b + c  # Pandas의 벡터 연산 활용

df = df.withColumn("sum", vectorized_sum(df["a"], df["b"], df["c"]))
df.show()

 

Apache Arrow 기반 벡터 연산 → 최적화된 메모리 사용.
Pandas의 벡터 연산 활용 → Python에서도 성능 최적화 가능.


결론: RDD에서는 벡터화 연산이 안 되고, DataFrame을 사용해야 벡터화 실행이 가능

RDD는 행(Row) 단위 실행이므로 벡터화 연산 불가능.
DataFrame을 사용해야 Catalyst & Tungsten 최적화가 적용되며 벡터화 실행이 가능.
Pandas UDFs(Vectorized UDFs)를 활용하면 Arrow 기반으로 추가 최적화 가능.

 

즉, Spark에서 성능을 높이려면 반드시 spark.createDataFrame을 사용하여 DataFrame을 생성해야 하며, 가능하면 Pandas UDFs를 활용하는 것이 가장 좋음!

반응형
댓글
공지사항
반응형