1. Kafka 소개

Kafka

  • 실시간 데이터를 처리하기 위해 설계된 오픈소스 분산 스트리밍 플랫폼
    • 데이터 재생이 가능한 분산 커밋 로그 (Distributed Commit Log)
  • Scalability와 Fault Tolerance를 제공하는 Publish-Subscription 메시징 시스템
    • 한 partition은 3개의 broker에 저장됨
    • Producer - Consumer
  • High Throughput Low Latency 실시간 데이터 처리에 맞게 구현됨
  • 분산 아키텍처를 따르기 때문에 Scale Out 형태로 스케일 가능
    • 서버(Broker) 추가를 통해 Scalability
  • 정해진 보유기간 (retention period) 동안 메시지 저장

기존 메시징 시스템 및 DB와 비교

  • 기존 메시징 시스템과는 달리 메시지를 보유 기간 동안 저장
    • 소비자가 오프라인 상태일 때에도 내구성과 내결함성 보장
    • 보유 기간: 한 topic이 가질 수 있는 데이터의 크기를 지정할 수도 있음
  • Kafka는 메시지 생산과 소비를 분리
    • 생산자와 소비자가 각자의 속도에 맞춰 독립적으로 작업이 가능하도록 함
    • 중간에 다른 소비자가 생기더라도 자연스럽게 추가할 수 있음
    • 시스템 안정성을 높일 수 있음
  • Kafka는 높은 처리량과 저지연 데이터 스트리밍을 제공
    • Scale-Out 아키텍처
  • 한 파티션 내에서는 메시지 순서를 보장해줌
    • 다수의 파티션에 걸쳐서는 Eventually Consistent
    • topic을 생성할 때 파티션 개수, 파티션 별로 몇 개의 복제본을 만들 것인지 지정 가능
      • Eventually Consistency vs. Strong Consistency : 소비하는 사람 관점에서 생각하여 결정하기!
      • Strong Consistency
        • 어떤 topic의 event를 produce해서 사용하는 경우 이 복제본이 모두에게 전달이 될 때까지 기다려야 함
        • 내가 write한 event를 바로 읽을 수 있음
        • 생성할 때에는 시간이 더 걸리지만, 읽는 관점에서는 항상 완전한 데이터를 읽을 수 있음
      • Eventually Consistency
        • 복제본 중 몇 개의 정보가 전달될 때까지 기다릴 것인지 정할 수 있음
        • 내가 쓰고 바로 읽는 경우 내가 쓴 event가 return이 안 될 확률이 큼
  • 사내 내부 데이터 버스로 사용되기 시작
    • 워낙 데이터 처리량이 크고 다수 소비자를 지원하기 때문에 가능

Eventual Consistency

  • 100대 서버로 구성된 분산 시스템에 레코드를 하나 쓴다면 그 레코드를 바로 읽을 수 있을까?
    • 내가 쓴 레코드가 온전히 return 될까?
    • 보통 하나의 데이터 블록은 여러 서버에 나눠 저장됨 (Replication Factor)
      • 그래서 데이터를 새로 쓰거나 수정하면 전파되는데 시간이 걸림
    • 보통 읽기 작업은 다수의 데이터 copy 중 하나를 대상으로 일어나기 때문에, 앞서 전파 시간에 따라 데이터가 있을 수도 있고 없을 수도 있음
  • Strong Consistency vs. Eventual Consistency
    • 보통 데이터를 쓸 때 복제가 완료될 때까지 기다리는 구조라면 Strong Consistency
    • 바로 return한다면 Eventual Consistency
      • 대부분 이것으로 충분

주요 기능 및 이점

  • 스트림 처리
    • 실시간 스트림 처리를 목표로 만들어진 서비스
    • ksqlDB를 통해 SQL로도 실시간 이벤트 데이터 처리 가능
  • High Throughput (높은 처리량)
    • 초당 수백 만개의 메시지 처리 가능
  • Fault Tolerance (내결함성)
    • 데이터 복제 및 분산 커밋 로그 기능 제공 -> 장애 대응 용이
  • Scalability (확장성)
    • 클러스터에 브론커를 추ㅏ가하여

2. Kafka 아키텍처

데이터 이벤트 스트림

  • Topic 이라고 부름
    • producer는 topic을 만들고, consumer는 topic에서 데이터를 읽어들이는 구조
    • topic마다 보존 정책이 있어 그에 맞춰 데이터를 저장함

