공부 내용을 정리하고 앞으로의 학습에 이해를 돕기 위해 작성합니다.
이번 글에서는 광고와 구매 이력을 통한 스트리밍 서비스 개발을 다룬다. 사용자가 광고를 볼 때마다 해당 이력이 Kafka에 저장되며, 구매 이력 역시 지속적으로 기록된다. 광고와 구매 사이의 시간 차이는 있으나, 이를 감안하고 개발을 진행한다. 사용자가 광고를 본 제품을 구매할 경우, 광고가 효과적이었음을 판단하고 광고의 효과 비율을 분석한다.
1. Kafka 설정 변경
수정 전
초기 Kafka 설정에서는 KafkaStreamsConfiguration을 통해 스트림의 기본 설정을 구성한다. 여기에는 애플리케이션 ID, 부트스트랩 서버 주소, 기본 키와 값의 SerDe 클래스 설정이 포함된다. 이 설정은 스트리밍 애플리케이션을 위한 기초적인 환경을 제공한다.
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration myKStreamConfig() {
Map<String, Object> myKStreamConfig = new HashMap<>();
myKStreamConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-test");
myKStreamConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "13.124.237.39:9092,3.38.181.75:9092,3.35.156.112:9092");
myKStreamConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
myKStreamConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
myKStreamConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
return new KafkaStreamsConfiguration(myKStreamConfig);
}
}
수정 후
수정된 설정에서는 KafkaTemplate과 ProducerFactory를 추가하여 데이터 전송 과정을 개선한다. 특히, ProducerFactory에서는 PurchaseLogOneProductSerializer를 사용하여 특정 데이터 유형을 위한 직렬화를 설정한다. 서버 주소 역시 최신 상태로 갱신되어, 더욱 안정적인 데이터 처리가 가능하다.
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration myKStreamConfig() {
Map<String, Object> myKStreamConfig = new HashMap<>();
myKStreamConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-test");
myKStreamConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "3.39.223.252:9092,13.124.211.91:9092,3.36.122.61:9092");
myKStreamConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
myKStreamConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
myKStreamConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
return new KafkaStreamsConfiguration(myKStreamConfig);
}
@Bean
public KafkaTemplate<String, Object> KafkaTemplate() {
return new KafkaTemplate<String, Object>(ProducerFactory());
}
@Bean
public ProducerFactory<String, Object> ProducerFactory() {
Map<String, Object> myConfig = new HashMap<>();
myConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "3.39.223.252:9092,13.124.211.91:9092,3.36.122.61:9092");
myConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
myConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, PurchaseLogOneProductSerializer.class);
return new DefaultKafkaProducerFactory<>(myConfig);
}
}
수정 전
Producer 클래스가 kafkaTemplate을 통해 "kafkaProducer" 토픽으로 메시지를 보내는 역할을 수행한다. 메시지 전송 과정에서 .get() 메서드를 사용해 동기적으로 메시지 전송 결과를 기다리며, 이는 메시지가 성공적으로 전송되었는지 즉시 확인할 수 있게 한다. 에러 발생 시 예외 처리를 통해 에러 메시지를 출력하고 스택 트레이스를 제공한다.
package com.example.kafkaproducer.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Producer {
String topicName = "kafkaProducer";
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
public Producer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void pub(String msg) {
System.out.println("Sending message: " + msg + " to topic: " + topicName);
try {
kafkaTemplate.send(topicName, msg).get(); // .get()을 사용하여 동기적으로 결과를 기다림
System.out.println("Message sent successfully");
} catch (Exception e) {
System.err.println("Error sending message: " + e.getMessage());
e.printStackTrace();
}
}
}
수정 후
기본적인 메시지 전송 메커니즘이 간소화되었다. 토픽 이름을 "defaultTopic"으로 변경하고, 동기적 결과 대기 방식인 .get() 호출을 제거하여 비동기적으로 메시지를 전송하도록 구성했다. 추가적으로 sendJoinedMsg 메서드를 도입하여 다른 토픽으로도 객체를 전송할 수 있는 유연성을 제공한다.
package com.example.kafkaproducer.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Producer {
String topicName = "defaultTopic";
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
public Producer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void pub(String msg) {kafkaTemplate.send(topicName, msg);}
public void sendJoinedMsg(String TopicName, Object msg) {
kafkaTemplate.send(TopicName, msg);
}
}
이 변경을 통해 메시지 전송 로직이 더 간결해지고, 처리 속도가 개선될 것이다. 또한, 다양한 토픽으로의 메시지 전송이 가능해져 시스템의 유연성이 증가했다.
3. Kafka 데이터 직렬화 방법 설정
이번에는 Kafka에서 데이터를 직렬화하기 위한 클래스를 추가한다. Kafka에서 객체를 전송할 때, 해당 객체를 바이트 배열로 변환하는 직렬화 과정이 필요하다. 이 과정에서 PurchaseLogOneProduct 객체를 직렬화할 수 있도록 별도의 직렬화 클래스가 추가되었다.
CustomSerializer
Kafka로 전송할 PurchaseLogOneProduct 객체를 직렬화하는 클래스이다. 이 클래스는 ObjectMapper를 사용해 PurchaseLogOneProduct 객체를 바이트 배열로 변환한다. 만약 직렬화할 데이터가 null일 경우 직렬화를 생략하며, 직렬화 과정에서 발생할 수 있는 예외를 처리하여 오류 발생 시 안전하게 전송 실패 원인을 알 수 있도록 한다. 이 클래스는 Kafka 스트림에서 다양한 객체를 직렬화할 수 있지만, 특정 객체에 특화된 직렬화 방식이 필요할 경우 PurchaseLogOneProductSerializer와 같은 클래스를 사용하는 것이 더 적합하다.
package com.example.kafkaproducer.util;
import com.example.kafkaproducer.vo.PurchaseLogOneProduct;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class CustomSerializer implements Serializer<PurchaseLogOneProduct> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, PurchaseLogOneProduct data) {
try {
if (data == null){
return null;
}
System.out.println("Serializing...");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SecurityException("Error when serializing MessageDto to byte[]");
}
}
@Override
public void close() {
}
}
PurchaseLogOneProductSerializer
PurchaseLogOneProduct 객체 전용 직렬화 클래스이다. CustomSerializer와 마찬가지로 ObjectMapper를 사용해 객체를 바이트 배열로 변환한다. 이 클래스는 PurchaseLogOneProduct 객체를 직렬화할 때 사용되며, 특정 데이터 모델에 특화된 방식으로 직렬화를 수행한다. 데이터가 없을 경우에는 직렬화를 생략하고, 직렬화 중 오류가 발생하면 예외를 발생시켜 문제를 알릴 수 있도록 되어 있다.
package com.example.kafkaproducer.util;
import com.example.kafkaproducer.vo.PurchaseLogOneProduct;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class PurchaseLogOneProductSerializer implements Serializer<PurchaseLogOneProduct> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, PurchaseLogOneProduct data) {
try {
if (data == null){
return null;
}
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SecurityException("Exception Occured");
}
}
@Override
public void close() {
}
}
4. VO 클래스 추가
Kafka 스트리밍에서 사용할 데이터 객체들을 정의한 VO(Value Object) 클래스들이 추가되었다. 이 클래스들은 광고와 구매 데이터를 관리하고 처리하기 위한 핵심 객체들로, 각각의 역할을 이해하는 것이 중요하다.
EffectOrNot
이 클래스는 특정 광고가 효과적이었는지를 판단하기 위한 정보를 담고 있다. 광고 ID(adId), 사용자 ID(userId), 주문 ID(orderId), 그리고 제품 정보(productInfo)를 포함하고 있다. productInfo는 Map으로 구성되어 있어 제품의 상세 정보를 저장할 수 있다. 이 클래스는 광고가 효과적인지 여부를 분석할 때 사용된다.
package com.example.kafkaproducer.vo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class EffectOrNot {
String adId; // ad-101
String userId; //
String orderId;
Map<String, String> productInfo;
}
PurchaseLog
사용자의 구매 정보를 담는 객체다. 이 클래스는 orderId, userId와 함께 구매한 제품 정보(productInfo)를 포함한다. productInfo는 여러 제품에 대한 정보를 ArrayList<Map<String, String>> 형태로 저장하여 다수의 제품을 한 번에 처리할 수 있도록 설계되었다. 이 데이터는 사용자의 구매 로그를 관리하는 데 사용된다.
package com.example.kafkaproducer.vo;
import lombok.Data;
import java.util.ArrayList;
import java.util.Map;
@Data
public class PurchaseLog {
// SAMPLE DATA
// { "orderId": "od-0005", "userId": "uid-0005", "productInfo": [{"productId": "pg-0023", "price":"12000"}, {"productId":"pg-0022", "price":"13500"}], "purchasedDt": "20230201070000", "price": 24000}
String orderId; // od-0001
String userId; // uid-0001
// ArrayList<String> productId; // {pg-0001, pg-0002}
ArrayList<Map<String, String>> productInfo;
String purchasedDt; // 2024201070000
// Long price; // 24000
}
PurchaseLogOneProduct
PurchaseLog와 유사하지만, 하나의 제품에 대한 구매 정보를 저장하는 클래스다. 여기서는 productId가 별도로 정의되어 있으며, 구매 일시(purchasedDt)와 가격(price) 정보를 함께 저장한다. 이 클래스는 특정 제품에 대한 구매 정보를 처리할 때 사용된다.
package com.example.kafkaproducer.vo;
import lombok.Data;
import java.util.ArrayList;
@Data
public class PurchaseLogOneProduct {
String orderId; // od-0001
String userId; // uid-0001
String productId; // pg-0001
String purchasedDt; // 20230201070000
String price; // 24000
}
WatchingAdLog
사용자가 광고를 본 기록을 저장하는 클래스다. 이 클래스는 userId, productId, adId, adType (광고 유형), watchingTime (광고를 본 시간), watchingDt (광고를 본 날짜) 등을 포함한다. 이 데이터는 사용자가 광고와 어떻게 상호작용했는지에 대한 정보를 관리하고 분석할 때 사용된다.
package com.example.kafkaproducer.vo;
import lombok.Data;
@Data
public class WatchingAdLog {
// SAMPLE DATA
// {"userId": "uid-0007", "productId": "pg-0007", "adId": "ad-101", "adType": "banner", "watchingTime": "30", "watchingDt": "20240201070000"}
String userId; // uid-0001
String productId; // pg-0001
String adId; // ad-101
String adType; // banner, clip, main, live
String watchingTime; // 머문시간
String watchingDt; // 2024201070000
}
이 클래스들은 Kafka 스트리밍 애플리케이션에서 광고와 구매 데이터를 처리하고, 광고 효과를 분석하는 데 핵심 역할을 한다. 각 클래스가 광고 및 구매 이력을 체계적으로 저장하고 관리할 수 있도록 설계되었으며, 다양한 상황에서 유연하게 사용할 수 있다.
5. AdEvaluationService
AdEvaluationService는 광고 데이터와 구매 데이터를 결합하여 광고 효과를 평가하는 스트리밍 애플리케이션의 핵심 클래스다. 이 클래스는 Kafka Streams를 활용하여 실시간으로 광고와 구매 데이터를 분석하고, 이를 통해 광고가 얼마나 효과적이었는지를 평가한다.
package com.example.kafkaproducer.service;
import com.example.kafkaproducer.vo.EffectOrNot;
import com.example.kafkaproducer.vo.PurchaseLog;
import com.example.kafkaproducer.vo.PurchaseLogOneProduct;
import com.example.kafkaproducer.vo.WatchingAdLog;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class AdEvaluationService {
// 광고 데이터는 중복 Join될 필요없다. --> Table
// 광고 이력이 먼저 들어온다.
// 구매 이력은 상품별로 들어오지 않는다. (복수개의 상품 존재) --> contain
// 광고에 머문 시간이 10초 이상되어야만 join 대상
// 특정가격 이상의 상품은 join 대상에서 제외 (100만원)
// 광고이력 : KTable(AdLog), 구매이력 : KTable(PurchaseLogOneProduct)
// filtering, 형 변환,
// EffectOrNot --> Json 형태로 Topic : AdEvaluationComplete
@Autowired
Producer myprdc;
@Autowired
public void buildPipeline(StreamsBuilder sb) {
JsonSerializer<EffectOrNot> effectSerializer = new JsonSerializer<>();
JsonSerializer<PurchaseLog> purchaseLogJsonSerializer = new JsonSerializer<>();
JsonSerializer<WatchingAdLog> watchingLogJsonSerializer = new JsonSerializer<>();
JsonSerializer<PurchaseLogOneProduct> purchaseLogOneProductJsonSerializer = new JsonSerializer<>();
JsonDeserializer<EffectOrNot> effectDeserializer = new JsonDeserializer<>(EffectOrNot.class);
JsonDeserializer<PurchaseLog> purchaseLogDeserializer = new JsonDeserializer<>(PurchaseLog.class);
JsonDeserializer<WatchingAdLog> watchingLogDeserializer = new JsonDeserializer<>(WatchingAdLog.class);
JsonDeserializer<PurchaseLogOneProduct> purchaseLogOneProductJsonDeserializer = new JsonDeserializer<>(PurchaseLogOneProduct.class);
Serde<EffectOrNot> effectOrNotSerde = Serdes.serdeFrom(effectSerializer, effectDeserializer);
Serde<PurchaseLog> purchaseLogSerde = Serdes.serdeFrom(purchaseLogJsonSerializer, purchaseLogDeserializer);
Serde<WatchingAdLog> watchingAdLogSerde = Serdes.serdeFrom(watchingLogJsonSerializer, watchingLogDeserializer);
Serde<PurchaseLogOneProduct> purchaseLogOneProductSerde = Serdes.serdeFrom(purchaseLogOneProductJsonSerializer, purchaseLogOneProductJsonDeserializer);
// adLog stream --> table
KTable<String, WatchingAdLog> adTable = sb.stream("adLog", Consumed.with(Serdes.String(), watchingAdLogSerde))
.selectKey((k,v) -> v.getUserId() + "_" + v.getProductId())
.toTable(Materialized.<String, WatchingAdLog, KeyValueStore<Bytes, byte[]>>as("adStore")
.withKeySerde(Serdes.String())
.withValueSerde(watchingAdLogSerde)
);
KStream<String, PurchaseLog> purchaseLogKStream = sb.stream("purchaseLog", Consumed.with(Serdes.String(), purchaseLogSerde));
purchaseLogKStream.foreach((k, v) -> {
for (Map<String, String> prodInfo:v.getProductInfo()) {
if (Integer.valueOf(prodInfo.get("price")) < 1000000) {
PurchaseLogOneProduct tempVo = new PurchaseLogOneProduct();
tempVo.setUserId(v.getUserId());
tempVo.setProductId(prodInfo.get("productId"));
tempVo.setOrderId(v.getOrderId());
tempVo.setPrice(prodInfo.get("price"));
tempVo.setPurchasedDt(v.getPurchasedDt());
myprdc.sendJoinedMsg("purchaseLogOneProduct", tempVo);
}
}
});
KTable<String, PurchaseLogOneProduct> purchaseLogOneProductKTable = sb.stream("purchaseLogOneProduct", Consumed.with(Serdes.String(), purchaseLogOneProductSerde))
.selectKey((k, v) -> v.getUserId() + "_" + v.getProductId())
.toTable(Materialized.<String, PurchaseLogOneProduct, KeyValueStore<Bytes, byte[]>>as("purchaseLogStore")
.withKeySerde(Serdes.String())
.withValueSerde(purchaseLogOneProductSerde)
);
ValueJoiner<WatchingAdLog, PurchaseLogOneProduct, EffectOrNot> tableStreamJoiner = (leftValue, rightValue) -> {
EffectOrNot returnValue = new EffectOrNot();
returnValue.setUserId(leftValue.getUserId());
returnValue.setAdId(leftValue.getAdId());
returnValue.setOrderId(rightValue.getOrderId());
Map<String, String> tempProdInfo = new HashMap<>();
tempProdInfo.put("productId", rightValue.getProductId());
tempProdInfo.put("price", rightValue.getPrice());
returnValue.setProductInfo(tempProdInfo);
return returnValue;
};
adTable.join(purchaseLogOneProductKTable, tableStreamJoiner)
.toStream().to("AdEvaluationComplete", Produced.with(Serdes.String(), effectOrNotSerde));
}
}
- 광고 로그와 구매 로그를 각각 처리하고, 두 데이터를 조인(join)하여 광고의 효과를 판단한다.
- 특정 조건에 맞는 광고 데이터만 처리하도록 필터링하며, 광고 효과가 있는 경우 그 결과를 Kafka의 AdEvaluationComplete 토픽으로 전송한다.
- 직렬화 및 역직렬화 설정
- 먼저 EffectOrNot, PurchaseLog, PurchaseLogOneProduct, WatchingAdLog 객체를 직렬화 및 역직렬화하는 설정을 정의한다. 이를 통해 Kafka 스트림 내에서 객체를 바이트 배열로 변환하고, 다시 객체로 변환할 수 있다.
- 각각의 VO 객체는 JsonSerializer와 JsonDeserializer를 사용하여 Kafka 스트림에서 안전하게 전송되도록 설정된다.
- adLog 스트림을 테이블로 변환
- adLog 토픽의 광고 데이터를 Kafka 스트림에서 읽어오고, userId와 productId를 결합한 키를 설정하여 KTable로 변환한다. 이 테이블은 광고 이력을 저장하고 관리하며, 중복 데이터 없이 광고 기록을 처리할 수 있다.
- purchaseLog 스트림 처리
- 구매 로그(purchaseLog) 스트림을 처리하여 여러 제품 정보 중에서 특정 가격(100만원 이하)의 제품만 처리하도록 필터링한다. 필터링된 제품 정보는 PurchaseLogOneProduct 객체로 변환되어 purchaseLogOneProduct 토픽으로 전송된다.
- foreach 문을 사용해 제품 목록을 순회하며, 각 제품이 필터링 조건을 충족하면 새로운 구매 로그 객체를 생성해 전송하는 구조다.
- purchaseLogOneProduct 스트림을 테이블로 변환
- purchaseLogOneProduct 토픽에서 데이터를 읽어와, userId와 productId를 키로 사용하여 KTable로 변환한다. 이 테이블은 각 제품별 구매 기록을 관리하며, 광고 기록과 결합(join)할 준비를 한다.
- 조인(join) 로직
- ValueJoiner를 사용해 광고 로그(WatchingAdLog)와 구매 로그(PurchaseLogOneProduct)를 조인한다. 두 테이블을 조인하여 EffectOrNot 객체를 생성하며, 광고가 실제로 구매로 이어졌는지에 대한 데이터를 포함한다.
- 조인된 데이터는 userId, adId, orderId와 제품 정보를 포함하며, 이를 통해 광고 효과를 평가할 수 있다.
- 결과 전송
- 조인된 데이터를 toStream()을 통해 스트림으로 변환하고, AdEvaluationComplete라는 Kafka 토픽으로 전송한다. 이 토픽은 최종적으로 광고 효과가 있는지 여부를 담은 데이터를 전달하는 역할을 한다.
- 테이블과 스트림의 결합: 광고 로그는 KTable로, 구매 로그는 KStream에서 시작해 KTable로 변환된다. 이 두 데이터를 조인하여 광고와 구매가 어떻게 연관되어 있는지를 파악한다.
- 필터링: 100만원 이상의 고가 제품은 분석 대상에서 제외되며, 광고를 10초 이상 본 사용자만 조인 대상으로 고려한다.
- 효율성: 테이블을 사용하여 중복된 광고 로그를 방지하고, 실시간으로 데이터를 처리함으로써 광고 효과를 빠르게 평가할 수 있다.
6. 실행 테스트
이제 Kafka 스트리밍 파이프라인을 테스트하기 위한 토픽 생성, 프로듀서와 컨슈머 실행, 샘플 데이터 입력 및 결과를 확인하는 절차를 다룬다.
1. Kafka 토픽 생성
Kafka 스트림에서 광고 로그와 구매 로그를 처리하고 조인 결과를 저장할 토픽을 생성한다. 아래 명령어를 사용하여 총 4개의 토픽을 생성한다.
~/kafka/bin/kafka-topics.sh --create --zookeeper 3.39.223.252:2181,13.124.211.91:2181,3.36.122.61:2181 --replication-factor 3 --partitions 1 --topic adLog
~/kafka/bin/kafka-topics.sh --create --zookeeper 3.39.223.252:2181,13.124.211.91:2181,3.36.122.61:2181 --replication-factor 3 --partitions 1 --topic purchaseLog
~/kafka/bin/kafka-topics.sh --create --zookeeper 3.39.223.252:2181,13.124.211.91:2181,3.36.122.61:2181 --replication-factor 3 --partitions 1 --topic purchaseLogOneProduct
~/kafka/bin/kafka-topics.sh --create --zookeeper 3.39.223.252:2181,13.124.211.91:2181,3.36.122.61:2181 --replication-factor 3 --partitions 1 --topic AdEvaluationComplete
2. 프로듀서 실행
adLog와 purchaseLog에 샘플 데이터를 입력한다. 아래 명령어를 사용해 각각의 토픽에 데이터를 전송할 수 있는 프로듀서를 실행한다.
~/kafka/bin/kafka-console-producer.sh --broker-list 3.39.223.252:9092,13.124.211.91:9092,3.36.122.61:9092 --topic adLog
~/kafka/bin/kafka-console-producer.sh --broker-list 3.39.223.252:9092,13.124.211.91:9092,3.36.122.61:9092 --topic purchaseLog
adLog와 purchaseLog 샘플 데이터
// adLog
{"userId": "uid-0005", "productId": "pg-0022", "adId": "ad-101", "adType": "banner", "watchingTime": "30", "watchingDt": "20240201070000"}
// purchaseLog
{ "orderId": "od-0005", "userId": "uid-0005", "productInfo": [{"productId": "pg-0023", "price":"12000"}, {"productId":"pg-0022", "price":"13500"}], "purchasedDt": "20240201070000", "price": 24000}
3. 컨슈머 실행
Kafka 스트림의 데이터를 확인하기 위해 purchaseLogOneProduct와 AdEvaluationComplete 토픽에서 데이터를 수신하는 컨슈머를 실행한다.
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 3.39.223.252:9092,13.124.211.91:9092,3.36.122.61:9092 --topic purchaseLogOneProduct --from-beginning
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 3.39.223.252:9092,13.124.211.91:9092,3.36.122.61:9092 --topic AdEvaluationComplete --from-beginning
4. 결과 확인
광고 로그와 구매 로그가 정상적으로 조인되고, 최종 결과가 AdEvaluationComplete 토픽으로 전송되는 것을 확인할 수 있다.
이번 글에서는 Kafka 스트리밍을 활용해 광고와 구매 데이터를 조인하고, 광고 효과를 분석하는 과정을 다뤘다. Kafka 설정, 직렬화 처리, 데이터 흐름과 조인 로직을 통해 실시간 광고 효과 평가 시스템을 구현하였다. 마지막으로, 실행 테스트를 통해 광고와 구매 로그가 올바르게 처리되고 조인되는 것을 확인했다.
'BackEnd > Project' 카테고리의 다른 글
[BigData] Ch12. Kafka StressTest (0) | 2024.09.19 |
---|---|
[BigData] Ch12. 대량 Kafka Message Join Test (0) | 2024.09.19 |
[BigData] Ch12. Kafka Stream을 활용한 KStream 개발(4) (0) | 2024.09.18 |
[BigData] Ch12. Kafka Stream을 활용한 KStream 개발(3) - 스트림 조인 개념 (0) | 2024.09.18 |
[BigData] Ch12. Kafka Stream을 활용한 KStream 개발(2) (2) | 2024.09.18 |