본문 바로가기
Framework/Kafka

[Kafka] 아파치 카프카 클라이언트 :: 컨슈머 API

by 미미믹 2024. 9. 25.

아파치 카프카 애플리케이션 프로그래밍 with 자바 책 공부 내용 정리

 

컨슈머 API

프로듀서가 전송한 데이터는 카프카 브로커에 적재된다. 컨슈머는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 한다.


IntelliJ로 프로젝트를 생성해 기본적인 애플리케이션을 만들어본다.


카프카 클라이언트 디펜던시와 로그 확인을 위한 slf4j를 추가해준다.
클라이언트 버전은 책의 버전과 같은 버전으로 하였다.

implementation('org.apache.kafka:kafka-clients:2.5.0')
implementation('org.slf4j:slf4j-simple:1.7.30')


컨슈머 애플리케이션을 개발하기 위한 자바 파일을 생성한다.

package com.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class SimpleConsumer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
    // 토픽명
    private final static String TOPIC_NAME = "test";
    // 토픽의 데이터를 가져올 카프카 클러스터의 IP, 포트
    private final static String BOOTSTRAP_SERVERS = "13.125.177.84:9092";
    // 컨슈머 그룹 이름
    // 그룹을 선언하지 않을 경우 어떤 그룹에도 속하지 않음
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);

        // 프로듀서가 직렬화하여 전송한 데이터를 역직렬화하기위해 지정
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // Properties로 지정한 카프카 컨슈머 옵션을 파라미터로 받아 KafkaConsumer 인스턴스 생성
        // 이 인스턴스를 통해 데이터를 가져옴
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

        // 컨슈머에게 토픽 할당하기 위해 subscribe() 메서드 사용
        // Collection 타입의 String 값들을 받는데, 1개 이상의 토픽 이름을 받을 수 있음
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // poll() 메서드를 호출해 데이터를 가져와서 처리한다.
        // 지속적으로 데이터를 처리하기위해 반복 호출해야됨
        while (true) {
            // poll() 메서드를 통해 ConsumerRecord 리스트를 반환받음
            // poll() 메서드는 Duration 타입을 인자로 받는다.
            // 해당 인자 값은 브로커로부터 데이터를 가져올 때 컨슈머 버퍼에 데이터를 기다리기 위한 타임아웃 간격
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> record : records) {
                logger.info("{}", record);
            }
        }
    }
}


프로젝트를 실행시켜 컨슈머를 동작시킨다.
카프카 컨슈머 애플리케이션이 실행되면서 카프카 라이브러리 로그가 출력됨과 동시에 컨슈머가 test 토픽을 구독하면서 컨슈머는 브로커로부터 polling을 시작한다.

컨슈머에 할당된 옵션 및 토픽 정보 등이 로그로 출력된다.

test 토픽에 데이터를 넣어주기 위해 kafka-console-producer 명령으로 데이터를 넣어보자.

bin/kafka-console-producer.sh --boostrap-server 카프카서버IP:포트 --topic 토픽명
> 넣어줄 데이터

test 토픽으로부터 데이터를 polling하여 로그로 출력했다.
가져온 레코드의 파티션 번호, 오프셋, 레코드가 브로커에 들어간 날짜, 메세지 키, 메세지 값을 확인할 수 있다.


컨슈머 중요 개념

토픽의 파티션으로부터 데이터를 가져가기 위해 컨슈머를 운영하는 방법은 크게 2가지가 있다.
첫번째는 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영하는 것이고,
두번째는 토픽의 특정 파티션만 구독하는 컨슈머를 운영하는 것이다.

컨슈머 그룹으로 운영하는 방법은 컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있도록 도와주는 카프카의 독특한 방식이다. 컨슈머 그룹으로 묶인 컨슈머들은 토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있다.
컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈 때, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하다. 컨슈머는 여러 개의 파티션에 할당될 수 있다.
이러한 특징으로 컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야한다.

컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가지고있다. 따라서 카프카 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지 않게 처리할 수 있다는 장점이 있다.

