티스토리 뷰

반응형

1. kafka-python vs confluent-kafka-python

 

기능 kafka-python confluent-kafka-python
멱등성 프로듀서 지원하지않음 ✅ 지원 (enable.idempotence)
트랜잭션 지원하지않음 ✅ 지원 (transactional.id)
구현기반 Pure Python librdkafka C 라이브러리 래퍼
성능 보통 매우좋음 (네이티브 구현)
API 스타일 Python 스타일 kafka 자바 클라이언트 유사

 

 

2.실습: Confluent Kafka  Python 기능 테스트

  • 기본연결 테스트
  • 멱등성 프로듀서 테스트
  • 트랜잭션 테스트
  • 정확히 한 번 처리 (Exactly-Once Processing) 테스트

 

2-1. 기본 연결 테스트 코드 실행 

2-1-1. 도커띄우기

#시작
docker-compose up -d

[+] Running 5/5
 ✔ Container zookeeper  Started                                                                                                                                                                                              0.2s 
 ✔ Container kafka1     Started                                                                                                                                                                                              0.3s 
 ✔ Container kafka2     Started                                                                                                                                                                                              0.5s 
 ✔ Container kafka3     Started                                                                                                                                                                                              0.7s 
 ✔ Container kafka-ui   Started
 
#kafka 3개 Healthy Check!
docker-compose ps

NAME        IMAGE                             COMMAND                   SERVICE     CREATED      STATUS                     PORTS
kafka-ui    provectuslabs/kafka-ui:latest     "/bin/sh -c 'java --…"   kafka-ui    2 days ago   Up 4 minutes               0.0.0.0:8080->8080/tcp
kafka1      confluentinc/cp-kafka:7.3.2       "/etc/confluent/dock…"   kafka1      2 days ago   Up 55 seconds (healthy)    0.0.0.0:9092->9092/tcp
kafka2      confluentinc/cp-kafka:7.3.2       "/etc/confluent/dock…"   kafka2      2 days ago   Up 4 minutes (healthy)     9092/tcp, 0.0.0.0:9093->9093/tcp
kafka3      confluentinc/cp-kafka:7.3.2       "/etc/confluent/dock…"   kafka3      2 days ago   Up 55 seconds (healthy)    9092/tcp, 0.0.0.0:9094->9094/tcp
zookeeper   confluentinc/cp-zookeeper:7.3.2   "/etc/confluent/dock…"   zookeeper   2 days ago   Up 4 minutes (unhealthy)   2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp



from confluent_kafka import Producer
import socket

def delivery_report(err, msg):
    if err is not None:
        print(f'메시지 전송 실패: {err}')
    else:
        print(f'메시지 전송 성공: {msg.topic()} [{msg.partition()}] @ 오프셋 {msg.offset()}')

# 프로듀서 설정
producer_config = {
    'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
    'client.id': socket.gethostname()
}

producer = Producer(producer_config)
    
# 간단한 메시지 전송
producer.produce('idempotent-topic', key='test', value='연결 테스트 메시지', callback=delivery_report)
    
# 이벤트 루프 처리 및 메시지 전송 확인
producer.flush()

 

 

2-1-2. 토픽생성 & 라이브러리 설치

# 실습용 토픽 생성
docker exec -it kafka1 kafka-topics --create --bootstrap-server kafka1:29092 --topic idempotent-topic --partitions 3 --replication-factor 3
docker exec -it kafka1 kafka-topics --create --bootstrap-server kafka1:29092 --topic order-events --partitions 3 --replication-factor 3
docker exec -it kafka1 kafka-topics --create --bootstrap-server kafka1:29092 --topic payment-events --partitions 3 --replication-factor 3
docker exec -it kafka1 kafka-topics --create --bootstrap-server kafka1:29092 --topic input-topic --partitions 3 --replication-factor 3
docker exec -it kafka1 kafka-topics --create --bootstrap-server kafka1:29092 --topic output-topic --partitions 3 --replication-factor 3

#로그
Created topic idempotent-topic.
Created topic order-events.
Created topic payment-events.
Created topic input-topic.
Created topic output-topic.

#설치
pip install confluent-kafka

 

 

2-1-3. -python connection_test.py 실행

from confluent_kafka import Producer
import socket

def delivery_report(err, msg):
    if err is not None:
        print(f'메시지 전송 실패: {err}')
    else:
        print(f'메시지 전송 성공: {msg.topic()} [{msg.partition()}] @ 오프셋 {msg.offset()}')

# 프로듀서 설정
producer_config = {
    'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
    'client.id': socket.gethostname()
}

