본문 바로가기
Framework/Kafka

[Kafka] 아파치 카프카 기본개념 :: 클라이언트 라이브러리 - 프로듀서 API

by 미미믹 2024. 9. 23.

아파치 카프카 애플리케이션 프로그래밍 with 자바 책 공부 내용 정리

 

프로듀서 API

프로듀서 애플리케이션은 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송한다.
프로듀서는 데이터를 직렬화하여 카프카 브로커로 보내기 때문에 자바에서 선언 가능한 모든 형태를 브로커로 전송할 수 있다.
직렬화란 자바 또는 외부 시스템에서 사용 가능하도록 바이트 형태로 데이터를 변환하는 기술이다.
직렬화를 사용하면 프로듀서는 자바 기본형과 참조형 뿐만 아니라 동영상, 이미지 같은 바이너리 데이터도 프로듀서를 통해 전송할 수 있다.


먼저 IntelliJ를 실행하여 프로젝트를 생성하여 기본적인 프로듀서 애플리케이션을 만들어본다.


카프카 클라이언트 디펜던시와 로그 확인을 위한 slf4j를 추가해준다.

implementation('org.apache.kafka:kafka-clients:2.5.0')
implementation('org.slf4j:slf4j-simple:1.7.30')


카프카 애플리케이션을 개발하기 위한 java 파일을 생성한다.

package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class SimpleProducer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    // 레코드를 전송하고자 하는 토픽의 이름
    private final static String TOPIC_NAME = "test";
    // 전송하고자 하는 카프카 클러스터 서버의 host와 IP
    private final static String BOOTSTRAP_SERVICE = "3.36.103.165:9092"; 

    public static void main(String[] args) {

        // Properties에는 KafkaProducer 인스턴스를 생성하기 위한 프로듀서 옵션들을 key/value 값으로 선언한다.
        // 필수 옵션은 반드시 선언해야하며,
        // 선택옵션은 선언하지 않아도 된다.(선언하지 않을 시, default 옵션으로 동작)
        Properties configs = new Properties();

        // 전송하고자 하는 카프카 클러스터 서버의 host와 IP
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVICE);

        // 메세지 키, 값을 직렬화 하기 위한 직렬화 클래스를 선언한다.
        // 예제에서는 String 객체를 전송하기 위해 카프카 라이브러리의 String Serializer를 사용했다.
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Properties(configs)를 KafkaProducer의 생성 파라미터로 추가하여 인스턴스를 생성한다.
        // producer 인스턴스는 ProducerRecord를 전송할 때 사용된다.
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        String messageValue = "testMessage"; // 메세지 값 선언

        // 카프카 브로커로 데이터를 보내기 위해 ProducerRecord를 생성한다.
        // 예제에서는 토픽 이름과 메세지 값만 선언하였다. (메세지 키는 null로 설정됨)
        // 메세지 키와 메세지 값의 타입은 직렬화 클래스와 동일하게 설정한다.
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);

        // 생성한 ProducerRecord를 전송하기 위해 record를 파라미터로 가지는 send() 메서드를 호출했다.
        // 프로듀서에서 send()는 즉각적인 전송을 뜻하는 것이 아니라,
        // 파라미터로 들어간 record를 프로듀서 내부에 가지고 있다가 배치 형태로 묶어 브로커에 전송한다.
        // 이러한 전송 방식을 '배치 전송'이라 부른다.
        producer.send(record);

        logger.info("{}", record);

        // 프로듀셔 내부 버퍼에 가지고 있던 레코드 배치를 브로커로 전송한다.
        producer.flush();

        // producer 인스턴스의 리소스들을 안전하게 종료한다.
        producer.close();
    }
}


해당 프로듀서를 싱행하기 전에 토픽을 생성한다.

bin/kafka-topics.sh --bootstrap-sever 카프카서버IP:포트 \
 --create --topic 토픽명 --partitions 파티션수


토픽 생성까지 완료한 경우, IntelliJ에서 애플리케이션을 실행시킨다.


