Post

[spark] 배치 프로세싱

[spark] 배치 프로세싱

RDD

  • 스파크의 기본 추상화 객체

주요 구성 요소

  • 의존성 정보
    • 어떤 입력을 필요로 하고, 현재 RDD가 어떻게 만들어지는지

    • 새로운 결과를 만들어야 하는 경우, 스파크는 이 의존성 정보를 참고하고 연산을 재반복해 RDD를 재생성할 수 있음

  • 파티션
    • 스파크에서 작업을 실행기들에 분산하여 파티션 별로 병렬 연산할 수 있도록 함
  • **연산 함수: ****Partition => Iterator[T]**
    • RDD에 저장되는 데이터를 반복자 형태로 변환

RDD API의 문제점

  • Spark는 RDD API 기반의 연산, 표현식을 검사하지 못해 최적화할 방법이 없음
    • Join, filter, group by 등 여러 연산을 하더라도 spark에서는 람다 표현식으로만 보임

    • 특히 Pyspark의 경우, 연산 함수 Iterator[T] 데이터 타입을 제대로 인식하지 못함. 단지 파이썬 기본 객체로만 인식 (python이 java/scala와는 달리 타입이 없는 언어이기 때문)

  • Spark는 어떠한 데이터 압축 테크닉도 적용하지 못함
    • 제네릭 형태로 표현한 타입 T에 대한 정보를 전혀 얻을 수 없음

    • 그 타입의 객체 안에서 어떤 타입의 컬럼에 접근한다고 해도, spark는 알 수 없음

    • 결국 byte 뭉치로 직렬화해서 사용할 수 밖에 없음

→ Spark가 연산 순서를 재정렬해 개발자가 작성한 쿼리 대비 효과적인 질의 계획으로 바꿀 수 없음

SparkSQL

  • 구조화된 데이터를 처리하기 위한 스파크 모듈

  • DataFrame, Dataset이라 불리는 추상화를 제공하고, 분산 SQL 쿼리 엔진의 역할도 수행

  • 위에서 언급한 RDD의 문제점들을 해결할 수 있음

역할

  • SQL같은 질의 수행

  • spark 컴포넌트 통합

  • DataFrame, Dataset이 여러 프로그래밍 언어로 정형화 데이터 관련 작업을 단순화할 수 있도록 추상화해줌

  • 정형화된 파일 포맷(JSON, CSV, txt, avro, parqueet, orc 등)에서 스키마와 정형화 데이터를 읽고 쓰며, 데이터를 임시 테이블로 변환
    • RDD만 사용할 경우 → txt / sequence file 등 기본적인 형태의 파일만 제공됨
  • 빠른 데이터 탐색을 위해 대화형 Spark SQL Shell 제공

  • 표준 데이터베이스 JDBC/ODBC 커넥터를 통해 외부 도구와 연결할 수 있는 중간 역할 제공
    • Tableau / Snowflake / Power BI / Databricks 등 여러 애플리케이션과 쉽게 연결
  • 최종 실행을 위해 최적화된 질의 계획과 JVM을 위한 최적화된 코드 생성

장점

  1. 성능
    • RDD와 달리 연산, 표현식, 데이터 타입 정보를 모두 알 수 있음 → 연산 순서를 재정렬해 더 효과적인 질의 계획을 변경 가능
  • 카탈리스트 옵티마이저
    • 연산 쿼리를 받아 실행 계획으로 변환. 크게 아래 4단계의 변환 과정을 거쳐 RDD 생성
      • 분석

      • 논리적 최적화

      • 물리 계획 수립

      • 코드 생성

  1. 표현성
    • RDD에서 사용하는 람다 표현식보다 DataFrame API 메서드는 무엇을 하고자 하는지 명확하게 보임
  2. 일관성
    • python/java/scala 등 여러 언어로 작성한 스파크 코드의 형태가 거의 비슷함

Dataframe API

  • pandas의 dataframe의 영향을 많이 받음

  • 컬럼과 스키마를 가진 분산 인메모리 테이블처럼 동작

스키마

  • Dataframe을 위해 컬럼 이름과 데이터 타입을 정의한 것

  • 외부 데이터 소스에서 구조화된 데이터를 읽어올 때 사용

  • 읽을 때 스키마를 가져오는 방식과 달리, 미리 스키마를 정의하는 것은 여러 장점이 존재

    • 스파크가 데이터 타입을 추측해야 하는 책임을 덜어줌

    • 스파크가 스키마를 확정하기 위해서 파일의 많은 부분을 읽어들이려고 별도의 job을 만드는 것을 방지

    • 데이터가 스키마에 맞지 않는 경우를 조기에 발견할 수 있음

스키마 정의 방법

  • 프로그래밍 스타일
1
2
3
schema = StructType([StructField("author", StringType(), False),
										 StructField("title", StringType(), False),
										 StructField("pages", StringType(), False), ])
  • DDL
1
schema = "author STRING, title STRING, pages INT"

