본문 바로가기
BackEnd/Project

[BigData] Ch12. 대량 Kafka Message Join Test

by 개발 Blog 2024. 9. 19.

공부 내용을 정리하고 앞으로의 학습에 이해를 돕기 위해 작성합니다.

 

이번 글에서는 대량의 메시지를 생성하여 Kafka 컨슈머에 넘기는 작업을 다룬다. 주로 Kafka 스트리밍 환경에서 광고와 구매 로그를 처리하고, 이를 이용해 광고 효과를 분석하는 서비스 로직을 구현한다. 이를 위해 다양한 Kafka 설정과 메시지 직렬화, 대량 메시지 생성 로직을 설명한다.

 

Kafka 설정

Kafka 스트림과 프로듀서를 설정하는 과정에서, 광고 로그(WatchingAdLog)와 구매 로그(PurchaseLog), 그리고 두 데이터를 조인하여 분석 결과를 전송하는 토픽을 각각 생성하고 설정한다. 각각의 토픽에 맞춰 ProducerFactory KafkaTemplate를 설정하여, 여러 종류의 데이터를 효율적으로 처리할 수 있게 구성한다.

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration myKStreamConfig() {
        Map<String, Object> myKStreamConfig = new HashMap<>();
        myKStreamConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "lecture-6");
        myKStreamConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "3.39.223.252:9092,43.203.201.212:9092,3.36.122.61:9092");
        myKStreamConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        myKStreamConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return new KafkaStreamsConfiguration(myKStreamConfig);
    }

    @Bean
    public KafkaTemplate<String, Object> KafkaTemplateForGeneral() {
        return new KafkaTemplate<String, Object>(ProducerFactory());
    }

    @Bean
    public ProducerFactory<String, Object> ProducerFactory() {
        Map<String, Object> myConfig = new HashMap<>();
        myConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "3.39.223.252:9092,43.203.201.212:9092,3.36.122.61:9092");
        myConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        myConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, PurchaseLogOneProductSerializer.class);

        return new DefaultKafkaProducerFactory<>(myConfig);
    }

    @Bean
    public KafkaTemplate<String, Object> KafkaTemplateForWatchingAdLog() {
        return new KafkaTemplate<String, Object>(ProducerFactoryForWatchingAdLog());
    }

    @Bean
    public ProducerFactory<String, Object> ProducerFactoryForWatchingAdLog() {
        Map<String, Object> myConfig = new HashMap<>();

        myConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "3.39.223.252:9092,43.203.201.212:9092,3.36.122.61:9092");
        myConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        myConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, WatchingAdLogSerializer.class);

        return new DefaultKafkaProducerFactory<>(myConfig);
    }

    @Bean
    public KafkaTemplate<String, Object> KafkaTemplateForPurchaseLog() {
        return new KafkaTemplate<String, Object>(ProducerFactoryForPurchaseLog());
    }
    @Bean
    public ProducerFactory<String, Object> ProducerFactoryForPurchaseLog() {
        Map<String, Object> myConfig = new HashMap<>();

        myConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "3.39.223.252:9092,43.203.201.212:9092,3.36.122.61:9092");
        myConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        myConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, PurchaseLogSerializer.class);

        return new DefaultKafkaProducerFactory<>(myConfig);
    }
}

 

AdEvaluationService

AdEvaluationService는 광고 로그와 구매 로그 데이터를 실시간으로 처리하여 광고 효과를 분석하는 핵심 서비스이다. Kafka 스트림을 활용하여 두 데이터를 조인하고, 그 결과를 바탕으로 광고가 얼마나 효과적이었는지 평가한다. 

package com.example.kafkaproducer.service;

import com.example.kafkaproducer.vo.EffectOrNot;
import com.example.kafkaproducer.vo.PurchaseLog;
import com.example.kafkaproducer.vo.PurchaseLogOneProduct;
import com.example.kafkaproducer.vo.WatchingAdLog;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Service;

import java.util.*;