토픽에 데이터가 정상적으로 적재되었는지 확인해본다.

bin/kafka-console-consumer.sh --bootstrap-server 카프카서버IP:포트 \
 --topic 토픽명 --from-beginning


프로듀서 중요 개념

프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거친다.
ProducerRecord 생성 시 추가 파라미터를 사용하여 오버로딩하여 ProducerRecord의 내부 변수를 선언할 수 있다.
파티션 번호를 직접 지정하거나 타임스탬프를 설정, 메세지 키를 설정할 수도 있다.

KafkaProduecer 인스턴스가 send() 메서드를 호출하면 ProducerRecord는 파티셔너에서 토픽의 어느 파티션으로 전송될 것인지 정해진다.
파티셔너(partitioner)에 의해 구분된 레코드는 데이터를 전송하기 전에 어큐뮬레이터(accumulator)에 데이터를 버퍼로 쌓아놓고 발송한다. 버퍼에 쌓인 데이터를 배치로 묶어서 전송함으로써 카프카의 프로듀서 처리랑을 향상시키는데 상당한 도움을 준다.

카프카 클라이언트 라이브러리에서는 사용자 지정 파티셔너를 생성하기 위한 Partitioner 인터페이스를 제공한다. 해당 인터페이스를 상속받은 사용자 정의 클래스에서 메세지 키 또는 메세지 값에 따른 파티션 지정 로직을 적용할 수도 있다.
파티셔너를 통해 파티션이 지정된 데이터는 어큐물레이터에 버퍼로 쌓인다. 센더(sender) 스레드는 어큐뮬레이터에 쌓인 배치 데이터를 가져가 카프카 브로커로 전송한다.

카프카 프로듀서는 압축 옵션을 통해 브로커로 전송 시 압축 방식을 정할 수 있다. default는 압축 되지 않은 상태로 전송이다. 압축을 하면 데이터 전송 시 네트워크 처리량에 이득을 볼 수 있지만 압축하는 데에 리소스를 사용하게된다. 프로듀서에서 압축한 메세지는 컨슈머 애플리케이션이 압출을 풀게 되는데 이 때도 컨슈머 애플리케이션 리소스가 사용된다.

프로듀서 필수 옵션
bootstrap.servers 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의
호스트 이름:포트를 1개 이상 작성한다.
key.serializer 레코드의 메세지 키를 직렬화하는 클래스를 지정한다.
value.serializer 레코드의 메세지 값을 직렬화하는 클래스를 지정한다.
프로듀서 선택 옵션
acks 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인할 수 있다.
0, 1, -1(all) 중 하나로 설정할 수 있다. dafault = 1
0 :: 프로듀서가 전송한 즉시 브로커에 데이터 저장 여부와 상관없이 성공
1 :: 리더 파티션에 데이터가 저장되면 성공
-1 :: == all. 토픽의 min.insync.replicas 개수에 해당하는 리더 파티션과 팔로워 파티션에 데이터가 저장되면 성공
buffer.memory 브로커로 전송할 데이터를 배치로 모으기 위해 설정한 버퍼 메모리양 지정.
dafault = 33554432(32MB)
retries 프로듀서가 브로커로부터 에러를 받고난 뒤 재전송을 시도하는 횟수.
default = 2147483647
batch.size 배치로 전송할 레코드 최대 용량. default = 16384
linger.ms 배치를 전송하기 전까지 기다리는 최소 시간. dafault = 0
partitioner.class 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스
default = org.apache.kafka.clients.producer.internals.DefaultPartitioner
enable.idempotence 멱등성 프로듀서로 동작할지 여부. default = false
transactional.id 프로듀서가 레코드를 전송할 때 레코드를 트렌젝션 단위로 묶을지 여부
그 외 옵션 https://kafka.apache.org/documentation/

✨ 참고 도서 정보 :: 아파치 카프카 애플리케이션 프로그래밍 with 자바 (최원영)