본문 바로가기
BackEnd/Project

[PharmNav] kafka를 통한 성능 향상

by 개발 Blog 2024. 9. 20.

레디스 기반의 캐싱이 적용된 PharmNav 프로젝트에 카프카를 도입하여 초기 데이터 적재를 개선하는 과정을 설명한다. 카프카는 대규모 데이터 전송과 분산 처리에 강점이 있는 메시징 시스템이지만, 이 프로젝트에서는 실시간 처리보다는 초기 데이터 전송에 활용된다. 사용자는 카프카로 적재된 데이터를 조회하는 기능만 사용하며, 데이터를 처리하는 것은 초기 적재 시에만 수행된다.

 

1. Kafka 의존성 추가

카프카 기능을 사용하기 위해 build.gradle 파일에 spring-kafka 의존성을 추가한다.

implementation 'org.springframework.kafka:spring-kafka'

이 의존성은 스프링과 카프카의 연동을 쉽게 해 준다.

 

2. KafkaConfig 클래스 설정

KafkaConfig 클래스는 카프카의 기본 설정을 담당한다. 이 클래스에서는 Kafka 토픽을 생성하고 관리하는 역할을 한다.

package com.example.phamnav.kafka.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;

import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

@Configuration
@Slf4j
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.topic.name:pharmacy-data}")
    private String topicName;

    @Value("${kafka.topic.partitions:3}")
    private int partitions;

    @Value("${kafka.topic.replication-factor:3}")
    private short replicationFactor;

    @Bean
    public NewTopic pharmacyTopic() {
        NewTopic topic = TopicBuilder.name(topicName)
                .partitions(partitions)
                .replicas(replicationFactor)
                .configs(Map.of("min.insync.replicas", "2"))
                .build();
        log.info("Created Kafka topic: {}", topic);
        return topic;
    }

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return new KafkaAdmin(configs);
    }

    @PostConstruct
    public void initializeKafkaTopic() {
        AdminClient adminClient = AdminClient.create(
                Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
        );

        try {
            Set<String> existingTopics = adminClient.listTopics().names().get();
            if (!existingTopics.contains("pharmacy-data")) {
                adminClient.createTopics(Collections.singleton(pharmacyTopic())).all().get();
                log.info("pharmacy-data 토픽이 성공적으로 생성되었습니다.");
            } else {
                log.info("pharmacy-data 토픽이 이미 존재합니다.");
            }
        } catch (InterruptedException | ExecutionException e) {
            log.error("Kafka 토픽 초기화 중 오류 발생: ", e);
        } finally {
            adminClient.close();
        }
    }
}
  • @Configuration과 @Slf4j 어노테이션을 사용해 설정 클래스를 정의하고, 로그 기능을 활성화한다.
  • @Value를 사용해 application.yml에서 설정된 값을 가져온다. bootstrapServers는 카프카 클러스터의 서버 목록을, topicName, partitions, replicationFactor는 토픽의 이름과 설정을 정의한다.
  • NewTopic 빈을 생성해 카프카 토픽을 정의하고, 이 토픽이 존재하지 않으면 새로 생성한다.

이후 @PostConstruct로 카프카 토픽 초기화 메서드를 실행해 AdminClient를 사용해 토픽이 존재하는지 확인하고, 없으면 새로 생성한다.

 

3. KafkaTestController 컨트롤러

KafkaTestController는 테스트 목적으로 Kafka로 데이터를 전송하는 역할을 한다.

package com.example.phamnav.kafka.controller;

import com.example.phamnav.kafka.service.PharmacyProducer;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
public class KafkaTestController {

    private final PharmacyProducer pharmacyProducer;

    public KafkaTestController(PharmacyProducer pharmacyProducer) {
        this.pharmacyProducer = pharmacyProducer;
    }

    @PostMapping("/send-pharmacy-data")
    public String sendPharmacyData() {
        int count = pharmacyProducer.sendPharmacyDataFromCsv();
        return "Pharmacy data sent to Kafka. Total messages sent: " + count;
    }
}
  • /kafka/send-pharmacy-data 엔드포인트를 통해 CSV 파일로부터 데이터를 읽어 카프카에 전송하는 PharmacyProducer의 메서드를 호출한다.
  • 전송한 메시지 개수를 리턴해 전송된 데이터의 상태를 확인할 수 있다.