@Service
public class AdEvaluationService {
    // 광고 데이터 중복 Join될 필요없다. --> Table
    // 광고 이력이 먼저 들어온다.
    // 구매 이력은 상품별로 들어오지 않는다. (복수개의 상품 존재) --> contain
    // 광고에 머문시간이 10초 이상되어야만 join 대상
    // 특정가격이상의 상품은 join 대상에서 제외 (100만원)
    // 광고이력 : KTable(AdLog), 구매이력 : KTable(PurchaseLogOneProduct)
    // filtering, 형 변환,
    // EffectOrNot --> Json 형태로 Topic : AdEvaluationComplete
    @Autowired
    Producer myprdc;
    @Autowired
    public void buildPipeline(StreamsBuilder sb) {

        //object의 형태별로 Serializer, Deserializer 를 설정합니다.
        JsonSerializer<EffectOrNot> effectSerializer = new JsonSerializer<>();
        JsonSerializer<PurchaseLog> purchaseLogSerializer = new JsonSerializer<>();
        JsonSerializer<WatchingAdLog> watchingAdLogSerializer = new JsonSerializer<>();
        JsonSerializer<PurchaseLogOneProduct> purchaseLogOneProductSerializer = new JsonSerializer<>();

        JsonDeserializer<EffectOrNot> effectDeserializer = new JsonDeserializer<>(EffectOrNot.class);
        JsonDeserializer<PurchaseLog> purchaseLogDeserializer = new JsonDeserializer<>(PurchaseLog.class);
        JsonDeserializer<WatchingAdLog> watchingAdLogJsonDeserializer = new JsonDeserializer<>(WatchingAdLog.class);
        JsonDeserializer<PurchaseLogOneProduct> purchaseLogOneProductDeserializer = new JsonDeserializer<>(PurchaseLogOneProduct.class);

        Serde<EffectOrNot> effectOrNotSerde = Serdes.serdeFrom(effectSerializer, effectDeserializer);
        Serde<PurchaseLog> purchaseLogSerde = Serdes.serdeFrom(purchaseLogSerializer, purchaseLogDeserializer);
        Serde<WatchingAdLog> watchingAdLogSerde = Serdes.serdeFrom(watchingAdLogSerializer, watchingAdLogJsonDeserializer);
        Serde<PurchaseLogOneProduct> purchaseLogOneProductSerdeSerde = Serdes.serdeFrom(purchaseLogOneProductSerializer, purchaseLogOneProductDeserializer);

        // adLog topic 을 consuming 하여 KTable 로 받는다.
        KTable<String, WatchingAdLog> adTable = sb.stream("adLog", Consumed.with(Serdes.String(), watchingAdLogSerde))
                .selectKey((k,v) -> v.getUserId() + "_" + v.getProductId()) // key를 userId+prodId로 생성해준다.
                .filter((k,v)-> Integer.parseInt(v.getWatchingTime()) > 10) // 광고 시청시간이 10초 이상인 데이터만 Table에 담는다.
                .toTable(Materialized.<String, WatchingAdLog, KeyValueStore<Bytes, byte[]>>as("adStore") // Key-Value Store로 만들어준다.
                        .withKeySerde(Serdes.String())
                        .withValueSerde(watchingAdLogSerde)
                );

        // purchaseLog topic 을 consuming 하여 KStream 으로 받는다.
        KStream<String, PurchaseLog> purchaseLogKStream = sb.stream("purchaseLog", Consumed.with(Serdes.String(), purchaseLogSerde));

        // 해당 KStream의 매 Row(Msg)마다 하기의 내용을 수행한다.
        purchaseLogKStream.foreach((k,v) -> {

                    // value 의 product 개수만큼 반복하여 신규 VO에 값을 Binding 한다.
                    for (Map<String, String> prodInfo:v.getProductInfo())   {
                        // price 100000 미만인 경우만 하기의 내용을 수행합니다. 위 purchaseLogKStream에서 filter조건을 주고 받아도 무관하다.
                        if (Integer.parseInt(prodInfo.get("price")) < 1000000) {
                            PurchaseLogOneProduct tempVo = new PurchaseLogOneProduct();
                            tempVo.setUserId(v.getUserId());
                            tempVo.setProductId(prodInfo.get("productId"));
                            tempVo.setOrderId(v.getOrderId());
                            tempVo.setPrice(prodInfo.get("price"));
                            tempVo.setPurchasedDt(v.getPurchasedDt());

                            // 1개의 product로 나눈 데이터를 purchaseLogOneProduct Topic으로 produce 한다.
                            myprdc.sendJoinedMsg("purchaseLogOneProduct", tempVo);

                            // 하기의 method는 samplie Data를 생산하여 Topic에 넣는다. 1개 받으면 여러개를 생성하기 때문에 무한하게 생성헌다,
//                            sendNewMsg();
                        }
                    }
                }
        );

        // product이 1개씩 나누어 producing 된 topic을 읽어 KTable 로 받아온다.
        KTable<String, PurchaseLogOneProduct> purchaseLogOneProductKTable= sb.stream("purchaseLogOneProduct", Consumed.with(Serdes.String(), purchaseLogOneProductSerdeSerde))
                // Stream의 key 지정
                .selectKey((k,v)-> v.getUserId()+ "_" +v.getProductId())
                // key-value Store로 이용이 가능하도록 생성
                .toTable(Materialized.<String, PurchaseLogOneProduct, KeyValueStore<Bytes, byte[]>>as("purchaseLogStore")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(purchaseLogOneProductSerdeSerde)
                );

        // value joiner 를 통해 Left, Right 값을 통한 Output 결과값을 bind 하거나 join 조건을 설정할 수 있다.
        ValueJoiner<WatchingAdLog, PurchaseLogOneProduct, EffectOrNot> tableStreamJoiner = (leftValue, rightValue) -> {
            EffectOrNot returnValue = new EffectOrNot();
            returnValue.setUserId(rightValue.getUserId());
            returnValue.setAdId(leftValue.getAdId());
            returnValue.setOrderId(rightValue.getOrderId());
            Map<String, String> tempProdInfo = new HashMap<>();
            tempProdInfo.put("productId", rightValue.getProductId());
            tempProdInfo.put("price", rightValue.getPrice());
            returnValue.setProductInfo(tempProdInfo);
            System.out.println("Joined!");
            return returnValue;
        };

        // table과 joiner를 입력해줍니다. stream join이 아니기에 window 설정은 불필요하다.
        adTable.join(purchaseLogOneProductKTable,tableStreamJoiner)
                // join 이 완료된 데이터를 AdEvaluationComplete Topic으로 전달한다.
                .toStream().to("AdEvaluationComplete", Produced.with(Serdes.String(), effectOrNotSerde));
    }