Message (Event) 구조: Key, Value, Timest

  • 최대 1MB
  • Timestamp는 보통 데이터가 Topic에 추가된 시점
  • Key 자체도 복잡한 구조를 가질 수 있음
    • key가 나중에 Topic 데이터를 나눠서 저장할 때 사용됨 (Partitioning)
  • Header는 선택적 구성 요소
    • 경량 메타 데이터 정보 (key-value)

Topic과 Partition

  • 하나의 topic확장성을 위해 다수의 Partition으로 나뉘어 저장됨
  • 메시지가 어느 partition에 속하는지 결정하는 방식은 키의 유무에 따라 달라짐
    • 키가 있다면: hashing 값을 partition 수로 나눈 나머지로 결정
    • 키가 없다면: 라운드 로빈으로 결정 (비추)

Topic과 Partition과 복제본

  • 하나의 partition은 Fail-over를 위해 Replication Partition을 가짐
  • 각 partition별로 Leader와 Follower가 존재
    • 쓰기는 Leader를 통해 이루어지고, 읽기는 Leader/Follower들을 통해 이루어짐
    • partition별로 Consistency Level 설정 가능 in-sync replica - "ack"
  • partition별로 누가 Leader이고 Follower인지 관리가 중요해짐

Topic 파라미터들

  • 이름
  • partition 수
  • 복제본의 수 (원본 포함 개수)
  • Consistency Level (“acks”) : 1, 2, .., “all”
  • 데이터 보존 기한
  • 메시지 압축 방식


  • 예시 스크린샷 2024-02-02 오후 4 34 37

Broker

  • 실제 데이터를 저장하고 관리하는 서버
  • Kafka 클러스터는 기본적으로 다수의 broker로 구성됨
      • 원활한 관리와 부가 기능을 위한 다른 서비스들이 추가됨 (Zookeeper 등)
    • 한 클러스터는 최대 20만 개까지 partition 관리 가능
    • broker들이 실제로 Producer/Consumer들과 통신 수행
  • Topic의 Partition들을 실제로 관리해주는 것이 Broker
    • 한 broker는 최대 4000개의 partition 처리 가능
  • broker는 물리 서버 혹은 VM 혹은 docker container 위에서 동작
    • 해당 서버의 디스크에 partition 데이터들을 기록함
  • broker 수를 늘림으로써 클러스터 용량을 늘림 (Scale Out)

  • 20만개, 4천개 제약은 Zookeeper를 사용하는 경우!
    • 최근, Zookeeper를 대체하는 KRaft가 개발 중, 일부 서비스

스크린샷 2024-02-02 오후 4 42 42

메타정보 관리

  • Broker 리스트 관리 (Broker Membership)
    • 누가 controller인가 (Controller Election)
  • Topic 리스트 관리 (Topic Configuration)
    • Topic을 구성하는 Partition 관리
    • Partition 별 Replica 관리
  • Topic 별 ACL (Access Control Lists) 관리
  • Quota 관리

Zookeeper와 Controller

  • Controller : Broker이면서 Partition을 관리하는 책임을 지는 노드
    • Zookeeper를 사용하여 partition을 관리해왔음
  • zookeeper에 여러 문제가 있기 때문에 현재로는 두가지 모드가 존재 (복잡 & 복구 어려움)
    • Zookeeper 모드
      • 3, 5, 7대의 서버를 Zookeeper Ensemble을 구성하기 위해 사용
      • Controller가 Zookeeper를 통해 메타데이터 관리와 리더 선출 담당
      • 하나의 Controller 존재
    • KRaft 모드
      • Zookeeper를 완전히 배제하고 내부적으로 Controller가 역할을 대신 수행
      • 다수의 Controller들이 Zookeeper 역할 대신 수행
        • Controller들은 보통 Broker들이기도 함


Zookeeper

  • 분산 시스템에서 널리 사용되는 Distributed Coordination Service
    • 동기화, 구성 관리, 리더 선출 등 분산 시스템의 관리와 조율을 위한 중앙 집중 시스템 제공
  • 다양한 문제 존재
    • 지원하는 데이터 크기가 작고, 동기 모드로 동작하기 때문에 처리 속도가 느림
      • 어느 스케일 이상으로 확장성이 떨어짐
    • 환경설정 복잡
    • 기존에 Zookeeper를 사용하던 서비스들이 Zookeeper를 대체하기 시작
      • ElasticSearch가 또 다른 예
  • 일반적인 사용 사례
    • Kafka (메시지 큐)
    • HBase (분산 데이터베이스 조정)
    • Storm (분산 스트림 처리)

