kafka
[kafka] Consumer 그룹관리
0hyeon의
2025. 6. 23. 06:42
반응형
##컨슈머 그룹 오프셋 초기화
#모든 파티션 오프셋을 0 (처음) 으로 재설정
--reset-offsets --to-earliest --execute
#가장 마지막 메시지 위치로 오프셋 이동
--reset-offsets --to-latest --execute
Consumer 기본 컨셉
- 컨슈머 그룹 : 토픽 파티션을 분산 처리하는 컨슈머 집합
- 토픽 파티션을 분산처리 같은 그룹 id를 가진 컨슈머들이 파티션을 나눠 처리하여 병렬성 확보, 일부 컨슈머가 실패하더라도 다른 컨슈머가 이어받아 고가용성을 제공
- 오프셋 관리 : 메시지 소비 위치를 추적하는 핵심 메커니즘
- 각파티션마다 어디까지 메시지를 읽었는지 처리하여 중복처리없는 연속적인 메시지 처리 가능
- 오프셋 커밋전략
- 자동 커밋 : 간편하지만 중복 발생가능
- 동기식 커밋 : 안전하지만 처리량 제한
- 비동기식 커밋 : 높은 처리량, 에러처리 유의 필요
- 핵심 고려사항
- 파티션 할당 전략 : Range, RoundRobin, Sticky, Cooperative Sticky
- 리밸런스 프로토콜 : Eager vs Cooperative
- 컨슈머 그룹 관리 : 상태 확인, 오프셋 리셋, Lag 모니터링
- 중복 처리 방지 : 멱등성 처리, 정확히 한 번 처리 전략
- 오프셋 커밋전략
- 각파티션마다 어디까지 메시지를 읽었는지 처리하여 중복처리없는 연속적인 메시지 처리 가능
- 리밸런싱 : 컨슈머 추가/제거 시 파티션 재할당 과정
- 컨슈머 그룹 동적으로 확장/축소 가능, 장애시 자동으로 복구 가능
핵심 API와 호출 방식
- poll() 메서드 : 메시지 가져오기 기본 메커니즘
- 이터레이터 패턴 : for message in consumer 형태의 더 간편한소비, 코드가 간편,간결 (사용하기쉽고 가독성이좋다)
- Seek/Assign: 특정 파티션/오프셋에서 메시지 읽기
#토픽 및 컨슈머 그룹 목록 확인
docker exec -it kafka1 kafka-topics --bootstrap-server kafka1:29092 --list
>
__consumer_offsets
events
orders
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --list
>
order_processing_group
#컨슈머 그룹 상태 조회
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --describe --group order_processing_group
>(최초)
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
order_processing_group orders 0 0 0 0 - - -
order_processing_group orders 1 0 0 0 - - -
order_processing_group orders 2 0 0 0 - - -
#상태확인
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --describe --group order_processing_group --state
>
GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
order_processing_group kafka2:29093 (2) range Stable 2
#프로듀서 가동후 (LAG이쌓임)
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --describe --group order_processing_group
>
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
order_processing_group orders 0 0 4 4 - - -
order_processing_group orders 1 0 3 3 - - -
order_processing_group orders 2 0 8 8 - - -
#컨슈머 가동후 (LAG이줄음)
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --describe --group order_processing_group
>
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
order_processing_group orders 0 16 16 0 order_processing_group-client-0-d087f120-fda0-457b-90b9-1d39b6b38f3e /192.168.65.1 order_processing_group-client-0
order_processing_group orders 1 12 12 0 order_processing_group-client-0-d087f120-fda0-457b-90b9-1d39b6b38f3e /192.168.65.1 order_processing_group-client-0
order_processing_group orders 2 19 19 0 order_processing_group-client-1-7b64245e-fe75-44bd-b658-4600ee70afee /192.168.65.1 order_processing_group-client-1
#컨슈머 그룹 오프셋 초기화
#모든 파티션 오프셋을 0 (처음) 으로 재설정
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --group order_processing_group --topic orders --reset-offsets --to-earliest --execute
#다시 가장 마지막 메시지 위치로 오프셋 이동
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --group order_processing_group --topic orders --reset-offsets --to-latest --execute
GROUP TOPIC PARTITION NEW-OFFSET
order_processing_group orders 0 155
order_processing_group orders 1 140
order_processing_group orders 2 133
#다시확인
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --describe --group order_processing_group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
order_processing_group orders 0 155 160 5 - - -
order_processing_group orders 1 140 142 2 - - -
order_processing_group orders 2 133 134 1 - - -
#컨슈머 그룹 삭제 시도
--delete --group order_processing_group
반응형