1. Spark 데이터 처리

Spark 데이터 시스템 아키텍처

  • 자체 파일 시스템을 갖고 있지 않음
    • 기존의 분산 파일 시스템 사용
    • HDFS, AWS S3, Azure Blob, CGP Cloud Storage … : 내부 데이터
    • 외부 데이터: RDBMS, NoSQL 등
      • 주기적인 ETL로 내부 데이터로 불러옴 (Airflow 이용)
      • 필요할 때 바로 Spark로 처리 (Spark Streaming / 배치)
  • [파일 시스템] - [Resource Manager] - [Spark] -> 외부 데이터 / 내부 데이터로 저장

스크린샷 2024-01-16 오후 11 05 12

Partition

데이터 병렬 처리가 가능하려면

  • 데이터가 먼저 분산되어야 함
    • 하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록 (128MB)
      • hdfs-site.xml에 있는 dfs.block.size가 결정
    • Spark에서는 이를 Partition이라고 부름. 기본 크기도 128MB
      • spark.sql.files.maxPartitionBytes: HDFS 등에 있는 파일을 읽어올 때만 적용됨. 보통 HDFS 블록 사이즈와 맞춤
  • 다음으로 나눠진 데이터를 각각 따로 동시 처리
    • 맵리듀스에서 N개의 데이터 블록으로 구성된 파일 처리시 N개의 Map 태스크 실행
    • Spark에서는 파티션 단위로 메모리로 로드되어 Executor가 배정됨

파티셔닝 예시

  • 적절한 파티션 수 = Executor 수 x Executor 당 CPU 수 (이론적)

스크린샷 2024-01-17 오전 12 44 27

Spark 데이터 처리 흐름

  • 데이터프레임은 작은 파티션들로 구성됨
    • 데이터프레임은 한 번 만들어지면 수정 불가! (Immutable)
  • 입력 데이터프레임을 원하는 결과 도출까지 다른 데이터프레임으로 계속 변환
    • sort, group by, filter, map, join, …


  • 입력 데이터 -> 데이터프레임으로 변환: partition의 집합 -> operation -> 출력 데이터

  • 파티션 간에 데이터 이동 없이 계속 변환이 가능할까?

    • 불가능!

셔플링

  • 파티션 간에 데이터 이동이 필요한 경우 발생
    • 명시적 파티션을 새롭게 하는 경우 (파티션 수를 줄이기, 파티션 구성 변경 등)
    • 시스템에 의해 이루어지는 셔플링 (적용하는 오퍼레이션의 특징에 따라)
      • 그룹핑 등의 aggregation이나 sorting
  • 셔플링이 발생할 때 네트워크를 타고 데이터가 이동하게 됨
    • 결과로 만들어지는 파티션 개수는 spark.sql.shuffle.partitions 가 결정
      • default 값은 200이며, 이는 최대 파티션 수
    • 오퍼레이션에 따라 파티션 수가 결정됨
      • random, hashing partition(group by), range partition(key의 분포에 맞게 - sorting) 등
    • 이때 Data Skew 발생 가능!

Data Skewness

  • Data partitioning은 데이터 처리에 병렬성을 주지만, 단점도 존재
    • 데이터가 균등하게 분포하지 않는 경우 발생 가능
    • 셔플링을 최소화하는 것이 중요하고, 파티션 최적화를 하는 것이 중요

2. Spark 데이터 구조: RDD, DF, Dataset

Spark 데이터 구조

  • Immutable Distributed Data
  • RDD가 조금 더 low level
  • DataFrame과 Dataset은 하나의 API로 통합됨
  • 모두 파티션으로 나뉘어 Spark에서 처리됨
  • Catalyst Optimizer
    • 실제 쿼리를 physical한 RDD operation으로 바꿀 때 operation 별로 비용을 계산하고, 그 비용을 바탕으로 가장 경제적인 execution plan을 결정

스크린샷 2024-01-17 오전 1 11 15

RDD

  • low level 데이터로, 클러스터 내 서버에 분산된 데이터를 지칭
  • 레코드별로 존재하지만 스키마가 존재하지 않음
    • 구조화, 비구조화 데이터 모두 지원
  • python -> parallelize 함수로 RDD로 변환 -> schema (format) 지정해주어야 DataFrame으로 사용 가능
    • 반대는 collect 함수로 파이썬 데이터로 변환 가능

DataFrame과 Dataset

  • RDD 위에 만들어지는 RDD와는 달리 필드 정보를 갖고 있음 (테이블)
  • DataFrame
    • RDB 테이블처럼 컬럼으로 나누어 저장
    • 다양한 데이터소스 지원: HDFS, Hive, 외부 데이터베이스(JDBC로 연결), RDD 등
    • Scala, Java, Python과 같은 언어에서 지원
  • Dataset은 타입 정보가 존재하며, 컴파일 언어에서 사용 가능
    • 컴파일 언어 - Scala/Java 에서 사용 가능
  • PySpark에서는 DataFrame 사용

데이터 구조

가장 기본은 RDD - 위에 Spark SQL Engine - high level API로 DataFrame / Dataset

스크린샷 2024-01-17 오전 1 18 18