3. Kafka 중요 개념

  • Producer, Broker, Consumer, Controller, Consumer Group

스크린샷 2024-02-02 오후 5 03 28


  • Topics, Partitions, Segments

스크린샷 2024-02-02 오후 5 10 00

Topic

스크린샷 2024-02-02 오후 5 13 33

  • Consumer가 데이터(message)를 읽는다고 없어지지 않음
  • Consumer 별로 어느 위치의 데이터를 읽고 있는지 위치 정보를 공유함 offset
  • Fault Tolerance를 위해 이 정보는 중복 저장됨

스크린샷 2024-02-02 오후 5 15 57

Partition과 Segment

  • 하나의 partition은 다수의 segment로 구성됨
    • segment는 변경되지 않고, 추가만 가능한 로그 파일이라고 볼 수 있음 (Immutable, Append-Only)
      • Commit Log
  • 각 segment는 디스크 상에 존재하는 하나의 파일
  • segment는 최대 크기가 있어 이를 넘어가면 새로운 segment 파일을 만들어냄
    • 각 segment는 데이터 오프셋 범위를 갖게됨
    • segment 최대 크기는 1GB 혹은 일주일치의 데이터

4. Kafka 기타 기능

Kafka Connect

  • Kafka 위에 만들어진 중앙집중 데이터 허브
    • 별도의 서버들이 필요하며, Kafka Connect는 별도의 오픈소스 프로젝트
    • 데이터 버스 혹은 메세지 버스라고 볼 수 있음
      • 중앙 Kafka에 여러 시스템의 데이터를 별도의 Topic들로 저장해놓고 필요한 곳에서 읽어가는 용도
  • 두 가지 모드가 존재
    • Standalone 모드: 개발 & 테스트
    • Distributed 모드
  • 데이터 시스템들 간의 데이터를 주고받는 용도로 Kafka를 사용하는 것
    • 데이터 시스템: 데이터베이스, 파일 시스템, 키-값 저장소, 검색 인덱스 등
    • 데이터 소스와 데이터 싱크
  • Broker들 중 일부나 별개 서버들로 Kafka Connect 구성
    • 그 안에 task들을 worker들이 수행
      • task들은 producer/consumer 역할
      • Source Task, Sink Task
    • 외부 데이터(데이터 소스)를 이벤트 스트림으로 읽어오는 것 가능
    • 내부 데이터를 외부(데이터 싱크)로 보내어 Kafka를 기존 시스템과 지속적으로 통합 가능
      • ex) S3 버킷으로 쉽게 저장

스크린샷 2024-02-03 오전 10 25 38

Kafka Schema Registry

스크린샷 2024-02-03 오전 10 28 04

  • Topic에 메세지가 기록될 때 메세지의 포맷이 정말 맞는지 데이터에 대한 스키마를 관리 및 검증하는 데 사용
  • Producer와 Consumer는 Schema Registry를 사용해 스키마 변경 처리


  • serialization
    • 메시지를 구성하는 데이터를 네트워크를 타고 보내려면 데이터가 일렬로 나열되어야 함 (순서가 정해져야 함)
    • 객체의 상태를 저장하거나 전송할 수 있는 형태로 변환하는 프로세스
    • 보통 이 과정에서 데이터 압축 등을 수행, 가능하다면 보내는 데이터의 스키마 정보 추가
  • deserialization
    • serialized된 데이터를 다시 사용할 수 있는 형태로 변환하는 프로세스
    • 이 과정에서 데이터 압축을 해제하거나 스키마 정보 등이 있다면 데이터 포맷 수행


  • Schema ID (&version)를 이용해 다양한 포맷 변천 (Schema Evolution) 지원
    • 보통 AVRO를 데이터 포맷으로 사용 (Protobuf, JSON)
  • 포맷 변경 처리 방법
    • Forward Compatibility: Producer로부터 변경하고 Consumer를 점진적으로 변경
    • Backward Compatibility: PConsumer로부터 변경하고 Consumer를 점진적으로 변경 (대부분)
    • Full Compatibility: 둘 다 변경

스크린샷 2024-02-03 오전 10 44 58