4. PharmacyConsumer 클래스

PharmacyConsumer는 카프카에서 메시지를 소비하는 역할을 한다.

package com.example.phamnav.kafka.service;

import com.example.phamnav.pharmacy.entity.Pharmacy;
import com.example.phamnav.pharmacy.repository.PharmacyRepository;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class PharmacyConsumer {

    private static final Logger log = LoggerFactory.getLogger(PharmacyConsumer.class);
    private final PharmacyRepository pharmacyRepository;

    @KafkaListener(topics = "pharmacy-data", groupId = "pharmacy-group")
    public void consume(ConsumerRecord<String, String> record) {
        try {
            log.info("Received message: key = {}, value = {}", record.key(), record.value());
            String[] values = record.value().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1);

            if (values.length < 5) {
                log.error("Invalid message format: {}", record.value());
                return;
            }

            Pharmacy pharmacy = Pharmacy.builder()
                    .pharmacyName(values[1].replace("\"", "").trim())
                    .pharmacyAddress(values[2].replace("\"", "").trim())
                    .latitude(Double.parseDouble(values[3].trim()))
                    .longitude(Double.parseDouble(values[4].trim()))
                    .build();

            pharmacyRepository.save(pharmacy);
            log.debug("Saved pharmacy to database: {}", pharmacy.getPharmacyName());

        } catch (Exception e) {
            log.error("Error processing message: {}", record.value(), e);
        }
    }
}
  • @KafkaListener 어노테이션을 사용해 pharmacy-data 토픽에서 메시지를 읽어온다. groupId는 동일한 컨슈머 그룹에 속한 컨슈머끼리 메시지를 나누어 처리하게 한다.
  • 메시지 포맷을 검증하고, 약국 정보를 데이터베이스에 저장하는 로직을 구현한다.

메시지 형식이 잘못되었을 경우 로그에 오류를 기록하며, 데이터가 정상일 경우 약국 정보를 데이터베이스에 저장한다.

 

5. PharmacyProducer 클래스

PharmacyProducer는 카프카로 메시지를 전송하는 역할을 한다.

package com.example.phamnav.kafka.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

@Service
public class PharmacyProducer {

    private static final Logger log = LoggerFactory.getLogger(PharmacyProducer.class);
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${pharmacy.csv.filepath}")
    private String csvFilePath;

    public PharmacyProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        kafkaTemplate.send("pharmacy-data", message)
                .addCallback(
                        result -> log.info("Message sent successfully: {}", message),
                        ex -> log.error("Failed to send message: {}", message, ex)
                );
    }

    public int sendPharmacyDataFromCsv() {
        int count = 0;
        try (BufferedReader br = new BufferedReader(new FileReader(csvFilePath))) {
            String line;
            while ((line = br.readLine()) != null) {
                sendMessage(line);
                count++;
                if (count % 100 == 0) {
                    log.info("Sent {} messages to Kafka", count);
                }
            }
            log.info("Total {} messages sent to Kafka", count);
        } catch (IOException e) {
            log.error("Error reading CSV file: {}", e.getMessage());
        }
        return count;
    }
}
  • KafkaTemplate을 사용해 메시지를 pharmacy-data 토픽으로 전송한다. CSV 파일로부터 데이터를 읽어 메시지를 생성하고, 이를 반복적으로 카프카로 전송한다.
  • 전송 성공 및 실패에 대한 콜백을 제공하여 전송 상태를 로그로 기록한다.

6. PharmacyController 클래스 수정

기존의 PharmacyController에 카프카와 관련된 API를 추가한다.

package com.example.phamnav.pharmacy.controller;

import com.example.phamnav.kafka.service.PharmacyProducer;
import com.example.phamnav.pharmacy.cache.PharmacyRedisTemplateService;
import com.example.phamnav.pharmacy.dto.PharmacyDto;
import com.example.phamnav.pharmacy.service.PharmacyRepositoryService;
import com.example.phamnav.pharmacy.service.PharmacySearchService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@RestController
@RequiredArgsConstructor
public class PharmacyController {

