본문 바로가기
BackEnd/Project

[SNS] Ch04. Kafka를 활용한 비동기 데이터 처리

by 개발 Blog 2024. 9. 11.

공부 내용을 정리하고 앞으로의 학습에 이해를 돕기 위해 작성합니다.

 

이번 시간에는 카프카(Kafka)를 이용한 비동기 데이터 처리에 대해 다룬다. 카프카는 분산 메시징 시스템으로, 대용량 데이터의 비동기 처리에 최적화된 도구이다. 카프카를 사용하면 데이터 생산자(Producer)와 소비자(Consumer) 간의 메시지 교환을 효율적으로 처리할 수 있다. 이를 통해 실시간 데이터 스트리밍이 가능해지며, 시스템 간의 결합도를 낮추고 확장성을 높일 수 있다.

 

Kafka

위 이미지는 Kafka의 기본 구조를 보여준다. Producer(생산자), Broker(중개자), 그리고 Consumer(소비자)로 나뉘어 데이터가 어떻게 처리되는지 나타낸다.

  • Producer
    • Producer는 데이터를 생성하고 Kafka 브로커로 전송하는 주체이다.
    • Producer가 생성하는 데이터는 Event(이벤트)로 표현되며, 이 이벤트에는 고유한 Key(키)가 할당된다.
    • 이 키는 데이터를 어느 Partition에 저장할지 결정하는 기준이 된다.
  • Broker
    • Broker는 데이터를 관리하고 저장하는 Kafka의 중심 역할을 한다.
    • Producer가 보낸 이벤트는 Broker 내의 Topic(토픽)에 저장되며, 하나의 Topic은 여러 개의 Partition으로 나뉜다.
    • 각 Partition은 데이터를 순서대로 기록하고 저장하며, 메시지를 관리한다. Partition을 사용하면 데이터가 여러 서버에 분산 저장되어 확장성을 크게 높일 수 있다.
    • Topic 0부터 Topic N까지 다양한 토픽이 존재할 수 있으며, 각각의 토픽이 여러 파티션으로 나누어진다.
  • Consumer
    • Consumer는 Broker에 저장된 데이터를 가져가 처리하는 주체이다.
    • 여러 Consumer가 Consumer Group을 이루며, 이 그룹 내에서 데이터를 효율적으로 처리한다.
    • 한 Consumer Group에 속한 Consumer들은 서로 다른 파티션에서 데이터를 읽어가며, 동시에 여러 데이터를 병렬로 처리할 수 있다.
    • 이미지에서 볼 수 있듯이, 각각의 파티션으로부터 데이터를 읽어오는 Consumer가 여러 개 있을 수 있다.
    • 예를 들어, Consumer 1과 Consumer 2는 각각 다른 파티션에서 데이터를 읽어온다.

카프카는 Producer와 Consumer 사이에 Broker를 두어 데이터를 주고받는 시스템이다. 데이터를 파티셔닝 하여 분산 처리함으로써 대용량 데이터의 비동기 처리가 가능하다. Producer는 데이터를 보내고, Broker는 데이터를 저장 및 관리하며, Consumer는 해당 데이터를 받아서 처리하는 구조이다. 이러한 구조 덕분에 Kafka는 대규모 데이터를 빠르게 처리하고, 시스템의 확장성과 유연성을 보장할 수 있다.

 

실습

Kafka 설치 및 설정

로컬에서 Kafka를 사용하기 위해서는 먼저 Kafka를 설치해야 한다. Mac에서는 brew를 이용해 설치할 수 있다.

brew install kafka

 

Kafka를 실행하기 전에는 zookeeper를 먼저 실행해야 한다.

 brew services start zookeeper

 

 

그 후, Kafka를 실행한다.

brew services start kafka

 

 

Kafka 토픽 생성

이번 프로젝트에서는 alarm이라는 토픽을 생성하여 알림 이벤트를 관리한다. 아래 명령어로 alarm 토픽을 생성할 수 있다.

kafka-topics --create --topic alarm --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

 

Kafka 프로듀서 생성

Kafka에서 이벤트를 생성하는 프로듀서를 생성하기 위해 콘솔 프로듀서를 실행한다.

kafka-console-producer --topic alarm--bootstrap-server localhost:9092

 

 

Gradle 의존성 추가

Kafka와 Spring Kafka를 연동하기 위해 build.gradle 파일에 아래와 같은 의존성을 추가한다.

implementation 'org.springframework.kafka:spring-kafka'

 

application.yml 설정

