본문 바로가기
BackEnd/Project

[PT Manager] Ch03. Batch 수업 종료 후 이용권 차감

by 개발 Blog 2024. 8. 28.

공부 내용을 정리하고 앞으로의 학습에 이해를 돕기 위해 작성합니다.

지난 시간에 이어 이번에는 멀티스레드로 배치 작업을 수행하는 방법을 더 살펴본다. 이번 작업의 목적은 수업이 종료된 이후, 해당 수업에 대해 이용권을 자동으로 차감하는 것이다.

 

수업 종료 후 이용권 차감 기능

수업이 종료된 데이터를 기반으로, 예약된 수업(booking)과 그에 연결된 이용권(pass) 데이터를 조회하여, 남은 횟수를 차감하고 업데이트하는 작업을 수행한다. 이 과정에서 AsyncItemProcessor와 AsyncItemWriter를 사용해 성능을 최적화한다.

 

동작 방식

  • UsePassesReader: 예약 데이터를 읽어온다. 종료된 수업이며, 이용권 차감이 아직 이루어지지 않은 데이터를 대상으로 한다.
  • AsyncItemProcessor: ItemProcessor에 별도의 스레드를 할당해 비동기로 작업을 처리한다. 복잡한 계산이 포함된 경우 이 방식으로 성능을 향상시킬 수 있다.
  • AsyncItemWriter: 비동기 처리된 데이터를 실제로 저장하는 단계로, 최종적으로 이용권 차감과 사용 여부 업데이트 작업을 수행한다.

BookingRepository

BookingRepository는 예약된 수업 데이터에 접근하고, 수업 종료 후 이용권의 사용 여부를 업데이트하는 역할을 담당한다.

package com.example.pass.repository.booking;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;

import javax.transaction.Transactional;

public interface BookingRepository extends JpaRepository<BookingEntity, Integer> {
    @Transactional
    @Modifying
    @Query(value = "UPDATE BookingEntity b" +
            "          SET b.usedPass = :usedPass," +
            "              b.modifiedAt = CURRENT_TIMESTAMP" +
            "        WHERE b.passSeq = :passSeq")
    int updateUsedPass(Integer passSeq, boolean usedPass);
}
  • updateUsedPass(Integer passSeq, boolean usedPass)
    • 이 메서드는 예약된 수업에서 해당 이용권의 사용 상태를 업데이트한다. usedPass가 true로 설정되면 해당 이용권이 사용된 것으로 표시된다.
    • 배치 작업에서 예약된 수업의 이용권 사용 여부를 최종적으로 반영할 때 사용된다. 배치 작업이 성공적으로 완료되면, 해당 수업의 예약 상태가 업데이트된다.

PassRepository

PassRepository는 이용권(pass) 데이터를 관리하며, 이용권의 남은 횟수를 업데이트하는 역할을 한다.

package com.example.pass.repository.pass;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;

import javax.transaction.Transactional;

public interface PassRepository extends JpaRepository<PassEntity, Integer> {
    @Transactional
    @Modifying
    @Query(value = "UPDATE PassEntity p" +
            "          SET p.remainingCount = :remainingCount," +
            "              p.modifiedAt = CURRENT_TIMESTAMP" +
            "        WHERE p.passSeq = :passSeq")
    int updateRemainingCount(Integer passSeq, Integer remainingCount);
}
  • updateRemainingCount(Integer passSeq, Integer remainingCount)
    • 이용권이 정상적으로 차감되었는지 확인하며, 잔여 횟수를 정확하게 관리하는 데 사용된다. 
    • 이 메서드는 특정 이용권의 남은 횟수를 차감하여 업데이트한다. 차감이 완료되면, 데이터베이스에 즉시 반영된다.

UsePassesJobConfig

수업 종료 후 이용권 차감을 자동으로 처리하는 배치 작업을 구성한다. 이 작업은 예약된 수업 데이터를 읽어와 이용권의 남은 횟수를 차감하고, 해당 데이터를 업데이트하는 과정을 포함한다. 이 배치 작업은 멀티스레드 환경에서 성능을 최적화하기 위해 AsyncItemProcessor와 AsyncItemWriter를 사용한다.

