본문 바로가기
Framework/Kafka

[Kafka] 아파치 카프카 기본 개념 :: 카프카 커넥트 - 싱크 커넥터

by 미미믹 2024. 11. 6.

아파치 카프카 애플리케이션 프로그래밍 with 자바 책 공부 내용 정리


싱크 커넥터는 토픽의 데이터를 타깃 애플리케이션 또는 타깃 파일로 저장하는 역할을 한다.


싱크 커넥터를 만들 때는 connect-api 라이브러리를 추가해야 한다.
connect-api 라이브러리에는 커넥터를 개발하기 위한 클래스들이 포함되어 있다.

 

소스 커넥터를 만들 때 필요한 클래스는 2개다. 첫 번째는 SinkConnector이고 두번째는 SinkTask이다.
SinkConnector는 태스크를 실행하기 전에 사용자로부터 입력받은 설정값을 초기화하고 초기화하고 어떤 태스크 클래스를 사용할 것인지 정의하는 데에 사용한다. SinkConnector에는 실질적인 데이터 처리 로직이 들어가지 않는다.
SinkTask가 커넥트에서 컨슈머 역할을 하고 데이터를 저장하는 코드를 가지게 된다.

먼저, SinkConnector를 상속받은 사용자의 클래스를 생성해야 한다. (extends SinkConnector)

package com.example;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

import java.util.List;
import java.util.Map;

// SinkConnector를 상속받은 사용자 정의 클래스를 선언한다.
// 사용자가 지정한 이 클래스 이름은 최종적으로 커넥트에서 호출할 때 사용되므로 명확하게 어떻게 사용되는 커넥터인지 알 수 있는 이름을 적으면 좋다.
// 예를 들어, mongoDB에서 토픽의 데이터를 저장하는 싱크 커텍터는 MongoDbSinkConnector로 선언하면 좋다.
public class TestSinkConnector extends SinkConnector {
    
    // 사용자가 JSON 또는 config 파일 형태로 입력한 설정값을 초기화하는 메서드다.
    // 만약 올바른 값이 아니라면 여기서 ConnectException()을 호출하여 커넥터를 종료할 수 있다.
    @Override
    public void start(Map<String, String> map) {

    }

    // 이 커넥터가 사용할 태스크 클래스를 지정한다.
    @Override
    public Class<? extends Task> taskClass() {
        return null;
    }

    // 태스크 개수가 2개 이상인 경우 태스크마다 각기 다른 옵션을 설정할 때 사용한다.
    @Override
    public List<Map<String, String>> taskConfigs(int i) {
        return List.of();
    }

    // 커넥터가 종료될 때 필요한 로직을 작성한다.
    @Override
    public void stop() {

    }

    // 커넥터가 사용할 설정값에 대한 정보를 받는다.
    // 커넥터의 설정값은 ConfigDef 클래스를 통해 각 설정의 이름, 기본값, 중요도, 설명을 정의할 수 있다.
    @Override
    public ConfigDef config() {
        return null;
    }

    // 커넥터의 버전을 리턴한다.
    // 커넥트에 포함된 커넥터 플러그인을 조회할 때 이 버전이 노출된다.
    // 커넥터를 지속적으로 유지보수하고 신규 배포할 때 이 메서드가 리턴하는 버전 값을 변경해야 한다.
    @Override
    public String version() {
        return "";
    }
}


다음은 SinkTask를 살펴본다. SinkTask를 상속받은 사용자 클래스는 5개 메서드를 구현해야 한다.
(extends SinkTask)

package com.example;

import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;

import java.util.List;
import java.util.Map;

public class TestSinkTask extends SinkTask {
    
    // 태스크의 버전을 저장한다.
    // 보통 커넥터의 version() 메서드에서 지정한 버전과 동일한 버전으로 작성하는 것이 일반적이다.
    @Override
    public String version() {
        return "";
    }

    // 태스크가 시작할 때 필요한 로직을 작성한다.
    // 태스크는 실질적으로 데이터를 처리하는 역할을 하므로 데이터 처리에 필요한 모든 리소스를 여기서 초기화하면 좋다.
    // 예를 들어, mongoDB 싱크 커넥터를 구현한다면 이 메서드에서 mongoDB 커넥션을 맺는다.
    @Override
    public void start(Map<String, String> map) {
        
    }

    // 싱크 애플리케이션 또는 싱크 파일에 저장할 데이터를 토픽에서 주기적으로 가져오는 메서드이다.
    // 토픽의 데이터들은 여러 개의 SinkRecord를 묶어 파라미터로 사용할 수 있다.
    // SinkRecord는 토픽의 한 개 레코드이며 토픽, 파티션, 타임스탬프 등의 정보를 담고있다.
    @Override
    public void put(Collection<SinkRecord> records) {
    
    }
    
    // put() 메서드를 통해 가져온 데이터를 일정 주기로 싱크 애플리케이션 또는 싱크 파일에 저장할 때 사용하는 로직이다.
    // 예를 들어, JDBC 커넥션을 맺어서 MySQL에 데이터를 저장할 때 put() 메서드에서는 데이터를 insert하고 flush() 메서드에서는 commit을 수행하여 트랜잭션을 끝낼 수 있다.
    // put() 메서드에서 레코드를 저장하는 로직을 넣을 수도 있으며 이 경우에는 flush() 메서드에는 로직을 구현하지 않아도 된다.
    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
    
    }

    // 태스크가 종료될 때 필요한 로직을 작성한다.
    // 태스크에서 사용한 리소스를 종료해야 할 때 여기에 종료 코드를 구현한다.
    @Override
    public void stop() {

    }
}

✨ 참고도서정보 :: 아파치 카프카 애플리케이션 프로그래밍 with 자바 (최원영)