kafka

[kafka] Consumer 오프셋 관리와 리밸런싱 전략

0hyeon의 2025. 6. 23. 03:58
반응형

1.컨슈머 오프셋 관리

 

오프셋

  • 컨슈머가 어디까지 메시지를 읽었는지 기록하는 값
  • 컨슈머가 메시지를 읽은 후, 오프셋을 커밋하여 컨슈머 그룹에 알림
  • poll()을 호출 할때, 카프카에 쓰여진 메시지중, 컨슈머 그룹에 속한 컨슈머들이 아직 읽지않은 레코드가 리턴됨.

 

오프셋 커밋

  • 파티션에서 현재위치를 업데이트 하는 작업
  • 파티션에서 성공적으로 처리해낸 마지막 메시지를 커밋하여, 그 전 메시지들 또한 성공적으로 처리되었음을 암묵적으로 알림 -> at least once 최소한번 전송 원칙 
  • kafka 특수 토픽  _consumer_offsets 에 각 파티션별로, 커밋된 오프셋을 업데이트 하도록 메시지를 전송함.

 

오프셋 커밋방식

  • Auto Commit 
    • 자동으로 오프셋을 커밋하는 방식
    • 빠르지만 중복 메시지 발생 가능
      • enalbe.auto.commit=True 설정시,
        • 5초 (auto.commit.interval.ms 의 기본값)에 한번, poll() 을 통해 받은 메시지중 마지막 메시지(바로다음)의 오프셋을 커밋함. 
        • 커밋한지 3초 뒤에 컨슈머가 죽으면 그 3초간의 메시지가 중복으로 처리됨.
      • 장점 ; 코드 간결 , 개발자가 직접 커밋을 구현하지 않아도됨
      • 단점: 메시지 처리 상태와 커밋 타이밍이 일치하지 않을수 있다. -> 3초간 중복으로 처리될수있음 (금융,결제 처럼 중요한경우 수동 커밋이 안전)
  • 동기 가져오기 (Synchronous Fetch)
    • consumer.poll() 로 메시지 가져온후 명시적으로 commitSync() 실행
    • 안전하지만 브로커가 커밋 요청에 응답할때까지 애플리케이션이 블록되어 처리량을 제한 
    • 에러 발생시 커밋을 재시도함.
      • 특징 
        • 커밋이 확실히처리됐는지 즉시알수있음. 신뢰성이 높음
        • 브로커 응답을 기다리는 동안 애플리케이션 스루풋이 제한 성능병목
        • 처리량 < 안정성
  • 비동기 가져오기 (Asynchronous Fetch)
    • commitAsync() 를 사용하여 병렬로 처리
    • 브로커가 커밋에 응답할 때까지 기다리는 대신 요청만 보내고 처리를 계속함
    • 에러 발생 시 커밋을 재시도 하지않음
    • 브로커가 보낸 응답을 받았을때, 콜백을 지정할 수 있는 옵션 존재
      • 커밋 에러 로깅 및 집계, 재시도 등 추가 처리 가능 
    • 특징
      • 에러 발생해도 자동으로 재시도 하지않기때문에 필요시 콜백을해서 재시도로직구현 해야함
      • 커밋 요청순서와 실제완료순서가 다를 수 있어 특정조건에서 예상치 못한 결과 발생
      • 높은처리량 중복처리 허용 
  • 처리단계에따라 동기 비동기 혼합해서 사용하기도함 (현업)
  • 일부처리는 비동기 에서 셧다운 직전 동기작업 사용하기도함 

 


 

 

2.컨슈머 파티션 리밸런스

컨슈머 그룹내 컨슈머 추가/삭제 될때 파티션을 재분배하는과정

-> 리밸런싱이 자주 발생하면 성능 저하 문제 발생 가능

카프카 4.0부터 완전히 사용 중단됨

 

  • 리밸런스 트리거
    • 컨슈머 그룹 내에서 컨슈머가 추가/제거 되거나, 컨슈머 세션이 만료될 때 리밴런스가 발생함
  • 리밴런스 과정
    • 컨슈머 그룹 코디네이터 (Groop Coordinator)가 컨슈머들에게 리밴런스 시작을 알림 
      • 컨슈머가 기존 파티션을 반납 (리밸런스 프로토콜을 따름)
      • 리더 컨슈머가 파티션 할당 전략을 실행해 컨슈머에 파티션을 재배정
  • 리밸런스 완료후 
    • 각 컨슈머가 새로운 파티션을 받아서 다시 메시지를 소비하기 시작함
    • 이전 커밋된 오프셋이 있다면 그지점부터, 없다면 매개변수 auto offset reset설정에 따라 시작위치가 결정이 됨.

 

 

리밸런스 프로토콜의 두가지 방식

조급한 리밸런스 방식(eager rebalance)

  • 모든 컨슈머가 일시적으로 파티션 할당 해재됨
  • 새로운 할당이 완료될 때까지 메시지 소비 중단
  • 전체 파티션 재분배되므로 전체 작업중단 발생
    • kafka 4.0부터 완전히 중단됨.

