[DEV] 14주차. Hadoop과 Spark (4)
1. Spark 파일 포맷
- 데이터는 디스크에 파일로 저장됨
- 일에 맞게 최적화 필요!
Unstructured | Semi-structured | Structured |
---|---|---|
Text | JSON</br>XML</br>CSV | PARQUET</br>AVRO</br>ORC</br>SequenceFile |
Spark의 주요 파일 타입
특징 | CSV | JSON | PARQUET | AVRO |
---|---|---|---|---|
컬럼 스토리지 | X | X | O | X |
압축 가능 | O | O | O | O |
Splittable | O | O | O | O |
Human readable | O | O | X | X |
Nested structure support | X | O | O | O |
Schema evolution | X | X | O | O |
- 컬럼 스토리지 X -> 행 별로 저장
- Splittable: HDFS 데이터 블록이 Spark에서 로딩될 때 partition으로 바로 올라갈 수 있는가
- CSV, JSON은 압축되면 Splittable 하지 않음
- Nested structure support: 서브필드가 가능한가
- Schema evolution: 어느 시점에 새로운 컬럼이 생겨도 문제없이 사용 가능한가
Parquet
- 마지막이 Parquet가 사용하는 방식 : Hybrid Storage
- 하나의 데이터 블록은 하나의 Row Group으로 구성됨
- Row Group 안에서는 column-wise storage
avro, parquet
df = spark.read \
.format("csv") \
.load("appl_stock.csv")
df2 = df.repartition(4)
print("Num Partitions after: " + str(df2.rdd.getNumPartitions()))
df2.groupBy(spark_partition_id()).count().show()
df3 = df2.coalesce(2)
print("Num Partitions after: " + str(df3.rdd.getNumPartitions()))
df3.groupBy(spark_partition_id()).count().show()
repartition()
- 파티션 수를 늘리거나 줄일 수 있음
- 셔플을 기반으로 동작 수행
- 보통 파티션 수를 늘려야 하는 경우에만 사용
coalesce()
- 파티션 수를 줄이는 것만 가능
- 강제로 셔플을 수행하라는 옵션을 지정하지 않는 한 셔플을 사용하지 않음
- 상대적으로 성능이 좋음
df.write \
.format("avro") \
.mode("overwrite") \
.option("path", "dataOutput/avro/") \
.save()
df2.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "dataOutput/parquet/") \
.save()
Schema Evolution
- 3개의 파일을 동시에 dataframe으로 로딩할 때 parquet 형식 -> 존재하지 않는 컬럼을 갖는 파일의 경우 필드의 값을 NULL로 채워줌
df = spark.read. \
option("mergeSchema", True). \
parquet("*.parquet")
2. Execution Plan (Spark 내부 동작)
-
spark은 결국 개발자가 만든 코드를 변환해서 실행
-
예제
spark.read.option("header", True).csv("test.csv") \
.where("gender <> 'F'") \
.select("name", "gender") \
.groupby("gender") \
.count() \
.show()
- where -> select -> groupby -> count -> show
- 해당 job은 2개의 stage로 구성됨!
- [where, select], [groupby, count]
- 로딩이 된 순간 데이터프레임은 하나 혹은 그 이상의 파티션으로 구성
- filtering(where), selecting은 그 파티션 내에서 해결 가능한 연산
- executor에서 돌아가는 태스크들이 독립적으로 작업 수행
- 셔플링 필요 X
- groupby
- groupby 키에 맞게 같은 값을 갖는 레코드들이 같은 파티션으로 재정렬되어야 함
- 셔플링 발생
- count는 해당 파티션 안에서 병렬적으로 수행 가능
- show : Action이라고 부름
- 앞에 있던 데이터프레임 작업들을 실제로 수행시키는 역할!
- spark이 lazy execution을 하기 때문
- Action이라고 부르는 작업들이 수행될 때 앞에 있는 연산들이 수행됨!
Transformations, Actions
- Actions
- write, read, collect, show 등
- job을 실행시킴 (실제 코드 실행됨)
- 하나의 job은 다수의 transformation으로 구성
- narrow / wide로 나누어 stage로 재구성
- 즉, 하나의 job은 하나 혹은 그 이상의 stage로 구성됨!
- Lazy Execution
- 더 많은 오퍼레이션을 볼 수 있기 때문에 더 최적화를 잘할 수 있음
- 그래서 Spark에서 SQL이 더 선호됨
- Transformations
- narrow transformation
- 셔플링 없이 파티션 내에서 병렬적으로 작업
- select, filter, map 등
- wide transformation
- 셔플링이 필요한 작업
- groupby, reduceby, partitionby, repartition, coalesce 등
- narrow transformation
Jobs, Stages, Tasks
- Action -> Job -> 1 + Stages -> 1 + Tasks
- Action
- Job을 한 개 만들어내고, 코드가 실제로 실행됨
- Job
- 하나 혹은 그 이상의 Stage로 구성됨
- Stage는 Shuffling이 발생하는 경우 새로 생김
- Stage
- DAG의 형태로 구성된 Task들 존재
- 여기 Task들은 병렬 실행이 가능
- Task
- 가장 작은 실행 유닛으로 Executor에 의해 실행됨
transformations, actions 시각화
spark.read.option("header", True).csv("test.csv") \
.where("gender <> 'F'") \
.select("name", "gender") \
.groupby("gender") \
.count() \
.show()
header=True
- 데이터를 읽을 때 바로 실행되어야 함
- job이 하나 생성됨
- 레코드 하나 읽어서 header 확인
- 두번째 job은
show
에 의해 trigger -
groupby
에 의해 stage 하나 추가됨 .option('inferSchema', True)
가 추가되면 job이 하나 더 생성됨- column type 확인
- Execution Plan
- 내가 코드를 spark이 어떻게 실제 코드로 바꿔 실행해주는지 확인
- 더 최적화할 point가 있는지, bottle neck이 있는지 확인
실습 1. WordCount
spark = SparkSession \
.builder \
.master('local[3]') \
.appName('SparkSchemaDemo') \
.config('spark.sql.adaptive.enabled', False) \
.config('spark.sql.shuffle.partitions', 3) \
.getOrCreate()
df = spark.read.text('shakespeare.txt')
df_count = df.select(explode(split(df.value, " ")).alias('word')).groupBy('word').count()
df_count.show() # 이 시점에 앞의 코드들이 실행됨
- txt 파일 -> 기본으로
value
라는 컬럼이 하나 주어짐 spark.read.text('shakespeare.txt')
: txt 파일은 read할 때 job이 생기지 않음 (action이 아님)explode
: 각 단어가 별개의 레코드가 됨
- 위 코드는 한 개의 job, 두 개의 stage
- 만약
show
가 없다면 job이 생기지 않음! (Action이 없기 때문)- 의미없는 일읋 하고 있는 것
- lazy execution의 장점 중 하나 : 의미없는 코드는 실행하지 않음
-
stage는 groupby를 기준으로 나뉘어짐
-
stage level
-
query level
실습 2. Join
spark = SparkSession \
.builder \
.master('local[3]') \
.appName('SparkSchemaDemo') \
.config('spark.sql.adaptive.enabled', False) \
.config('spark.sql.shuffle.partitions', 3) \
.getOrCreate()
df_large = spark.read.json('large_data/')
df_small = spark.read.json('small_data/')
join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, 'inner')
join_df.show()
- 3개의 job이 생성됨 (read, read, show)
- shuffle hashing join이 실행될 것
- but, df_small이 매우 작다면 overhead 될 것
- -> broadcasting join 사용
-
stage level
-
query level
Broadcast join
from pyspark.sql.functions import broadcast
spark = SparkSession \
.builder \
.master('local[3]') \
.appName('SparkSchemaDemo') \
.config('spark.sql.adaptive.enabled', False) \
.config('spark.sql.shuffle.partitions', 3) \
.getOrCreate()
df_large = spark.read.json('large_data/')
df_small = spark.read.json('small_data/')
join_expr = df_large.id == df_small.id
join_df = df_large.join(broadcast(df_small), join_expr, 'inner')
join_df.show()
- broadcast
웹 UI로 보기
- wordcount 예제
spark-submit --master 'local[3]' wordcount.py
<localhost:4040>
3. Bucketing, Partitioning
- 입력 데이터가 얼마나 최적화되어있느냐에 따라 처리 시간을 단축하고 리소스를 덜 사용할 수 있다!
- 둘 다 Hive 메타스토어의 사용 필요 :
saveAsTable
- 반복처리에 최적화된 방법으로 데이터 저장
- Bucketing
- Shuffling을 최대한 줄이는 것이 목적
- 먼저 Aggregation이나 Window 함수나 Join에서 많이 사용되는 컬럼이 있는지 확인
- 있다면 데이터를 이 특정 컬럼들을 기준으로 테이블로 저장
- 이때 버킷의 수도 지정
- File System Partitioning
- File System에 저장되는 데이터를 특정 컬럼 혹은 컬럼들의 집합으로 나누어 저장하는 것
- 특정 키 중심의 filtering 등을 많이 이용하는 경우
- 원래 Hive에서 많이 사용
- 데이터의 특정 컬럼들을 기준으로 폴더 구조를 만들어 데이터 저장 최적화
- 위의 컬럼들을 Partition Key라고 부름
Bucketing
- DataFrame을 특정 ID를 기준으로 나누어 테이블로 저장
- 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
- DataFrameWriter의
bucketBy
함수 사용- 인자: bucket의 수, 기준이 되는 컬럼 지정
- DataFrameWriter의
- 데이터의 특성을 잘 알고있는 경우 사용 가능
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL 저장하기") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.sql.adaptive.enabled", False) \
.enableHiveSupport() \
.getOrCreate()
# Redshift와 연결해서 DataFrame으로 로딩하기
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234"
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.user_session_channel") \
.load()
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_timestamp") \
.load()
join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
df_join = df_user_session_channel.join(df_session_timestamp, join_expr, "inner")
df_join.show(10)
spark.sql("DROP TABLE IF EXISTS bk_usc")
spark.sql("DROP TABLE IF EXISTS bk_st")
df_user_session_channel.write.mode("overwrite").bucketBy(3, "sessionid").saveAsTable("bk_usc")
df_session_timestamp.write.mode("overwrite").bucketBy(3, "sessionid").saveAsTable("bk_st")
df_bk_usc = spark.read.table("bk_usc")
df_bk_st = spark.read.table("bk_st")
join_expr2 = df_bk_usc.sessionid == df_bk_st.sessionid
df_join2 = df_bk_usc.join(df_bk_st, join_expr2, "inner")
df_join2.show(10)
input("Waiting ...")
File System Partitioning
- 데이터를 Partition Key 기반 폴더 (‘Partition’) 구조로 물리적으로 나누어 저장
- DataFrame이 아닌 Hive에서 사용하는 Partitioning을 말하는 것
- 예시와 이점
- 굉장히 큰 로그 파일을 데이터 생성 시간 기반으로 데이터 읽기 연산을 많이 하는 경우
- 데이터 자체를 연도-월-일의 폴더 구조로 저장
- 보통 위의 구조로 이미 저장되는 경우가 많음
- 이를 통해 데이터 읽기 과정을 최적화 (Scanning 과정이 줄어들거나 없어짐)
- 데이터 관리도 쉬워짐 (Retention Policy 적용시)
- 굉장히 큰 로그 파일을 데이터 생성 시간 기반으로 데이터 읽기 연산을 많이 하는 경우
- 저장할 땐 DataFrameWriter의
partitionBy
사용- Partition Key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨!
- Partition Key는 Cardinality가 가능한 낮은 것을 사용해야 함
- Cardinality: 가능한 값의 경우의 수
df = spark.read.csv("appl_stock.csv", header=True, inferSchema=True)
df = df.withColumn("year", year(df.Date)) \
.withColumn("month", month(df.Date))
# write
spark.sql("DROP TABLE IF EXISTS appl_stock")
df.write.partitionBy("year", "month").saveAsTable("appl_stock")
# read
df = spark.read.table("appl_stock").where("year = 2016 and month = 12")
spark.sql("SELECT * FROM appl_stock WHERE year = 2016 and month = 12").show(10)