try:
    # 프로듀서 인스턴스 생성
    producer = Producer(producer_config)
    
    # 간단한 메시지 전송
    producer.produce('idempotent-topic', key='test', value='연결 테스트 메시지', callback=delivery_report)
    
    # 이벤트 루프 처리 및 메시지 전송 확인
    producer.flush()
    
    print("기본 연결 테스트 성공: 브로커와 연결되었으며 메시지를 전송했습니다.")
    
except Exception as e:
    print(f"기본 연결 테스트 실패: {e}")
    
    
    
#로그
메시지 전송 성공: idempotent-topic [1] @ 오프셋 0
기본 연결 테스트 성공: 브로커와 연결되었으며 메시지를 전송했습니다.

 

 

2-2. 멱등성 프로듀서 

  • 메시지가 정확히 한 번만 저장되도록 보장
  • 메시지 방지를 위해 PID 시퀀스 번호 사용
  • 설정: enable.idempotence=True 설정하면, 정확히 한번 전송

 

 

멱등성(idempotence) 이란?

**"같은 작업을 여러 번 해도 결과가 한 번만 한 것과 동일하게 나오는 성질"**

 

 

예시:

  • POST /order 요청을 3번 보내도 실제 주문은 한 번만 생성된다면 → 멱등성 보장됨.
  • Kafka에서 같은 메시지를 재전송하더라도 중복 없이 정확히 한 번만 저장되면 → 멱등성 프로듀서.

 

✅ 보장 범위

  • ✅ 단일 파티션
  • ✅ 단일 세션
  • ❌ 여러 토픽/파티션 원자성 없음

 

 

2-2-1. 설정

# 멱등성 프로듀서 설정
producer_config = {
'bootstrap.servers': bootstrap_servers,
'client.id': f'idempotent-producer-{uuid.uuid4()}',
'enable.idempotence': True, # 멱등성 활성화 (핵심 설정)
'acks': 'all', # 자동으로 'all'로 설정됨
'retries': 5, # 재시도 횟수
'max.in.flight.requests.per.connection': 5 # 최대 5로 제한됨
}

 

 

 


 

2-3. 트랜잭션 프로듀서 

  • 여러 토픽, 파티션에 걸친 원자적 쓰기 작업 보장
  • 모두 성공하거나 모두 실패하는 원자성
  • 설정: 고유한 transactional.id 필요

 

✅ 보장 범위

  • ✅ 여러 파티션/토픽
  • ✅ 원자성(atomicity)
  • ✅ 커밋되지 않은 메시지는 컨슈머에 보이지 않음

 

#!/usr/bin/env python3
"""
confluent-kafka-python 멱등성 프로듀서 테스트
"""

import json
import time
import uuid
import sys
from confluent_kafka import Producer, KafkaException

def delivery_callback(err, msg):
    """메시지 전송 콜백"""
    if err is not None:
        print(f'메시지 전송 실패: {err}')
    else:
        print(f'메시지 전송 성공: {msg.topic()} [{msg.partition()}] @ 오프셋 {msg.offset()}')


def create_idempotent_producer(bootstrap_servers='localhost:9092,localhost:9093,localhost:9094'):
    """멱등성 프로듀서 생성"""
    # 멱등성 프로듀서 설정
    producer_config = {
        'bootstrap.servers': bootstrap_servers,
        'client.id': f'idempotent-producer-{uuid.uuid4()}',
        'enable.idempotence': True,  # 멱등성 활성화 (핵심 설정)
        'acks': 'all',               # 자동으로 'all'로 설정됨
        'retries': 5,                # 재시도 횟수
        'max.in.flight.requests.per.connection': 5  # 최대 5로 제한됨
    }
    
    print("\n[프로듀서 설정]")
    for key, value in producer_config.items():
        print(f"  {key}: {value}")
    
    # 프로듀서 생성
    producer = Producer(producer_config)
    return producer


def send_messages(producer, topic, num_messages=5):
    """메시지 전송"""
    successful_messages = 0
    
    try:
        print("\n[메시지 전송 시작]")
        for i in range(num_messages):
            # 메시지 데이터
            message = {
                'id': i,
                'message': f'멱등성 프로듀서 테스트 메시지 #{i}',
                'timestamp': time.time()
            }
            
            # 메시지 전송
            producer.produce(
                topic=topic,
                key=str(i),
                value=json.dumps(message).encode('utf-8'),
                callback=delivery_callback
            )
            
            # 이벤트 처리
            producer.poll(0)
            
            print(f"  메시지 #{i} 전송 요청됨")
            time.sleep(0.5)  # 메시지간 간격
            successful_messages += 1
        
        # 모든 메시지가 전송될 때까지 대기
        print("\n[남은 메시지 flush 중...]")
        remaining = producer.flush(timeout=10)
        
        if remaining > 0:
            print(f"❗ {remaining}개의 메시지가 전송되지 않았습니다.")
        else:
            print("✅ 모든 메시지 flush 완료")
        
    except KafkaException as e:
        print(f"\n❌ Kafka 예외 발생: {e}")
        return successful_messages
    except Exception as e:
        print(f"\n❌ 일반 예외 발생: {e}")
        return successful_messages
    
    return successful_messages