Explain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
== Parsed Logical Plan ==
'Sort ['hour ASC NULLS FIRST, 'minute ASC NULLS FIRST], true
+- Aggregate [hour#47, minute#48], [hour#47, minute#48, collect_set(ip#0, 0, 0) AS ip_list#49, count(ip#0) AS ip_count#50L]
   +- Project [ip#0, timestamp#33, method#2, endpoint#3, status_code#4, latency#5, latency_seconds#32, hour#47, minute(date_trunc(minute, timestamp#33, Some(Asia/Seoul)), Some(Asia/Seoul)) AS minute#48]
      +- Project [ip#0, timestamp#33, method#2, endpoint#3, status_code#4, latency#5, latency_seconds#32, hour(date_trunc(hour, timestamp#33, Some(Asia/Seoul)), Some(Asia/Seoul)) AS hour#47]
         +- Project [ip#0, to_timestamp(timestamp#1, None, TimestampType, Some(Asia/Seoul), true) AS timestamp#33, method#2, endpoint#3, status_code#4, latency#5, latency_seconds#32]
            +- Project [ip#0, timestamp#1, method#2, endpoint#3, status_code#4, latency#5, (cast(latency#5 as double) / cast(1000 as double)) AS latency_seconds#32]
               +- Relation [ip#0,timestamp#1,method#2,endpoint#3,status_code#4,latency#5] csv

== Analyzed Logical Plan ==
hour: int, minute: int, ip_list: array<string>, ip_count: bigint
Sort [hour#47 ASC NULLS FIRST, minute#48 ASC NULLS FIRST], true
+- Aggregate [hour#47, minute#48], [hour#47, minute#48, collect_set(ip#0, 0, 0) AS ip_list#49, count(ip#0) AS ip_count#50L]
   +- Project [ip#0, timestamp#33, method#2, endpoint#3, status_code#4, latency#5, latency_seconds#32, hour#47, minute(date_trunc(minute, timestamp#33, Some(Asia/Seoul)), Some(Asia/Seoul)) AS minute#48]
      +- Project [ip#0, timestamp#33, method#2, endpoint#3, status_code#4, latency#5, latency_seconds#32, hour(date_trunc(hour, timestamp#33, Some(Asia/Seoul)), Some(Asia/Seoul)) AS hour#47]
         +- Project [ip#0, to_timestamp(timestamp#1, None, TimestampType, Some(Asia/Seoul), true) AS timestamp#33, method#2, endpoint#3, status_code#4, latency#5, latency_seconds#32]
            +- Project [ip#0, timestamp#1, method#2, endpoint#3, status_code#4, latency#5, (cast(latency#5 as double) / cast(1000 as double)) AS latency_seconds#32]
               +- Relation [ip#0,timestamp#1,method#2,endpoint#3,status_code#4,latency#5] csv

== Optimized Logical Plan ==
Sort [hour#47 ASC NULLS FIRST, minute#48 ASC NULLS FIRST], true
+- Aggregate [hour#47, minute#48], [hour#47, minute#48, collect_set(ip#0, 0, 0) AS ip_list#49, count(ip#0) AS ip_count#50L]
   +- Project [ip#0, hour(date_trunc(hour, timestamp#33, Some(Asia/Seoul)), Some(Asia/Seoul)) AS hour#47, minute(date_trunc(minute, timestamp#33, Some(Asia/Seoul)), Some(Asia/Seoul)) AS minute#48]
      +- Project [ip#0, cast(timestamp#1 as timestamp) AS timestamp#33]
         +- Relation [ip#0,timestamp#1,method#2,endpoint#3,status_code#4,latency#5] csv

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [hour#47 ASC NULLS FIRST, minute#48 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(hour#47 ASC NULLS FIRST, minute#48 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=36]
      +- ObjectHashAggregate(keys=[hour#47, minute#48], functions=[collect_set(ip#0, 0, 0), count(ip#0)], output=[hour#47, minute#48, ip_list#49, ip_count#50L])
         +- Exchange hashpartitioning(hour#47, minute#48, 200), ENSURE_REQUIREMENTS, [plan_id=33]
            +- ObjectHashAggregate(keys=[hour#47, minute#48], functions=[partial_collect_set(ip#0, 0, 0), partial_count(ip#0)], output=[hour#47, minute#48, buf#64, count#65L])
               +- Project [ip#0, hour(date_trunc(hour, timestamp#33, Some(Asia/Seoul)), Some(Asia/Seoul)) AS hour#47, minute(date_trunc(minute, timestamp#33, Some(Asia/Seoul)), Some(Asia/Seoul)) AS minute#48]
                  +- Project [ip#0, cast(timestamp#1 as timestamp) AS timestamp#33]
                     +- FileScan csv [ip#0,timestamp#1] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/bokyung/spark-streaming-study/02_batch/data/log.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ip:string,timestamp:string>

Dataset API

  • Spark 2.0에서 Datafram + Dataset API를 하나로 통합함

  • Dataset은 정적 타입 API와 동적 타입 API의 두 특성을 모두 가짐

  • Dataset은 Java, Scala에서만 사용 가능 (타입 안전을 보장하는 언어)

    • Python, R은 타입 안전을 보장하지 않는 언어이기 때문에 사용 불가능

image

This post is licensed under CC BY 4.0 by the author.