    public void sendNewMsg() {
        PurchaseLog tempPurchaseLog  = new PurchaseLog();
        WatchingAdLog tempWatchingAdLog = new WatchingAdLog();

        //랜덤한 ID를 생성하기 위해 아래의 함수를 사용한다.
        // random Numbers for concatenation with attrs
        Random rd = new Random();
        int rdUidNumber = rd.nextInt(9999);
        int rdOrderNumber = rd.nextInt(9999);
        int rdProdIdNumber = rd.nextInt(9999);
        int rdPriceIdNumber = rd.nextInt(90000)+10000;
        int prodCnt = rd.nextInt(9)+1;
        int watchingTime = rd.nextInt(55)+5;

        // bind value for purchaseLog
        tempPurchaseLog.setUserId("uid-" + String.format("%05d", rdUidNumber));
        tempPurchaseLog.setPurchasedDt("20230101070000");
        tempPurchaseLog.setOrderId("od-" + String.format("%05d", rdOrderNumber));
        ArrayList<Map<String, String>> tempProdInfo = new ArrayList<>();
        Map<String, String> tempProd = new HashMap<>();
        for (int i=0; i<prodCnt; i++ ){
            tempProd.put("productId", "pg-" + String.format("%05d", rdProdIdNumber));
            tempProd.put("price", String.format("%05d", rdPriceIdNumber));
            tempProdInfo.add(tempProd);
        }
        tempPurchaseLog.setProductInfo(tempProdInfo);

        // bind value for watchingAdLog
        tempWatchingAdLog.setUserId("uid-" + String.format("%05d", rdUidNumber));
        tempWatchingAdLog.setProductId("pg-" + String.format("%05d", rdProdIdNumber));
        tempWatchingAdLog.setAdId("ad-" + String.format("%05d",rdUidNumber) );
        tempWatchingAdLog.setAdType("banner");
        tempWatchingAdLog.setWatchingTime(String.valueOf(watchingTime));
        tempWatchingAdLog.setWatchingDt("20230201070000");

        // produce msg
        myprdc.sendMsgForPurchaseLog("purchaseLog", tempPurchaseLog);
        myprdc.sendMsgForWatchingAdLog("adLog", tempWatchingAdLog);
    }
}
  • 데이터 직렬화 및 역직렬화 설정
    • 서비스 시작 시, EffectOrNot, PurchaseLog, WatchingAdLog, PurchaseLogOneProduct 객체에 대한 SerializerDeserializer를 설정한다. 이는 Kafka 스트림에서 객체 데이터를 바이트 배열로 변환하고, 다시 객체로 변환하는 과정에서 사용된다. 이를 통해 각 데이터를 안정적으로 처리하고 전송할 수 있다.
  • adLog를 KTable로 처리
    • adLog 토픽의 광고 데이터를 KTable로 변환하여 중복되지 않는 광고 기록을 관리한다.
    • 광고 로그의 키는 userId와 productId를 결합하여 생성되며, 광고 시청 시간이 10초 이상인 데이터만 필터링한다. 이 조건을 충족하는 광고 로그만 KTable에 저장된다.
    • 이 KTable은 Key-Value Store로 관리되어, 이후 구매 로그와의 조인(join) 과정에서 사용된다.
  • purchaseLog를 KStream으로 처리
    • purchaseLog 토픽의 데이터를 KStream으로 처리한다. 이 스트림은 구매 로그를 실시간으로 처리하며, 각 메시지마다 여러 개의 제품 정보를 포함할 수 있다.
    • 각 구매 로그는 제품별로 나뉘어 PurchaseLogOneProduct 객체로 변환된다. 이 과정에서 가격이 100만 원 이하인 제품만 처리되며, 변환된 데이터는 purchaseLogOneProduct 토픽으로 다시 전송된다.
    • 스트림에서 데이터를 나누고 새로운 객체로 변환하는 로직은 foreach 문을 통해 처리된다.
  • purchaseLogOneProduct를 KTable로 처리
    • purchaseLogOneProduct 토픽의 데이터를 KTable로 변환하여 각 제품별로 구매 기록을 관리한다.
    • userId와 productId를 결합한 키로 데이터를 관리하며, 이를 통해 광고 로그와 효과적으로 조인할 수 있다.
  • 조인 로직
    • ValueJoiner를 사용하여 광고 로그와 구매 로그를 조인한다. 광고를 본 사용자와 실제로 제품을 구매한 사용자를 매칭하여, 광고가 효과가 있었는지 분석한다.
    • 조인된 결과는 EffectOrNot 객체로 변환되며, 이 객체에는 광고 ID, 사용자 ID, 주문 ID, 제품 정보 등이 포함된다.
    • 조인이 완료된 데이터는 AdEvaluationComplete라는 Kafka 토픽으로 전송된다.
  • 대량 메시지 생성
    • sendNewMsg 메서드를 통해 무작위로 생성된 광고 로그와 구매 로그를 대량으로 생성하고, 각각의 토픽으로 전송한다. 이를 통해 스트리밍 애플리케이션에서 대량의 데이터를 처리하는 시나리오를 테스트할 수 있다.

