티스토리 뷰
1. 스트림 프로세싱 (stream processing, Dstream)
- batch processing은 고정된 (큰)dataset에 한번 연산을 하는거였다면, stream processing은 끝없이 들어오는 데이터의 흐름을 연속적, 준실시간으로 처리하는것.
- IOT센서, 웹앱 상호작용, 신용카드 트랜잭션에 사용
- 실시간 대시보드, 온라인머신러닝등 다양한 분야에서 사용
- batch procssing과 같이 사용하되, 서로의 약점을 보완하고, 강점을 취하는 방식으로도 사용 (lamda 아키텍쳐)
2. stream processing 방법
- 크게 두가지 방법론이 존재
- 레코드 단위 처리 모델 (record-at-a-time processing model)
- 마이크로배치 스트림 처리 모델 (micro batch stream processing model)
2-1 레코드 단위 처리모델 (record-at-a-time processing model)
- 각 노드는 지속적으로 한개의 한번의 레코드를 받게됨
- 그 레코드를 처리하여 생성된 다음 레코드는 그래프상의 다음 노드로 보냄.
- 장점: 응답시간(latency)가 매우 짧음 (ms단위로도 가능)
- 단점: 높은처리량(throughtput)을 달성하기 어렵고, 특정노드에 장애가 발생시 복구가 여려움
2-2 마이크로배치 스트림 처리모델 (micro batch stream processing model)
- spark streaming 에서 기본적으로 사용하는 방식
- stream processing을 아주 작은 batch processing을 처리하는 방식으로 생각하는것
- 장점: 높은처리량
- 단점: 느린반응속도 (ms 단위로 처리하기는 어려움 몇 초 단위로는 가능)
- 채택배경: 대부분의 데이터 파이프라인 에서는 ms단위의 작업이 필요하지 않고, 이 단계에서 빠른 반응속도를 갖춘다 하더라도, 다른 곳에서 지연일 발생할 확률이 높음
3. spark streaming 종류
- spark 프레임워크에서는 batch, streaming을 다루는 코드가 비슷함.
- spark streaming에서는 두가지 api를 제공함
- Dstream
- Batch의 RDD api기반으로 작성됨.
- RDD api와 마찬가지로, 개발자들이 작성한 코드의 동일한 순서로 연산을 수행 ( optimizer 에의한 최적화 발생 X )
- Event time window 지원 부족 (Processing time window만 지원)
- Event time? -> 실제로 스트리밍 이벤트가 실제 생성된시간
- Processing time? -> (만들어진 이벤트가 실제로 spark streaming에 들어온시간)
- Spark Structured Streaming
- Dstream의 단점들을 극복하고, Streaming processing 작업이 Batch processing 코드 작성만큼 쉬워야하다는 기본원칙을 갖고 설계
- Dstream
4 . Spark Structured Streaming
- 데이터의 stream을 무한하게 연속적으로 추가되는 데이터의 테이블 개념으로 간주
- 매번 결과 테이블이 갱신될때마다 아래의 세가지 기능을 제공
- 1. append모드 - 지난 트리거이후로 결과테이블에 새로추가된 행만 외부 저장소에 기록
- 2. update모드 - 지난 트리거 이후에 결과 테이블에 갱신된 행들만 외부저장소에 기록
- 3. complete모드 - 갱신된 전체테이블을 외부 저장소에 기록
5 . 코드구현
5-1. DStream
DStream은 Spark 1.x에서 도입된 스트리밍 API로, 배치 간격(micro-batch)을 기반으로 스트리밍 데이터를 처리.
DStream은 RDD의 연속으로 볼 수 있으며, 상태 유지와 같은 기능은 있지만 워터마크(늦게 도착하는 데이터 처리를 위해 설계된 기능)와 같은 고급 기능은 제공하지 않음
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# SparkContext 생성
sc = SparkContext("local[2]", "DStreamExample")
ssc = StreamingContext(sc, 5) # 5초 배치 간격
# Socket으로부터 데이터 수신
lines = ssc.socketTextStream("localhost", 9999)
# Word count 수행
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
# 결과 출력
word_counts.pprint()
# 스트리밍 시작
ssc.start()
ssc.awaitTermination()
특징
- 데이터를 배치 간격으로 처리(예: 5초).
- RDD 연산 기반.
- 워터마크 기능 없음. 따라서 늦게 도착하는 데이터 처리를 직접 관리해야 함.
5-1. Structured Streaming
Structured Streaming은 Spark 2.x 이후 도입된 API로, 데이터프레임(DataFrame)과 데이터셋(DataSet)을 기반으로 한 스트리밍 처리를 지원하는데, 이벤트 시간 기반 처리, 워터마크, 상태 저장(stateful operations) 등을 제공하며, 더 직관적이고 강력함
예제 코드: Structured Streaming + 워터마크
from pyspark.sql import SparkSession
from pyspark.sql.functions import window
# SparkSession 생성
spark = SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()
# 스트리밍 데이터 읽기 (소켓)
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
# 워터마크를 사용한 이벤트 시간 기반 처리
words = lines.selectExpr("explode(split(value, ' ')) as word")
word_counts = words.groupBy(window("timestamp", "10 minutes", "5 minutes"), "word") \
.count() \
.withWatermark("timestamp", "10 minutes") # 워터마크 설정
# 결과를 콘솔에 출력
query = word_counts.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
특징
- 워터마크 지원: withWatermark("timestamp", "10 minutes")로 늦게 도착하는 데이터의 허용 시간을 설정.
- 데이터프레임과 SQL 기반 API를 사용하여 직관적이고 유연.
- 이벤트 시간(Event Time) 처리 가능.
- 여러 출력 모드(append, complete, update) 지원.
- 상태 저장 연산(Stateful Operations) 지원.
주요 차이점 요약
특징DStreamStructured Streaming
API 기반 | RDD | DataFrame/DataSet |
처리 단위 | Micro-batch | Micro-batch (Continuous 모드도 가능) |
워터마크 지원 | 없음 | 있음 (withWatermark) |
이벤트 시간 처리 | 제한적 | 가능 |
출력 모드 | 기본 출력만 지원 | Append, Complete, Update |
성능 최적화 | 제한적 | Catalyst Optimizer를 통한 최적화 |
코드 복잡성 | 비교적 복잡 | 직관적, SQL 사용 가능 |
결론
- DStream은 더 저수준의 RDD 기반 접근 방식을 제공하므로 복잡한 처리 로직을 직접 관리해야 합니다.
- Structured Streaming은 고수준 API로, 워터마크 및 이벤트 시간 기반 처리 등 강력한 기능을 제공합니다.
따라서, 새로운 프로젝트에서는 가능한 한 Structured Streaming을 사용하는 것이 권장됩니다.
6. DStream과 Structured Streaming 의 공존
6-1. 역사적 이유: DStream의 등장과 한계
- **DStream (Discretized Stream)**은 Spark 1.x 시절의 스트리밍 API로, 마이크로배치 기반의 스트리밍 처리를 위해 설계되었습니다.
- 당시에는 RDD(Resilient Distributed Dataset)를 기반으로 설계된 스트리밍 모델이 일반적이었고, Spark Streaming이 이런 흐름에 맞춰 개발되었습니다.
- 초기 Spark 사용자는 RDD를 이미 사용하고 있었기 때문에, RDD의 개념을 확장한 DStream은 익숙하고 자연스러운 선택이었습니다.
- 그러나 DStream은 다음과 같은 한계를 가졌습니다:
- 성능 제한: Catalyst Optimizer나 Tungsten 엔진을 활용할 수 없어 최적화에 제약이 있었습니다.
- 유연성 부족: 이벤트 시간(event-time) 처리와 같은 고급 기능을 지원하지 않았습니다.
- 코드 복잡성: 상태 관리나 복잡한 연산 로직을 구현할 때 더 많은 수작업이 필요했습니다.
DStream은 여전히 많은 애플리케이션에서 사용되었기 때문에 backward compatibility(하위 호환성)를 유지하며, 이를 대체할 구조적인 접근 방식이 필요했습니다.
6-2. Structured Streaming의 등장과 필요성
- Spark 2.x부터는 고급 데이터 분석을 위해 SQL과 DataFrame API가 발전했습니다. 이러한 고수준 API를 스트리밍에 적용하기 위해 Structured Streaming이 설계되었습니다.
- Structured Streaming은 다음과 같은 이점을 제공했습니다:
- 통합 API: 배치(batch)와 스트리밍 작업을 동일한 DataFrame/DataSet API로 처리할 수 있음.
- 성능 최적화: Catalyst Optimizer와 Tungsten 엔진을 활용하여 쿼리를 최적화.
- 유연한 시간 처리: 이벤트 시간(event-time)과 워터마크 처리(watermarking)를 지원.
- 직관적인 코드 작성: SQL과 유사한 접근 방식으로 코드 작성이 더 쉬움.
- 연속 처리 옵션: 기존 마이크로 배치 기반 처리 외에 Continuous Processing을 통해 지연(latency)을 줄임.
Structured Streaming은 기존의 DStream 한계를 해결하며, Spark의 데이터 처리 생태계를 확장하는 중요한 역할을 했습니다.
6-3. DStream과 Structured Streaming을 모두 유지하는 이유
- Backward Compatibility (하위 호환성):
- DStream은 초기 Spark 스트리밍 애플리케이션에서 널리 사용되었습니다. 이를 제거하면 기존의 많은 시스템이 중단될 위험이 있습니다.
- 대규모 시스템에서 완전히 새로운 API로 전환하는 것은 비용이 크기 때문에, Spark는 DStream을 계속 지원합니다.
- 유연성 제공:
- DStream은 RDD 기반으로 더 낮은 수준의 제어가 가능하므로, 특정 상황(예: 복잡한 상태 관리, 사용자 정의 처리 등)에서 여전히 유용할 수 있습니다.
- Structured Streaming은 높은 수준의 API를 제공하지만, DStream은 더 세부적인 제어가 필요한 애플리케이션에 적합합니다.
- 마이그레이션 지원:
- DStream에서 Structured Streaming으로의 전환을 점진적으로 할 수 있도록, 두 API를 동시에 제공합니다.
- 많은 프로젝트에서 기존 DStream 기반 코드를 서서히 Structured Streaming으로 전환 중입니다.
- 다양한 사용자 요구사항:
- 일부 사용자는 단순히 RDD에 익숙하며, RDD 기반의 처리 방식을 선호합니다.
- 반면, 최신 사용자는 Structured Streaming의 고급 기능과 SQL 기반 처리에 더 관심이 있습니다.
6-4. 장기적인 방향
- Spark 커뮤니티는 Structured Streaming을 주요 스트리밍 API로 강조하고 있습니다.
- 새로운 기능과 최적화는 Structured Streaming에서만 제공됩니다.
- DStream은 더 이상 주요 업데이트가 없으며, deprecated되지는 않았지만 "legacy" API로 간주됩니다.
- 대부분의 새로운 프로젝트는 Structured Streaming으로 구현되며, DStream은 기존 시스템 유지보수를 위해 제공됩니다.
DStream은 Spark 초기의 설계 철학과 유저 기반을 반영한 API로, 기존 시스템과의 호환성을 위해 유지함. 반면,
Structured Streaming은 Spark 생태계의 현재와 미래를 위한 API로, 더 많은 기능과 유연성을 제공
새로운 프로젝트를 시작하거나 고성능 요구사항이 있다면, Structured Streaming을 사용하는 것이 권장
7. 코드사용 가이드
1.SparkSession생성
2.StructType생성
3.SparkSession.readStream (읽기)
4.SparkSession.writeStream (쓰기)
7-1. SparkSession 생성
Structured Streaming의 모든 작업은 SparkSession을 통해 시작됩니다.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("StructuredStreamingGuide") \
.getOrCreate()
유의할 점:
- 스트리밍 작업은 클러스터 모드에서 실행되므로, 로컬 모드(local[*])는 테스트 용도로만 사용하세요.
- getOrCreate() 메서드는 동일한 애플리케이션 내에서 중복 생성 방지에 유용합니다.
7-2. StructType 생성
읽을 데이터의 스키마를 명시적으로 정의해야 합니다. 이는 데이터가 JSON, CSV와 같은 형식일 때 특히 중요합니다.
from pyspark.sql.types import StructType, StringType, TimestampType
schema = StructType() \
.add("value", StringType()) \
.add("timestamp", TimestampType())
유의할 점:
- 데이터 형식(JSON, CSV 등)에 따라 스키마를 정의하지 않으면 Spark가 자동으로 추론하지만, 이는 비효율적일 수 있습니다.
- 스키마는 데이터의 안정성과 성능을 보장합니다.
7-3. SparkSession.readStream: 데이터 읽기
readStream을 사용해 다양한 소스에서 스트리밍 데이터를 읽습니다. 주요 지원 형식과 옵션은 다음과 같습니다.
지원되는 입력 형식 (format):
1.socket:
- 네트워크 소켓에서 데이터를 읽습니다.
- 주로 테스트 용도로 사용.
df = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
2.file:
- 디렉토리에서 새로 추가되는 파일을 모니터링하며 데이터를 읽음.
df = spark.readStream \
.format("csv") \
.schema(schema) \
.option("path", "/path/to/dir") \
.option("maxFilesPerTrigger", 1) \
.load()
3.kafka:
- Kafka 토픽에서 데이터를 읽습니다.
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "topic1") \
.load()
공통 옵션:
옵션설명path | 읽을 데이터의 경로. 파일 소스에서 필수. |
host / port | 소켓 소스에서 사용하는 옵션. |
kafka.bootstrap.servers | Kafka 클러스터 서버 주소. |
subscribe | Kafka 토픽 이름. |
startingOffsets | Kafka 소스에서 읽기 시작할 위치 (latest, earliest). |
maxFilesPerTrigger | 파일 소스에서 한 번에 읽을 파일 수를 제한. |
7-4. SparkSession.writeStream: 데이터 쓰기
writeStream은 읽은 데이터를 다양한 형식으로 출력할 때 사용됩니다.
지원되는 출력 형식 (format):
1.console:
- 데이터를 콘솔에 출력. 주로 디버깅 용도로 사용.
query = df.writeStream \
.format("console") \
.outputMode("append") \
.start()
- 데이터를 디렉토리에 저장. 각 배치마다 파일이 생성
query = df.writeStream \
.format("csv") \
.option("path", "/output/path") \
.option("checkpointLocation", "/checkpoint/path") \
.start()
3.kafka:
- Kafka 토픽으로 데이터를 출력.
query = df.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "outputTopic") \
.start()
공통 옵션:
옵션설명path | 데이터를 저장할 경로. |
checkpointLocation | 상태 정보를 저장할 체크포인트 경로. 필수. |
kafka.bootstrap.servers | Kafka 서버 주소. |
topic | Kafka 토픽 이름. |
truncate | 콘솔 출력에서 결과를 생략하지 않을지 설정 (기본값: true). |
outputMode 옵션:
- append: 새 데이터만 출력.
- update: 기존 데이터 업데이트.
- complete: 전체 집계 결과 출력.
7-5. 스트리밍 쿼리 관리
1.start:
- 스트리밍 작업을 시작합니다.
query = df.writeStream \
.format("console") \
.start()
2.awaitTermination:
- 쿼리가 중단될 때까지 대기합니다.
query.awaitTermination()
3.쿼리 상태 확인:
query.status # 현재 상태
query.lastProgress # 최근 진행 상태
query.stop() # 쿼리 중단
7-6 유의할 점: 스트리밍에서 GroupBy와 제한 사항
1.그룹화는 가능하나, 상태 저장과 워터마크 필요:
- 단순한 groupBy는 스트리밍 환경에서 사용할 수 없습니다.
- 윈도우를 추가하거나 워터마크를 설정해야 상태 저장(stateful) 집계가 가능
2.상태 저장 연산(Stateful Operations):
- groupBy와 window 연산은 Spark가 메모리에 상태를 저장하므로 반드시 체크포인트를 설정하여야
3.상태 크기 관리:
- 상태 크기는 워터마크를 통해 관리해야 합니다. 그렇지 않으면 메모리 부족 문제가 발생할 수 있습니다.
query = df \
.withWatermark("timestamp", "10 minutes") \
.groupBy("key") \
.count() \
.writeStream \
.option("checkpointLocation", "/checkpoint/path") \
.start()
8. Dstream vs Structured stream 차이점
- 코드에 SparkSession이 있다면 Structured Streaming.
- 코드에 StreamingContext가 있다면 DStream.
- RDD 연산(flatMap, reduceByKey)이 많다면 DStream.
- SQL 스타일 연산(groupBy, select)이 많다면 Structured Streaming.
- 워터마크(withWatermark)가 있다면 Structured Streaming.
- 출력이 pprint ,saveAsTextFiles 면 Dstream , writeStream.format 이면 Structured Streaming
참조링크
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
'개발' 카테고리의 다른 글
[ETL] Apache Spark - RDD vs DataFrame (0) | 2025.02.15 |
---|---|
[ETL] Apache Spark - window (2) (0) | 2025.01.21 |
🍜 Python asyncio - 비동기 asyncio.gather / asyncio.create_task 응용편 (0) | 2024.11.07 |
🍜 Python asyncio - 웨이터와 주방으로 이해하기 (1) | 2024.11.06 |
[Vercel] Erorr: Fix the Upstream Dependency conflict (1) | 2024.07.18 |
- Total
- Today
- Yesterday
- 윈도우pscale설치
- SSR
- Python
- datalabeling
- 함수형프로그래밍
- nextj이미지저장
- next.js
- create_task
- 비동기
- k8s
- kubectl
- window
- supervised
- 우테코
- nodejs
- planetscale배포
- Tailwind
- asyncio.gather
- CloudFlare
- helm
- pscale
- 위즈윅에디터
- 타입스크립트
- asyncio
- 42서울
- ADT
- semi-supervised
- un-supervised
- 대수자료구조
- iris
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |