본문 바로가기
Framework/Kafka

[Kafka] 아파치 카프카 CLI 명령어

by 미미믹 2024. 9. 5.

kafka-topics.sh
  • 카프카 토픽 생성
    --replication-factor 1로 설정할 경우 복제를 하지 않고 사용하겠단 의미이며, 2면 1개의 복제본을 생성한다.
    복제 수의 최소 설정은 1이고, 최대 설정은 카프카 클러스터의 브로커 수이다.
bin/kafka-topics.sh \
  --create \
  --bootstrap-server 카프카브로커IP:포트 \
  [--partitions 파티션수] \
  [--replication-factor 복제수(1이면복제X)] \
  [--config retention.ms=토픽데이터유지시간] \
  --topic 토픽명

 

  • 카프카 토픽 조회
    --exclude-internal 옵션 추가 시 내부 관리를 위한 인터널 토픽을 조회 목록에서 제외할 수 있다.
bin/kafka-topics.sh --bootstrap-server 카프카브로커IP:포트 --list [--exclude-internal]

 

  • 카프카 토픽 상세 조회
bin/kafka-topics.sh --bootstrap-server 카프카브로커IP:포트 --describe --topic 토픽명

 

  • 토픽 옵션 수정
bin/kafka-topics.sh --bootstrap-server 카프카브로커IP:포트 --topic 토픽명 \
  --alter --partitions 파티션수(늘리기만 가능)

 

  • 토픽 삭제
bin/kafka-topics.sh --bootstrap-server 카프카브로커IP:포트 --topic 토픽명 --delete

 


kafka-configs.sh
bin/kafka-configs.sh --bootstrap-server 카프카브로커IP:포트 \
  --entity-type topics --entity-name 토픽명 \
  --alter --add-config retention.ms=보관시간

 


kafka-console-producer.sh

생성된 토픽에 데이터를 넣을 수 있다.
메세지 키 없이 값만 넘길 경우 키는 자바의 null로 기본 설정된다.
메세지 키 사용 설정 후 구분자를 지정하지 않을 경우, 기본 설정은 \t이다.
메세지 키 사용 설정 후 구분자를 넣지 않고 엔터를 누르면 KafkaException 오류가 호출되고 종류된다.

bin/kafka-console-producer.sh --bootstrap-server 카프카브로커IP:포트 --topic 토픽명 \
  [--property parse.key=true --property key.separator="구분자"]

 


kafka-console-consumer.sh

토픽에 전송한 데이터를 확인할 수 있다.
--from-beginning 옵션을 주면 처음 데이터부터 볼 수 있다.

bin/kafka-console-consumer.sh --bootstrap-server 카프카브로커IP:포트 --topic 토픽명 \
  [--property parse.key=true --property key.separator="구분자"] \
  [--group 컨슈머그룹명] \
  [--max-messages 메세지수] \
  [--partition 파티션번호] \
  [--from-beginning]

 


kafka-consumer-groups.sh
  • 컨슈머 그룹 목록 조회
bin/kafka-consumer-groups.sh --bootstrap-server 카프카브로커IP:포트 --list

 

  • 특정 컨슈머 그룹 상세 내용 조회
bin/kafka-consumer-groups.sh --bootstrap-server 카프카브로커IP:포트 \
  --group 그룹명 --describe

 

  • 컨슈머 그룹 삭제
bin/kafka-consumer-groups.sh --bootstrap-server 카프카브로커IP:포트 --delete --group 그룹명

 


kafka-verifiable-producer.sh

kafka-verifiable-producer를 사용하면 String 타입 메시지 값을 코드 없이 주고받을 수 있다.
카프카 클러스터 설치 후 토픽에 데이터를 전송하여 간단한 네트워크 통신 테스트를 할때 유용하다.

bin/kafka-verifiable-producer.sh --bootstrap-server 카프카브로커IP:포트 \
  --max-messages 보내는데이터수 \
  --topic 데이터를받을토픽명

--max-messages는 kafka-verifiable-producer.sh로 보내는 데이터 개수를 지정한다.
-1로 설정하면 종료될 때까지 계속 데이터를 보낸다.

최초 실행 시점이 startup_compelete와 함께 출력된다.
메세지별로 보낸 시간과 메세지 키, 메세지 값, 토픽, 저장된 파티션, 저장된 오프셋 번호가 출력된다.

 

kafka-verifiable-consumer.sh

kafka-verifiable-producer에 전송된 데이터를 볼 수 있다.

bin/kafka-verifiable-consumer.sh --bootstrap-server 카프카브로커IP:포트 \
  --topic 토픽명 \
  --group-id 컨슈머그룹명

 


kafka-delete-records.sh

적재된 토픽의 데이터를 지울 수 있다.
이미 적재된 토픽의 데이터 중 가장 오래된 데이터부터 특정 시점의 오프셋까지 삭제할 수 있다.
토픽의 특정 레코드 하나만 삭제되는 것이 아니라 파티션에 존재하는 가장 오래된 오프셋부터 지정한 오프셋까지 삭제된다.

vi delete-topic.json

{
  "partitions": [
    {
      "topic": 토픽명,
      "partition": 파티션번호,
      "offset": 오프셋번호
    }
  ],
  "version": 1
}

삭제하고자 하는 데이터에 대한 정보를 파일로 저장해서 사용해야한다.

bin/kafka-delete-records.sh --bootstrap-server 카프카브로커IP:포트 --offset-json-file 파일명