Producer 클래스

Producer 클래스는 Kafka 스트림에서 광고 로그와 구매 로그 데이터를 Kafka 토픽에 전송하는 역할을 담당하는 클래스이다. 이 클래스는 여러 Kafka 토픽에 메시지를 전송하는 다양한 메서드를 제공하며, 데이터 전송을 효율적으로 관리할 수 있게 한다.

package com.example.kafkaproducer.service;

import com.example.kafkaproducer.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

@Service
public class Producer {
    private static final Logger logger = LoggerFactory.getLogger(Producer.class);

    @Autowired
    KafkaConfig myConfig;

    String topicName = "defaultTopic";

    private KafkaTemplate<String, Object> kafkaTemplate;

//    public Producer(KafkaTemplate kafkaTemplate) {
//        this.kafkaTemplate = kafkaTemplate;
//    }

    public void pub(String msg) {
        kafkaTemplate = myConfig.KafkaTemplateForGeneral();
        kafkaTemplate.send(topicName, msg);
    }

    public void sendJoinedMsg (String topicNm, Object msg) {
        kafkaTemplate = myConfig.KafkaTemplateForGeneral();
        kafkaTemplate.send(topicNm, msg);
    }
    public void sendMsgForWatchingAdLog (String topicNm, Object msg) {
        kafkaTemplate = myConfig.KafkaTemplateForWatchingAdLog();
        kafkaTemplate.send(topicNm, msg);
    }

