본문 바로가기
BackEnd/Project

[BigData] Ch12. Kafka Stream을 활용한 KStream 개발(1)

by 개발 Blog 2024. 9. 18.

공부 내용을 정리하고 앞으로의 학습에 이해를 돕기 위해 작성합니다.

 

KafkaConfig 설정

Kafka Stream 설정을 위해 KafkaConfig 클래스에서 스트림 설정을 정의한다. Kafka Stream은 실시간 데이터 스트림 처리를 위한 기능을 제공하며, 이를 위해 애플리케이션 ID, 부트스트랩 서버 주소, 키와 값에 대한 직렬화 방법 등의 필수 정보를 설정한다. 복수의 Kafka 서버 주소를 포함시킴으로써 높은 가용성을 보장하고, 데이터 처리의 신뢰성을 높인다. 또한, 스트림 처리를 위한 병렬 처리 스레드 수도 설정할 수 있다.

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration myKStreamConfig() {
        Map<String, Object> myKStreamConfig = new HashMap<>();
        myKStreamConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-test");
        myKStreamConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "13.124.237.39:9092,3.38.181.75:9092,3.35.220.144:9092");
        myKStreamConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        myKStreamConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        myKStreamConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
        return new KafkaStreamsConfiguration(myKStreamConfig);
    }

 

StreamService 구현

StreamService 클래스에서는 Kafka의 스트림 빌더를 사용하여 데이터 파이프라인을 구성한다. 여기서는 특정 토픽에서 메시지를 읽어오고, 필터링 로직을 통해 특정 조건에 맞는 메시지만을 다른 토픽으로 전송하는 기능을 구현한다. 예를 들어, "freeClass"라는 단어를 포함하는 메시지만을 추출하여 'freeClassList'라는 토픽으로 전송한다. 이런 방식은 스트림의 필터 기능을 활용한 간단한 예로, 실시간 데이터 처리에서 매우 유용하다.

package com.example.kafkaproducer.service;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Printed;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class StreamService {

    private static final Serde<String> STRING_SERDE = Serdes.String();

    @Autowired
    public void buildPipeline(StreamsBuilder sb) {

        KStream<String, String> myStream = sb.stream("test", Consumed.with(STRING_SERDE, STRING_SERDE));
        myStream.print(Printed.toSysOut());
        myStream.filter((key, value) -> value.contains("freeClass")).to("freeClassList");

    }
}

 

실행 테스트

실제로 Kafka 스트림을 테스트하기 위해서는 Kafka 콘솔 프로듀서와 콘솔 컨슈머를 사용한다. 프로듀서를 통해 'test' 토픽에 데이터를 전송하고, 컨슈머를 통해 'freeClassList' 토픽에서 필터링된 결과를 확인할 수 있다. 이는 스트림이 제대로 작동하는지 검증하는데 필수적인 단계이다.

 // producer
 ~/kafka/bin/kafka-console-producer.sh --broker-list 13.124.237.39:9092,3.38.181.75:9092,3.35.220.144:9092 --topic test
 
 // consumer
 ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 13.124.237.39:9092,3.38.181.75:9092,3.35.220.144:9092 --topic freeClassList --from-beginning

 

이렇게 Kafka Stream을 사용하여 실시간 데이터 스트림 처리의 기본적인 사례를 구현하고, Kafka의 강력한 스트림 처리 능력을 활용할 수 있다. 데이터의 실시간 분석과 반응은 많은 현대 애플리케이션에서 중요한 요소이며, Kafka는 이를 효과적으로 지원한다.