[DEV] 15주차. Kafka와 Spark Streaming 기반 스트리밍 처리 (3)
1. Kafka CLI Tools
접근 방법
docker ps
를 통해 Broker의 Container ID 혹은 Container 이름 파악 (confluentinc/cp-kafka:7.3.2)- 해당 컨테이너로 로그인
docker exec -it Broker_Container_ID sh
- 다양한 kafka 관련 클라이언트 툴 사용 가능
- kafka-topics
- kafka-configs
- kafka-console-consumer
- kafka-conosole-producer
kafka-topics
- kafka topic과 관련된 다양한 유틸리티 제공
kafka-topics --bootstrap-server kafka1:9092 --list
kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test
kafka-console-producer
- Topic을 만들고 message 생성 가능
- 기본 Partition 1개, Replica 1개
kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
- Ctrl+C를 누르기 전까지 Enter를 누르며 계속 메시지 생성!
kafka-console-consumer
- Topic에서 message 읽기
--from-beginning
옵션을 주지 않으면 default로 이미 쌓여있는 메시지는 읽지 않고 새로 들어오는 것만 읽음 (latest)--from-beginning
옵션이 earliest
kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console --from-beginning
- 웹 UI
2. Topic 파라미터 설정
다수의 Partition이나 Replica
- 먼저 KafkaAdminClient 오브젝트를 생성하고
create_topics
함수로 Topic 추가 create_topics
의 인자로는 NewTopic 클래스의 오브젝트 지정
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(
name=name,
num_partitions=partitions,
replication_factor=replica # 원본 포함 개수
)
client.create_topics([topic])
KafkaProducer 파라미터
파라미터 | 의미 | 기본값 |
---|---|---|
bootstrap_servers | 메시지를 보낼 때 사용할 브로커 리스트 (host:port) | localhost:9092 |
client_id | Kafka Producer의 이름 | kafka-python-(version) |
key_serializer, value_serializer | 메시지의 키와 값의 serialize 방법 지정 (함수) | |
enable_idempotence | ETL에서의 멱등성과 비슷한 컨셉 중복 메세지 전송을 막을 것인지? |
False (안막음) |
acks: 0, 1, ‘all’ | consistency level 0: 바로 리턴, 1: leader에 쓰일 때까지 대기, ‘all’: 모든 partition leader/follower에 적용될 때까지 대기 |
0 |
retries delivery.timeout.ms |
메시지 실패 시 재시도 횟수 메시지 전송 최대 시간 |
2147483647 120000 |
linger_ms, batch_size | 다수의 메시지를 동시에 보내기 위함 (배치 전송) - 메시지 송신전 대기 시간 - 메시지 송신전 데이터 크기 |
0 16384 |
Kafka Proudcer 동작
KafkaProducer로 topic 만들기
- 랜덤하게 사람 정보를 만들어 저장하는 producer 구현
- Faker 모듈 사용:
pip3 install faker
- pydantic의 BaseModel을 사용하여 메시지 클래스 구현 (Person)
pip3 install pydantic
- Topic을 먼저 만들고 진행
- Faker 모듈 사용:
- person.py
"""
Pydantic is a Python library for data parsing and validation.
It uses the type hinting mechanism of the newer versions of Python (version 3.6 onwards)
and validates the types during the runtime. Pydantic defines BaseModel class.
It acts as the base class for creating user defined models.
"""
from pydantic import BaseModel
class Person(BaseModel):
id: str
name: str
title: str
- fake_person_producer.py
import re
import uuid
from typing import List
from person import Person
from faker import Faker
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka import KafkaAdminClient
from kafka.producer import KafkaProducer
def create_topic(bootstrap_servers, name, partitions, replica=1):
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
try:
topic = NewTopic(
name=name,
num_partitions=partitions,
replication_factor=replica)
client.create_topics([topic])
except TopicAlreadyExistsError as e:
print(e)
pass
finally:
client.close()
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
# create a topic first
create_topic(bootstrap_servers, topic_name, 4)
# ingest some random people events
people: List[Person] = []
faker = Faker()
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
client_id="Fake_Person_Producer",
)
for _ in range(100):
person = Person(id=str(uuid.uuid4()), name=faker.name(), title=faker.job().title())
people.append(person)
producer.send(
topic=topic_name,
key=re.sub(r'\s+', '-', person.title.lower()).encode('utf-8'),
value=person.model_dump_json().encode('utf-8'))
producer.flush()
if __name__ == '__main__':
main()
3. Kafka Consumer
KafkaCounsumer 파라미터
-
Topic 이름을 KafkaConsumer의 첫번째 인자로 지정하거나, 나중에 별도로 subscribe를 호출해서 지정함
- Consumer를 1개 사용하더라도
group_id
는 지정해주어야 함- 같은 id에 소속된 consumer끼리는 kafka가 partition을 나누어 줌 + 그룹 내의 consumer가 사라지면 partition을 재분배해줌
group_id
를 잘못 사용하면 아주 큰 문제가 발생할 수도 !!
enable_auto_commit
- 데이터 유실이 조금 있어도 괜찮은 경우 일반적으로 True
- 데이터 유실이 있으면 안되고, 중복된 데이터라도 다시 꼭 한번은 확인해야함 False (at_least_once)
파라미터 | 의미 | 기본값 |
---|---|---|
bootstrap_servers | 메시지를 보낼 때 사용할 브로커 리스트 (host:port) | localhost:9092 |
client_id | Kafka Consumer의 이름 | kafka-python-(version) |
group_id | Kafka Consumer Group의 이름 | |
key_deserializer, value_deserializer | 메시지의 키와 값의 deserialize 방법 지정 (함수) | |
auto_offset_reset | earliest, latest | latest |
enable_auto_commit | True: 소비자의 offset이 백그라운드에서 주기적으로 커밋 False: 명시적으로 커밋해주어야 함 offset은 별도로 리셋 가능하며, Conduktor 웹 UI에서도 가능 |
True |
Consumer의 일반적인 동작
- 다수의 partition들로부터 어떻게 읽을까?
- Consumer가 하나이고 다수의 partition들로 구성된 topic으로부터 읽어야 한다면?
- consumer는 각 partition들로부터 라운드 로빈 형태로 하나씩 읽게 됨
- 이 경우 병렬성이 떨어지고, 데이터 생산 속도에 따라 Backpressure가 심해질 수 있음
- 이를 해결하기 위한 것이 Consumer Group
- 한 프로세스에서 다수의 topic을 읽는 것 가능
- topic 수만큼 KafkaConsumer 인스턴스를 생성하고, 별도의 Group ID와 Client ID를 지정해야 함
Consumer Group
-
병렬성을 높이기 위한 것!
- Consumer가 Topic을 읽기 시작하면 해당 Topic 내 일부 partition들이 자동으로 할당됨
- Consumer의 수보다 partition의 수가 더 많은 경우, partition은 라운드 로빈 방식으로 Consumer들에게 할당됨 (한 partition은 한 consumer에게만 할당)
- 이를 통해 데이터 소비 병렬성을 늘리고 Backpressure 경감
- Consumer가 일부 중단되더라도 계속해서 데이터 처리 가능
- Consumer Group Rebalancing
- 기존 Consumer가 사라지거나 새로운 Consumer가 Group에 참여하는 경우 partition들이 다시 지정되어야 함
- Kafka가 알아서 해줌
auto commit Consumer
import json
from kafka.consumer import KafkaConsumer
def key_deserializer(key):
return key.decode('utf-8')
def value_deserializer(value):
return json.loads(value.decode('utf-8'))
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
consumer_group_id = "fake_people_group"
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=consumer_group_id,
key_deserializer=key_deserializer,
value_deserializer=value_deserializer,
auto_offset_reset='earliest',
enable_auto_commit=True)
consumer.subscribe([topic_name])
for record in consumer:
print(f"""
Consumed person {record.value} with key '{record.key}'
from partition {record.partition} at offset {record.offset}
""")
if __name__ == '__main__':
main()
manual commit Consumer
import json
from kafka import TopicPartition, OffsetAndMetadata
from kafka.consumer import KafkaConsumer
def key_deserializer(key):
return key.decode('utf-8')
def value_deserializer(value):
return json.loads(value.decode('utf-8'))
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
consumer_group_id = "manual_fake_people_group"
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=consumer_group_id,
key_deserializer=key_deserializer,
value_deserializer=value_deserializer,
auto_offset_reset='earliest',
enable_auto_commit=False)
consumer.subscribe([topic_name])
for record in consumer:
print(f"""
Consumed person {record.value} with key '{record.key}'
from partition {record.partition} at offset {record.offset}
""")
topic_partition = TopicPartition(record.topic, record.partition)
offset = OffsetAndMetadata(record.offset + 1, record.timestamp)
consumer.commit({
topic_partition: offset
})
if __name__ == '__main__':
main()
Consumer 옵션 - Message Processing Guarantee 방식
- 실시간 메시지 처리 및 전송 관점에서 시스템의 보장 방식
방식 | 설명 |
---|---|
Exactly Once | 각 메시지가 Consumer에게 정확히 한 번만 전달된다는 것을 보장 네트워크 문제, 장애 또는 재시도 가능성으로 아주 어려운 문제 1) Procuder 단에서는 enable_idempotence 를 True로 설정 2)Producer에서 메시지를 쓸 때와 Consumer에서 읽을 때 Transaction API 사용 |
At Least Once | 모든 메시지가 Consumer에게 적어도 한 번 이상이 되도록 보장하지만, 메시지 중복의 가능성 존재 중복 제거 매커니즘이 필요 (멱등성) |
At Mose Once | 메시지 손실 가능성에 중점 가장 흔한 메시지 전송 방식 |
4. ksqlDB
- REST API나 ksql 클라이언트 툴을 사용하여 topic을 테이블처럼 SQL로 조작
- 실행 방법
docker ps
후confluentinc/cp-ksqldb-server
의 container ID 복사docker exec -it containerID sh
- ksql 실행 후 아래 두 명령 실행
CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');
SELECT * FROM my_stream;
ROWTIME
: row가 생성되었을 때의 timestamp 출력EMIT CHANGES
: 마지막 출력 이후 새로 생긴 레코드만 출력