1. Spark 파일 포맷

  • 데이터는 디스크에 파일로 저장됨
    • 일에 맞게 최적화 필요!
Unstructured Semi-structured Structured
Text JSON</br>XML</br>CSV PARQUET</br>AVRO</br>ORC</br>SequenceFile

Spark의 주요 파일 타입

특징 CSV JSON PARQUET AVRO
컬럼 스토리지 X X O X
압축 가능 O O O O
Splittable O O O O
Human readable O O X X
Nested structure support X O O O
Schema evolution X X O O
  • 컬럼 스토리지 X -> 행 별로 저장
  • Splittable: HDFS 데이터 블록이 Spark에서 로딩될 때 partition으로 바로 올라갈 수 있는가
    • CSV, JSON은 압축되면 Splittable 하지 않음
  • Nested structure support: 서브필드가 가능한가
  • Schema evolution: 어느 시점에 새로운 컬럼이 생겨도 문제없이 사용 가능한가

Parquet

스크린샷 2024-01-27 오후 8 53 09

  • 마지막이 Parquet가 사용하는 방식 : Hybrid Storage
    • 하나의 데이터 블록은 하나의 Row Group으로 구성됨
    • Row Group 안에서는 column-wise storage

avro, parquet

df = spark.read \
    .format("csv") \
    .load("appl_stock.csv")

df2 = df.repartition(4)
print("Num Partitions after: " + str(df2.rdd.getNumPartitions()))
df2.groupBy(spark_partition_id()).count().show()

df3 = df2.coalesce(2)
print("Num Partitions after: " + str(df3.rdd.getNumPartitions()))
df3.groupBy(spark_partition_id()).count().show()
  • repartition()
    • 파티션 수를 늘리거나 줄일 수 있음
    • 셔플을 기반으로 동작 수행
    • 보통 파티션 수를 늘려야 하는 경우에만 사용
  • coalesce()
    • 파티션 수를 줄이는 것만 가능
    • 강제로 셔플을 수행하라는 옵션을 지정하지 않는 한 셔플을 사용하지 않음
    • 상대적으로 성능이 좋음


df.write \
    .format("avro") \
    .mode("overwrite") \
    .option("path", "dataOutput/avro/") \
    .save()

df2.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "dataOutput/parquet/") \
    .save()

Schema Evolution

스크린샷 2024-01-27 오후 9 03 39

  • 3개의 파일을 동시에 dataframe으로 로딩할 때 parquet 형식 -> 존재하지 않는 컬럼을 갖는 파일의 경우 필드의 값을 NULL로 채워줌
df = spark.read. \
    option("mergeSchema", True). \
    parquet("*.parquet")

2. Execution Plan (Spark 내부 동작)

  • spark은 결국 개발자가 만든 코드를 변환해서 실행

  • 예제

spark.read.option("header", True).csv("test.csv") \
    .where("gender <> 'F'") \
    .select("name", "gender") \
    .groupby("gender") \
    .count() \
    .show()
  • where -> select -> groupby -> count -> show
    • 해당 job은 2개의 stage로 구성됨!
    • [where, select], [groupby, count]


  • 로딩이 된 순간 데이터프레임은 하나 혹은 그 이상의 파티션으로 구성
  • filtering(where), selecting은 그 파티션 내에서 해결 가능한 연산
    • executor에서 돌아가는 태스크들이 독립적으로 작업 수행
    • 셔플링 필요 X
  • groupby
    • groupby 키에 맞게 같은 값을 갖는 레코드들이 같은 파티션으로 재정렬되어야 함
    • 셔플링 발생
  • count는 해당 파티션 안에서 병렬적으로 수행 가능
  • show : Action이라고 부름
    • 앞에 있던 데이터프레임 작업들을 실제로 수행시키는 역할!
    • spark이 lazy execution을 하기 때문
    • Action이라고 부르는 작업들이 수행될 때 앞에 있는 연산들이 수행됨!

Transformations, Actions

  • Actions
    • write, read, collect, show 등
    • job을 실행시킴 (실제 코드 실행됨)
      • 하나의 job은 다수의 transformation으로 구성
      • narrow / wide로 나누어 stage로 재구성
      • 즉, 하나의 job은 하나 혹은 그 이상의 stage로 구성됨!
    • Lazy Execution
      • 더 많은 오퍼레이션을 볼 수 있기 때문에 더 최적화를 잘할 수 있음
      • 그래서 Spark에서 SQL이 더 선호됨
  • Transformations
    • narrow transformation
      • 셔플링 없이 파티션 내에서 병렬적으로 작업
      • select, filter, map 등
    • wide transformation
      • 셔플링이 필요한 작업
      • groupby, reduceby, partitionby, repartition, coalesce 등

