아파치 카프카 애플리케이션 프로그래밍 with 자바 책 공부 내용 정리
카프카 커넥트(kafka connect)는 카프카 오픈소스에 포함된 툴 중 하나로 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션이다.
커넥트는 특정한 작업 형태를 템플릿으로 만들어놓은 커넥터(connector)를 실행함으로써 반복 작업을 줄일 수 있다.
커넥터는 각 커넥터가 가진 고유한 설정값을 입력받아서 데이터를 처리한다. 예를 들어, 파일의 데이터를 토픽으로 보내는 커넥터가 있다면 파일이 존재하는 디렉토리 위치, 파일 이름을 설정해야 한다.
커넥터는 프로듀서 역할을 하는 '소스 커넥터(Source Connector)'와 컨슈머 역할을 하는 '싱크 커넥터(Sink Connector)' 2가지로 나뉜다.
카프카 2.6에 포함된 커넥트를 실행할 경우 클러스터 간 토픽 미러링을 지원하는 미러메이커2 커넥터와 파일 싱크 커넥터, 파일 소스 커넥터를 기본 플러그인으로 제공한다. 이외에 추가적인 커넥터를 사용하고 싶다면 플러그인 형태로 커넥터 jar 파일을 추가하여 사용할 수 있다. 오픈소스 커넥터는 직접 커넥터를 만들 필요가 없으면 커넥터 jar 파일을 다운로드하여 사용할 수 있다는 장점이 있다.
사용자가 커넥트에 커넥터 생성 명령을 내리면 커넥트는 내부에 커넥터와 태스크를 생성한다.
커넥터는 태스크들을 관리한다. 태스크는 커넥터에 종속되는 개념으로 실질적인 데이터 처리를 한다. 그렇기 때문에 데이터 처리를 정상적으로 하는지 확인하기 위해서는 각 태스크의 상태를 확인해야 한다.
사용자가 커넥터를 사용하여 파이프라인을 생성할 때 컨버터(converter)와 트랜스폼(transform) 기능을 옵션으로 추가할 수 있다.
컨버터는 데이터 처리를 하기 전에 스키마를 변경하도록 도와준다. JsonConvertoer, StringConverter, ByteArrayConverter를 지원하고 필요하다면 커스텀 컨버터를 작성하여 사용할 수도 있다.
트랜스폼은 데이터 처리 시 각 메세지 단위로 데이터를 간단하게 변환하기 위한 용도로 사용된다. 예를 들어, JSON 데이터를 커넥터에서 사용할 때 트랜스폼을 사용하면 특정 키를 삭제하거나 추가할 수 있다. 기본 제공 트랜스폼으로 Cast, Drop, ExtractField 등이 있다.
커넥트 실행 방법
커넥트를 실행하는 방법은 크게 두가지가 있다. 첫 번째는 '단일 모드 커넥트(standalone mode kafka connect)'이고 두번째는 '분산 모드 커넥트(distributed mode kafka connect)'이다.
단일 모드 커넥트는 1개 프로세스만 실행되는 점이 특징인데, 단일 프로세스로 실행되기 때문에 고가용성 구성이 되지 않아 단일 장애점(SPOF)이 될 수 있다.
분산 모드 커넥트는 2개 이상의 서버에서 클러스터 형태로 운영함으로써 단일 모드 커넥트 대비 안전하게 운영할 수 있다는 장점이 있다.
REST API를 사용하면 현재 실행 중인 커넥트의 커넥터 플러그인 종류, 태스크 상태, 커넥터 상태 등을 조회할 수 있다.
커넥트는 8083 포트로 호출할 수 있으며 HTTP 메서드(GET, POST, DELETE, PUT) 기반 API를 제공한다.
요청 메서드 | 호출 경로 | 설명 |
GET | / | 실행 중인 커넥트 정보 확인 |
GET | /connectors | 실행 중인 커넥터 이름 확인 |
POST | /connectors | 새로운 커넥터 생성 요청 |
GET | /connectors/{커넥터이름} | 실행 중인 커넥터 정보 확인 |
GET | /connectors/{커넥터이름}/config | 실행 중인 커넥터의 설정값 확인 |
PUT | /connectors/{커넥터이름}/config | 실행 중인 커넥터 설정값 변경 요청 |
GET | /connectors/{커넥터이름}/status | 실행 중인 커넥터 상태 확인 |
POST | /connectors/{커넥터이름}/restart | 실행 중인 커넥터 재시작 요청 |
PUT | /connectors/{커넥터이름}/pause | 커넥터 일시 중지 요청 |
PUT | /connectors/{커넥터이름}/resume | 일시중지된 커넥터 실행 요청 |
DELETE | /connectors/{커넥터이름}/ | 실행중인 커넥터 종료 |
GET | /connectors/{커넥터이름}/tasks/{태스크아이디}/status | 실행 중인 커넥터의 태스트 정보확인 |
POST | /connectors/{커넥터이름}/tasks/{태스크 아이디}/restart | 실행 중인 커넥터의 태스크 재시작 요청 |
GET | /connectors/{커넥터이름}/topics | 커넥터별 연동된 토픽 정보 확인 |
GET | /connector-plugins/ | 커넥트에 존재하는 커넥터 플러그인 확인 |
PUT | /connector-plugins/{커넥터 플러그인 이름}/config/validate | 커넥터 생성 시 설정값 유효 여부 확인 |
✨ 참고도서정보 :: 아파치 카프카 애플리케이션 프로그래밍 with 자바 (최원영)
'Framework > Kafka' 카테고리의 다른 글
[Kafka] 아파치 카프카 기본 개념 :: 카프카 커넥트 - 싱크 커넥터 (0) | 2024.11.06 |
---|---|
[Kafka] 아파치 카프카 기본 개념 :: 카프카 커넥트 - 소스 커넥터 (1) | 2024.11.05 |
[Kafka] 아파치 카프카 기본 개념 :: 카프카 스트림즈 - 스트림즈DSL (1) | 2024.10.22 |
[Kafka] 아파치 카프카 기본 개념 :: 카프카 스트림즈 (4) | 2024.10.08 |
[Kafka] 아파치 카프카 기본개념 :: 클라이언트 라이브러리 - 어드민 API (1) | 2024.09.27 |