    public void sendMsgForPurchaseLog (String topicNm, Object msg) {
        kafkaTemplate = myConfig.KafkaTemplateForPurchaseLog();
        kafkaTemplate.send(topicNm, msg);
    }
}
  • KafkaTemplate 사용: Kafka 메시지를 전송하기 위해 KafkaTemplate을 사용한다. 각 메시지 타입에 맞는 KafkaTemplate을 동적으로 설정해 사용한다.
  • 메시지 전송 메서드
    • pub(String msg): 기본 토픽(defaultTopic)으로 메시지를 전송한다.
    • sendJoinedMsg(String topicNm, Object msg): 조인된 데이터를 특정 토픽으로 전송한다.
    • sendMsgForWatchingAdLog(String topicNm, Object msg): 광고 로그를 전송한다.
    • sendMsgForPurchaseLog(String topicNm, Object msg): 구매 로그를 전송한다.
  • 동적 템플릿 설정: 광고 로그와 구매 로그 등 각 데이터 타입에 맞는 KafkaTemplate을 동적으로 사용하여 적절한 토픽에 메시지를 전송한다.

직렬화 클래스

PurchaseLogSerializer와 WatchingAdLogSerializer는 PurchaseLog와 WatchingAdLog 객체를 Kafka에서 전송하기 위해 바이트 배열로 직렬화하는 클래스이다. Jackson 라이브러리를 사용해 데이터를 직렬화하고, 직렬화 과정에서 발생하는 예외를 처리하여 안정적인 데이터 전송이 가능하게 한다.

 

PurchaseLogSerializer

package com.example.kafkaproducer.util;

import com.example.kafkaproducer.vo.PurchaseLog;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class PurchaseLogSerializer implements Serializer<PurchaseLog> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, PurchaseLog data) {
        try {
            if (data == null){
                return null;
            }
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SecurityException("Exception Occured");
        }
    }

    @Override
    public void close() {
    }

}
  • PurchaseLog 객체를 바이트 배열로 변환하는 클래스이다.
  • Jackson의 ObjectMapper를 사용하여 PurchaseLog 객체를 직렬화하며, 직렬화 중 예외가 발생하면 이를 처리하여 안전하게 전송할 수 있도록 한다.

WatchingAdLogSerializer

package com.example.kafkaproducer.util;

import com.example.kafkaproducer.vo.WatchingAdLog;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class WatchingAdLogSerializer implements Serializer<WatchingAdLog> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, WatchingAdLog data) {
        try {
            if (data == null){
                return null;
            }
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SecurityException("Exception Occured");
        }
    }

    @Override
    public void close() {
    }

}
  • WatchingAdLog 객체를 직렬화하는 역할을 한다.
  • 이 클래스 역시 ObjectMapper를 사용해 광고 로그 데이터를 직렬화하고, 문제가 발생하면 예외 처리를 통해 오류를 관리한다.

두 클래스 모두 Kafka 스트림에서 데이터를 전송할 때 객체를 바이트 배열로 변환하여 데이터 전송을 원활하게 하고, 안정성을 높인다.

실행 테스트

이제 개발한 기능들을 테스트해본다. 서버를 실행하면, sendNewMsg() 메서드가 자동으로 광고 로그(WatchingAdLog)와 구매 로그(PurchaseLog)를 생성하고 Kafka 토픽으로 전송한다. 

 

스트림 처리 확인

서버가 실행되면, Kafka 스트림 설정에 따라 자동으로 데이터를 직렬화하고 역직렬화하는 과정을 거쳐, 광고 로그와 구매 로그 데이터가 스트림을 통해 처리된다. 이를 통해 설정한 로직에 따라 데이터가 올바르게 스트림 처리되는지 확인할 수 있다.

  1. 광고 로그 처리: adLog 토픽에서 스트림된 데이터는 광고 시청 시간이 10초 이상인 경우만 필터링하여 KTable로 변환된다.
  2. 구매 로그 처리: purchaseLog 토픽의 데이터는 KStream을 통해 실시간으로 처리되며, 각 로그 내 상품 정보는 PurchaseLogOneProduct 객체로 변환되어 조건에 따라 필터링된다.

데이터 조인 및 결과 출력

처리된 광고 로그와 구매 로그는 조인되어 EffectOrNot 객체를 생성하고, 이 결과는 AdEvaluationComplete 토픽으로 전송된다. 이 과정 역시 서버 실행 시 자동으로 이루어지며, 최종적으로 Kafka 토픽에 결과가 올바르게 저장되었는지 로그를 통해 확인할 수 있다.

이렇게 자동으로 데이터 파이프라인을 구축하여 Kafka를 사용함으로써 대량의 데이터를 효율적으로 처리할 수 있다. 이를 통해 개발한 시스템이 안정적으로 운영되는 것을 확인할 수 있었다.