package com.example.pass.job.pass;

import com.example.pass.repository.booking.BookingEntity;
import com.example.pass.repository.booking.BookingRepository;
import com.example.pass.repository.booking.BookingStatus;
import com.example.pass.repository.pass.PassEntity;
import com.example.pass.repository.pass.PassRepository;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaCursorItemReader;
import org.springframework.batch.item.database.builder.JpaCursorItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import javax.persistence.EntityManagerFactory;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.Future;

@Configuration
public class UsePassesJobConfig {
    private final int CHUNK_SIZE = 10;

    // @EnableBatchProcessing로 인해 Bean으로 제공된 JobBuilderFactory, StepBuilderFactory
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;
    private final PassRepository passRepository;
    private final BookingRepository bookingRepository;

    public UsePassesJobConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, EntityManagerFactory entityManagerFactory, PassRepository passRepository, BookingRepository bookingRepository) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.entityManagerFactory = entityManagerFactory;
        this.passRepository = passRepository;
        this.bookingRepository = bookingRepository;
    }

    @Bean
    public Job usePassesJob() {
        return this.jobBuilderFactory.get("usePassesJob")
                .start(usePassesStep())
                .build();
    }

    @Bean
    public Step usePassesStep() {
        return this.stepBuilderFactory.get("usePassesStep")
                .<BookingEntity, Future<BookingEntity>>chunk(CHUNK_SIZE)
                .reader(usePassesItemReader())
                .processor(usePassesAsyncItemProcessor())
                .writer(usePassesAsyncItemWriter())
                .build();

    }

    @Bean
    public JpaCursorItemReader<BookingEntity> usePassesItemReader() {
        return new JpaCursorItemReaderBuilder<BookingEntity>()
                .name("usePassesItemReader")
                .entityManagerFactory(entityManagerFactory)
                // 상태(status)가 완료이며, 종료 일시(endedAt)이 과거인 예약이 이용권 차감 대상이 됩니다.
                .queryString("select b from BookingEntity b join fetch b.passEntity where b.status = :status and b.usedPass = false and b.endedAt < :endedAt")
                .parameterValues(Map.of("status", BookingStatus.COMPLETED, "endedAt", LocalDateTime.now()))
                .build();
    }

    // 이 프로젝트에서는 적합하지 않지만, ItemProcessor의 수행이 오래걸려 병목이 생기는 경우에 AsyncItemProcessor, AsyncItemWriter를 사용하면 성능을 향상시킬 수 있습니다.
    @Bean
    public AsyncItemProcessor<BookingEntity, BookingEntity> usePassesAsyncItemProcessor() {
        AsyncItemProcessor<BookingEntity, BookingEntity> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(usePassesItemProcessor()); // usePassesItemProcessor로 위임하고 결과를 Future에 저장합니다.
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return asyncItemProcessor;

    }

    @Bean
    public ItemProcessor<BookingEntity, BookingEntity> usePassesItemProcessor() {
        return bookingEntity -> {
            // 이용권 잔여 횟수는 차감합니다.
            PassEntity passEntity = bookingEntity.getPassEntity();
            passEntity.setRemainingCount(passEntity.getRemainingCount() - 1);
            bookingEntity.setPassEntity(passEntity);

            // 이용권 사용 여부를 업데이트합니다.
            bookingEntity.setUsedPass(true);
            return bookingEntity;

        };
    }
    @Bean
    public AsyncItemWriter<BookingEntity> usePassesAsyncItemWriter() {
        AsyncItemWriter<BookingEntity> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(usePassesItemWriter()); // usePassesItemWriter 최종 결과값을 넘겨주고 작업을 위임합니다.
        return asyncItemWriter;
    }

    @Bean
    public ItemWriter<BookingEntity> usePassesItemWriter() {
        return bookingEntities -> {
            for (BookingEntity bookingEntity : bookingEntities) {
                // 잔여 횟수를 업데이트 합니다.
                int updatedCount = passRepository.updateRemainingCount(bookingEntity.getPassSeq(), bookingEntity.getPassEntity().getRemainingCount());
                // 잔여 횟수가 업데이트 완료되면, 이용권 사용 여부를 업데이트합니다.
                if (updatedCount > 0) {
                    bookingRepository.updateUsedPass(bookingEntity.getPassSeq(), bookingEntity.isUsedPass());
                }
            }
        };
    }
}

