1. Spark Streaming

  • 실시간 데이터 스트림 처리를 위한 Spark API
  • Kafka, Kinesis, Flume, TCP 소켓 등의 다양한 소스에서 발생하는 데이터 처리 가능
  • Join, Map, Reduce, Window와 같은 고급 함수 사용 가능
    • streaming 데이터일지라도 내부에서 data frame으로 변환하여 처리하기 때문
    • semi-real time!

스크린샷 2024-02-05 오후 8 55 09

동작 방식

  • 데이터를 마이크로 배치로 처리
  • 계속해서 위의 과정을 반복 (loop) : trigger 방식
  • 이렇게 읽은 데이터를 앞서 읽은 데이터에 merge
    • append, complete (full refresh 느낌), update (upsert 느낌)
  • 배치마다 데이터 위치 관리 (시작과 끝)
    • spark에서 관리해줌
  • Fault Tolerance와 데이터 재처리 관리 (실패 시)

내부 동작

  • 실시간 입력 데이터 스트림을 배치로 나눈 다음
  • Spark Engine에서 처리하여 최종 결과 스트림을 일괄적으로 생성
    • DStream, Structured Streaming

스크린샷 2024-02-05 오후 9 00 28


DStream Structured Streaming
RDD 기반 스트리밍 처리 DataFrame 기반 스트리밍 처리
Spark SQL 엔진의 최적화 기능 사용 불가 Catalyst 기반 최적화 혜택을 가져감
이벤트 발생 시간 기반 처리 불가 이벤트 발생 시간 기반 처리 가능
개발이 중단된 상태 (RDD 기반 모두에 적용됨) 계속해서 기능이 추가되고 있음

Source & Sink

  • Source: 입력
  • Sink: 출력


  • 외부 시스템 (소스)에서 스트리밍 데이터를 수집하고
  • 처리된 데이터를 외부 시스템 (싱크)으로 출력하는 것을 용이하게 하는 구성 요소

스크린샷 2024-02-05 오후 9 05 04

Source

  • Kafka, Kinesis, Flume, TPI/IP 소켓, HDFS, File 등을 Spark Structured Streaming에서 처리할 수 있도록 해줌
    • 결국 Spark DataFrame으로 변환
    • ex) Kafka에서 Spark Structured Streaming으로 데이터를 수집하려는 경우, Kafka Source를 사용하여 Kafka 클러스터에서 하나 이상의 토픽에서 데이터를 가져와 DataFrame으로 변환 가능
  • Spark DataFrame과 비교하면 readStream을 사용하는 점이 다름


lines_df = spark.readStream \
        .format('socket') \
        .option('host', 'localhost') \
        .option('port', '9999') \
        .load()

Sink

  • Spark Structured Streaming에서 처리된 데이터를 외부 시스템이나 스토리지로 출력 가능하게 해줌
  • Sink는 변환되거나 집계된 데이터가 어떻게 쓰이거나 소비되는지를 정의
    • Source와 마찬가지로 Kafka, HDFS, S3, Cassandra, JDBC DB 등과 같은 다양한 대상에 대해 사용 가능
    • ex) Kafka Sink를 사용하여 Spark Structured Streaming에서 처리된 데이터를 Kafka Topic으로 쓰는 것이 가능
  • OutputMode: 현재 Micro Batch의 결과가 Sink에 어떻게 쓰일지 결정
    • append
    • update: upsert 느낌
    • complete: full refresh 느낌


word_count_query = counts_df.writeStream \
    .format('console') \
    .outputMode('complete') \
    .option('checkpointLocation', 'chk-point-dir') \
    .start()

전체 구조

스크린샷 2024-02-05 오후 9 39 35

Micro Batch Trigger Option

  • Unspecified : default. 현재 micro batch가 끝나면 바로 다음 batch 시작
  • Time Interval : 고정된 시간마다 micro batch 시작
    • 현재 batch가 지정된 시간을 넘어서 끝나면 끝나자마자 다음 batch가 시작됨
    • 읽을 데이터가 없는 경우 시작되지 않음
  • One Time : Available-Now. 지금 있는 데이터를 모두 처리하고 중단
  • Continuous : 새로운 저지연 연속 처리 모드에서 실행. 아직 베타/실험 버전

예제

# default: unspecified
df.writeStream \
    .format('console') \
    .start()

# 2-second micro batch interval
df.writeStream \
    .format('console') \
    .trigger(processingTime='2 seconds') \
    .start()

# available-now 
df.writeStream \
    .format('console') \
    .trigger(availableNow=True) \
    .start()

# continuous trigger with 1-second checkpointing interval
df.writeStream \
    .format('console') \
    .trigger(continous='1 second') \
    .start()

2. Spark 환경 설정

Local Standalone Spark

스크린샷 2024-02-05 오후 9 52 46

  • Spark Cluster Manager로 local[n] 지정
    • .master('local[*]')
    • master는 클러스터 매니저를 지정하는데 사용
  • 주로 개발이나 간단한 테스트 용도
  • 하나의 JVM에서 모든 프로세스를 실행
    • 하나의 Driver와 하나의 Executor가 실행됨
    • 하나 이상의 thread가 Executor 안에서 실행됨
  • Executor 안에 생성되는 thread 수
    • local : 1개
    • local[]* : 컴퓨터 CPU 수만큼

설치

  • JAVA
    • JDK8/11 필요
    • JAVA_HOME 환경변수 등록
  • Spark 다운로드

