공부 내용을 정리하고 앞으로의 학습에 이해를 돕기 위해 작성합니다.
지난 시간에 이어 이번에는 멀티스레드로 배치 작업을 수행하는 방법을 더 살펴본다. 이번 작업의 목적은 수업이 종료된 이후, 해당 수업에 대해 이용권을 자동으로 차감하는 것이다.
수업 종료 후 이용권 차감 기능
수업이 종료된 데이터를 기반으로, 예약된 수업(booking)과 그에 연결된 이용권(pass) 데이터를 조회하여, 남은 횟수를 차감하고 업데이트하는 작업을 수행한다. 이 과정에서 AsyncItemProcessor와 AsyncItemWriter를 사용해 성능을 최적화한다.
동작 방식
- UsePassesReader: 예약 데이터를 읽어온다. 종료된 수업이며, 이용권 차감이 아직 이루어지지 않은 데이터를 대상으로 한다.
- AsyncItemProcessor: ItemProcessor에 별도의 스레드를 할당해 비동기로 작업을 처리한다. 복잡한 계산이 포함된 경우 이 방식으로 성능을 향상시킬 수 있다.
- AsyncItemWriter: 비동기 처리된 데이터를 실제로 저장하는 단계로, 최종적으로 이용권 차감과 사용 여부 업데이트 작업을 수행한다.
BookingRepository
BookingRepository는 예약된 수업 데이터에 접근하고, 수업 종료 후 이용권의 사용 여부를 업데이트하는 역할을 담당한다.
package com.example.pass.repository.booking;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import javax.transaction.Transactional;
public interface BookingRepository extends JpaRepository<BookingEntity, Integer> {
@Transactional
@Modifying
@Query(value = "UPDATE BookingEntity b" +
" SET b.usedPass = :usedPass," +
" b.modifiedAt = CURRENT_TIMESTAMP" +
" WHERE b.passSeq = :passSeq")
int updateUsedPass(Integer passSeq, boolean usedPass);
}
- updateUsedPass(Integer passSeq, boolean usedPass)
- 이 메서드는 예약된 수업에서 해당 이용권의 사용 상태를 업데이트한다. usedPass가 true로 설정되면 해당 이용권이 사용된 것으로 표시된다.
- 배치 작업에서 예약된 수업의 이용권 사용 여부를 최종적으로 반영할 때 사용된다. 배치 작업이 성공적으로 완료되면, 해당 수업의 예약 상태가 업데이트된다.
PassRepository
PassRepository는 이용권(pass) 데이터를 관리하며, 이용권의 남은 횟수를 업데이트하는 역할을 한다.
package com.example.pass.repository.pass;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import javax.transaction.Transactional;
public interface PassRepository extends JpaRepository<PassEntity, Integer> {
@Transactional
@Modifying
@Query(value = "UPDATE PassEntity p" +
" SET p.remainingCount = :remainingCount," +
" p.modifiedAt = CURRENT_TIMESTAMP" +
" WHERE p.passSeq = :passSeq")
int updateRemainingCount(Integer passSeq, Integer remainingCount);
}
- updateRemainingCount(Integer passSeq, Integer remainingCount)
- 이용권이 정상적으로 차감되었는지 확인하며, 잔여 횟수를 정확하게 관리하는 데 사용된다.
- 이 메서드는 특정 이용권의 남은 횟수를 차감하여 업데이트한다. 차감이 완료되면, 데이터베이스에 즉시 반영된다.
UsePassesJobConfig
수업 종료 후 이용권 차감을 자동으로 처리하는 배치 작업을 구성한다. 이 작업은 예약된 수업 데이터를 읽어와 이용권의 남은 횟수를 차감하고, 해당 데이터를 업데이트하는 과정을 포함한다. 이 배치 작업은 멀티스레드 환경에서 성능을 최적화하기 위해 AsyncItemProcessor와 AsyncItemWriter를 사용한다.
package com.example.pass.job.pass;
import com.example.pass.repository.booking.BookingEntity;
import com.example.pass.repository.booking.BookingRepository;
import com.example.pass.repository.booking.BookingStatus;
import com.example.pass.repository.pass.PassEntity;
import com.example.pass.repository.pass.PassRepository;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaCursorItemReader;
import org.springframework.batch.item.database.builder.JpaCursorItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import javax.persistence.EntityManagerFactory;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.Future;
@Configuration
public class UsePassesJobConfig {
private final int CHUNK_SIZE = 10;
// @EnableBatchProcessing로 인해 Bean으로 제공된 JobBuilderFactory, StepBuilderFactory
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private final PassRepository passRepository;
private final BookingRepository bookingRepository;
public UsePassesJobConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, EntityManagerFactory entityManagerFactory, PassRepository passRepository, BookingRepository bookingRepository) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.entityManagerFactory = entityManagerFactory;
this.passRepository = passRepository;
this.bookingRepository = bookingRepository;
}
@Bean
public Job usePassesJob() {
return this.jobBuilderFactory.get("usePassesJob")
.start(usePassesStep())
.build();
}
@Bean
public Step usePassesStep() {
return this.stepBuilderFactory.get("usePassesStep")
.<BookingEntity, Future<BookingEntity>>chunk(CHUNK_SIZE)
.reader(usePassesItemReader())
.processor(usePassesAsyncItemProcessor())
.writer(usePassesAsyncItemWriter())
.build();
}
@Bean
public JpaCursorItemReader<BookingEntity> usePassesItemReader() {
return new JpaCursorItemReaderBuilder<BookingEntity>()
.name("usePassesItemReader")
.entityManagerFactory(entityManagerFactory)
// 상태(status)가 완료이며, 종료 일시(endedAt)이 과거인 예약이 이용권 차감 대상이 됩니다.
.queryString("select b from BookingEntity b join fetch b.passEntity where b.status = :status and b.usedPass = false and b.endedAt < :endedAt")
.parameterValues(Map.of("status", BookingStatus.COMPLETED, "endedAt", LocalDateTime.now()))
.build();
}
// 이 프로젝트에서는 적합하지 않지만, ItemProcessor의 수행이 오래걸려 병목이 생기는 경우에 AsyncItemProcessor, AsyncItemWriter를 사용하면 성능을 향상시킬 수 있습니다.
@Bean
public AsyncItemProcessor<BookingEntity, BookingEntity> usePassesAsyncItemProcessor() {
AsyncItemProcessor<BookingEntity, BookingEntity> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(usePassesItemProcessor()); // usePassesItemProcessor로 위임하고 결과를 Future에 저장합니다.
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemProcessor<BookingEntity, BookingEntity> usePassesItemProcessor() {
return bookingEntity -> {
// 이용권 잔여 횟수는 차감합니다.
PassEntity passEntity = bookingEntity.getPassEntity();
passEntity.setRemainingCount(passEntity.getRemainingCount() - 1);
bookingEntity.setPassEntity(passEntity);
// 이용권 사용 여부를 업데이트합니다.
bookingEntity.setUsedPass(true);
return bookingEntity;
};
}
@Bean
public AsyncItemWriter<BookingEntity> usePassesAsyncItemWriter() {
AsyncItemWriter<BookingEntity> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(usePassesItemWriter()); // usePassesItemWriter 최종 결과값을 넘겨주고 작업을 위임합니다.
return asyncItemWriter;
}
@Bean
public ItemWriter<BookingEntity> usePassesItemWriter() {
return bookingEntities -> {
for (BookingEntity bookingEntity : bookingEntities) {
// 잔여 횟수를 업데이트 합니다.
int updatedCount = passRepository.updateRemainingCount(bookingEntity.getPassSeq(), bookingEntity.getPassEntity().getRemainingCount());
// 잔여 횟수가 업데이트 완료되면, 이용권 사용 여부를 업데이트합니다.
if (updatedCount > 0) {
bookingRepository.updateUsedPass(bookingEntity.getPassSeq(), bookingEntity.isUsedPass());
}
}
};
}
}
Job 설정 (usePassesJob)
- 이 부분은 배치 작업의 전체 흐름을 정의한다. usePassesJob은 하나의 스텝(usePassesStep)으로 구성되어 있다. 이 작업은 종료된 수업을 읽고, 이용권을 차감한 뒤 업데이트하는 단일 프로세스를 수행한다.
Step 설정 (usePassesStep)
이 스텝은 배치 프로세스의 핵심이다. chunk(CHUNK_SIZE)는 데이터 처리 단위를 설정하며, 이 예제에서는 10개씩 데이터를 처리한다. 각 chunk는 다음 세 가지 작업을 순차적으로 수행한다:
- Reader: 예약 데이터를 읽어온다 (usePassesItemReader).
- Processor: 비동기로 남은 이용권 횟수를 차감하고 사용 여부를 업데이트한다 (usePassesAsyncItemProcessor).
- Writer: 비동기 처리된 결과를 데이터베이스에 저장한다 (usePassesAsyncItemWriter).
ItemReader 설정 (usePassesItemReader)
- JpaCursorItemReader를 사용하여 예약 데이터를 읽어온다. 이 리더는 JPA 쿼리를 통해 상태가 "완료"이고 종료 시간이 현재 시점보다 이전인 예약 데이터를 가져온다.
- usedPass가 false인 데이터를 조회하여, 아직 이용권이 차감되지 않은 데이터를 대상으로 한다.
AsyncItemProcessor 설정 (usePassesAsyncItemProcessor)
- 이 부분은 비동기로 데이터 처리를 수행한다. AsyncItemProcessor는 내부적으로 usePassesItemProcessor에 작업을 위임하고, 비동기로 결과를 처리한다.
- SimpleAsyncTaskExecutor를 사용해 독립적인 스레드를 할당하여 성능을 높인다.
ItemProcessor 설정 (usePassesItemProcessor)
- ItemProcessor는 예약된 수업 데이터를 처리하여 이용권의 남은 횟수를 차감한다.
- 먼저 이용권의 잔여 횟수를 하나 줄이고, 예약 정보에서 usedPass 상태를 true로 설정하여 해당 이용권이 사용되었음을 나타낸다.
AsyncItemWriter 설정 (usePassesAsyncItemWriter)
- AsyncItemWriter는 비동기로 처리된 결과를 데이터베이스에 저장한다.
- usePassesItemWriter에 작업을 위임하여 최종적으로 데이터를 저장하며, 비동기 환경에서도 효율적인 처리가 가능하다.
ItemWriter 설정 (usePassesItemWriter)
- 이 최종 단계에서는 처리된 예약 데이터를 데이터베이스에 저장한다. 먼저 passRepository를 통해 남은 횟수를 차감한 후, bookingRepository를 통해 이용권의 사용 여부를 업데이트한다.
- 두 단계로 나누어 처리하여 데이터의 일관성을 유지한다.
아이템 프로세서에서 병목 현상이 발생할 경우, 멀티스레드로 작업을 분산 처리하여 성능을 크게 향상시킬 수 있다. 이번 구현에서는 AsyncItemProcessor와 AsyncItemWriter를 활용해 병렬 처리가 가능하도록 설계했다. 이를 통해 대량의 데이터를 효율적으로 처리하고, 전체 배치 작업의 성능을 극대화할 수 있었다.
'BackEnd > Project' 카테고리의 다른 글
[PT Manager] Ch03. Web 프로젝트 생성 및 git 설정 (0) | 2024.08.28 |
---|---|
[PT Manager] Ch03. Batch 통계 데이터 생성 (0) | 2024.08.28 |
[PT Manager] Ch03. Batch 예약된 수업 전 알람(외부 채널 알람 연동) (0) | 2024.08.28 |
[PT Manager] Ch03. Batch 이용권 일괄 지급 (0) | 2024.08.27 |
[PT Manager] Ch03. Batch 이용 기간에 따른 만료 (0) | 2024.08.27 |