이번 글에서는 로컬 환경에서의 Redis와 Kafka 성능을 비교한다. Redis가 로컬에서 실행되고 있기 때문에, 동일한 환경에서 Kafka도 EC2가 아닌 로컬에서 실행하도록 코드를 수정하였다. 또한 Kafka는 배치 처리가 가능하기 때문에, 이전에는 컨슈머가 한 건씩 데이터를 읽었으나, 이번에는 배치 처리가 가능하도록 코드도 함께 수정하였다.
이를 위해 Kafka 및 Redis 관련 코드와 설정 파일을 다음과 같이 수정하였다.
1. docker-compose-local.yml 수정
Redis와 Kafka를 모두 로컬 환경에서 실행할 수 있도록 네트워크 설정과 포트 설정을 조정하였다.
수정 전
- Redis와 Database 컨테이너만 정의됨.
수정 후
- Kafka, Zookeeper를 추가하여 로컬에서 실행되도록 설정.
- 각 컨테이너 간의 네트워크 연결을 위한 pharmacy-recommendation-network 추가.
version: '3'
services:
pharmacy-recommendation-redis:
container_name: pharmacy-recommendation-redis
build:
dockerfile: Dockerfile
context: ./redis
image: eunhan97/pharmacy-recommendation-redis
ports:
- "6379:6379"
networks:
- pharmacy-recommendation-network
pharmacy-recommendation-database:
container_name: pharmacy-recommendation-database
build:
dockerfile: Dockerfile
context: ./database
image: eunhan97/pharmacy-recommendation-database
environment:
- MARIADB_DATABASE=pharmacy-recommendation
- MARIADB_ROOT_PASSWORD=${SPRING_DATASOURCE_PASSWORD}
ports:
- "3307:3306"
networks:
- pharmacy-recommendation-network
zookeeper:
image: bitnami/zookeeper:latest
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- pharmacy-recommendation-network
kafka:
image: bitnami/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
depends_on:
- zookeeper
networks:
- pharmacy-recommendation-network
pharmacy-recommendation-app:
container_name: pharmacy-recommendation-app
build:
context: .
environment:
- SPRING_DATASOURCE_USERNAME=${SPRING_DATASOURCE_USERNAME}
- SPRING_DATASOURCE_PASSWORD=${SPRING_DATASOURCE_PASSWORD}
- KAKAO_REST_API_KEY=${KAKAO_REST_API_KEY}
ports:
- "8080:8080"
depends_on:
- pharmacy-recommendation-database
- pharmacy-recommendation-redis
- kafka
networks:
- pharmacy-recommendation-network
networks:
pharmacy-recommendation-network:
driver: bridge
2. KafkaConfig 수정
Kafka 설정에서 로컬 환경에 맞춰 replication factor와 min.insync.replicas를 조정하였다.
수정 전
- Replication factor: 3
- min.insync.replicas: 2
수정 후
- Replication factor: 1 (로컬 환경에서는 복제 필요 없음)
- min.insync.replicas: 1
package com.example.phamnav.kafka.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.topic.name:pharmacy-data}")
private String topicName;
@Value("${kafka.topic.partitions:3}")
private int partitions;
@Value("${kafka.topic.replication-factor:1}") // 로컬 환경에서는 replication factor를 1로 설정
private short replicationFactor;
@Bean
public NewTopic pharmacyTopic() {
return TopicBuilder.name(topicName)
.partitions(partitions)
.replicas(replicationFactor)
.configs(Map.of("min.insync.replicas", "1")) // 로컬 환경에서는 min.insync.replicas를 1로 설정
.build();
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public AdminClient adminClient() {
return AdminClient.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
}
}
3. KafkaConsumer 수정
기존의 Kafka 컨슈머는 한 건씩 데이터를 읽고 처리하였으나, 배치 처리로 변경하여 성능을 개선하였다.
수정 전
- 메시지 수신 시마다 개별로 데이터를 저장.
@KafkaListener(topics = "pharmacy-data", groupId = "pharmacy-group")
public void consume(ConsumerRecord<String, String> record) {
try {
log.info("Received message: key = {}, value = {}", record.key(), record.value());
String[] values = record.value().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1);
if (values.length < 5) {
log.error("Invalid message format: {}", record.value());
return;
}
Pharmacy pharmacy = Pharmacy.builder()
.pharmacyName(values[1].replace("\"", "").trim())
.pharmacyAddress(values[2].replace("\"", "").trim())
.latitude(Double.parseDouble(values[3].trim()))
.longitude(Double.parseDouble(values[4].trim()))
.build();
pharmacyRepository.save(pharmacy);
log.info("Pharmacy saved to database: {}", pharmacy.getPharmacyName());
} catch (Exception e) {
log.error("Error processing message: {}", record.value(), e);
}
}
수정 후
- 100건씩 배치로 처리하여 데이터베이스에 저장.
private final List<Pharmacy> buffer = new ArrayList<>();
private static final int BATCH_SIZE = 100;
@KafkaListener(topics = "pharmacy-data", groupId = "pharmacy-group")
public void consume(ConsumerRecord<String, String> record) {
Pharmacy pharmacy = parseRecord(record);
buffer.add(pharmacy);
if (buffer.size() >= BATCH_SIZE) {
pharmacyRepository.saveAll(buffer);
buffer.clear();
log.info("Batch of {} pharmacies saved to database", BATCH_SIZE);
}
}
4. PharmacyProducer 수정
PharmacyProducer 클래스는 Redis와 Kafka에 데이터를 송신하는 역할을 한다. 기존에는 Kafka에만 데이터를 전송하는 로직이 있었으나, Redis로도 데이터를 전송할 수 있도록 추가하였다.
수정 전
- Kafka에만 데이터를 전송하는 로직이 존재.
수정 후
- Redis와 Kafka 모두에 데이터를 전송할 수 있도록 수정.
- CSV 파일을 읽어 각 메시지를 Redis와 Kafka로 각각 송신하는 메서드 추가.
@Service
public class PharmacyProducer {
private static final Logger log = LoggerFactory.getLogger(PharmacyProducer.class);
private final KafkaTemplate<String, String> kafkaTemplate;
private final StringRedisTemplate redisTemplate;
@Value("${pharmacy.csv.filepath}")
private String csvFilePath;
public PharmacyProducer(KafkaTemplate<String, String> kafkaTemplate, StringRedisTemplate redisTemplate) {
this.kafkaTemplate = kafkaTemplate;
this.redisTemplate = redisTemplate;
}
public void sendMessageToKafka(String message) {
kafkaTemplate.send("pharmacy-data", message)
.addCallback(
result -> log.info("Message sent successfully to Kafka: {}", message),
ex -> log.error("Failed to send message to Kafka: {}", message, ex)
);
}
public int sendPharmacyDataFromCsv() {
int count = 0;
try (BufferedReader br = new BufferedReader(new FileReader(csvFilePath))) {
String line;
while ((line = br.readLine()) != null) {
sendMessageToKafka(line);
count++;
if (count % 100 == 0) {
log.info("Sent {} messages to Kafka", count);
}
}
log.info("Total {} messages sent to Kafka", count);
} catch (IOException e) {
log.error("Error reading CSV file: {}", e.getMessage());
}
return count;
}
public int sendPharmacyDataToRedis() {
int count = 0;
try (BufferedReader br = new BufferedReader(new FileReader(csvFilePath))) {
String line;
while ((line = br.readLine()) != null) {
redisTemplate.opsForValue().set("pharmacy:" + count, line);
count++;
if (count % 100 == 0) {
log.info("Sent {} messages to Redis", count);
}
}
log.info("Total {} messages sent to Redis", count);
} catch (IOException e) {
log.error("Error reading CSV file: {}", e.getMessage());
}
return count;
}
}
5. application.yml 수정
application.yml 파일에서 Kafka와 Redis의 설정을 로컬 환경과 프로덕션 환경에 맞게 수정하였다. 로컬 환경에서는 로컬 Kafka 브로커와 Redis 인스턴스를 사용하고, 프로덕션 환경에서는 EC2 상의 Kafka와 Redis를 사용하도록 설정하였다.
수정 전
- 프로덕션 환경의 Kafka 서버 주소만 설정되어 있었음.
수정 후
- 로컬 환경에서 사용할 Kafka와 Redis 설정 추가.
- 포트와 URL을 로컬에 맞게 조정.
spring:
profiles:
active: local
group:
local:
- common
prod:
- common
---
spring:
config:
activate:
on-profile: common
kakao:
rest:
api:
key: ${KAKAO_REST_API_KEY}
---
spring:
config:
activate:
on-profile: local
datasource:
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mariadb://localhost:3307/pharmacy-recommendation
username: ${SPRING_DATASOURCE_USERNAME}
password: ${SPRING_DATASOURCE_PASSWORD}
redis:
host: localhost
port: 6379
jpa:
hibernate:
ddl-auto: create
show-sql: true
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: pharmacy-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
request.timeout.ms: 60000
max.block.ms: 60000
retry.backoff.ms: 1000
retries: 3
web:
resources:
static-locations: classpath:/static/
pharmacy:
recommendation:
base:
url: http://localhost:8080/dir/
csv:
filepath: ${user.dir}/database/init/pharmacy.csv
logging:
level:
com.example.phamnav: DEBUG
org:
springframework:
data:
redis: DEBUG
kafka: DEBUG
---
spring:
config:
activate:
on-profile: prod
datasource:
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mariadb://pharmacy-recommendation-database:3306/pharmacy-recommendation
username: ${SPRING_DATASOURCE_USERNAME}
password: ${SPRING_DATASOURCE_PASSWORD}
redis:
host: pharmacy-recommendation-redis
port: 6379
jpa:
hibernate:
ddl-auto: validate
show-sql: true
kafka:
bootstrap-servers: 52.79.207.254:9092,3.34.143.67:9092,3.38.212.167:9092
consumer:
group-id: pharmacy-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
pharmacy:
recommendation:
base:
url: http://3.38.83.24/dir/
csv:
filepath: ${user.dir}/database/init/pharmacy.csv
6. 테스트
Kafka는 Redis에 비해 눈에 띄게 빠른 속도로 데이터를 삽입하는 것을 확인할 수 있다. 데이터를 Kafka에서 처리할 때 빠른 속도는 컨슈머에서 1건씩 데이터를 삽입하더라도 배치 처리로 변경함으로써 얻은 효율성 덕분이다. 여기서 중요한 점은 동기식 처리와 비동기식 처리의 차이, 그리고 배치 처리의 이점이다.
배치 처리 전
2.3만 건
4.7만 건
9.5만 건
38만 건
배치 처리 후
2.3만 건
4.7만 건
9.5만 건
38만 건
이러한 결과는 다음과 같은 몇 가지 핵심 요소들로 인해 Kafka가 Redis보다 처리 속도가 훨씬 빠른 것을 보여준다.
1. 비동기 처리 vs 동기 처리
Kafka는 메시지를 비동기적으로 전송하여 메시지를 보내는 즉시 응답을 반환하고, 실제로 메시지가 브로커에 도달하는 것은 나중에 처리된다. 반면, Redis는 동기적으로 데이터를 저장하여, 데이터를 저장할 때까지 응답을 기다려야 하므로 처리 시간이 더 오래 걸린다.
2. 배치 처리
Kafka는 여러 메시지를 한 번에 배치로 처리할 수 있어 네트워크 오버헤드를 줄이고 전체적인 처리 속도를 높일 수 있다. 반면, Redis는 각 요청마다 하나의 데이터만 저장하므로 각 저장 작업에서 발생하는 지연이 더 크다.
3. 디스크 기반 저장 vs 메모리 기반 저장
Kafka는 데이터를 디스크에 저장하고 고성능의 순차적 디스크 쓰기를 활용한다. 이는 대량의 데이터를 빠르게 처리할 수 있게 해 준다. 반면, Redis는 메모리 기반 저장소로, 메모리의 속도에 의존하지만 데이터 양이 많아지면 메모리 부족 문제가 발생할 수 있다.
4. 네트워크 오버헤드
Kafka는 고성능 네트워크 프로토콜을 사용하여 메시지를 전송하므로 네트워크 오버헤드를 최소화하고 데이터를 빠르게 전송할 수 있다. 반면, Redis는 일반적인 TCP/IP 프로토콜을 사용하므로 네트워크 오버헤드가 더 클 수 있다.
결론적으로, Kafka는 대량의 데이터를 빠르게 처리할 수 있는 구조적 장점 덕분에 Redis보다 우수한 성능을 보여주었다. 이러한 차이는 특히 데이터 양이 많아질수록 더욱 명확하게 드러난다.
'BackEnd > Project' 카테고리의 다른 글
[PharmNav]Base62 vs Base64 비교 분석 (1) | 2024.09.24 |
---|---|
[PharmNav] Redis vs DB 조회 성능 비교 분석 (0) | 2024.09.24 |
[PharmNav] kafka를 통한 성능 향상 (0) | 2024.09.20 |
[BigData] Ch12. Kafka추가 활용 사례 (2) | 2024.09.19 |
[BigData] Ch12. Kafka StressTest (0) | 2024.09.19 |