Spark SQL Engine

  • 작성한 DataFrame 코드나 Spark SQL 코드를 최적화하여 RDD Operation으로 -> 최종적으로 JAVA 코드로 만들어줌
  • RDD를 이용하면 이것이 의미가 없음

  • 4가지 step
    • Code Analysis
      • 어떤 테이블, 어떤 컬럼이 쓰이는지
      • typo 검사 등
    • Logical Optimization
      • 이 코드를 실행할 수 있는 여러 방안을 만들어냄
      • Catalyst Optimizer: 방안마다 비용 계산 -> 최종적으로 필요한 비용 파악
        • Stanard SQL 최적화 방식 사용
    • Physical Planning
      • 가장 비용이 적은 방안을 이용해 RDD Operation으로 코드 생성
      • 윗단에서는 SQL, DataFrame, Dataset -> RDD Operation
    • Code Generation
      • RDD Operation 코드를 Java 코드로 변경
      • ProjectTungsten(컴파일러 기술) 등으로 코드 최적화

3. Spark 프로그램 구조

Spark Session 생성

  • Spark 프로그램의 시작
    • 프로그램마다 Spark Session 하나를 만들어 Spark Cluster와 통신: Singleton 객체
    • Spark 2.0에서 처음 소개됨
  • Spark Session을 통해 Spark이 제공해주는 다양한 기능 사용
    • DataFrame, SQL, Streaming, ML API 모두 이 객체로 통신
    • config 메소드를 이용해 다양한 환경설정 가능
    • 단, RDD 관련 작업을 할 때는 SparkSession 밑의 sparkContext 객체 사용

PySpark 예제

  • spark 2.0에서 spark session이 처음으로 소개되었고, DataFrame, Dataset, Spark SQL 모두가 Spark SQL Engine 위에서 돌아가는 것으로 바뀌었기 때문에 pyspark.sql에서 import


from pyspark.sql import SparkSession

# SparkSession은 singleton
spark = SparkSession.builder\
        .master('local[*]')\
        .appName('PySpark Tutorial')\
        .getOrCreate()   # singleton

...

spark.stop()

Spark Session 환경 변수

  • 다양한 환경 설정 가능!
    • 사용하는 Resourge Manager에 따라 환경변수가 많이 달라짐
  • 몇 가지 예시
    • executor 별 메모리: spark.executor.memory (default: 1GB)
    • executor 별 CPU 수: spark.executor.cores (YARN default: 1)
    • driver 메모리: spark.driver.memory (defualt: 1GB)
    • suffle 후 partition 수: spark.sql.shuffle.patitions (default: 최대 200)

환경 변수 설정 방법

  • Spark Cluster Admin이 관리
    • 환경 변수
    • $SPARK_HOME/conf/spark_defaults.conf
  • spark-submit 명령의 커맨드라인 파라미터
  • SparkSession 만들 때 지정
    • SparkConf


  • 위 4개 방법 중 충돌시 우선순위는 아래로 갈수록 높음!

SpakrSession시 지정

from pyspark.sql import SparkSession

# SparkSession은 singleton
spark = SparkSession.builder\
        .master('local[*]')\
        .appName('PySpark Tutorial')\
        .config('spark.home.config.option1', 'some-value')\
        .config('spark.home.config.option2', 'some-value')\
        .getOrCreate()   
  • 이 시점의 Spark Configuration은 앞서 언급한 환경변수와 spark_defults.conf와 spakr-submit의 설정이 정리가 된 상태

SparkConf 객체에 설정하고 SparkSession에 지정

from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set('spark.app.name', 'PySpark Tutorial')
conf.set('spark.master', 'local[*]')

# SparkSession은 singleton
spark = SparkSession.builder\
        .config(config=conf)\
        .getOrCreate()   

전체적인 플로우

  • SparkSession 생성
    • 환경 설정
  • SparkSession API로 입력 데이터 로딩
  • 데이터 조작 작업 (Pandas와 아주 흡사)
    • DataFrame API나 Spark SQL 사용
    • 원하는 결과가 나올 때까지 새로운 DataFrame 생성
  • 최종 결과 저장

스크린샷 2024-01-17 오전 2 06 52

SparkSession이 지원하는 데이터 소스

  • spark.read(DataFrameReader)를 사용하여 데이터프레임으로 로드
  • spark.write(DataFrameWriter)를 사용하여 데이터프레임을 저장
  • 많이 사용되는 데이터 소스들
    • HDFS 파일
      • CSV, JSON, Parquet, ORC, Text, Avro
      • Hive 테이블
    • JDBC 관계형 데이터베이스
    • 클라우드 기반 데이터 시스템
    • 스트리밍 시스템

Local Standalone Spark

  • Spark Cluster Manager로 local[n] 지정
    • master를 local[n]으로 지정
    • master는 클러스터 매니저를 지정하는데 사용
  • 주로 개발이나 간단한 테스트 용도
  • 하나의 JVM에서 모든 프로세스 실행
    • 하나의 Driver와 하나의 Executor가 실행됨
    • 하나 이상의 thread가 Executor 안에서 실행됨
  • Executor 안에 생성되는 thread 수
    • local: 하나의 thread만 생성
    • local[*]: 컴퓨터 CPU 수만큼 생성

스크린샷 2024-01-17 오전 3 29 57