티스토리 뷰
반응형
1. kafka-python vs confluent-kafka-python
기능 | kafka-python | confluent-kafka-python |
멱등성 프로듀서 | ❌ 지원하지않음 | ✅ 지원 (enable.idempotence) |
트랜잭션 | ❌ 지원하지않음 | ✅ 지원 (transactional.id) |
구현기반 | Pure Python | librdkafka C 라이브러리 래퍼 |
성능 | 보통 | 매우좋음 (네이티브 구현) |
API 스타일 | Python 스타일 | kafka 자바 클라이언트 유사 |
2.실습: Confluent Kafka Python 기능 테스트
- 기본연결 테스트
- 멱등성 프로듀서 테스트
- 트랜잭션 테스트
- 정확히 한 번 처리 (Exactly-Once Processing) 테스트
2-1. 기본 연결 테스트 코드 실행
2-1-1. 도커띄우기
#시작
docker-compose up -d
[+] Running 5/5
✔ Container zookeeper Started 0.2s
✔ Container kafka1 Started 0.3s
✔ Container kafka2 Started 0.5s
✔ Container kafka3 Started 0.7s
✔ Container kafka-ui Started
#kafka 3개 Healthy Check!
docker-compose ps
NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS
kafka-ui provectuslabs/kafka-ui:latest "/bin/sh -c 'java --…" kafka-ui 2 days ago Up 4 minutes 0.0.0.0:8080->8080/tcp
kafka1 confluentinc/cp-kafka:7.3.2 "/etc/confluent/dock…" kafka1 2 days ago Up 55 seconds (healthy) 0.0.0.0:9092->9092/tcp
kafka2 confluentinc/cp-kafka:7.3.2 "/etc/confluent/dock…" kafka2 2 days ago Up 4 minutes (healthy) 9092/tcp, 0.0.0.0:9093->9093/tcp
kafka3 confluentinc/cp-kafka:7.3.2 "/etc/confluent/dock…" kafka3 2 days ago Up 55 seconds (healthy) 9092/tcp, 0.0.0.0:9094->9094/tcp
zookeeper confluentinc/cp-zookeeper:7.3.2 "/etc/confluent/dock…" zookeeper 2 days ago Up 4 minutes (unhealthy) 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
from confluent_kafka import Producer
import socket
def delivery_report(err, msg):
if err is not None:
print(f'메시지 전송 실패: {err}')
else:
print(f'메시지 전송 성공: {msg.topic()} [{msg.partition()}] @ 오프셋 {msg.offset()}')
# 프로듀서 설정
producer_config = {
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'client.id': socket.gethostname()
}
producer = Producer(producer_config)
# 간단한 메시지 전송
producer.produce('idempotent-topic', key='test', value='연결 테스트 메시지', callback=delivery_report)
# 이벤트 루프 처리 및 메시지 전송 확인
producer.flush()
2-1-2. 토픽생성 & 라이브러리 설치
# 실습용 토픽 생성
docker exec -it kafka1 kafka-topics --create --bootstrap-server kafka1:29092 --topic idempotent-topic --partitions 3 --replication-factor 3
docker exec -it kafka1 kafka-topics --create --bootstrap-server kafka1:29092 --topic order-events --partitions 3 --replication-factor 3
docker exec -it kafka1 kafka-topics --create --bootstrap-server kafka1:29092 --topic payment-events --partitions 3 --replication-factor 3
docker exec -it kafka1 kafka-topics --create --bootstrap-server kafka1:29092 --topic input-topic --partitions 3 --replication-factor 3
docker exec -it kafka1 kafka-topics --create --bootstrap-server kafka1:29092 --topic output-topic --partitions 3 --replication-factor 3
#로그
Created topic idempotent-topic.
Created topic order-events.
Created topic payment-events.
Created topic input-topic.
Created topic output-topic.
#설치
pip install confluent-kafka
2-1-3. -python connection_test.py 실행
from confluent_kafka import Producer
import socket
def delivery_report(err, msg):
if err is not None:
print(f'메시지 전송 실패: {err}')
else:
print(f'메시지 전송 성공: {msg.topic()} [{msg.partition()}] @ 오프셋 {msg.offset()}')
# 프로듀서 설정
producer_config = {
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'client.id': socket.gethostname()
}
try:
# 프로듀서 인스턴스 생성
producer = Producer(producer_config)
# 간단한 메시지 전송
producer.produce('idempotent-topic', key='test', value='연결 테스트 메시지', callback=delivery_report)
# 이벤트 루프 처리 및 메시지 전송 확인
producer.flush()
print("기본 연결 테스트 성공: 브로커와 연결되었으며 메시지를 전송했습니다.")
except Exception as e:
print(f"기본 연결 테스트 실패: {e}")
#로그
메시지 전송 성공: idempotent-topic [1] @ 오프셋 0
기본 연결 테스트 성공: 브로커와 연결되었으며 메시지를 전송했습니다.
2-2. 멱등성 프로듀서
- 메시지가 정확히 한 번만 저장되도록 보장
- 중복 메시지 방지를 위해 PID와 시퀀스 번호 사용
- 설정: enable.idempotence=True 설정하면, 정확히 한번 전송
멱등성(idempotence) 이란?
**"같은 작업을 여러 번 해도 결과가 한 번만 한 것과 동일하게 나오는 성질"**
예시:
- POST /order 요청을 3번 보내도 실제 주문은 한 번만 생성된다면 → 멱등성 보장됨.
- Kafka에서 같은 메시지를 재전송하더라도 중복 없이 정확히 한 번만 저장되면 → 멱등성 프로듀서.
✅ 보장 범위
- ✅ 단일 파티션
- ✅ 단일 세션
- ❌ 여러 토픽/파티션 원자성 없음
2-2-1. 설정
# 멱등성 프로듀서 설정
producer_config = {
'bootstrap.servers': bootstrap_servers,
'client.id': f'idempotent-producer-{uuid.uuid4()}',
'enable.idempotence': True, # 멱등성 활성화 (핵심 설정)
'acks': 'all', # 자동으로 'all'로 설정됨
'retries': 5, # 재시도 횟수
'max.in.flight.requests.per.connection': 5 # 최대 5로 제한됨
}
2-3. 트랜잭션 프로듀서
- 여러 토픽, 파티션에 걸친 원자적 쓰기 작업 보장
- 모두 성공하거나 모두 실패하는 원자성
- 설정: 고유한 transactional.id 필요
✅ 보장 범위
- ✅ 여러 파티션/토픽
- ✅ 원자성(atomicity)
- ✅ 커밋되지 않은 메시지는 컨슈머에 보이지 않음
#!/usr/bin/env python3
"""
confluent-kafka-python 멱등성 프로듀서 테스트
"""
import json
import time
import uuid
import sys
from confluent_kafka import Producer, KafkaException
def delivery_callback(err, msg):
"""메시지 전송 콜백"""
if err is not None:
print(f'메시지 전송 실패: {err}')
else:
print(f'메시지 전송 성공: {msg.topic()} [{msg.partition()}] @ 오프셋 {msg.offset()}')
def create_idempotent_producer(bootstrap_servers='localhost:9092,localhost:9093,localhost:9094'):
"""멱등성 프로듀서 생성"""
# 멱등성 프로듀서 설정
producer_config = {
'bootstrap.servers': bootstrap_servers,
'client.id': f'idempotent-producer-{uuid.uuid4()}',
'enable.idempotence': True, # 멱등성 활성화 (핵심 설정)
'acks': 'all', # 자동으로 'all'로 설정됨
'retries': 5, # 재시도 횟수
'max.in.flight.requests.per.connection': 5 # 최대 5로 제한됨
}
print("\n[프로듀서 설정]")
for key, value in producer_config.items():
print(f" {key}: {value}")
# 프로듀서 생성
producer = Producer(producer_config)
return producer
def send_messages(producer, topic, num_messages=5):
"""메시지 전송"""
successful_messages = 0
try:
print("\n[메시지 전송 시작]")
for i in range(num_messages):
# 메시지 데이터
message = {
'id': i,
'message': f'멱등성 프로듀서 테스트 메시지 #{i}',
'timestamp': time.time()
}
# 메시지 전송
producer.produce(
topic=topic,
key=str(i),
value=json.dumps(message).encode('utf-8'),
callback=delivery_callback
)
# 이벤트 처리
producer.poll(0)
print(f" 메시지 #{i} 전송 요청됨")
time.sleep(0.5) # 메시지간 간격
successful_messages += 1
# 모든 메시지가 전송될 때까지 대기
print("\n[남은 메시지 flush 중...]")
remaining = producer.flush(timeout=10)
if remaining > 0:
print(f"❗ {remaining}개의 메시지가 전송되지 않았습니다.")
else:
print("✅ 모든 메시지 flush 완료")
except KafkaException as e:
print(f"\n❌ Kafka 예외 발생: {e}")
return successful_messages
except Exception as e:
print(f"\n❌ 일반 예외 발생: {e}")
return successful_messages
return successful_messages
if __name__ == "__main__":
if len(sys.argv) > 1:
topic = sys.argv[1]
else:
topic = 'idempotent-topic'
if len(sys.argv) > 2:
num_messages = int(sys.argv[2])
else:
num_messages = 10
print(f"토픽: {topic}, 메시지 수: {num_messages}")
# 프로듀서 생성
producer = create_idempotent_producer()
# 메시지 전송
successful_count = send_messages(producer, topic, num_messages)
print(f"\n테스트 결과: {successful_count}/{num_messages} 메시지 전송 성공")
if successful_count == num_messages:
print("\n✅ 멱등성 프로듀서 테스트 완료")
else:
print("\n❌ 멱등성 프로듀서 테스트 부분 실패")
2-2-3. 테스트결과 => 멱등성 프로듀서는 단일 프로듀서 세션 내에서 중복을 방지, 여러 토픽, 파티션 걸친 원자적 쓰기는 보장x 이를 위해 트랜잭션 기능 사용해야
docker exec -it kafka1 kafka-console-consumer \
--bootstrap-server kafka1:29092 \
--topic idempotent-topic \
--from-beginning \
--max-messages 10 \
--property print.key=true
test 연결 테스트 메시지
2 {"id": 2, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #2", "timestamp": 1750679584.6633348}
3 {"id": 3, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #3", "timestamp": 1750679585.164608}
4 {"id": 4, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #4", "timestamp": 1750679585.670042}
5 {"id": 5, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #5", "timestamp": 1750679586.1711628}
6 {"id": 6, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #6", "timestamp": 1750679586.673999}
0 {"id": 0, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #0", "timestamp": 1750679583.65276}
1 {"id": 1, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #1", "timestamp": 1750679584.158083}
8 {"id": 8, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #8", "timestamp": 1750679587.67747}
7 {"id": 7, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #7", "timestamp": 1750679587.176651}
Processed a total of 10 messages
❌ 멱등성으로 불가능한 것
- 여러 개의 토픽이나 파티션에 메시지를 동시에 쓰는 것
→ 이건 트랜잭션 기능이 필요함 - 중간에 프로듀서가 재시작되면 멱등성 보장 안 됨
트랜잭션 프로듀서 설정 id는 반드시 고유해야하고, 같은 아이디로 다른 프로듀서가 시작되면 기존 프로듀서는 사용 못함. 당연한 얘기지만 트랜잭션 프로듀서는 멱등성도 자동을 활성화됨.
- 일반적인 단일 토픽/파티션 전송에서는 enable.idempotence=true만으로 (멱등성 프로듀서) 충분합니다.
- 하지만 여러 이벤트를 묶어 하나의 트랜잭션처럼 처리해야 하는 경우, 즉 DB의 트랜잭션처럼 Kafka도 쓰고 싶다면 → 트랜잭션 프로듀서 사용해야 합니다. (트랜잭션 프로듀서 사용해야)
# 트랜잭션 ID 생성 (고유해야 함)
transaction_id = f'txn-producer-{uuid.uuid4()}'
# 트랜잭션 프로듀서 설정
producer_config = {
'bootstrap.servers': bootstrap_servers,
'transactional.id': transaction_id, # 트랜잭션 ID (필수)
'enable.idempotence': True, # 트랜잭션은 자동으로 멱등성 활성화
'acks': 'all', # 필수 설정
'transaction.timeout.ms': 60000 # 트랜잭션 타임아웃 60초
}
트랜잭션 프로듀서.py
from confluent_kafka import Producer, KafkaException
def create_transaction_producer(bootstrap_servers='localhost:9092,localhost:9093,localhost:9094'):
"""트랜잭션 프로듀서 생성"""
# 트랜잭션 ID 생성 (고유해야 함)
transaction_id = f'txn-producer-{uuid.uuid4()}'
# 트랜잭션 프로듀서 설정
producer_config = {
'bootstrap.servers': bootstrap_servers,
'transactional.id': transaction_id, # 트랜잭션 ID (필수)
'enable.idempotence': True, # 트랜잭션은 자동으로 멱등성 활성화
'acks': 'all', # 필수 설정
'transaction.timeout.ms': 60000 # 트랜잭션 타임아웃 60초
}
print("\n[트랜잭션 프로듀서 설정]")
for key, value in producer_config.items():
print(f" {key}: {value}")
# 프로듀서 생성
producer = Producer(producer_config)
# 트랜잭션 초기화
print("\n[트랜잭션 초기화 중...]")
producer.init_transactions()
print("✅ 트랜잭션 초기화 성공")
return producer
def run_successful_transaction(producer, order_topic, payment_topic):
"""성공하는 트랜잭션 예제"""
print("\n----- 성공 트랜잭션 실행 -----")
try:
# 트랜잭션 시작
producer.begin_transaction()
print("✅ 트랜잭션 시작")
# 주문 생성 이벤트 전송
order_id = random.randint(1000, 9999)
order_data = {
'order_id': order_id,
'customer_id': 'user123',
'product_id': 'product456',
'quantity': 2,
'status': 'created',
'timestamp': time.time()
}
producer.produce(
topic=order_topic,
key=f'order-{order_id}',
value=json.dumps(order_data).encode('utf-8')
)
producer.poll(0)
print(f"✅ 주문 이벤트 전송 요청됨: order_id={order_id}")
# 결제 처리 이벤트 전송
payment_data = {
'order_id': order_id,
'payment_method': 'credit_card',
'amount': 129.99,
'status': 'processing',
'timestamp': time.time()
}
producer.produce(
topic=payment_topic,
key=f'order-{order_id}',
value=json.dumps(payment_data).encode('utf-8')
)
producer.poll(0)
print(f"✅ 결제 이벤트 전송 요청됨: order_id={order_id}")
# 처리 지연 시뮬레이션
time.sleep(1)
# 트랜잭션 커밋
print("[트랜잭션 커밋 중...]")
producer.commit_transaction()
print("✅ 트랜잭션이 성공적으로 커밋되었습니다.")
return order_id, True
except Exception as e:
print(f"❌ 트랜잭션 실패: {e}")
try:
producer.abort_transaction()
print("✅ 트랜잭션이 중단되었습니다.")
except Exception as abort_e:
print(f"❌ 트랜잭션 중단 실패: {abort_e}")
return None, False
def run_aborted_transaction(producer, order_topic, payment_topic):
"""중단되는 트랜잭션 예제"""
print("\n----- 중단 트랜잭션 실행 -----")
try:
# 트랜잭션 시작
producer.begin_transaction()
print("✅ 트랜잭션 시작")
# 주문 생성 이벤트 전송
order_id = random.randint(1000, 9999)
order_data = {
'order_id': order_id,
'customer_id': 'user456',
'product_id': 'product789',
'quantity': 1,
'status': 'created',
'timestamp': time.time()
}
producer.produce(
topic=order_topic,
key=f'order-{order_id}',
value=json.dumps(order_data).encode('utf-8')
)
producer.poll(0)
print(f"✅ 주문 이벤트 전송 요청됨: order_id={order_id}")
# 결제 처리 이벤트 전송 (실패 케이스)
payment_data = {
'order_id': order_id,
'payment_method': 'credit_card',
'amount': 99.99,
'status': 'failed', # 결제 실패
'error_code': 'insufficient_funds',
'timestamp': time.time()
}
producer.produce(
topic=payment_topic,
key=f'order-{order_id}',
value=json.dumps(payment_data).encode('utf-8')
)
producer.poll(0)
print(f"✅ 결제 이벤트 전송 요청됨: order_id={order_id}, status=failed")
# 비즈니스 로직 검증 - 결제 실패 시 트랜잭션 중단
print("[비즈니스 로직 검증 중... 결제 실패 감지]")
time.sleep(1)
# 실패 시 트랜잭션 중단
print("[트랜잭션 중단 중...]")
producer.abort_transaction()
print("✅ 트랜잭션이 의도적으로 중단되었습니다.")
return order_id, True
except Exception as e:
print(f"❌ 트랜잭션 처리 중 오류 발생: {e}")
try:
producer.abort_transaction()
print("✅ 트랜잭션이 중단되었습니다.")
except Exception as abort_e:
print(f"❌ 트랜잭션 중단 실패: {abort_e}")
return None, False
if __name__ == "__main__":
order_topic = 'order-events'
payment_topic = 'payment-events'
print(f"주문 토픽: {order_topic}, 결제 토픽: {payment_topic}")
# 트랜잭션 프로듀서 생성
producer = create_transaction_producer()
# 성공 트랜잭션 실행
success_order_id, success_result = run_successful_transaction(producer, order_topic, payment_topic)
# 1초 대기
time.sleep(1)
# 중단 트랜잭션 실행
aborted_order_id, abort_result = run_aborted_transaction(producer, order_topic, payment_topic)
# 결과 요약
print("\n===== 트랜잭션 테스트 결과 =====")
print(f"✅ 성공 트랜잭션 주문 ID: {success_order_id}")
print(f"❌ 중단 트랜잭션 주문 ID: {aborted_order_id}")
# 테스트 결과 평가 (성공 트랜잭션은 커밋되고, 중단 트랜잭션은 중단되어야 함)
if success_result and abort_result:
print("\n✅ 트랜잭션 프로듀서 테스트 성공")
else:
print("\n❌ 트랜잭션 프로듀서 테스트 부분 실패")
결과
[트랜잭션 프로듀서 설정]
bootstrap.servers: localhost:9092,localhost:9093,localhost:9094
transactional.id: txn-producer-9e9d3056-71ea-48e8-9527-59f3a845bf70
enable.idempotence: True
acks: all
transaction.timeout.ms: 60000
[트랜잭션 초기화 중...]
✅ 트랜잭션 초기화 성공
----- 성공 트랜잭션 실행 -----
✅ 트랜잭션 시작
✅ 주문 이벤트 전송 요청됨: order_id=3826
✅ 결제 이벤트 전송 요청됨: order_id=3826
[트랜잭션 커밋 중...]
✅ 트랜잭션이 성공적으로 커밋되었습니다.
----- 중단 트랜잭션 실행 -----
✅ 트랜잭션 시작
✅ 주문 이벤트 전송 요청됨: order_id=1202
✅ 결제 이벤트 전송 요청됨: order_id=1202, status=failed
[비즈니스 로직 검증 중... 결제 실패 감지]
[트랜잭션 중단 중...]
✅ 트랜잭션이 의도적으로 중단되었습니다.
===== 트랜잭션 테스트 결과 =====
✅ 성공 트랜잭션 주문 ID: 3826
❌ 중단 트랜잭션 주문 ID: 1202
# 트랜잭션 테스트 결과 확인 (read_uncommitted)
docker exec -it kafka1 kafka-console-consumer \
--bootstrap-server kafka1:29092 \
--topic order-events \
--from-beginning \
--max-messages 5 \
--property print.key=true
order-1202 {"order_id": 1202, "customer_id": "user456", "product_id": "product789", "quantity": 1, "status": "created", "timestamp": 1750690421.83603}
order-3826 {"order_id": 3826, "customer_id": "user123", "product_id": "product456", "quantity": 2, "status": "created", "timestamp": 1750690419.7871602}
# 트랜잭션 테스트 결과 확인 (read_committed)
docker exec -it kafka1 kafka-console-consumer \
--bootstrap-server kafka1:29092 \
--topic order-events \
--from-beginning \
--max-messages 5 \
--property print.key=true \
--isolation-level read_committed
order-3826 {"order_id": 3826, "customer_id": "user123", "product_id": "product456", "quantity": 2, "status": "created", "timestamp": 1750690419.7871602}
로직끝에 producer.commit_trnasaction() 이있으면 메시지 보내고, 반대로 producer.abort_transaction() 이있으면 read_committed 모드일 경우 abort된 건 안 보임
멱등성 프로듀서는 항상보임 단 중복은 없음 (단일파티션)
반응형
'kafka' 카테고리의 다른 글
[kafka] PostgreSQL - Kafka Connect 파이프라인 (feat. CDC) (1) | 2025.06.28 |
---|---|
[kafka] Kafka-Connect 알아보기 (1) | 2025.06.28 |
[kafka] Consumer 그룹관리 (2) | 2025.06.23 |
[kafka] Consumer 오프셋 관리와 리밸런싱 전략 (0) | 2025.06.23 |
[kafka] Producer 파티셔닝 전략 (1) | 2025.06.22 |
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- 우테코
- supervised
- un-supervised
- 윈도우pscale설치
- helm
- CloudFlare
- ADT
- planetscale배포
- create_task
- 대수자료구조
- nextj이미지저장
- asyncio
- Python
- 타입스크립트
- kubectl
- pscale
- asyncio.gather
- Tailwind
- next.js
- nodejs
- 비동기
- datalabeling
- iris
- 42서울
- 함수형프로그래밍
- window
- semi-supervised
- 위즈윅에디터
- SSR
- k8s
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
글 보관함