본문 바로가기
BackEnd/Project

[BigData] Ch12. Kafka Stream을 활용한 KStream 개발(4)

by 개발 Blog 2024. 9. 18.

공부 내용을 정리하고 앞으로의 학습에 이해를 돕기 위해 작성합니다.

 

이번 글에서는 KStream이 아닌 K-Table을 활용하여 조인(join)을 구현하는 방법을 다룬다.

 

K-Stream과 K-Table의 차이점

  • KStream은 메시지가 오는 대로 모든 메시지를 처리한다.
  • K-Table은 최신 상태만을 유지하며 중복된 키 값이 들어왔을 때는 가장 최신의 값으로 덮어쓴다. 즉, 실시간으로 상태(state)를 관리하는 테이블이라고 할 수 있다.

K-Table을 이용한 조인 구현

package com.example.kafkaproducer.service;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class KTableService {

    @Autowired
    public void buildPipeline(StreamsBuilder sb) {

        KTable<String, String> leftTable = sb.stream("leftTopic1", Consumed.with(Serdes.String(), Serdes.String())).toTable();
        KTable<String, String> rightTable = sb.stream("rightTopic1", Consumed.with(Serdes.String(), Serdes.String())).toTable();

        ValueJoiner<String, String, String> stringJoiner = (leftValue, rightValue) -> {
            return "[StringJoiner]" + leftValue + "-" + rightValue;
        };

        KTable<String, String> joinTable = leftTable.join(rightTable, stringJoiner);
        joinTable.toStream().to("joinedMsg1");
    }
}
  • leftTablerightTable 생성
    • StreamsBuilder를 통해 각각 leftTopic1과 rightTopic1에서 스트림을 받아와서 KTable로 변환한다.
  • ValueJoiner
    • ValueJoiner는 두 테이블에서 조인할 값들을 어떻게 결합할지 정의하는 함수다. 여기서는 간단히 두 값을 연결하여 출력하는 방식으로 구현했다.
  • K-Table Join
    • leftTable과 rightTable을 join() 메서드를 사용해 조인한다. 조인된 결과는 joinTable로 저장된다.
  • toStream()을 통해 스트림으로 변환
    • 조인된 결과는 다시 스트림으로 변환되어 joinedMsg1 토픽으로 전송된다.

실행 테스트

Kafka Streams를 이용한 KTable 조인 기능을 테스트하기 위해 다음과 같은 과정을 수행했다.

leftTopic1과 rightTopic1에 각각 데이터를 입력했고, joinedMsg1 토픽을 통해 조인 결과를 확인할 수 있었다.

 

결과 분석

  • 중복 출력: "abcd-efgh"와 "fifi-wiwi"가 두 번씩 출력된 것은 동일한 키("table")로 여러 번 데이터가 입력되었기 때문이다. KTable은 각 키의 최신 값을 유지하므로, 새로운 값이 들어올 때마다 조인 결과가 업데이트되어 출력된다.
  • 최종 결과: 마지막 두 줄은 각 토픽의 "table" 키에 대한 최종 값들의 조인 결과를 나타낸다.
  • 무시된 데이터: "ttable:123"와 "nonTable:45"는 키가 "table"이 아니므로 조인에 포함되지 않는다.

조인 로직

  • KTable 조인은 키를 기준으로 이루어진다. 이 테스트에서는 "table"이 키로 사용되었다.
  • 한쪽 토픽에 새로운 데이터가 들어오면, 다른 쪽 토픽의 같은 키를 가진 가장 최근 데이터와 조인된다.
  • 양쪽 토픽 모두에 데이터가 있을 때만 조인 결과가 출력된다.

이 테스트 결과를 통해 Kafka Streams의 KTable 조인이 예상대로 작동하고 있음을 확인할 수 있었다. 각 키에 대해 최신 값들이 성공적으로 조인되었으며, 조인 조건에 맞지 않는 데이터는 적절히 무시되었다.