본문 바로가기
Framework/Kafka

[Kafka] 아파치 카프카 기본 개념 :: 카프카 커넥트 - 소스 커넥터

by 미미믹 2024. 11. 5.

아파치 카프카 애플리케이션 프로그래밍 with 자바 책 공부 내용 정리


소스 커넥터는 소스 애플리케이션 또는 소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할을 한다.


소스 커넥터를 만들 때는 connect-api 라이브러리를 추가해야 한다.
connect-api 라이브러리에는 커넥터를 개발하기 위한 클래스들이 포함되어 있다.


소스 커넥터를 만들 때 필요한 클래스는 2개다. 첫 번째는 SourceConnector이고 두번째는 SourceTask이다.
SourceConnector는 태스크를 실행하기 전 커넥터 설정파일을 초기화하고 어떤 태스크 클래스를 사용할 것인지 정의하는 데에 사용한다. 그렇기 때문에 SourceConnector에는 실질적인 데이터를 다루는 부분이 들어가지 않는다.
SourceTask가 실제로 데이터를 다루는 클래스라고 볼 수 있다. SourceTask는 소스 애플리케이션 또는 소스 파일로부터 데이터를 가져와서 토픽으로 데이터를 보내는 역할을 수행한다.

먼저, SourceConnector를 상속받은 사용자의 클래스를 생성해야 한다. (extends SourceConnector)

package com.example;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;

import java.util.List;
import java.util.Map;

// SourceConnector를 상속받은 사용자 정의 클래스를 선언한다.
// 사용자가 지정한 이 클래스 이름은 최종적으로 커넥트에서 호출할 때 사용되므로 명확하게 어떻게 사용되는 커넥터인지 알 수 있는 이름을 적으면 좋다.
// 예를 들어, mongoDB에서 데이터를 가져와서 토픽으로 저장하는 소스 커텍터라면 MongoDbSourceConnector로 선언할 수 있다.
public class TestSourceConnector extends SourceConnector {
    
    // 사용자가 JSON 또는 config 파일 형태로 입력한 설정값을 초기화하는 메서드다.
    // 만약 올바른 값이 아니라면 여기서 ConnectException()을 호출하여 커넥터를 종료할 수 있다.
    // 예를 들어, JDBC 소스 커넥터라면 JDBC 커넥션 URL값을 검증하는 로직을 넣을 수 있다.
    // 만약 비정상적인 URL값이라면 커넥션을 맺을 필요 없이 커넥터를 종료시킬 수 있다.
    @Override
    public void start(Map<String, String> map) {

    }

    // 이 커넥터가 사용할 태스크 클래스를 지정한다.
    @Override
    public Class<? extends Task> taskClass() {
        return null;
    }

    // 태스크 개수가 2개 이상인 경우 태스크마다 각기 다른 옵션을 설정할 때 사용한다.
    @Override
    public List<Map<String, String>> taskConfigs(int i) {
        return List.of();
    }

    // 커넥터가 종료될 때 필요한 로직을 작성한다.
    @Override
    public void stop() {

    }

    // 커넥터가 사용할 설정값에 대한 정보를 받는다.
    // 커넥터의 설정값은 ConfigDef 클래스를 통해 각 설정의 이름, 기본값, 중요도, 설명을 정의할 수 있다.
    @Override
    public ConfigDef config() {
        return null;
    }

    // 커넥터의 버전을 리턴한다.
    // 커넥트에 포함된 커넥터 플러그인을 조회할 때 이 버전이 노출된다.
    // 커넥터를 지속적으로 유지보수하고 신규 배포할 때 이 메서드가 리턴하는 버전 값을 변경해야 한다.
    @Override
    public String version() {
        return "";
    }
}


다음은 SourceTask를 살펴본다. SourceTask를 상속받은 사용자 클래스는 4개 메서드를 구현해야 한다.
(extends SourceTask)

package com.example;

import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

import java.util.List;
import java.util.Map;

public class TestSourceTask extends SourceTask {
    
    // 태스크의 버전을 저장한다.
    // 보통 커넥터의 version() 메서드에서 지정한 버전과 동일한 버전으로 작성하는 것이 일반적이다.
    @Override
    public String version() {
        return "";
    }

    // 태스크가 시작할 때 필요한 로직을 작성한다.
    // 태스크는 실질적으로 데이터를 처리하는 역할을 하므로 데이터 처리에 필요한 모든 리소스를 여기서 초기화하면 좋다.
    // 예를 들어, JDBC 소스 커넥터를 구현한다면 이 메서드에서 JDBC 커넥션을 맺는다.
    @Override
    public void start(Map<String, String> map) {
        
    }

    // 소스 애플리케이션 또는 소스 파일로부터 데이터를 읽어오는 로직을 작성한다.
    // 데이터를 읽어오면 토픽으로 보낼 데이터를 SourceRecord로 정의한다.
    // SourceRecord클래스는 토픽으로 데이터를 정의하기 위해 사용한다.
    // List<SourceRecord> 인스턴스에 데이터를 담아 리턴하면 데이터가 토픽으로 전송된다.
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        return List.of();
    }

    // 태스크가 종료될 때 필요한 로직을 작성한다.
    // JDBC 소스 커넥터를 구현했다면 이 메서드에서 JDBC 커넥션을 종료하는 로직을 추가하면 된다.
    @Override
    public void stop() {

    }
}

✨ 참고도서정보 :: 아파치 카프카 애플리케이션 프로그래밍 with 자바 (최원영)