kafka

[kafka] Consumer 그룹관리

0hyeon의 2025. 6. 23. 06:42
반응형
##컨슈머 그룹 오프셋 초기화


#모든 파티션 오프셋을 0 (처음) 으로 재설정
--reset-offsets --to-earliest --execute


#가장 마지막 메시지 위치로 오프셋 이동
--reset-offsets --to-latest --execute

Consumer 기본 컨셉

  • 컨슈머 그룹 : 토픽 파티션을 분산 처리하는 컨슈머 집합
    • 토픽 파티션을 분산처리 같은 그룹 id를 가진 컨슈머들이 파티션을 나눠 처리하여 병렬성 확보, 일부 컨슈머가 실패하더라도 다른 컨슈머가 이어받아 고가용성을 제공
  • 오프셋 관리 : 메시지 소비 위치를 추적하는 핵심 메커니즘 
    • 각파티션마다 어디까지 메시지를 읽었는지 처리하여 중복처리없는 연속적인  메시지 처리 가능
      • 오프셋 커밋전략
        • 자동 커밋 : 간편하지만 중복 발생가능
        • 동기식 커밋 : 안전하지만 처리량 제한 
        • 비동기식 커밋 : 높은 처리량, 에러처리 유의 필요
      • 핵심 고려사항
        • 파티션 할당 전략 : Range, RoundRobin, Sticky, Cooperative Sticky
        • 리밸런스 프로토콜 : Eager vs Cooperative
        • 컨슈머 그룹 관리 : 상태 확인, 오프셋 리셋, Lag 모니터링
        • 중복 처리 방지 : 멱등성 처리, 정확히 한 번 처리 전략
  • 리밸런싱 : 컨슈머 추가/제거 시 파티션 재할당 과정
    • 컨슈머 그룹 동적으로 확장/축소 가능, 장애시 자동으로 복구 가능

핵심 API와 호출 방식 

  • poll() 메서드 : 메시지 가져오기 기본 메커니즘
  • 이터레이터 패턴 : for message in consumer 형태의 더 간편한소비, 코드가 간편,간결 (사용하기쉽고 가독성이좋다)
  • Seek/Assign: 특정 파티션/오프셋에서 메시지 읽기

 

 

#토픽 및 컨슈머 그룹 목록 확인

docker exec -it kafka1 kafka-topics --bootstrap-server kafka1:29092 --list
> 
__consumer_offsets
events
orders


docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --list
> 
order_processing_group

 

#컨슈머 그룹 상태 조회

docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --describe --group order_processing_group
>(최초)
GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
order_processing_group orders          0          0               0               0               -               -               -
order_processing_group orders          1          0               0               0               -               -               -
order_processing_group orders          2          0               0               0               -               -               -


#상태확인 
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --describe --group order_processing_group --state
>
GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
order_processing_group    kafka2:29093 (2)          range                Stable          2



#프로듀서 가동후 (LAG이쌓임)
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --describe --group order_processing_group
>
GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
order_processing_group orders          0          0               4               4               -               -               -
order_processing_group orders          1          0               3               3               -               -               -
order_processing_group orders          2          0               8               8               -               -               -



#컨슈머 가동후 (LAG이줄음)
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --describe --group order_processing_group
>
GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                          HOST            CLIENT-ID
order_processing_group orders          0          16              16              0               order_processing_group-client-0-d087f120-fda0-457b-90b9-1d39b6b38f3e /192.168.65.1   order_processing_group-client-0
order_processing_group orders          1          12              12              0               order_processing_group-client-0-d087f120-fda0-457b-90b9-1d39b6b38f3e /192.168.65.1   order_processing_group-client-0
order_processing_group orders          2          19              19              0               order_processing_group-client-1-7b64245e-fe75-44bd-b658-4600ee70afee /192.168.65.1   order_processing_group-client-1

 

#컨슈머 그룹 오프셋 초기화

#모든 파티션 오프셋을 0 (처음) 으로 재설정
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --group order_processing_group --topic orders --reset-offsets --to-earliest --execute

#다시 가장 마지막 메시지 위치로 오프셋 이동
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --group order_processing_group --topic orders --reset-offsets --to-latest --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
order_processing_group         orders                         0          155            
order_processing_group         orders                         1          140            
order_processing_group         orders                         2          133

#다시확인
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:29092 --describe --group order_processing_group

GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
order_processing_group orders          0          155             160             5               -               -               -
order_processing_group orders          1          140             142             2               -               -               -
order_processing_group orders          2          133             134             1               -               -               -

 

#컨슈머 그룹 삭제 시도

--delete --group order_processing_group
반응형