Kafka와 관련된 설정을 application.yml 파일에 추가한다.

  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: notification
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        enable.idempotence: false
    listener:
      ack-mode: MANUAL
    topic:
      alarm: alarm
  • consumer 설정
    • bootstrap-servers: localhost:9092: Kafka 브로커의 주소를 정의하며, 여기서는 로컬에서 Kafka 브로커가 실행 중인 localhost:9092로 설정되어 있다.
    • group-id: notification: 컨슈머 그룹 ID를 설정하며, 같은 그룹에 속한 컨슈머들은 각 파티션에서 한 번씩만 메시지를 처리한다. 여기서는 notification 그룹이 사용된다.
    • auto-offset-reset: latest: 컨슈머가 처음 시작할 때 처리할 메시지의 위치를 지정한다. latest는 가장 마지막 메시지부터 처리함을 의미한다.
    • key-deserializer 및 value-deserializer: Kafka에서 수신한 메시지의 키와 값을 어떻게 역직렬화할지 설정한다. 키는 IntegerDeserializer를 사용해 정수로 변환하고, 값은 JsonDeserializer를 사용해 JSON 데이터를 객체로 변환한다.
    • spring.json.trusted.packages: "*": JSON 역직렬화 시 신뢰할 수 있는 패키지를 정의하며, "*"는 모든 패키지를 허용한다는 뜻이다.
  • producer 설정
    • bootstrap-servers: localhost:9092: Kafka 프로듀서가 메시지를 전송할 브로커의 주소이다. 역시 로컬 Kafka 브로커인 localhost:9092를 사용한다.
    • key-serializer 및 value-serializer: Kafka 프로듀서가 메시지를 전송할 때 직렬화하는 방법을 정의한다. 키는 IntegerSerializer를 사용해 정수로 직렬화하고, 값은 JsonSerializer를 사용해 객체를 JSON 형태로 직렬화한다.
    • enable.idempotence: false: 프로듀서의 멱등성(idempotence)을 비활성화하는 설정이다. 멱등성을 활성화하면 중복 메시지 전송이 방지되지만, 여기서는 비활성화되어 있다.
  • listener 설정
    • ack-mode: MANUAL: Kafka 컨슈머가 메시지를 수동으로 처리 완료를 알리도록 설정한다. 메시지를 처리한 후 직접적으로 Kafka에게 해당 메시지를 처리 완료했음을 알리기 위해 수동 확인 모드를 사용한다.
  • topic 설정
    • alarm: alarm: Kafka에서 사용할 토픽 이름을 설정한다. alarm이라는 이름의 토픽을 사용하며, 알림과 관련된 이벤트들이 이 토픽을 통해 전송되고 처리된다.

이 설정을 통해 Spring Kafka 기반의 프로듀서와 컨슈머가 Kafka 브로커와 상호작용하며, 알림 관련 데이터를 주고받을 수 있도록 구성된다.

AlarmConsumer 클래스

AlarmConsumer 클래스는 Kafka로부터 수신된 알림 이벤트를 처리하는 역할을 하는 컨슈머 클래스이다. 이 클래스는 Kafka에서 전송된 AlarmEvent를 수신하고, 해당 알림을 처리하는 로직을 포함한다.

package com.example.sns.consumer;

import com.example.sns.model.event.AlarmEvent;
import com.example.sns.service.AlarmService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmConsumer {

    private final AlarmService alarmService;

    @KafkaListener(topics = "${spring.kafka.topic.alarm}")
    public void consumeAlarm(AlarmEvent event, Acknowledgment ack) {
        log.info("Consume the event: {}", event);
        alarmService.send(event.getAlarmType(), event.getArgs(), event.getReceiveUserId());
        ack.acknowledge();
    }
}
  • KafkaListener 어노테이션
    • @KafkaListener(topics = "${spring.kafka.topic.alarm}")는 이 메서드가 지정된 Kafka 토픽으로부터 메시지를 수신하도록 한다. 여기서 토픽 이름은 application.yml 파일에 설정된 alarm 토픽을 참조한다. 이 토픽은 알림과 관련된 모든 이벤트가 전송되는 곳이다.
  • consumeAlarm 메서드
    • 이 메서드는 Kafka에서 수신된 AlarmEvent를 처리하는 로직을 담고 있다.
    • 이벤트가 수신되면, log.info를 통해 수신된 이벤트 정보를 로그로 기록하고, alarmService의 send 메서드를 호출하여 알림을 해당 수신자에게 전달한다. 여기서 AlarmType, AlarmArgs, 그리고 ReceiveUserId를 이용해 알림 전송을 처리한다.
  • Acknowledgment 사용
    • Acknowledgment ack는 수동으로 메시지 처리가 완료되었음을 Kafka에 알리기 위한 객체이다.
    • ack.acknowledge()는 Kafka에게 해당 메시지가 성공적으로 처리되었음을 명시적으로 알리는 역할을 한다. 이를 통해 메시지 중복 처리를 방지할 수 있다.

