kafka
[kafka] Consumer 오프셋 관리와 리밸런싱 전략
0hyeon의
2025. 6. 23. 03:58
반응형
1.컨슈머 오프셋 관리
오프셋
- 컨슈머가 어디까지 메시지를 읽었는지 기록하는 값
- 컨슈머가 메시지를 읽은 후, 오프셋을 커밋하여 컨슈머 그룹에 알림
- poll()을 호출 할때, 카프카에 쓰여진 메시지중, 컨슈머 그룹에 속한 컨슈머들이 아직 읽지않은 레코드가 리턴됨.
오프셋 커밋
- 파티션에서 현재위치를 업데이트 하는 작업
- 파티션에서 성공적으로 처리해낸 마지막 메시지를 커밋하여, 그 전 메시지들 또한 성공적으로 처리되었음을 암묵적으로 알림 -> at least once 최소한번 전송 원칙
- kafka 특수 토픽 _consumer_offsets 에 각 파티션별로, 커밋된 오프셋을 업데이트 하도록 메시지를 전송함.
오프셋 커밋방식
- Auto Commit
- 자동으로 오프셋을 커밋하는 방식
- 빠르지만 중복 메시지 발생 가능
- enalbe.auto.commit=True 설정시,
- 5초 (auto.commit.interval.ms 의 기본값)에 한번, poll() 을 통해 받은 메시지중 마지막 메시지(바로다음)의 오프셋을 커밋함.
- 커밋한지 3초 뒤에 컨슈머가 죽으면 그 3초간의 메시지가 중복으로 처리됨.
- 장점 ; 코드 간결 , 개발자가 직접 커밋을 구현하지 않아도됨
- 단점: 메시지 처리 상태와 커밋 타이밍이 일치하지 않을수 있다. -> 3초간 중복으로 처리될수있음 (금융,결제 처럼 중요한경우 수동 커밋이 안전)
- enalbe.auto.commit=True 설정시,
- 동기 가져오기 (Synchronous Fetch)
- consumer.poll() 로 메시지 가져온후 명시적으로 commitSync() 실행
- 안전하지만 브로커가 커밋 요청에 응답할때까지 애플리케이션이 블록되어 처리량을 제한
- 에러 발생시 커밋을 재시도함.
- 특징
- 커밋이 확실히처리됐는지 즉시알수있음. 신뢰성이 높음
- 브로커 응답을 기다리는 동안 애플리케이션 스루풋이 제한 성능병목
- 처리량 < 안정성
- 특징
- 비동기 가져오기 (Asynchronous Fetch)
- commitAsync() 를 사용하여 병렬로 처리
- 브로커가 커밋에 응답할 때까지 기다리는 대신 요청만 보내고 처리를 계속함
- 에러 발생 시 커밋을 재시도 하지않음
- 브로커가 보낸 응답을 받았을때, 콜백을 지정할 수 있는 옵션 존재
- 커밋 에러 로깅 및 집계, 재시도 등 추가 처리 가능
- 특징
- 에러 발생해도 자동으로 재시도 하지않기때문에 필요시 콜백을해서 재시도로직구현 해야함
- 커밋 요청순서와 실제완료순서가 다를 수 있어 특정조건에서 예상치 못한 결과 발생
- 높은처리량 중복처리 허용
- 처리단계에따라 동기 비동기 혼합해서 사용하기도함 (현업)
- 일부처리는 비동기 에서 셧다운 직전 동기작업 사용하기도함
2.컨슈머 파티션 리밸런스
컨슈머 그룹내 컨슈머 추가/삭제 될때 파티션을 재분배하는과정
-> 리밸런싱이 자주 발생하면 성능 저하 문제 발생 가능
- 리밸런스 트리거
- 컨슈머 그룹 내에서 컨슈머가 추가/제거 되거나, 컨슈머 세션이 만료될 때 리밴런스가 발생함
- 리밴런스 과정
- 컨슈머 그룹 코디네이터 (Groop Coordinator)가 컨슈머들에게 리밴런스 시작을 알림
- 컨슈머가 기존 파티션을 반납 (리밸런스 프로토콜을 따름)
- 리더 컨슈머가 파티션 할당 전략을 실행해 컨슈머에 파티션을 재배정
- 컨슈머 그룹 코디네이터 (Groop Coordinator)가 컨슈머들에게 리밴런스 시작을 알림
- 리밸런스 완료후
- 각 컨슈머가 새로운 파티션을 받아서 다시 메시지를 소비하기 시작함
- 이전 커밋된 오프셋이 있다면 그지점부터, 없다면 매개변수 auto offset reset설정에 따라 시작위치가 결정이 됨.
리밸런스 프로토콜의 두가지 방식
조급한 리밸런스 방식(eager rebalance)
- 모든 컨슈머가 일시적으로 파티션 할당 해재됨
- 새로운 할당이 완료될 때까지 메시지 소비 중단
- 전체 파티션 재분배되므로 전체 작업중단 발생
- kafka 4.0부터 완전히 중단됨.
협력적 리밸런스 방식(cooperative rebalance)
- 점진적으로 파티션을 재할당
- 필요한 파티션을 재할당(partition 3) 하여 서비스 중단 최소화
- kafka 3.1부터 기본값
3.파티션 할당전략
- 리밸런싱 시 파티션을 컨슈머에게 어떻게 배정할지 결정하는 방법
- partition.assignment.strategy로 설정가능
- Range Assigin (기본값)
- 컨슈머가 구독하는 각 토픽의 파티션들을 연속된 그룹으로 나눠서 할당
- 토픽이 많을 경우 불균형한 할당 가능성 존재
- Round Robin Assignor
- 모든 파티션을 돌아가면서 균등하게 배정
- 전체적으로 균형 잡힌 할당 가능
- Sticky Assignor (조금더 발전된 전략)
- 파티션을 가능한 균등 할당한다.
- 리밸런스 시 가능한 많은 파티션들이 같은 컨슈머에 할당한다.
- 기존할당을 최대한 유지하며, 최소한의 변경만 수행하여 리밸런싱으로 인한 부하 감소 -> 리밸런스 후에도 컨슈머가 이전에 처리하던 파티션을 계속 처리할 가능성 높혀줌. 기존 캐시와, 내부상태 활용 성능향상에 도움
- Cooperative Sticke Assignor (현업시)
- Sticky Assignor의 장점 + 협력적 리밸런싱 지원
- 균등파티션할당, 파티션할당의 안정성, 리밸런스중 서비스중단 최소화 3가지 이점
- Sticky Assignor의 장점 + 협력적 리밸런싱 지원
- Range Assigin (기본값)
4.컨슈머 스태틱 멤버십
기존 다이나믹 멤버십 문제
- 컨슈머가 재시작되면 세션 타임아웃이 발생하여 컨슈머 그룹에서 제거됨
- 이후 다시 합류하면 전체 리밸런싱 발생
- -> 메시지 소비중단, CPU및 네트워크 부하증가
게념
- 컨슈머 그룹 내 특정 컨슈머가 재시작 되더라도 기존 파티션 할당을 유지하는 방식
- 불필요한 리밸런스를 방지하여 서비스 중단 최소화
- 컨슈머가 group.instance.id를 설정하여 고유 ID를 유지
5.서비스 개발시 고려사항
- 메시지순서보장
- 메시지 키를 설정하여 특정파티션에 할당
- 같은 키를가진 메시지가 같은 파티션에 저장되도록 유지
- 예시 : Order ID를 키로 사용하면, 같은 주문의 모든 메시지가 동일한 파티션 저장됨 -> 순서보장
- 주의할점 : 순서보장은 파티션내에서만 보장 서로 다른 파티션 간에는 순서 보장x, 컨슈머 병렬로 처리할경우 메시지 순서가 달라질수도, 순서가 중요한경우 단일 컨슈머로 처리하는것을 추천
- 중복 메시지 처리
- kafka offset 개념활용
- idempotent producer( 중복 전송방지) 사용
- 컨슈머 측에서 중복 수신 메시지 필터링 적용
- 예시:
- Exactily-Once Processing (EoS) 를 지원하는 kafka 설정 적용
- 각메시지의 고유id 부여하고, 이미처리한 거래id를 db에 기록해서 중복 처리르 막을수있음.
- commitSync() 또는 CommitAsync() 를 적절히 사용하여 중복방지
- Exactily-Once Processing (EoS) 를 지원하는 kafka 설정 적용
- 예시:
- 트랜잭셔널 메시징
- kafka 트랜잭션 프로듀서를 사용하여 메시지를 원자적으로 처리
- 트랜잭션 ID (transactional,id)설정
- commitTransaction() 을 통해 메시지를 커밋
- isolation_level=read_committed로 컨슈머 설정하여 커밋된 트랜잭션의 메시지 필터
- 예시:
- A 서비스에서 메시지생성 -> B 서비스에서 처리 / 다중 컨슈머 시나리오에서 트랜잭션 을 활용하여 데이터 일관성 보장
- 예시:
from kafka import KafkaConsumer
consumer_config = {
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'isolation.level': 'read_committed', # 커밋된 트랜잭션만 읽기
'auto.offset.reset': 'earliest'
}
for message in consumer:
print(f"수신한 메시지: {message.value.decode()}")
- 다양한 요구사항에 맞는 컨슈머 설정 전략
- use case별 컨슈머 설정방법
요구사항 | 설정방법 |
메시지 유실 없이 읽기 | enable.auto.commit=False + commitSync() 사용 |
빠른 처리가 필요함 | fetch.min.bytes 조정 + commitAsync() 사용 |
컨슈머 확장성을 높이고 싶음 | 컨슈머 그룹을 늘려서 파티션을 병렬 처리 |
리밸런싱 성능 최적화 | sticky.assignor 또는 cooperative-sticky 사용 |
특정 컨슈머가 특정 데이터를 가져오게 하고싶음 | 수동으로 파티션할당 assign() |
반응형