3. Streaming WordCount 프로그램

  • Spark에서 제공해주는 예제 프로그램
    • TCP 소켓에서 수신 대기 중인 데이터 서버로부터 수신한 텍스트 데이터의 단어 수를 세는 프로그램
  • Netcat을 데이터 Producer로 사용 (터미널 1)
  • Spark Structured Streaming으로 만든 Consumer 실행 (터미널 2)

Data Producer

  • Netcat을 사용하여 텍스트 스트림 생성
  • Netcat을 포트번호 9999번에 데이터를 보내도록 실행
  • 여기서 입력한 텍스트는 모두 TCP 9999번으로 보내짐

스크린샷 2024-02-05 오후 10 01 05

Data Consumer

  • SPARK_HOME이 설정되어 있어야 함
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Streaming Word Count") \
        .master("local[3]") \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .config("spark.sql.shuffle.partitions", 3) \
        .getOrCreate()

    # READ
    lines_df = spark.readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", "9999") \
        .load()

    # TRANSFORM
    words_df = lines_df.select(expr("explode(split(value,' ')) as word")) 
    counts_df = words_df.groupBy("word").count()

    # SINK
    word_count_query = counts_df.writeStream \
        .format("console") \
        .outputMode("complete") \
        .option("checkpointLocation", "chk-point-dir") \
        .start()

    print("Listening to localhost:9999")
    word_count_query.awaitTermination()


  • "explode(split(value,' ')) as word"
    • split: 공백을 기준으로 단어들이 리스트 형태가 됨
    • explode: 리스트에서 각각의 값이 별개의 row가 됨
  • .option("checkpointLocation", "chk-point-dir")
    • 체크포인트: Fault Tolerance와 Exactly Once를 가능하게 하는 Spark Structured Streaming의 메커니즘
    • 스트리밍 쿼리의 메타데이터와 상태 정보를 HDFS 또는 S3와 같은 안정적인 스토리지 시스템에 일정한 간격으로 저장


스크린샷 2024-02-05 오후 10 07 58

format

  • parquet, json, csv, avro, …
  • kafka
  • memory
  • console
  • socket

outputMode

outputMode 설명
Complete 데이터 배치가 처리될 때마다 업데이트된 전체 결과 테이블이 sink에 기록
전체 테이블 출력을 처리할 수 있는 sink에 적합
예를 들어, 각 데이터 배치가 새 파일로 기록되는 파일 sink에 쓰는 경우
Append 마지막 trigger 이후 결과 테이블에 추가된 새 행만 sink에 기록
기존 데이터를 수정하지 않고 새 데이터를 추가하는 것을 지원하는 sink에 적합
예를 들어, 각 데이터 배치가 기존 파일에 추가되는 파일 sink에 쓰는 경우
Update 마지막 trigger 이후 결과 테이블의 변경된 행만 sink에 기록 (UPSERT)
고유 식별자를 기반으로 기존 데이터를 업데이트하는 것을 지원하는 sink에 적합
예를 들어, 기본 키를 기반으로 변경된 행이 업데이트되는 데이터베이스 테이블 sink에 쓰는 경우

4. Kafka Topic을 input source로

스크린샷 2024-02-06 오전 8 29 31

주제

  • fake_people topic을 기준으로 가장 많은 title 10개를 계산하는 코드
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("File Streaming Demo") \
        .master("local[3]") \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .getOrCreate()

    # fake_peopel topic의 한 message
    schema = StructType([
        StructField("id", StringType()),
        StructField("name", StringType()),
        StructField("title", StringType())
    ])

    kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "fake_people") \
        .option("startingOffsets", "earliest") \
        .load()
    kafka_df.printSchema()
    """
    |-- key: binary (nullable = true)
    |-- value: binary (nullable = true)
    |-- topic: string (nullable = true)
    |-- partition: integer (nullable = true)
    |-- offset: long (nullable = true)
    |-- timestamp: timestamp (nullable = true)
    |-- timestampType: integer (nullable = true)
    """
    value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("value"))
    value_df.createOrReplaceTempView("fake_people")
    value_df.printSchema()
    count_df = spark.sql("SELECT value.title, COUNT(1) count FROM fake_people GROUP BY 1 ORDER BY 2 DESC LIMIT 10")

    count_writer_query = count_df.writeStream \
        .format("console") \
        .outputMode("complete") \
        .option("checkpointLocation", "chk-point-dir-json") \
        .start()

    print("Listening to Kafka")
    count_writer_query.awaitTermination()


  • 그냥 spark-submit으로는 실행이 안되고, 환경 설정이 필요!

Kafka Structured Streaming 실행 준비

  • spark.jars.packages 설정해주어야 함. 3가지 옵션 존재
    • spark-defaults.conf 파일 수정
    • SparkSession 생성시 config로 지정
    • spark-submit 실행시 --packages 옵션 사용 : pyspark에 가장 적합!
  • spark: 3.3.1, scala: 2.12 버전 확인

packages 옵션으로 지정할 값 찾기

https://mvnrepository.com/search?q=kafka+structured+streaming

스크린샷 2024-02-06 오전 8 48 23

  • spark, scala 버전에 맞는 프로그램 설치


스크린샷 2024-02-06 오전 8 50 44

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 kafka_source_streaming.py
  • groupId:artifactId:version 으로 활용


  • 실행 결과

스크린샷 2024-02-06 오전 10 44 35