AlarmProducer 클래스

AlarmProducer 클래스는 Kafka를 사용하여 알림 이벤트를 비동기적으로 처리하기 위해 설계된 프로듀서 클래스이다. 이 클래스의 주요 역할은 알림(AlarmEvent)을 Kafka 브로커로 전송하는 것이다.

package com.example.sns.producer;

import com.example.sns.model.event.AlarmEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmProducer {

    private final KafkaTemplate<Integer, AlarmEvent> kafkaTemplate;

    @Value("${spring.kafka.topic.alarm}")
    private String topic;

    public void send(AlarmEvent event){
        kafkaTemplate.send(topic, event.getReceiveUserId(), event);
        log.info("Send to Kafka finished");
    }
}
  • KafkaTemplate 사용
    • KafkaTemplate<Integer, AlarmEvent>는 Kafka로 데이터를 전송하는 데 사용되는 스프링 제공 템플릿이다. 여기서 Integer는 메시지 키의 타입이고, AlarmEvent는 전송할 이벤트의 타입이다.
    • 이를 통해 AlarmEvent 객체를 특정 Kafka 토픽에 전송할 수 있다.
  • 토픽 설정
    • @Value("${spring.kafka.topic.alarm}") 어노테이션을 사용해 application.yml 파일에 정의된 Kafka 토픽 이름을 주입받는다. 이 토픽은 알림이 전송될 대상 토픽을 지정한다.
  • send 메서드
    • send(AlarmEvent event) 메서드는 AlarmEvent 객체를 Kafka로 전송하는 역할을 한다. 이벤트의 수신자 ID(getReceiveUserId())를 메시지의 키로 사용하여, 특정 수신자에게 관련된 알림 이벤트가 전송될 수 있도록 한다.
    • 메시지를 전송한 후에는 로그를 통해 Kafka로의 전송이 완료되었음을 기록한다.

이 클래스는 Kafka의 비동기 처리 특성을 활용하여 대규모 사용자에게 알림을 효율적으로 전송하는 데 기여하며, 알림 이벤트를 별도의 처리 흐름에서 독립적으로 관리할 수 있도록 한다.

AlarmService 클래스 수정

AlarmService 클래스에서는 알림 데이터를 저장하고, Kafka로 이벤트를 전송한다.

	...
  
	private final AlarmEntityRepository alarmEntityRepository;
	private final UserEntityRepository userEntityRepository;

	public void send(AlarmType type, AlarmArgs arg, Integer receiverUserId) {
        UserEntity user = userEntityRepository.findById(receiverUserId).orElseThrow(() -> new SnsApplicationException(ErrorCode.USER_NOT_FOUND));

        // alarm save
        AlarmEntity alarmEntity = alarmEntityRepository.save(AlarmEntity.of(user, type, arg));
        emitterRepository.get(receiverUserId).ifPresentOrElse(sseEmitter -> {
            try {
                sseEmitter.send(SseEmitter.event().id(alarmEntity.getId().toString()).name(ALARM_NAME).data("new alarm"));
            } catch (IOException e) {
                emitterRepository.delete(receiverUserId);
                throw new SnsApplicationException(ErrorCode.ALARM_CONNECT_ERROR);
            }
        }, () -> log.info("No emitter founded"));
    }
    
    ...
  • 데이터베이스 저장 추가
    수정 전에는 단순히 alarmId를 이용해 알림을 전송하는 역할만 했지만, 수정 후에는 AlarmType, AlarmArgs, receiverUserId를 기반으로 알림 데이터를 데이터베이스에 저장하는 로직이 추가되었다.
    • 이를 위해 AlarmEntityRepository와 UserEntityRepository가 추가되었으며, 사용자의 존재 여부를 확인한 후 알림 데이터를 저장한다.
    • alarmEntityRepository.save()를 통해 AlarmEntity를 생성하고 저장한 후, 저장된 alarmEntity의 ID를 사용해 알림을 전송한다.
  • 사용자 확인 추가
    수정 후 코드에서는 알림을 전송하기 전에 해당 사용자가 존재하는지 확인하는 절차가 추가되었다.
    • UserEntityRepository를 통해 receiverUserId로 사용자를 조회하고, 존재하지 않을 경우 SnsApplicationException을 발생시킨다.
  • 알림 ID에서 alarmEntity ID로 변경
    수정 전에는 단순히 alarmId를 받아서 전송했지만, 수정 후에는 alarmEntity.getId()를 사용해 저장된 알림의 고유 ID를 사용하여 전송한다. 이는 데이터베이스에 저장된 알림과 연동하기 위한 중요한 변경 사항이다.

