아파치 카프카 애플리케이션 프로그래밍 with 자바 책 공부 내용 정리
스트림즈 DSL에는 레코드의 흐름을 추상화한 3가지 개념인 KStream, KTable, GolbalKTable이 있다.
이 3가지 개념은 컨슈머, 프로듀셔, 프로세서 API에서는 사용되지 않고 스트림즈DSL에서만 사용되는 개념이다.
KStream
KStream은 레코드의 흐름을 표현한 것으로 메세지 키와 메세지 값으로 구성되어 있다.
KStream으로 데이터를 조회하면 토픽에 존재하는(또는 KStream에 존재하는) 모든 레코드가 출력된다.
KStream은 컨슈머로 토픽을 구독하는 것과 동일한 선상에서 사용하는 것이라고 볼 수 있다.
KTable
KTable은 KStream과 다르게 메세지 키를 기준으로 묶어서 사용한다.
KStream은 토픽의 모든 레코드를 조회할 수 있지만 KTable은 유니크한 메세지 키를 기준으로 가장 최신 레코드를 사용한다. 그러므로 KTable로 데이터를 조회하면 메세지 키를 기준으로 가장 최신에 추가된 레코드의 데이터가 출력된다.
GlobalKTable
GlobalKTable은 KTable과 동일하게 메세지 키를 기준으로 묶어서 사용된다.
그러나 KTable로 선언된 토픽은 1개 파티션이 1개 테스크에 할당되어 사용되고, GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용된다는 차이점이 있다.
GlobalKTable을 설명하는 가장 좋은 예는 KStream과 KTable이 데이터 조인(join)을 수행할 때다.
예를 들어, KStream과 KTable 데이터를 조인한다고 가정하면, 먼저 반드시 코파티셔닝(co-partitioning)되어 있어야한다.
코파티셔닝이란 조인을 하는 2개 데이터의 파티션 개수가 동일하고 파티셔닝 전략(partitioning strategy)을 동일하게 맞추는 작업이다.
파티션 개수가 동일하고 파티셔닝 전략이 같은 경우에는 동일한 메세지 키를 가진 데이터가 동일 태스크에 들어가는 것을 보장한다.
문제는 조인을 수행하려는 토픽들이 코파티셔닝되어 있음을 보장할 수 없다는 것이다.
KStream과 KTable로 사용하는 2개의 토픽이 파티션 개수가 다를 수도 있고 파티션 전략이 다를 수 있다.
이런 경우에는 조인을 수행할 수 없다. 코파티셔닝이 되어있지 않은 2개의 토픽을 조인하는 로직이 담긴 스트림즈 애플리케이션을 실행하면 TopologyException이 발생한다.
조인을 수행하려는 KStream과 KTable이 코파티셔닝이 되어 있지 않으면 KStream 또는 KTable을 리파티셔닝(repartitioning)하는 과정을 거쳐야 한다.
리파티셔닝이란 새로운 토픽에 새로운 메세지 키를 가지도록 재배열하는 과정이다.
리파티셔닝 과정을 거쳐 KStream 토픽과 KTable로 사용하는 토픽이 코파티셔닝되도록 할 수 있다.
코파티셔닝 되지 않은 KStream과 KTable을 조인해서 사용하고 싶다면 KTable을 GlobalKTable로 선언하여 사용하면 된다.
KTable과 다르게 GlobalKTable로 정의된 데이터는 스트림즈 애프릴케이션의 모든 테스크에 동일하게 공유되어 사용된다.
다만, GlobalKTable을 사용하면 각 태스크마다 GlobalKTable로 정의된 모든 데이터를 저장하고 사용하기 때문에 스크림즈 애플리케이션의 로컬 스토리지의 사용량이 증가하고 네트워크, 브러커에 부하가 생기므로 되도록이면 작은 용량의 데이터일 경우에만 사용하는 것이 좋다.
스트림즈DSL 필수 옵션 | |
bootstrap.servers | 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성한다. |
application.id | 스트림즈 애플리케이션을 구분하기 위한 고유한 아이디를 설정한다. |
스트림즈DSL 선택 옵션 | |
default.key.serde | 레코드의 메세지 키를 직렬화, 역직렬화하는 클래스를 지정한다. default = Serdes.ByteArray().getClass().getName() |
default.value.serde | 레코드의 메세지 값을 직렬화, 역직렬화하는 클래스를 지정한다. default = Serdes.ByteArray().getClass().getName() |
num.stream.threads | 스트림 프로세싱 실행 시 실행될 스레드 개수를 지정한다. default = 1 |
state.dir | 상태기반 데이터 처리를 할 때 데이터를 저장할 디렉토리를 지정한다. default = /tmp/kafka-streams |
그 외옵션 | https://kafka.apache.org/documentation/ |
✨ 참고도서정보 :: 아파치 카프카 애플리케이션 프로그래밍 with 자바 (최원영)
'Framework > Kafka' 카테고리의 다른 글
[Kafka] 아파치 카프카 기본 개념 :: 카프카 커넥트 - 소스 커넥터 (1) | 2024.11.05 |
---|---|
[Kafka] 아파치 카프카 기본 개념 :: 카프카 커넥트 (1) | 2024.11.01 |
[Kafka] 아파치 카프카 기본 개념 :: 카프카 스트림즈 (4) | 2024.10.08 |
[Kafka] 아파치 카프카 기본개념 :: 클라이언트 라이브러리 - 어드민 API (1) | 2024.09.27 |
[Kafka] 아파치 카프카 기본 개념 :: 클라이언트 라이브러리 - 컨슈머 API (4) | 2024.09.25 |