Jobs, Stages, Tasks

  • Action -> Job -> 1 + Stages -> 1 + Tasks


  • Action
    • Job을 한 개 만들어내고, 코드가 실제로 실행됨
  • Job
    • 하나 혹은 그 이상의 Stage로 구성됨
    • Stage는 Shuffling이 발생하는 경우 새로 생김
  • Stage
    • DAG의 형태로 구성된 Task들 존재
    • 여기 Task들은 병렬 실행이 가능
  • Task
    • 가장 작은 실행 유닛으로 Executor에 의해 실행됨

transformations, actions 시각화

spark.read.option("header", True).csv("test.csv") \
    .where("gender <> 'F'") \
    .select("name", "gender") \
    .groupby("gender") \
    .count() \
    .show()

스크린샷 2024-01-27 오후 9 52 33

  • header=True
    • 데이터를 읽을 때 바로 실행되어야 함
    • job이 하나 생성됨
    • 레코드 하나 읽어서 header 확인
  • 두번째 job은 show에 의해 trigger
  • groupby에 의해 stage 하나 추가됨

  • .option('inferSchema', True)가 추가되면 job이 하나 더 생성됨
    • column type 확인


  • Execution Plan
    • 내가 코드를 spark이 어떻게 실제 코드로 바꿔 실행해주는지 확인
    • 더 최적화할 point가 있는지, bottle neck이 있는지 확인

실습 1. WordCount

spark = SparkSession \
    .builder \
    .master('local[3]') \
    .appName('SparkSchemaDemo') \
    .config('spark.sql.adaptive.enabled', False) \
    .config('spark.sql.shuffle.partitions', 3) \
    .getOrCreate()

df = spark.read.text('shakespeare.txt')
df_count = df.select(explode(split(df.value, " ")).alias('word')).groupBy('word').count()

df_count.show()  # 이 시점에 앞의 코드들이 실행됨
  • txt 파일 -> 기본으로 value라는 컬럼이 하나 주어짐
  • spark.read.text('shakespeare.txt') : txt 파일은 read할 때 job이 생기지 않음 (action이 아님)
  • explode : 각 단어가 별개의 레코드가 됨


  • 위 코드는 한 개의 job, 두 개의 stage
  • 만약 show가 없다면 job이 생기지 않음! (Action이 없기 때문)
    • 의미없는 일읋 하고 있는 것
    • lazy execution의 장점 중 하나 : 의미없는 코드는 실행하지 않음


  • stage는 groupby를 기준으로 나뉘어짐

  • stage level 스크린샷 2024-01-27 오후 10 04 17

  • query level 스크린샷 2024-01-27 오후 10 04 42

실습 2. Join

spark = SparkSession \
    .builder \
    .master('local[3]') \
    .appName('SparkSchemaDemo') \
    .config('spark.sql.adaptive.enabled', False) \
    .config('spark.sql.shuffle.partitions', 3) \
    .getOrCreate()

df_large = spark.read.json('large_data/')
df_small = spark.read.json('small_data/')

join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, 'inner')

join_df.show()
  • 3개의 job이 생성됨 (read, read, show)
  • shuffle hashing join이 실행될 것
    • but, df_small이 매우 작다면 overhead 될 것
    • -> broadcasting join 사용


  • stage level 스크린샷 2024-01-27 오후 10 08 23

  • query level 스크린샷 2024-01-27 오후 10 09 21

Broadcast join

from pyspark.sql.functions import broadcast

spark = SparkSession \
    .builder \
    .master('local[3]') \
    .appName('SparkSchemaDemo') \
    .config('spark.sql.adaptive.enabled', False) \
    .config('spark.sql.shuffle.partitions', 3) \
    .getOrCreate()

df_large = spark.read.json('large_data/')
df_small = spark.read.json('small_data/')

join_expr = df_large.id == df_small.id
join_df = df_large.join(broadcast(df_small), join_expr, 'inner')

join_df.show()

스크린샷 2024-01-27 오후 10 12 21

  • broadcast 스크린샷 2024-01-27 오후 10 12 54

웹 UI로 보기

  • wordcount 예제