컨슈머 그룹의 컨슈머에 장애가 발생하면, 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어간다. 이러한 과정을 '리밸런싱(rebalancing)'이라고 부른다.
리밸런싱은 크게 두가지 상황에서 일어나늗네, 첫번째는 컨슈머가 추가되는 상황이고 두번째는 컨슈머가 제외되는 상황이다. 리밸런싱이 발생할 때 파티션의 소유권을 컨슈머로 재할당하는 과정에서 해당 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없기 때문에 리밸런싱이 자주 일어나서는 안된다.
그룹 조정자(group coordinator)는 리밸런싱을 발동시키는 역할을 하는데, 컨슈머 그룹의 컨슈머가 추가되고 삭제될 때를 감지한다. 카프카 브로커 중 한 대가 그룹 조정자의 역할을 수행한다.

오프셋 커밋은 컨슈머 애플리케이션에서 명시적, 비명시적으로 수행할 수 있다.
기본 옵션은 poll() 메서드가 수행될 때 일정 간격마다 오프셋을 커밋하도록 enable.auto.commit=true로 설정되어있다. 이렇게 일정 간격마다 자동으로 커밋되는 것을 '비명시 오프셋 커밋'이라고 부른다. 이 옵션은 poll() 메서드가 auto.commit.interval.ms에 설정된 값 이상 지났을 때 그 시점까지 읽은 레코드의 오프셋을 커밋한다.
명시적으로 오프셋을 커밋하려면 poll() 메서드 호출 이후 반환받은 데이터의 처리가 완료되고 commitSync() 메서드를 호출하면 된다.
비동기 오프셋 커밋은 commitAsync() 메서드를 사용한다.

컨슈머 애플리케이션을 실행하게 되면 내부에서 Fetcher 인스턴스가 생성되어 poll() 메서드를 호출하기 전에 미리 레코드들을 내부 큐로 가져오고, 이후에 사용자가 명시적으로 poll() 메서드를 호출하면 컨슈머는 내부 큐에 있는 레코드들을 반환받아 처리를 수행한다.

 

컨슈머 필수 옵션
bootstrap.servers 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성한다.
key.deserializer 레코드의 메세지 키를 역직렬화하는 클래스를 지정한다.
value.deserializer 레코드의 메세지 값을 역직렬화하는 클래스를 지정한다.
컨슈머 선택 옵션
group.id 커슈머 그룹 아이디를 지정한다. subscribe() 메서드로 토픽을 구독하여 사용할 때는 이 옵션이 필수이다.
default = null
auto.offset.reset 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션이다.
이미 컨슈머 오프셋이 있다면 이 옵션값은 무시된다.
latest, earliest, none 중 1개를 설정할 수 있다.
lastest :: 가장 높은(가장 최근에 넣은) 오프셋부터
earliest :: 가장 낮은(가장 오래전에 넣은) 오프셋부터
none :: 커밋 기록이 없으면 오류 반환, 있으면 커밋 기록 이후 오프셋부터
default = lastest
enable.auto.commit 자동 커밋 여부. default = true
auto.commit.interval.ms 자동 커밋일 경우 오프셋 커밋 간격. default = 5000
max.poll.records poll() 메서드를 통해 반환되는 레코드 개수 지정. default = 500
session.timeout.ms 컨슈머와 브로커가 연결이 끊기는 최대 시간. 이 시간 내에 하트비트를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다 가정하고 리밸런싱 시작.
default = 10000(하트비트 시간 간격의 3배)
hearbeat.interval.ms 하트비트를 전송하는 시간 간격. default = 3000
max.poll.interval.ms poll() 메서드를 호출하는 간격의 최대 시간. 메서드 호출 이후 데이터를 처리하는 데에 시간이 너무 많이 걸리는 경우 비정상으로 판단하고 리밸런싱 시작
default = 300000
isolation.level 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용
read_committed, read_uncommitted로 설정할 수 있다.
read_committed :: 커밋이 완료된 레코드만 읽는다.
read_uncommitted :: 커밋 여부와 관계없이 파티션에 있는 모든 레코드를 읽는다.
default = read_uncommitted
그 외 옵션 https://kafka.apache.org/documentation/

✨ 참고 도서 정보 :: 아파치 카프카 애플리케이션 프로그래밍 with 자바 (최원영)