Job 설정 (usePassesJob)

  • 이 부분은 배치 작업의 전체 흐름을 정의한다. usePassesJob은 하나의 스텝(usePassesStep)으로 구성되어 있다. 이 작업은 종료된 수업을 읽고, 이용권을 차감한 뒤 업데이트하는 단일 프로세스를 수행한다.

Step 설정 (usePassesStep)

이 스텝은 배치 프로세스의 핵심이다. chunk(CHUNK_SIZE)는 데이터 처리 단위를 설정하며, 이 예제에서는 10개씩 데이터를 처리한다. 각 chunk는 다음 세 가지 작업을 순차적으로 수행한다:

  • Reader: 예약 데이터를 읽어온다 (usePassesItemReader).
  • Processor: 비동기로 남은 이용권 횟수를 차감하고 사용 여부를 업데이트한다 (usePassesAsyncItemProcessor).
  • Writer: 비동기 처리된 결과를 데이터베이스에 저장한다 (usePassesAsyncItemWriter).

ItemReader 설정 (usePassesItemReader)

  • JpaCursorItemReader를 사용하여 예약 데이터를 읽어온다. 이 리더는 JPA 쿼리를 통해 상태가 "완료"이고 종료 시간이 현재 시점보다 이전인 예약 데이터를 가져온다.
  • usedPass가 false인 데이터를 조회하여, 아직 이용권이 차감되지 않은 데이터를 대상으로 한다.

AsyncItemProcessor 설정 (usePassesAsyncItemProcessor)

  • 이 부분은 비동기로 데이터 처리를 수행한다. AsyncItemProcessor는 내부적으로 usePassesItemProcessor에 작업을 위임하고, 비동기로 결과를 처리한다.
  • SimpleAsyncTaskExecutor를 사용해 독립적인 스레드를 할당하여 성능을 높인다.

ItemProcessor 설정 (usePassesItemProcessor)

  • ItemProcessor는 예약된 수업 데이터를 처리하여 이용권의 남은 횟수를 차감한다.
  • 먼저 이용권의 잔여 횟수를 하나 줄이고, 예약 정보에서 usedPass 상태를 true로 설정하여 해당 이용권이 사용되었음을 나타낸다.

AsyncItemWriter 설정 (usePassesAsyncItemWriter)

  • AsyncItemWriter는 비동기로 처리된 결과를 데이터베이스에 저장한다.
  • usePassesItemWriter에 작업을 위임하여 최종적으로 데이터를 저장하며, 비동기 환경에서도 효율적인 처리가 가능하다.

ItemWriter 설정 (usePassesItemWriter)

  • 이 최종 단계에서는 처리된 예약 데이터를 데이터베이스에 저장한다. 먼저 passRepository를 통해 남은 횟수를 차감한 후, bookingRepository를 통해 이용권의 사용 여부를 업데이트한다.
  • 두 단계로 나누어 처리하여 데이터의 일관성을 유지한다.

아이템 프로세서에서 병목 현상이 발생할 경우, 멀티스레드로 작업을 분산 처리하여 성능을 크게 향상시킬 수 있다. 이번 구현에서는 AsyncItemProcessor와 AsyncItemWriter를 활용해 병렬 처리가 가능하도록 설계했다. 이를 통해 대량의 데이터를 효율적으로 처리하고, 전체 배치 작업의 성능을 극대화할 수 있었다.