spark-submit --master 'local[3]' wordcount.py

<localhost:4040>

스크린샷 2024-01-28 오전 1 52 31

스크린샷 2024-01-28 오전 1 53 19

스크린샷 2024-01-28 오전 1 58 17

스크린샷 2024-01-28 오전 1 53 36

3. Bucketing, Partitioning

  • 입력 데이터가 얼마나 최적화되어있느냐에 따라 처리 시간을 단축하고 리소스를 덜 사용할 수 있다!


  • 둘 다 Hive 메타스토어의 사용 필요 : saveAsTable
  • 반복처리에 최적화된 방법으로 데이터 저장
  • Bucketing
    • Shuffling을 최대한 줄이는 것이 목적
    • 먼저 Aggregation이나 Window 함수나 Join에서 많이 사용되는 컬럼이 있는지 확인
    • 있다면 데이터를 이 특정 컬럼들을 기준으로 테이블로 저장
      • 이때 버킷의 수도 지정
  • File System Partitioning
    • File System에 저장되는 데이터를 특정 컬럼 혹은 컬럼들의 집합으로 나누어 저장하는 것
    • 특정 키 중심의 filtering 등을 많이 이용하는 경우
    • 원래 Hive에서 많이 사용
    • 데이터의 특정 컬럼들을 기준으로 폴더 구조를 만들어 데이터 저장 최적화
      • 위의 컬럼들을 Partition Key라고 부름

Bucketing

  • DataFrame을 특정 ID를 기준으로 나누어 테이블로 저장
  • 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
    • DataFrameWriter의 bucketBy 함수 사용
      • 인자: bucket의 수, 기준이 되는 컬럼 지정
  • 데이터의 특성을 잘 알고있는 경우 사용 가능

image


from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL 저장하기") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.sql.adaptive.enabled", False) \
    .enableHiveSupport() \
    .getOrCreate()

# Redshift와 연결해서 DataFrame으로 로딩하기
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234"

df_user_session_channel = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

df_session_timestamp = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.session_timestamp") \
    .load()

join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
df_join = df_user_session_channel.join(df_session_timestamp, join_expr, "inner")
df_join.show(10)

spark.sql("DROP TABLE IF EXISTS bk_usc")
spark.sql("DROP TABLE IF EXISTS bk_st")

df_user_session_channel.write.mode("overwrite").bucketBy(3, "sessionid").saveAsTable("bk_usc")
df_session_timestamp.write.mode("overwrite").bucketBy(3, "sessionid").saveAsTable("bk_st")

df_bk_usc = spark.read.table("bk_usc")
df_bk_st = spark.read.table("bk_st")

join_expr2 = df_bk_usc.sessionid == df_bk_st.sessionid
df_join2 = df_bk_usc.join(df_bk_st, join_expr2, "inner")

df_join2.show(10)

input("Waiting ...")

File System Partitioning

  • 데이터를 Partition Key 기반 폴더 (‘Partition’) 구조로 물리적으로 나누어 저장
    • DataFrame이 아닌 Hive에서 사용하는 Partitioning을 말하는 것
  • 예시와 이점
    • 굉장히 큰 로그 파일을 데이터 생성 시간 기반으로 데이터 읽기 연산을 많이 하는 경우
      • 데이터 자체를 연도-월-일의 폴더 구조로 저장
      • 보통 위의 구조로 이미 저장되는 경우가 많음
    • 이를 통해 데이터 읽기 과정을 최적화 (Scanning 과정이 줄어들거나 없어짐)
    • 데이터 관리도 쉬워짐 (Retention Policy 적용시)
  • 저장할 땐 DataFrameWriter의 partitionBy 사용
    • Partition Key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨!
    • Partition Key는 Cardinality가 가능한 낮은 것을 사용해야 함
      • Cardinality: 가능한 값의 경우의 수

스크린샷 2024-01-28 오전 2 23 41


df = spark.read.csv("appl_stock.csv", header=True, inferSchema=True)

df = df.withColumn("year", year(df.Date)) \
    .withColumn("month", month(df.Date))

# write
spark.sql("DROP TABLE IF EXISTS appl_stock")
df.write.partitionBy("year", "month").saveAsTable("appl_stock")

# read
df = spark.read.table("appl_stock").where("year = 2016 and month = 12")

spark.sql("SELECT * FROM appl_stock WHERE year = 2016 and month = 12").show(10)