1. Kafka CLI Tools

접근 방법

  • docker ps를 통해 Broker의 Container ID 혹은 Container 이름 파악 (confluentinc/cp-kafka:7.3.2)
  • 해당 컨테이너로 로그인
    • docker exec -it Broker_Container_ID sh
  • 다양한 kafka 관련 클라이언트 툴 사용 가능
    • kafka-topics
    • kafka-configs
    • kafka-console-consumer
    • kafka-conosole-producer

kafka-topics

  • kafka topic과 관련된 다양한 유틸리티 제공
kafka-topics --bootstrap-server kafka1:9092 --list
kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test

스크린샷 2024-02-03 오후 1 36 11

kafka-console-producer

  • Topic을 만들고 message 생성 가능
  • 기본 Partition 1개, Replica 1개
kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
  • Ctrl+C를 누르기 전까지 Enter를 누르며 계속 메시지 생성!

스크린샷 2024-02-03 오후 1 38 02

kafka-console-consumer

  • Topic에서 message 읽기
    • --from-beginning 옵션을 주지 않으면 default로 이미 쌓여있는 메시지는 읽지 않고 새로 들어오는 것만 읽음 (latest)
    • --from-beginning 옵션이 earliest
kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console --from-beginning

스크린샷 2024-02-03 오후 1 38 13


  • 웹 UI 스크린샷 2024-02-03 오후 1 41 34

2. Topic 파라미터 설정

다수의 Partition이나 Replica

  • 먼저 KafkaAdminClient 오브젝트를 생성하고 create_topics 함수로 Topic 추가
  • create_topics의 인자로는 NewTopic 클래스의 오브젝트 지정
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(
    name=name,
    num_partitions=partitions,
    replication_factor=replica    # 원본 포함 개수
)
client.create_topics([topic])

KafkaProducer 파라미터

파라미터 의미 기본값
bootstrap_servers 메시지를 보낼 때 사용할 브로커 리스트 (host:port) localhost:9092
client_id Kafka Producer의 이름 kafka-python-(version)
key_serializer, value_serializer 메시지의 키와 값의 serialize 방법 지정 (함수)  
enable_idempotence ETL에서의 멱등성과 비슷한 컨셉
중복 메세지 전송을 막을 것인지?
False (안막음)
acks: 0, 1, ‘all’ consistency level
0: 바로 리턴, 1: leader에 쓰일 때까지 대기, ‘all’: 모든 partition leader/follower에 적용될 때까지 대기
0
retries
delivery.timeout.ms
메시지 실패 시 재시도 횟수
메시지 전송 최대 시간
2147483647
120000
linger_ms, batch_size 다수의 메시지를 동시에 보내기 위함 (배치 전송)
- 메시지 송신전 대기 시간
- 메시지 송신전 데이터 크기
0
16384

Kafka Proudcer 동작

스크린샷 2024-02-03 오후 2 02 39

KafkaProducer로 topic 만들기

  • 랜덤하게 사람 정보를 만들어 저장하는 producer 구현
    • Faker 모듈 사용: pip3 install faker
    • pydantic의 BaseModel을 사용하여 메시지 클래스 구현 (Person)
      • pip3 install pydantic
    • Topic을 먼저 만들고 진행


  • person.py
"""
Pydantic is a Python library for data parsing and validation. 
It uses the type hinting mechanism of the newer versions of Python (version 3.6 onwards)
and validates the types during the runtime. Pydantic defines BaseModel class.
It acts as the base class for creating user defined models.
"""
from pydantic import BaseModel


class Person(BaseModel):
    id: str
    name: str
    title: str


  • fake_person_producer.py
import re
import uuid
from typing import List
from person import Person

from faker import Faker
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka import KafkaAdminClient
from kafka.producer import KafkaProducer


def create_topic(bootstrap_servers, name, partitions, replica=1):
    client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    try:
        topic = NewTopic(
            name=name,
            num_partitions=partitions,
            replication_factor=replica)
        client.create_topics([topic])
    except TopicAlreadyExistsError as e:
        print(e)
        pass
    finally:
        client.close()


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]

    # create a topic first
    create_topic(bootstrap_servers, topic_name, 4)

    # ingest some random people events
    people: List[Person] = []
    faker = Faker()
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        client_id="Fake_Person_Producer",
        
    )

    for _ in range(100):
        person = Person(id=str(uuid.uuid4()), name=faker.name(), title=faker.job().title())
        people.append(person)
        producer.send(
            topic=topic_name,
            key=re.sub(r'\s+', '-', person.title.lower()).encode('utf-8'),
            value=person.model_dump_json().encode('utf-8'))

    producer.flush()

if __name__ == '__main__':
    main()

3. Kafka Consumer

KafkaCounsumer 파라미터

  • Topic 이름을 KafkaConsumer의 첫번째 인자로 지정하거나, 나중에 별도로 subscribe를 호출해서 지정함

  • Consumer를 1개 사용하더라도 group_id는 지정해주어야 함
    • 같은 id에 소속된 consumer끼리는 kafka가 partition을 나누어 줌 + 그룹 내의 consumer가 사라지면 partition을 재분배해줌
    • group_id를 잘못 사용하면 아주 큰 문제가 발생할 수도 !!
  • enable_auto_commit
    • 데이터 유실이 조금 있어도 괜찮은 경우 일반적으로 True
    • 데이터 유실이 있으면 안되고, 중복된 데이터라도 다시 꼭 한번은 확인해야함 False (at_least_once)