if __name__ == "__main__":
    if len(sys.argv) > 1:
        topic = sys.argv[1]
    else:
        topic = 'idempotent-topic'
        
    if len(sys.argv) > 2:
        num_messages = int(sys.argv[2])
    else:
        num_messages = 10
    
    print(f"토픽: {topic}, 메시지 수: {num_messages}")
    
    # 프로듀서 생성
    producer = create_idempotent_producer()
    
    # 메시지 전송
    successful_count = send_messages(producer, topic, num_messages)
    
    print(f"\n테스트 결과: {successful_count}/{num_messages} 메시지 전송 성공")
    
    if successful_count == num_messages:
        print("\n✅ 멱등성 프로듀서 테스트 완료")
    else:
        print("\n❌ 멱등성 프로듀서 테스트 부분 실패")

 

 

2-2-3. 테스트결과 => 멱등성 프로듀서는 단일 프로듀서 세션 내에서 중복을 방지, 여러 토픽, 파티션 걸친 원자적 쓰기는 보장x 이를 위해 트랜잭션 기능 사용해야

docker exec -it kafka1 kafka-console-consumer \
  --bootstrap-server kafka1:29092 \
  --topic idempotent-topic \
  --from-beginning \
  --max-messages 10 \
  --property print.key=true
  
  
 test    연결 테스트 메시지
