본문 바로가기
Framework/Kafka

[Kafka] 아파치 카프카 클라이언트 :: 어드민 API

by 미미믹 2024. 9. 27.

아파치 카프카 애플리케이션 프로그래밍 with 자바 책 공부 내용 정리


카프카 클라이언트에서는 내부 옵션들을 설정하거나 조회하기 위해 AdminClient 클래스를 제공한다.
AdminClient 클래스를 활용하면 클러스터의 옵션과 관련된 부분을 자동화할 수 있다.

AdminClient 활용 예시
- 카프카 컨슈머를 멀티 스레드로 생성할 때, 구독하는 토픽의 파티션 개수만큼 스레드를 생성하고 싶을 때, 스레드 생성 전에 해당 토픽의 파티션 개수를 어드민 API를 통해 가져올 수 있다.
- AdminClient 클래스로 구현한 웹 대시보드를 통해 ACL(Access Control List)이 적용된 클러스터의 리소스 접근 권한 규칙을 추가할 수 있다.
- 특정 토픽의 데이터 양이 늘어남을 감지하고 AdminClient 클래스로 해당 토픽의 파티션을 늘릴 수 있다.

어드민 API 선언

Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "카프카서버IP:포트");
AdminClient admin = AdminClient.create(configs);

프로듀서 API 또는 컨슈머 API와는 다르게 추가 설정 없이 클러스터 정보에 대한 설정만 하면 된다.
create() 메서드로 KafkaAdminClient를 반환받는다.
KafkaAdminClient는 브로커들의 옵션들을 확인, 설정할 수 있는 유틸 클래스다.

KafkaAdminClient 주요 메서드
describeCluster(DescribeClusterOptions options)
브로커 정보 조회
listTopics(ListTopicsOptions options) 토픽 리스트 조회
listConsumerGroups(ListConsumerGroupsOptions) 컨슈머 그룹 조회
createTopics
(Collection<NewTopic> newTopic, CreateTopicsOptions options)
신규 토픽 생성
createPartitions
(Map<String, NewPartitions> newPartitions, CreateTopicsOptions options)
파티션 개수 변경
createAcls(Collection<AclBinding> acls, CreateAclsOptions options) 접근 제어 규칙 생성

 

package com.example;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class KafkaAdminClient {
    private final static Logger logger = LoggerFactory.getLogger(KafkaAdminClient.class);
    private final static String BOOTSTRAP_SERVERS = "13.125.177.84:9092";

    public static void main(String[] args) throws Exception {

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        AdminClient admin = AdminClient.create(configs);

        logger.info("== Get broker information");
        for (Node node : admin.describeCluster().nodes().get()) {
            logger.info("node : {}", node);
            ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
            DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
            describeConfigs.all().get().forEach((broker, config) -> {
                config.entries().forEach(configEntry -> logger.info(configEntry.name() + "= " + configEntry.value()));
            });
        }

        logger.info("== Get default num.partitions");
        for (Node node : admin.describeCluster().nodes().get()) {
            ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
            DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
            Config config = describeConfigs.all().get().get(cr);
            Optional<ConfigEntry> optionalConfigEntry = config.entries().stream().filter(v -> v.name().equals("num.partitions")).findFirst();
            ConfigEntry numPartitionConfig = optionalConfigEntry.orElseThrow(Exception::new);
            logger.info("{}", numPartitionConfig.value());
        }

        logger.info("== Topic list");
        for (TopicListing topicListing : admin.listTopics().listings().get()) {
            logger.info("{}", topicListing.toString());
        }

        logger.info("== test topic information");
        Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
        logger.info("{}", topicInformation);

        logger.info("== Consumer group list");
        ListConsumerGroupsResult listConsumerGroups = admin.listConsumerGroups();
        listConsumerGroups.all().get().forEach(v -> {
            logger.info("{}", v);
        });

        admin.close();
    }
}


어드민 API는 사용하고 나면 명시적으로 종료 메서드를 호출하여 리소스가 낭비되지 않도록 한다.
AdminClient 클래스의 close() 메서드를 사용하면 명시적으로 종료할 수 잇다.

어드민 API를 활용할 때 클러스터의 버전과 클라이언트의 버전을 맞춰서 사용해야한다.
어드민 API의 많은 부분이 버전이 올라가면서 자주 바뀌기 때문이다.


✨ 참고도서정보 :: 아파치 카프카 애플리케이션 프로그래밍 with 자바 (최원영)