REST Proxy

  • 클라이언트가 API 호출을 사용하여 Kafka를 사용 가능하게 해줌
    • 메시지를 생성 및 소비하고, topic을 관리하는 간단하고 표준화된 방법 제공
    • 메시지 serialization과 deserialization을 대신 수행해주고, Load Balancing도 수행
  • 특히 사내 네트워크 밖에서 Kafka를 접근해야 할 필요성이 있는 경우 더 유용

스크린샷 2024-02-03 오전 10 52 27

Streams와 KSQL

스크린샷 2024-02-03 오전 11 00 38

  • Streams
    • Kafka Topic을 소비하고 생성하는 실시간 스트림 처리 라이브러리
    • Spark Streaming으로 Kafka Topic을 처리하는 것은 조금 더 micro-batch에 가까움
    • Kafka Streaming으로 Kafka Topic을 처리하는 것은 조금 더 realtime에 가까움 (레코드 단위 처리)
  • KSQL
    • Kafka용 오픈소스 SQL 엔진
    • SQL을 사용해 스트리밍 데이터를 실시간으로 쿼리, 분석, 처리할 수 있는 방법 제공
    • but, ksqlDB로 대체하여 더이상 사용되지 않음
  • ksqlDB
    • Kafka Streams로 구현된 스트림 처리 데이터베이스로 KSQL 대체
      • SQL과 유사한 쿼리 언어
      • 필터링, 집계, 조인, 윈도우잉 등과 같은 SQL 작업 지원
      • 연속 쿼리 : 데이터가 실시간으로 도착할 때 지속적으로 처리하는 연속 쿼리 생성 가능
      • 지속 업데이트되는 뷰 지원 : 실시간으로 지속적으로 업데이트되는 집계 및 변환 가능

스크린샷 2024-02-03 오전 11 04 30

5. Kafka 설치

git clone https://github.com/conduktor/kafka-stack-docker-compose.git
cd kafka-stack-docker-compose
docker compose -f full-stack.yml up


  • id: admin@admin.io
  • pwd: admin


스크린샷 2024-02-03 오전 11 36 49

스크린샷 2024-02-03 오전 11 37 06

6. Kafka Python 프로그래밍

kafka-python 라이브러리 설치

pip3 install kafka-python

간단한 producer 생성

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(
   bootstrap_servers=['localhost:9092'],
   value_serializer=lambda x: dumps(x).encode('utf-8')
)

for j in range(999):
   print("Iteration", j)
   data = {'counter': j}
   producer.send('topic_test', value=data)
   sleep(0.5)


  • 로컬 kafka 인스턴스를 연결하는 KafkaProducer 객체 생성
    • bootstrap_servers
      • producer: 어느 kafka 클러스터에 topic 만들 것인지 결정해야 함 -> broker들과 연결되어야 함
      • broker들 중 하나 이상 지정
    • value_serializer
      • value를 어떻게 serialize 할 것인지
      • 전송하려는 데이터를 json 문자열로 변환(dumps(x))한 다음 UTF-8로 인코딩하여 직렬화하는 방법을 정의


  • 0.5초마다 ‘topic-test’라는 topic과 반복 카운터를 데이터로 포함하는 event 전송
  • 데이터는 ‘counter’라는 키와 정수를 값으로 갖도록 구성
    • producer.send('topic_test', value=data): key와 header는 지정되어 있지 않음


스크린샷 2024-02-03 오후 12 01 40

Consumer 객체 생성

from kafka import KafkaConsumer
from json import loads
from time import sleep

consumer = KafkaConsumer(
   'topic_test',
   bootstrap_servers=['localhost:9092'],
   auto_offset_reset='earliest',
   enable_auto_commit=True,
   group_id='my-group-id',
   value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
   event_data = event.value
   # Do whatever you want
   print(event_data)
   sleep(2)


  • auto_offset_reset
    • earlist: 지금 이용 가능한 가장 앞의 offset에 있는 데이터 읽기
    • latest: 지금 topic에 있는 데이터는 관심없고, 새로 생기는 것부터 읽기
  • enable_auto_commit
    • True: kafka consumer 객체가 알아서 지금 사용하고 있는 offset 값을 kafka 안에 기록
    • False: commit 함수로 명시적으로 offset 위치를 커밋 (현업에서!)
  • value_deserializer=lambda x: loads(x.decode('utf-8'))
    • 앞서 Producer에서 사용했던 value_serializer의 반대 작업 수행


  • 2초마다 ‘topic-test’에서 데이터를 읽어옴


스크린샷 2024-02-03 오후 12 07 28