    private final PharmacyProducer pharmacyProducer;
    private final PharmacyRedisTemplateService pharmacyRedisTemplateService;
    private final PharmacySearchService pharmacySearchService;

    @PostMapping("/pharmacy/kafka/send")
    public String sendPharmacyDataToKafka() {
        int count = pharmacyProducer.sendPharmacyDataFromCsv();
        return "Pharmacy data sent to Kafka. Total messages sent: " + count;
    }

    @GetMapping("/pharmacy/search")
    public List<PharmacyDto> searchPharmacy() {
        return pharmacySearchService.searchPharmacyDtoList();
    }
}
  • @PostMapping("/pharmacy/kafka/send") 메서드를 통해 CSV 데이터를 카프카로 전송하는 기능을 구현한다. 이는 PharmacyProducer의 sendPharmacyDataFromCsv 메서드를 호출해 데이터를 전송한다.
  • 기존 Redis 관련 기능은 그대로 유지되며, 검색 기능을 제공하는 /pharmacy/search 엔드포인트도 추가한다.

7. Pharmacy 엔티티 수정

Pharmacy 엔티티에 PharmacyDto를 받는 새로운 생성자를 추가한다.

package com.example.phamnav.pharmacy.entity;

import com.example.phamnav.BaseTimeEntity;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

import com.example.phamnav.pharmacy.dto.PharmacyDto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Entity(name = "pharmacy")
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Pharmacy extends BaseTimeEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String pharmacyName;
    private String pharmacyAddress;
    private double latitude;
    private double longitude;

    // PharmacyDto를 받는 새로운 생성자
    public Pharmacy(PharmacyDto pharmacyDto) {
        this.pharmacyName = pharmacyDto.getPharmacyName();
        this.pharmacyAddress = pharmacyDto.getPharmacyAddress();
        this.latitude = pharmacyDto.getLatitude();
        this.longitude = pharmacyDto.getLongitude();
    }

    public void changePharmacyAddress(String address){
        this.pharmacyAddress = address;
    }
}
  • 이 생성자는 Kafka로부터 받은 데이터를 DTO 형식으로 변환하여 사용할 수 있도록 한다.
  • 기존의 약국 주소 변경 메서드인 changePharmacyAddress도 그대로 유지된다.

8. PharmacySearchService 수정

기존 PharmacySearchService 클래스에 Redis 캐싱 로직을 강화하고, 카프카 연동을 위한 일부 변경사항을 반영한다.

package com.example.phamnav.pharmacy.service;

import com.example.phamnav.pharmacy.cache.PharmacyRedisTemplateService;
import com.example.phamnav.pharmacy.dto.PharmacyDto;
import com.example.phamnav.kafka.service.PharmacyProducer;
import com.example.phamnav.pharmacy.entity.Pharmacy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@Service
@RequiredArgsConstructor
public class PharmacySearchService {

    private final PharmacyRepositoryService pharmacyRepositoryService;
    private final PharmacyRedisTemplateService pharmacyRedisTemplateService;

    public List<PharmacyDto> searchPharmacyDtoList() {
        // Redis에서 데이터 조회
        List<PharmacyDto> pharmacyDtoList = pharmacyRedisTemplateService.findAll();
        if(!pharmacyDtoList.isEmpty()){
            log.info("Redis findAll success!");
            return pharmacyDtoList;
        }

        // DB에서 데이터 조회
        pharmacyDtoList = pharmacyRepositoryService.findAll()
                .stream()
                .map(this::convertToPharmacyDto)
                .collect(Collectors.toList());

        // Redis에 데이터 저장
        pharmacyDtoList.forEach(pharmacyRedisTemplateService::save);

        return pharmacyDtoList;
    }

    private PharmacyDto convertToPharmacyDto(Pharmacy pharmacy) {
        return PharmacyDto.builder()
                .id(pharmacy.getId())
                .pharmacyAddress(pharmacy.getPharmacyAddress())
                .pharmacyName(pharmacy.getPharmacyName())
                .latitude(pharmacy.getLatitude())
                .longitude(pharmacy.getLongitude())
                .build();
    }
}
  • Redis에서 데이터를 먼저 조회하고, 없는 경우 데이터베이스에서 조회한 후 Redis에 저장하는 방식을 취한다.

