아파치 카프카 애플리케이션 프로그래밍 with 자바 책 공부 내용 정리
카프카 클라이언트에서는 내부 옵션들을 설정하거나 조회하기 위해 AdminClient 클래스를 제공한다.
AdminClient 클래스를 활용하면 클러스터의 옵션과 관련된 부분을 자동화할 수 있다.
AdminClient 활용 예시
- 카프카 컨슈머를 멀티 스레드로 생성할 때, 구독하는 토픽의 파티션 개수만큼 스레드를 생성하고 싶을 때, 스레드 생성 전에 해당 토픽의 파티션 개수를 어드민 API를 통해 가져올 수 있다.
- AdminClient 클래스로 구현한 웹 대시보드를 통해 ACL(Access Control List)이 적용된 클러스터의 리소스 접근 권한 규칙을 추가할 수 있다.
- 특정 토픽의 데이터 양이 늘어남을 감지하고 AdminClient 클래스로 해당 토픽의 파티션을 늘릴 수 있다.
어드민 API 선언
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "카프카서버IP:포트");
AdminClient admin = AdminClient.create(configs);
프로듀서 API 또는 컨슈머 API와는 다르게 추가 설정 없이 클러스터 정보에 대한 설정만 하면 된다.
create() 메서드로 KafkaAdminClient를 반환받는다.
KafkaAdminClient는 브로커들의 옵션들을 확인, 설정할 수 있는 유틸 클래스다.
KafkaAdminClient 주요 메서드 | |
describeCluster(DescribeClusterOptions options)
|
브로커 정보 조회 |
listTopics(ListTopicsOptions options) | 토픽 리스트 조회 |
listConsumerGroups(ListConsumerGroupsOptions) | 컨슈머 그룹 조회 |
createTopics (Collection<NewTopic> newTopic, CreateTopicsOptions options) |
신규 토픽 생성 |
createPartitions (Map<String, NewPartitions> newPartitions, CreateTopicsOptions options) |
파티션 개수 변경 |
createAcls(Collection<AclBinding> acls, CreateAclsOptions options) | 접근 제어 규칙 생성 |
package com.example;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class KafkaAdminClient {
private final static Logger logger = LoggerFactory.getLogger(KafkaAdminClient.class);
private final static String BOOTSTRAP_SERVERS = "13.125.177.84:9092";
public static void main(String[] args) throws Exception {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
AdminClient admin = AdminClient.create(configs);
logger.info("== Get broker information");
for (Node node : admin.describeCluster().nodes().get()) {
logger.info("node : {}", node);
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
describeConfigs.all().get().forEach((broker, config) -> {
config.entries().forEach(configEntry -> logger.info(configEntry.name() + "= " + configEntry.value()));
});
}
logger.info("== Get default num.partitions");
for (Node node : admin.describeCluster().nodes().get()) {
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
Config config = describeConfigs.all().get().get(cr);
Optional<ConfigEntry> optionalConfigEntry = config.entries().stream().filter(v -> v.name().equals("num.partitions")).findFirst();
ConfigEntry numPartitionConfig = optionalConfigEntry.orElseThrow(Exception::new);
logger.info("{}", numPartitionConfig.value());
}
logger.info("== Topic list");
for (TopicListing topicListing : admin.listTopics().listings().get()) {
logger.info("{}", topicListing.toString());
}
logger.info("== test topic information");
Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
logger.info("{}", topicInformation);
logger.info("== Consumer group list");
ListConsumerGroupsResult listConsumerGroups = admin.listConsumerGroups();
listConsumerGroups.all().get().forEach(v -> {
logger.info("{}", v);
});
admin.close();
}
}
어드민 API는 사용하고 나면 명시적으로 종료 메서드를 호출하여 리소스가 낭비되지 않도록 한다.
AdminClient 클래스의 close() 메서드를 사용하면 명시적으로 종료할 수 잇다.
어드민 API를 활용할 때 클러스터의 버전과 클라이언트의 버전을 맞춰서 사용해야한다.
어드민 API의 많은 부분이 버전이 올라가면서 자주 바뀌기 때문이다.
✨ 참고도서정보 :: 아파치 카프카 애플리케이션 프로그래밍 with 자바 (최원영)
'Framework > Kafka' 카테고리의 다른 글
[Kafka] 아파치 카프카 기본 개념 :: 카프카 스트림즈 - 스트림즈DSL (1) | 2024.10.22 |
---|---|
[Kafka] 아파치 카프카 기본 개념 :: 카프카 스트림즈 (4) | 2024.10.08 |
[Kafka] 아파치 카프카 기본 개념 :: 클라이언트 라이브러리 - 컨슈머 API (4) | 2024.09.25 |
[Kafka] 아파치 카프카 기본개념 :: 클라이언트 라이브러리 - 프로듀서 API (3) | 2024.09.23 |
[Kafka] 아파치 카프카 기본 개념 (2) | 2024.09.13 |