PostService 클래스 수정

PostService 클래스는 게시물의 생성, 수정, 삭제와 같은 핵심 기능을 제공하며, 이번 수정에서는 알림 기능을 비동기적으로 처리하기 위해 Kafka를 활용한 AlarmProducer로 전환하였다.

   private final AlarmProducer alarmProducer;
    
    @Transactional
    public void like(Integer postId, String userName) {
        ...
        
        // like save
        likeEntityRepository.save(LikeEntity.of(userEntity, postEntity));
        alarmProducer.send(new AlarmEvent(postEntity.getUser().getId(), AlarmType.NEW_LIKE_ON_POST, new AlarmArgs(userEntity.getId(), postEntity.getId())));
    }
	
    ...
    
    @Transactional
    public void comment(Integer postId, String userName, String comment) {
     	...

        // comment save
        commentEntityRepository.save(CommentEntity.of(userEntity, postEntity, comment));
        AlarmEntity alarmEntity = alarmEntityRepository.save(AlarmEntity.of(postEntity.getUser(), AlarmType.NEW_COMMENT_ON_POST, new AlarmArgs(userEntity.getId(), postEntity.getId())));

        alarmProducer.send(new AlarmEvent(postEntity.getUser().getId(), AlarmType.NEW_COMMENT_ON_POST, new AlarmArgs(userEntity.getId(), postEntity.getId())));
    }
  • AlarmService에서 AlarmProducer로 전환
    • 수정 전에는 alarmService.send()를 사용해 알림을 처리했다. 이는 즉시 알림을 전송하는 방식이었다.
    • 수정 후에는 AlarmProducer를 사용해 alarmProducer.send()로 알림 이벤트를 Kafka로 전송하는 방식으로 변경되었다. 이를 통해 알림 처리가 비동기적으로 이루어지며, 더 높은 확장성과 성능을 제공할 수 있다.
  • alarmService.send() 호출 제거
    • 수정 전에는 alarmService.send() 메서드로 직접 알림을 보냈지만, 수정 후에는 alarmProducer.send()로 알림 이벤트를 Kafka로 전송하는 로직으로 바뀌었다. Kafka를 사용함으로써, 알림 전송이 비동기적으로 처리되어 서비스가 더 유연하게 확장 가능해졌다.
  • AlarmProducer.send() 사용
    • AlarmProducer는 Kafka 프로듀서를 통해 알림 이벤트를 Kafka 토픽으로 전송하는 역할을 한다. 이제 알림은 실시간으로 전송되는 대신, Kafka에 의해 이벤트가 비동기적으로 처리된다.
    • AlarmProducer.send()는 AlarmEvent 객체를 생성하여 Kafka로 전송하며, 이벤트에는 알림을 받을 사용자의 ID, 알림 유형(AlarmType), 그리고 알림에 대한 추가 정보(AlarmArgs)가 포함된다.

테스트 결과

포스트맨을 통해 POST 요청으로 댓글을 달면, 카프카(Kafka)가 비동기적으로 데이터를 처리하는 것을 확인할 수 있다.

 

아래 이미지는 포스트맨을 통해 /comments 엔드포인트에 성공적으로 요청을 보낸 화면을 보여준다. 요청이 성공적으로 처리되었음을 의미하는 SUCCESS 응답을 확인할 수 있다.

 

서버 로그를 보면 카프카 프로듀서와 컨슈머가 비동기적으로 데이터를 처리하는 것을 보여준다.

  • 프로듀서 로그
    • "Send to Kafka finished"라는 로그를 통해, 댓글이 달린 후 알림 이벤트가 정상적으로 Kafka 브로커로 전송되었음을 확인할 수 있다.
  • 컨슈머 로그
    • "Consume the event: AlarmEvent"라는 로그를 통해, Kafka 브로커로부터 알림 이벤트가 정상적으로 소비되었고, 이벤트가 컨슈머에서 처리되었음을 확인할 수 있다.

아래 이미지는 터미널에서 Kafka 컨슈머 콘솔을 이용해 alarm 토픽의 메시지를 확인하는 화면이다.

 

  • 명령어는 alarm 토픽에 저장된 메시지를 처음부터(--from-beginning) 읽어오며, Kafka 브로커는 localhost:9092에서 동작하고 있다.
  • 메시지 내용은 특정 사용자가 게시물에 댓글을 남긴 후, 이를 알리는 알림 이벤트 정보가 포함되어 있다.

이번 포스팅에서는 Kafka를 이용한 비동기 데이터 처리 과정을 살펴보았다. Kafka의 Producer와 Consumer를 통해 대용량 데이터를 효율적으로 주고받으며, 시스템의 확장성을 높이는 방법을 이해할 수 있었다.