9. application.yml 설정 변경

카프카 설정을 추가하여 로컬 및 프로덕션 환경에서 카프카를 사용할 수 있도록 설정한다.

spring:
  profiles:
    active: local # default profile
    group:
      local:
        - common
      prod:
        - common

---
spring:
  config:
    activate:
      on-profile: common

kakao:
  rest:
    api:
      key: ${KAKAO_REST_API_KEY}

---
spring:
  config:
    activate:
      on-profile: local
  datasource:
    driver-class-name: org.mariadb.jdbc.Driver
    url: jdbc:mariadb://localhost:3306/pharmacy-recommendation
    username: ${SPRING_DATASOURCE_USERNAME}
    password: ${SPRING_DATASOURCE_PASSWORD}
  redis:
    host: localhost
    port: 6379
  jpa:
    hibernate:
      ddl-auto: create
    show-sql: true

  kafka:
    bootstrap-servers: 52.79.207.254:9092,3.34.143.67:9092,3.38.212.167:9092
    consumer:
      group-id: pharmacy-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    properties:
      request.timeout.ms: 60000
      max.block.ms: 60000
      retry.backoff.ms: 1000
      retries: 3

  web:
    resources:
      static-locations: classpath:/static/

pharmacy:
  recommendation:
    base:
      url: http://localhost:8080/dir/
  csv:
    filepath: ${user.dir}/database/init/pharmacy.csv
logging:
  level:
    com.example.phamnav: DEBUG
    org:
      springframework:
        data:
          redis: DEBUG
          kafka: DEBUG
---
spring:
  config:
    activate:
      on-profile: prod
  datasource:
    driver-class-name: org.mariadb.jdbc.Driver
    url: jdbc:mariadb://pharmacy-recommendation-database:3306/pharmacy-recommendation
    username: ${SPRING_DATASOURCE_USERNAME}
    password: ${SPRING_DATASOURCE_PASSWORD}
  redis:
    host: pharmacy-recommendation-redis
    port: 6379
  jpa:
    hibernate:
      ddl-auto: validate # prod 배포시 validate
    show-sql: true
  kafka:
    bootstrap-servers: 52.79.207.254:9092,3.34.143.67:9092,3.38.212.167:9092
    consumer:
      group-id: pharmacy-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
pharmacy:
  recommendation:
    base:
      url: http://3.38.83.24/dir/
  csv:
    filepath: /path/to/your/pharmacy.csv
  • spring.kafka.bootstrap-servers에 카프카 서버 주소를 추가하고, 프로듀서 및 컨슈머 설정을 명시한다.
  • Redis 설정도 추가되어 있으며, 로컬과 프로덕션 환경에서 각각 다른 설정을 적용할 수 있도록 구분되어 있다.

10. 실행 테스트 

PharmNav 프로젝트에 카프카를 통한 데이터 적재와 레디스 캐싱 기능을 테스트한다. 다음과 같은 단계로 진행된다.

 

도커 실행

로컬 환경에서 도커를 이용하여 필요한 서비스를 구동시킨다.

docker-compose -f docker-compose-local.yml up

 

초기 데이터 적재

포스트맨(Postman)을 이용하여 카프카로 초기 데이터를 전송하는 POST 요청을 수행한다. 이 요청을 통해 총 2만 건의 데이터가 카프카로 전송된다.

 

데이터 구독 확인

카프카 컨슈머를 이용하여 구독된 메시지를 확인하고 일부 데이터를 로그를 통해 검증하며, 정상적으로 데이터베이스에 저장되었는지 확인한다.

 

실제 데이터 요청 테스트

웹 브라우저를 통해 http://localhost:8080으로 접속하여 실제 데이터 요청을 수행하고 레디스를 통해 캐싱된 데이터로 응답이 이루어지는 것을 확인한다.

 

이 테스트를 통해 카프카와 레디스의 결합이 데이터 처리의 효율성과 안정성을 크게 향상시키는 것을 확인할 수 있었다. 서버 부하 감소와 빠른 데이터 접근을 가능하게 함으로써, 애플리케이션의 성능을 극대화하였다. 또한, 시스템 장애 발생 시에도 데이터 손실 없이 신속하게 복구할 수 있는 환경을 구축하였다.