티스토리 뷰
반응형
1.kafka connect 란?
- kafka와 다양한 외부시스템 (데이터 베이스,클라우드 스토리지, 검색엔진 등)을 쉽게 연결하는 프레임워크
- kafka 자체의 producer & consumer api를 직접 개발핮 ㅣ않고도, 데이터 스트리밍 파이프라인을 손쉽게 구축할 수 있도록 설계된 플러그인 기반 시스템
- Source Connector:외부 시스템 -> kafka 데이터 전송
- Sink Connector: kafka -> 외부시스템으로 데이터 저장 (s3, elasticsearch, hdfs)
2.kafka connect 주요특징
- Scalability (확장성)
- 클러스터 기반 확장 지원 (Standalone vs. Distributed Mode)
- 여러 Worker가 병렬로 데이터를 처리가능
- Fault Tolerance (장애 복구 지원)
- 장애 발생 시 자동 복구 가능
- 오프셋 저장을 통해 중복 또는 데이터 손실 방지
- Schema Evolution (스키마 변경 대응)
- Confluent Schema Registry와 함께 사용하면, 데이터 스키마 변경을 유연하게 처리가능
- PostgreSQL과 같은 RDBMS연결 시 테이블 스키마 변경에도 적응 가능
- Manageability(관리 편의성)
- REST API를 통해 손쉽게 Connector를 추가/수정/삭제 가능
3.kafka connect 아키텍처
4.kafka connect 주요컴포넌트
- 4-1.Connector
- Source Connector 또는 Sink Connector로 데이터를 이동 시키는 역할을 담당
- 다양한 종류의 커넥터가 플러그인 형태로 제공됨 (JDBC, Elasticsearch, s3, MongoDB등)
- 실습에서는 JDBC Source/Sink Connector를 사용하여 PostgreSQL과 연동
- -> 외부시스템과 카프카의 통합을 담당 어떤 데이터를 어떻게 이동시킬지 고수준의 로직담당 다양한종류의 커넥트가 플러그인으로 제공되어 활용만하면됨
- 4-2.TASK
- Connector가 실제로 실행하는 작업단위
- 하나의 Connector는 여러개의 Task로 분할 가능(병렬 처리 지원) -> 이를통해 처리량 늘림
- 예시: 5개의 MySQL테이블에서 데이터를 읽어와야 한다면, 5개의 Task를 실행하여 병렬 처리 가능 -> (이는 각 테이블을 불러오는 각Task에서도 병렬처리하여 속도를 높힘)
- Task는 무상태 (Stateless)이며, 관련된 모든 상태 정보는 Kafka내부의 특수 토픽에 저장됨 -> (장애발생시 다른 워커로 쉽게 이동 가능 상태정보는 카프카 내부 토픽에 저장 이런설계 덕분에 taks가 실패하더라도 중단없이 다른워커에서 작업을 이어감
- config.storage.topic -> Connector및 Task의 설정 정보 저장
- status.storage.topic -> Task 및 Connector 상태 저장( 실행중인지, 실패했는지 )
- Task실행방식
- Connector가 등록됨
- REST API또는 설정파일을 통해 Connector가 Kafka Connect에 추가됨 (하단 설정방식 참고)
- kafka connect가 taks를 생성
- takss.max값에 따라 병렬 실행할 TASK 개수를 결정
- Task 실행되고 데이터 전송시작
- 각 Task는 데이터를 읽어와서 Kafka Topic으로 보냄 (Source Connector)
- 또는 Kafka Topic 에서 데이터를 읽어 외부 시스템으로 전송 (Sink Conncetor)
- kafka 상태저장보고
- config.storage.topic에 설정 정보 저장
- status..storage.topoc에 Task 실행 상태 저장
- Task가 중단되거나 실패하면 자동 재시작
- Task가 실패하면 kafka Connect는 이를 감지하고, 자동으로 재시작하여 장애 복구 수행
- Connector가 등록됨
#설정방식
{
"name":"postgres-source-connector",
"config":{
"connector.class":"io.confluent,connect,jdbc,JdbcSourceConnector",
"connection.url":"jdbc:postgresql://postgres:5432/kafka_demo",
//... 기타 설정
}
}
#JDBC Source Connector 설정 예시
{
"name":"postgres-source-connector",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",#사용할 커텍터 클래스지정
"connection.url":"jdbc:pstgresql://postgres:5432/kafka_demo",#데이터베이스 연결URL
"connection.user":"postgres",#데이터베이스 접속계정
"connection.password":"postgres",#데이터베이스 접속계정
"topic.prefix":"postgres-source-",
"table.whitelist":"source_table",
"mode":"incrementing",
"incrementing.column.name":"id",
"tasks.max":"1"#병렬로 실행할 최대 Task수
}
}
#Sink Connector설정
{
"name":"postgres-sink-connector",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:postgresql://postgres:5432/kfaka_demo",
"connection.user":"postgres",
"connection.password":"postgres",
"topics":"postgres-source-source_table",#구독할 토픽목록(쉼표구분)
"table.name.format":"target_table",#저장할 테이블
"auto.create":"false",#테이블이 없을경우 자동 생성여부
"insert.mode":"upsert",#기존데이터 있으면 업데이트, 업으면삽입
"pk.mode":"record_value",#기본 키 지정방식
"pk.fields":"id",#기본 키로 사용할 필드 이름
"tasks.max":"1"
}
}
- 4-3.Worker
- kafka connect를 실행하는 노드
- standalone모드에서는 단일 Worker, Distributed 모드에서는 여러 Worker로 구성
- Confluent Platform 환경에서는 기본적으로 Distributed모드로 동작
- 4-4.Offset Storage
- KafkaConnect는 처리한 데이터의 오프셋을 저장하여, 중복 처리나 데이터 손실을 방지
- Source Connector의겨우 mode설정(incrementing, timestamp)을 통해 데이터 추적방식 지정
5. Kafka Connect REST API 주요 엔드포인트
- 커넥터 관리
#커넥터 목록조회
curl -s http://localhost:8083/connectors | jq
#커넥터 생성
curl -X POST -H "Content-Type: application/json" --data @conncetor-config.json http://localhost:8083/connectors
#커넥터 상태 확인
curl -s http://localhost:8083/connectors/{connector-name}/status | jq
#커넥터 삭제
curl -X DELETE http://localhost:8083/connectors/{connector-name}
- 커넥터 플러그인 관리
#사용 가능한 커넥터 플러그인 목록조회
curl -s http://localhost:8083/connector-plugins | jq
#특정 커넥터 플러그인의 설정 검증
curl -X PUT -H "Content-Type: application/json"
--data @connector-config.json http://localhost:8083/connector-plugins/{connector-class}/config/validate | jq
반응형
'kafka' 카테고리의 다른 글
[kafka] PostgreSQL - Kafka Connect 파이프라인 (feat. CDC) (1) | 2025.06.28 |
---|---|
[kafka] Confluent-Kafka-Python을 사용한 멱등성 Producer와 트랜잭션 (6) | 2025.06.24 |
[kafka] Consumer 그룹관리 (2) | 2025.06.23 |
[kafka] Consumer 오프셋 관리와 리밸런싱 전략 (0) | 2025.06.23 |
[kafka] Producer 파티셔닝 전략 (1) | 2025.06.22 |
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- asyncio.gather
- k8s
- 위즈윅에디터
- create_task
- 비동기
- 윈도우pscale설치
- window
- asyncio
- supervised
- Python
- pscale
- iris
- 우테코
- kubectl
- 대수자료구조
- un-supervised
- next.js
- Tailwind
- nextj이미지저장
- nodejs
- semi-supervised
- CloudFlare
- datalabeling
- 42서울
- 타입스크립트
- ADT
- planetscale배포
- SSR
- helm
- 함수형프로그래밍
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
글 보관함