티스토리 뷰

kafka

[kafka] Kafka-Connect 알아보기

0hyeon의 2025. 6. 28. 11:05
반응형

 

 


 

 

1.kafka connect 란?

  • kafka와 다양한 외부시스템 (데이터 베이스,클라우드 스토리지, 검색엔진 등)을 쉽게 연결하는 프레임워크
  • kafka 자체의 producer & consumer api를 직접 개발핮 ㅣ않고도, 데이터 스트리밍 파이프라인을 손쉽게 구축할 수 있도록 설계된 플러그인 기반 시스템
  • Source Connector:외부 시스템 -> kafka 데이터 전송
  • Sink Connector: kafka -> 외부시스템으로 데이터 저장 (s3, elasticsearch, hdfs)

 

 


2.kafka connect 주요특징

  • Scalability (확장성)
    • 클러스터 기반 확장 지원 (Standalone vs. Distributed Mode)
    • 여러 Worker가 병렬로 데이터를 처리가능
  • Fault Tolerance (장애 복구 지원)
    • 장애 발생 시 자동 복구 가능
    • 오프셋 저장을 통해 중복 또는 데이터 손실 방지 
  • Schema Evolution (스키마 변경 대응)
    • Confluent Schema Registry와 함께 사용하면, 데이터 스키마 변경을 유연하게 처리가능
    • PostgreSQL과 같은 RDBMS연결 시 테이블 스키마 변경에도 적응 가능
  • Manageability(관리 편의성)
    • REST API를 통해 손쉽게 Connector를 추가/수정/삭제 가능

 

 


 

3.kafka connect 아키텍처

https://debezium.io/documentation/reference/3.0/architecture.html

 

4.kafka connect 주요컴포넌트 

  • 4-1.Connector
    • Source Connector 또는 Sink Connector로 데이터를 이동 시키는 역할을 담당
    • 다양한 종류의 커넥터가 플러그인 형태로 제공됨 (JDBC, Elasticsearch, s3, MongoDB등)
    • 실습에서는 JDBC Source/Sink Connector를 사용하여 PostgreSQL과 연동
      • -> 외부시스템과 카프카의 통합을 담당 어떤 데이터를 어떻게 이동시킬지 고수준의 로직담당 다양한종류의 커넥트가 플러그인으로 제공되어 활용만하면됨
  • 4-2.TASK
    • Connector가 실제로 실행하는 작업단위
    • 하나의 Connector는 여러개의 Task로 분할 가능(병렬 처리 지원) -> 이를통해 처리량 늘림
      1. 예시: 5개의 MySQL테이블에서 데이터를 읽어와야 한다면, 5개의 Task를 실행하여 병렬 처리 가능 -> (이는 각 테이블을 불러오는 각Task에서도 병렬처리하여 속도를 높힘)
      2. Task는 무상태 (Stateless)이며, 관련된 모든 상태 정보는 Kafka내부의 특수 토픽에 저장됨 -> (장애발생시 다른 워커로 쉽게 이동 가능 상태정보는 카프카 내부 토픽에 저장 이런설계 덕분에 taks가 실패하더라도 중단없이 다른워커에서 작업을 이어감
        • config.storage.topic -> Connector및 Task의 설정 정보 저장
        • status.storage.topic -> Task 및 Connector 상태 저장( 실행중인지, 실패했는지 )
    • Task실행방식
      1. Connector가 등록됨
        • REST API또는 설정파일을 통해 Connector가 Kafka Connect에 추가됨 (하단 설정방식 참고)
      2. kafka connect가 taks를 생성
        • takss.max값에 따라 병렬 실행할 TASK 개수를 결정
      3. Task 실행되고 데이터 전송시작
        • 각 Task는 데이터를 읽어와서 Kafka Topic으로 보냄 (Source Connector)
        • 또는 Kafka Topic 에서 데이터를 읽어 외부 시스템으로 전송 (Sink Conncetor)
      4. kafka 상태저장보고
        • config.storage.topic에 설정 정보 저장
        • status..storage.topoc에 Task 실행 상태 저장
      5. Task가 중단되거나 실패하면 자동 재시작
        • Task가 실패하면 kafka Connect는 이를 감지하고, 자동으로 재시작하여 장애 복구 수행

 

#설정방식

{
	"name":"postgres-source-connector",
    "config":{
    	"connector.class":"io.confluent,connect,jdbc,JdbcSourceConnector",
        "connection.url":"jdbc:postgresql://postgres:5432/kafka_demo",
        //... 기타 설정
    }
}

 

#JDBC Source Connector 설정 예시

{
	"name":"postgres-source-connector",
    "config":{
    	"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",#사용할 커텍터 클래스지정
        "connection.url":"jdbc:pstgresql://postgres:5432/kafka_demo",#데이터베이스 연결URL
        "connection.user":"postgres",#데이터베이스 접속계정
        "connection.password":"postgres",#데이터베이스 접속계정
        "topic.prefix":"postgres-source-",
        "table.whitelist":"source_table",
        "mode":"incrementing",
        "incrementing.column.name":"id",
        "tasks.max":"1"#병렬로 실행할 최대 Task수
    } 
}

 

#Sink Connector설정

{
	"name":"postgres-sink-connector",
    "config":{
    	"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:postgresql://postgres:5432/kfaka_demo",
        "connection.user":"postgres",
        "connection.password":"postgres",
        "topics":"postgres-source-source_table",#구독할 토픽목록(쉼표구분)
        "table.name.format":"target_table",#저장할 테이블
        "auto.create":"false",#테이블이 없을경우 자동 생성여부
        "insert.mode":"upsert",#기존데이터 있으면 업데이트, 업으면삽입
        "pk.mode":"record_value",#기본 키 지정방식
        "pk.fields":"id",#기본 키로 사용할 필드 이름 
        "tasks.max":"1"
    }
}
  •  4-3.Worker
    • kafka connect를 실행하는 노드
    • standalone모드에서는 단일 Worker, Distributed 모드에서는 여러 Worker로 구성
    • Confluent Platform 환경에서는 기본적으로 Distributed모드로 동작
  • 4-4.Offset Storage
    • KafkaConnect는 처리한 데이터의 오프셋을 저장하여, 중복 처리나 데이터 손실을 방지
    • Source Connector의겨우 mode설정(incrementing, timestamp)을 통해 데이터 추적방식 지정 

 

5. Kafka Connect REST API 주요 엔드포인트

  • 커넥터 관리
#커넥터 목록조회

curl -s http://localhost:8083/connectors | jq
#커넥터 생성

curl -X POST -H "Content-Type: application/json" --data @conncetor-config.json http://localhost:8083/connectors
#커넥터 상태 확인

curl -s http://localhost:8083/connectors/{connector-name}/status | jq
#커넥터 삭제

curl -X DELETE http://localhost:8083/connectors/{connector-name}

 

  • 커넥터 플러그인 관리
#사용 가능한 커넥터 플러그인 목록조회

curl -s http://localhost:8083/connector-plugins | jq

 

#특정 커넥터 플러그인의 설정 검증

curl -X PUT -H "Content-Type: application/json"
--data @connector-config.json http://localhost:8083/connector-plugins/{connector-class}/config/validate | jq

 

반응형
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/07   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31
글 보관함