2       {"id": 2, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #2", "timestamp": 1750679584.6633348}
3       {"id": 3, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #3", "timestamp": 1750679585.164608}
4       {"id": 4, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #4", "timestamp": 1750679585.670042}
5       {"id": 5, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #5", "timestamp": 1750679586.1711628}
6       {"id": 6, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #6", "timestamp": 1750679586.673999}
0       {"id": 0, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #0", "timestamp": 1750679583.65276}
1       {"id": 1, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #1", "timestamp": 1750679584.158083}
8       {"id": 8, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #8", "timestamp": 1750679587.67747}
7       {"id": 7, "message": "\uba71\ub4f1\uc131 \ud504\ub85c\ub4c0\uc11c \ud14c\uc2a4\ud2b8 \uba54\uc2dc\uc9c0 #7", "timestamp": 1750679587.176651}
Processed a total of 10 messages

 

 

 

❌ 멱등성으로 불가능한 것

  • 여러 개의 토픽이나 파티션에 메시지를 동시에 쓰는 것
    → 이건 트랜잭션 기능이 필요함
  • 중간에 프로듀서가 재시작되면 멱등성 보장 안 됨

 

트랜잭션 프로듀서 설정 id는 반드시 고유해야하고, 같은 아이디로 다른 프로듀서가 시작되면 기존 프로듀서는 사용 못함. 당연한 얘기지만 트랜잭션 프로듀서는 멱등성도 자동을 활성화됨.

 

 

  • 일반적인 단일 토픽/파티션 전송에서는 enable.idempotence=true만으로 (멱등성 프로듀서) 충분합니다.
  • 하지만 여러 이벤트를 묶어 하나의 트랜잭션처럼 처리해야 하는 경우, 즉 DB의 트랜잭션처럼 Kafka도 쓰고 싶다면 → 트랜잭션 프로듀서 사용해야 합니다. (트랜잭션 프로듀서 사용해야)

 

# 트랜잭션 ID 생성 (고유해야 함)
transaction_id = f'txn-producer-{uuid.uuid4()}'

# 트랜잭션 프로듀서 설정
producer_config = {
'bootstrap.servers': bootstrap_servers,
	'transactional.id': transaction_id, # 트랜잭션 ID (필수)
	'enable.idempotence': True, # 트랜잭션은 자동으로 멱등성 활성화
	'acks': 'all', # 필수 설정
	'transaction.timeout.ms': 60000 # 트랜잭션 타임아웃 60초
}

 

 

트랜잭션 프로듀서.py

from confluent_kafka import Producer, KafkaException

def create_transaction_producer(bootstrap_servers='localhost:9092,localhost:9093,localhost:9094'):
    """트랜잭션 프로듀서 생성"""
    # 트랜잭션 ID 생성 (고유해야 함)
    transaction_id = f'txn-producer-{uuid.uuid4()}'
    
    # 트랜잭션 프로듀서 설정
    producer_config = {
        'bootstrap.servers': bootstrap_servers,
        'transactional.id': transaction_id,  # 트랜잭션 ID (필수)
        'enable.idempotence': True,   # 트랜잭션은 자동으로 멱등성 활성화
        'acks': 'all',                # 필수 설정
        'transaction.timeout.ms': 60000  # 트랜잭션 타임아웃 60초
    }
    
    print("\n[트랜잭션 프로듀서 설정]")
    for key, value in producer_config.items():
        print(f"  {key}: {value}")
    
    # 프로듀서 생성
    producer = Producer(producer_config)
    
    # 트랜잭션 초기화
    print("\n[트랜잭션 초기화 중...]")
    producer.init_transactions()
    print("✅ 트랜잭션 초기화 성공")
    
    return producer


def run_successful_transaction(producer, order_topic, payment_topic):
    """성공하는 트랜잭션 예제"""
    print("\n----- 성공 트랜잭션 실행 -----")
    
    try:
        # 트랜잭션 시작
        producer.begin_transaction()
        print("✅ 트랜잭션 시작")
        
        # 주문 생성 이벤트 전송
        order_id = random.randint(1000, 9999)
        
        order_data = {
            'order_id': order_id,
            'customer_id': 'user123',
            'product_id': 'product456',
            'quantity': 2,
            'status': 'created',
            'timestamp': time.time()
        }
        
        producer.produce(
            topic=order_topic,
            key=f'order-{order_id}',
            value=json.dumps(order_data).encode('utf-8')
        )
        producer.poll(0)
        print(f"✅ 주문 이벤트 전송 요청됨: order_id={order_id}")
        
        # 결제 처리 이벤트 전송
        payment_data = {
            'order_id': order_id,
            'payment_method': 'credit_card',
            'amount': 129.99,
            'status': 'processing',
            'timestamp': time.time()
        }
        
        producer.produce(
            topic=payment_topic,
            key=f'order-{order_id}',
            value=json.dumps(payment_data).encode('utf-8')
        )
        producer.poll(0)
        print(f"✅ 결제 이벤트 전송 요청됨: order_id={order_id}")
        
        # 처리 지연 시뮬레이션
        time.sleep(1)
        
        # 트랜잭션 커밋
        print("[트랜잭션 커밋 중...]")
        producer.commit_transaction()
        print("✅ 트랜잭션이 성공적으로 커밋되었습니다.")
        
        return order_id, True
        
    except Exception as e:
        print(f"❌ 트랜잭션 실패: {e}")
        try:
            producer.abort_transaction()
            print("✅ 트랜잭션이 중단되었습니다.")
        except Exception as abort_e:
            print(f"❌ 트랜잭션 중단 실패: {abort_e}")
        return None, False


def run_aborted_transaction(producer, order_topic, payment_topic):
    """중단되는 트랜잭션 예제"""
    print("\n----- 중단 트랜잭션 실행 -----")
    
    try:
        # 트랜잭션 시작
        producer.begin_transaction()
        print("✅ 트랜잭션 시작")
        
        # 주문 생성 이벤트 전송
        order_id = random.randint(1000, 9999)
        
        order_data = {
            'order_id': order_id,
            'customer_id': 'user456',
            'product_id': 'product789',
            'quantity': 1,
            'status': 'created',
            'timestamp': time.time()
        }
        
        producer.produce(
            topic=order_topic,
            key=f'order-{order_id}',
            value=json.dumps(order_data).encode('utf-8')
        )
        producer.poll(0)
        print(f"✅ 주문 이벤트 전송 요청됨: order_id={order_id}")
        
        # 결제 처리 이벤트 전송 (실패 케이스)
        payment_data = {
            'order_id': order_id,
            'payment_method': 'credit_card',
            'amount': 99.99,
            'status': 'failed',  # 결제 실패
            'error_code': 'insufficient_funds',
            'timestamp': time.time()
        }
        
        producer.produce(
            topic=payment_topic,
            key=f'order-{order_id}',
            value=json.dumps(payment_data).encode('utf-8')
        )
        producer.poll(0)
        print(f"✅ 결제 이벤트 전송 요청됨: order_id={order_id}, status=failed")
        
        # 비즈니스 로직 검증 - 결제 실패 시 트랜잭션 중단
        print("[비즈니스 로직 검증 중... 결제 실패 감지]")
        time.sleep(1)
        
        # 실패 시 트랜잭션 중단
        print("[트랜잭션 중단 중...]")
        producer.abort_transaction()
        print("✅ 트랜잭션이 의도적으로 중단되었습니다.")
        
        return order_id, True
        
    except Exception as e:
        print(f"❌ 트랜잭션 처리 중 오류 발생: {e}")
        try:
            producer.abort_transaction()
            print("✅ 트랜잭션이 중단되었습니다.")
        except Exception as abort_e:
            print(f"❌ 트랜잭션 중단 실패: {abort_e}")
        return None, False


if __name__ == "__main__":
    order_topic = 'order-events'
    payment_topic = 'payment-events'
    
    print(f"주문 토픽: {order_topic}, 결제 토픽: {payment_topic}")
    
    # 트랜잭션 프로듀서 생성
    producer = create_transaction_producer()
    
    # 성공 트랜잭션 실행
    success_order_id, success_result = run_successful_transaction(producer, order_topic, payment_topic)
    
    # 1초 대기
    time.sleep(1)
    
    # 중단 트랜잭션 실행
    aborted_order_id, abort_result = run_aborted_transaction(producer, order_topic, payment_topic)
    
    # 결과 요약
    print("\n===== 트랜잭션 테스트 결과 =====")
    print(f"✅ 성공 트랜잭션 주문 ID: {success_order_id}")
    print(f"❌ 중단 트랜잭션 주문 ID: {aborted_order_id}")
    
    # 테스트 결과 평가 (성공 트랜잭션은 커밋되고, 중단 트랜잭션은 중단되어야 함)
    if success_result and abort_result:
        print("\n✅ 트랜잭션 프로듀서 테스트 성공")
    else:
        print("\n❌ 트랜잭션 프로듀서 테스트 부분 실패")

 

 

결과

[트랜잭션 프로듀서 설정]
  bootstrap.servers: localhost:9092,localhost:9093,localhost:9094
  transactional.id: txn-producer-9e9d3056-71ea-48e8-9527-59f3a845bf70
  enable.idempotence: True
  acks: all
  transaction.timeout.ms: 60000

[트랜잭션 초기화 중...]
✅ 트랜잭션 초기화 성공

----- 성공 트랜잭션 실행 -----
✅ 트랜잭션 시작
✅ 주문 이벤트 전송 요청됨: order_id=3826
✅ 결제 이벤트 전송 요청됨: order_id=3826
[트랜잭션 커밋 중...]
✅ 트랜잭션이 성공적으로 커밋되었습니다.

----- 중단 트랜잭션 실행 -----
✅ 트랜잭션 시작
✅ 주문 이벤트 전송 요청됨: order_id=1202
✅ 결제 이벤트 전송 요청됨: order_id=1202, status=failed
[비즈니스 로직 검증 중... 결제 실패 감지]
[트랜잭션 중단 중...]
✅ 트랜잭션이 의도적으로 중단되었습니다.

===== 트랜잭션 테스트 결과 =====
✅ 성공 트랜잭션 주문 ID: 3826
❌ 중단 트랜잭션 주문 ID: 1202

 

# 트랜잭션 테스트 결과 확인 (read_uncommitted)
docker exec -it kafka1 kafka-console-consumer \
  --bootstrap-server kafka1:29092 \
  --topic order-events \
  --from-beginning \
  --max-messages 5 \
  --property print.key=true
  
order-1202      {"order_id": 1202, "customer_id": "user456", "product_id": "product789", "quantity": 1, "status": "created", "timestamp": 1750690421.83603}
order-3826      {"order_id": 3826, "customer_id": "user123", "product_id": "product456", "quantity": 2, "status": "created", "timestamp": 1750690419.7871602}
  
  


# 트랜잭션 테스트 결과 확인 (read_committed)
docker exec -it kafka1 kafka-console-consumer \
  --bootstrap-server kafka1:29092 \
  --topic order-events \
  --from-beginning \
  --max-messages 5 \
  --property print.key=true \
  --isolation-level read_committed
  
  order-3826      {"order_id": 3826, "customer_id": "user123", "product_id": "product456", "quantity": 2, "status": "created", "timestamp": 1750690419.7871602}

 

로직끝에 producer.commit_trnasaction() 이있으면 메시지 보내고, 반대로 producer.abort_transaction() 이있으면 read_committed 모드일 경우 abort된 건 안 보임

 

멱등성 프로듀서는 항상보임 단 중복은 없음 (단일파티션)

반응형
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/07   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31
글 보관함