파라미터 의미 기본값
bootstrap_servers 메시지를 보낼 때 사용할 브로커 리스트 (host:port) localhost:9092
client_id Kafka Consumer의 이름 kafka-python-(version)
group_id Kafka Consumer Group의 이름  
key_deserializer, value_deserializer 메시지의 키와 값의 deserialize 방법 지정 (함수)  
auto_offset_reset earliest, latest latest
enable_auto_commit True: 소비자의 offset이 백그라운드에서 주기적으로 커밋
False: 명시적으로 커밋해주어야 함
offset은 별도로 리셋 가능하며, Conduktor 웹 UI에서도 가능
True

Consumer의 일반적인 동작

  • 다수의 partition들로부터 어떻게 읽을까?


  • Consumer가 하나이고 다수의 partition들로 구성된 topic으로부터 읽어야 한다면?
    • consumer는 각 partition들로부터 라운드 로빈 형태로 하나씩 읽게 됨
    • 이 경우 병렬성이 떨어지고, 데이터 생산 속도에 따라 Backpressure가 심해질 수 있음
    • 이를 해결하기 위한 것이 Consumer Group
  • 한 프로세스에서 다수의 topic을 읽는 것 가능
    • topic 수만큼 KafkaConsumer 인스턴스를 생성하고, 별도의 Group ID와 Client ID를 지정해야 함

Consumer Group

  • 병렬성을 높이기 위한 것!

  • Consumer가 Topic을 읽기 시작하면 해당 Topic 내 일부 partition들이 자동으로 할당됨
  • Consumer의 수보다 partition의 수가 더 많은 경우, partition은 라운드 로빈 방식으로 Consumer들에게 할당됨 (한 partition은 한 consumer에게만 할당)
    • 이를 통해 데이터 소비 병렬성을 늘리고 Backpressure 경감
    • Consumer가 일부 중단되더라도 계속해서 데이터 처리 가능
  • Consumer Group Rebalancing
    • 기존 Consumer가 사라지거나 새로운 Consumer가 Group에 참여하는 경우 partition들이 다시 지정되어야 함
    • Kafka가 알아서 해줌

auto commit Consumer

import json

from kafka.consumer import KafkaConsumer


def key_deserializer(key):
    return key.decode('utf-8')


def value_deserializer(value):
    return json.loads(value.decode('utf-8'))


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]
    consumer_group_id = "fake_people_group"

    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=consumer_group_id,
        key_deserializer=key_deserializer,
        value_deserializer=value_deserializer,
        auto_offset_reset='earliest',
        enable_auto_commit=True)

    consumer.subscribe([topic_name])
    for record in consumer:
        print(f"""
            Consumed person {record.value} with key '{record.key}'
            from partition {record.partition} at offset {record.offset}
        """)


if __name__ == '__main__':
    main()

manual commit Consumer

import json

from kafka import TopicPartition, OffsetAndMetadata
from kafka.consumer import KafkaConsumer


def key_deserializer(key):
    return key.decode('utf-8')


def value_deserializer(value):
    return json.loads(value.decode('utf-8'))


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]
    consumer_group_id = "manual_fake_people_group"

    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=consumer_group_id,
        key_deserializer=key_deserializer,
        value_deserializer=value_deserializer,
        auto_offset_reset='earliest',
        enable_auto_commit=False)

    consumer.subscribe([topic_name])
    for record in consumer:
        print(f"""
            Consumed person {record.value} with key '{record.key}'
            from partition {record.partition} at offset {record.offset}
        """)

        topic_partition = TopicPartition(record.topic, record.partition)
        offset = OffsetAndMetadata(record.offset + 1, record.timestamp)
        consumer.commit({
            topic_partition: offset
        })

if __name__ == '__main__':
    main()

Consumer 옵션 - Message Processing Guarantee 방식

  • 실시간 메시지 처리 및 전송 관점에서 시스템의 보장 방식
방식 설명
Exactly Once 각 메시지가 Consumer에게 정확히 한 번만 전달된다는 것을 보장
네트워크 문제, 장애 또는 재시도 가능성으로 아주 어려운 문제
1) Procuder 단에서는 enable_idempotence를 True로 설정
2)Producer에서 메시지를 쓸 때와 Consumer에서 읽을 때 Transaction API 사용
At Least Once 모든 메시지가 Consumer에게 적어도 한 번 이상이 되도록 보장하지만, 메시지 중복의 가능성 존재
중복 제거 매커니즘이 필요 (멱등성)
At Mose Once 메시지 손실 가능성에 중점
가장 흔한 메시지 전송 방식

4. ksqlDB

  • REST API나 ksql 클라이언트 툴을 사용하여 topic을 테이블처럼 SQL로 조작
  • 실행 방법
    • docker psconfluentinc/cp-ksqldb-server의 container ID 복사
    • docker exec -it containerID sh
    • ksql 실행 후 아래 두 명령 실행
CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');

SELECT * FROM my_stream;


스크린샷 2024-02-06 오전 10 17 21

스크린샷 2024-02-06 오전 10 17 38


  • ROWTIME: row가 생성되었을 때의 timestamp 출력
  • EMIT CHANGES: 마지막 출력 이후 새로 생긴 레코드만 출력

스크린샷 2024-02-06 오전 10 18 57