협력적 리밸런스 방식(cooperative rebalance)

  • 점진적으로 파티션을 재할당 
  • 필요한 파티션을 재할당(partition 3) 하여 서비스 중단 최소화
    • kafka 3.1부터 기본값

 

 


3.파티션 할당전략 

  • 리밸런싱 시 파티션을 컨슈머에게 어떻게 배정할지 결정하는 방법
  • partition.assignment.strategy로 설정가능
    • Range Assigin (기본값)
      • 컨슈머가 구독하는 각 토픽의 파티션들을 연속된 그룹으로 나눠서 할당
      • 토픽이 많을 경우 불균형한 할당 가능성 존재
    • Round Robin Assignor
      • 모든 파티션을 돌아가면서 균등하게 배정
      • 전체적으로 균형 잡힌 할당 가능 
    • Sticky Assignor (조금더 발전된 전략)
      • 파티션을 가능한 균등 할당한다.
      • 리밸런스 시 가능한 많은 파티션들이 같은 컨슈머에 할당한다.
        • 기존할당을 최대한 유지하며, 최소한의 변경만 수행하여 리밸런싱으로 인한 부하 감소 -> 리밸런스 후에도 컨슈머가 이전에 처리하던 파티션을 계속 처리할 가능성 높혀줌. 기존 캐시와, 내부상태 활용 성능향상에 도움
    • Cooperative Sticke Assignor (현업시)
      • Sticky Assignor의 장점 + 협력적 리밸런싱 지원
        • 균등파티션할당, 파티션할당의 안정성, 리밸런스중 서비스중단 최소화 3가지 이점 

 


4.컨슈머 스태틱 멤버십

기존 다이나믹 멤버십 문제

  • 컨슈머가 재시작되면 세션 타임아웃이 발생하여 컨슈머 그룹에서 제거됨
  • 이후 다시 합류하면 전체 리밸런싱 발생
  • -> 메시지 소비중단, CPU및 네트워크 부하증가

 

게념

  • 컨슈머 그룹 내 특정 컨슈머가 재시작 되더라도 기존 파티션 할당을 유지하는 방식 
  • 불필요한 리밸런스를 방지하여 서비스 중단 최소화
  • 컨슈머가 group.instance.id를 설정하여 고유 ID를 유지

 


5.서비스 개발시 고려사항

  • 메시지순서보장
    • 메시지 키를 설정하여 특정파티션에 할당
    • 같은 키를가진 메시지가 같은 파티션에 저장되도록 유지
    • 예시 : Order ID를 키로 사용하면, 같은 주문의 모든 메시지가 동일한 파티션 저장됨 -> 순서보장 
    • 주의할점 : 순서보장은 파티션내에서만 보장 서로 다른 파티션 간에는 순서 보장x,  컨슈머 병렬로 처리할경우 메시지 순서가 달라질수도, 순서가 중요한경우 단일 컨슈머로 처리하는것을 추천
  • 중복 메시지 처리 
    • kafka offset 개념활용
    • idempotent producer( 중복 전송방지) 사용
    • 컨슈머 측에서 중복 수신 메시지 필터링 적용
      • 예시:
        • Exactily-Once Processing (EoS) 를 지원하는 kafka 설정 적용
          • 각메시지의 고유id 부여하고, 이미처리한 거래id를 db에 기록해서 중복 처리르 막을수있음.
        • commitSync() 또는 CommitAsync() 를 적절히 사용하여 중복방지 
  • 트랜잭셔널 메시징
    • kafka 트랜잭션 프로듀서를 사용하여 메시지를 원자적으로 처리
    • 트랜잭션 ID (transactional,id)설정
    • commitTransaction() 을 통해 메시지를 커밋
    • isolation_level=read_committed로 컨슈머 설정하여 커밋된 트랜잭션의 메시지 필터
      • 예시:
        • A 서비스에서 메시지생성 -> B 서비스에서 처리 / 다중 컨슈머 시나리오에서 트랜잭션 을 활용하여 데이터 일관성 보장 
from kafka import KafkaConsumer

consumer_config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'isolation.level': 'read_committed',  # 커밋된 트랜잭션만 읽기
        'auto.offset.reset': 'earliest'
}

for message in consumer:
	print(f"수신한 메시지: {message.value.decode()}")

 

 

  • 다양한 요구사항에 맞는 컨슈머 설정 전략
    • use case별 컨슈머 설정방법
요구사항 설정방법
메시지 유실 없이 읽기 enable.auto.commit=False + commitSync() 사용
빠른 처리가 필요함 fetch.min.bytes 조정 + commitAsync() 사용
컨슈머 확장성을 높이고 싶음 컨슈머 그룹을 늘려서 파티션을 병렬 처리
리밸런싱 성능 최적화 sticky.assignor 또는 cooperative-sticky 사용
특정 컨슈머가 특정 데이터를 가져오게 하고싶음 수동으로 파티션할당 assign()
반응형