개발놀이터

멀티 스레드 환경에서의 스프링 배치 : AsyncItemProcessor/AsyncItemWriter 본문

Spring/Spring Batch

멀티 스레드 환경에서의 스프링 배치 : AsyncItemProcessor/AsyncItemWriter

마늘냄새폴폴 2022. 10. 12. 20:40

본 포스팅은 인프런의 정수원님의 스프링 배치 강의를 듣고 정리한 포스팅입니다. 더 자세한 내용은 강의를 참고해주세요. 

 

 

AsyncItemProcessor/AsyncItemWriter

기본 개념

  • Step 안에서 ItemProcessor 가 비동기적으로 동작하는 구조
  • AsyncItemProcessor와 AsyncItemWriter가 함께 구성이 되어야 함
  • AsyncItemProcessor 로부터 AsyncItemWriter 가 받는 최종 결괏값은 List<Future<T>> 타입이며 비동기 실행이 완료될 때까지 대기한다. 
  • spring-batch-integration 의존성이 필요하다. 

 

AsyncItemProcessor는 작업 처리를 ItemProcessor에게 맡기고 ItemProcessor는 Async한 작업과정을 거치는 것입니다. 

 

ItemReader는 데이터를 AsyncItemProcessor에 전달하고 AsyncItemProcessor는 ItemProcessor에 작업을 위임합니다. 위임할 때 별도의 스레드로 위임하고 결괏값을 List<Future> 타입으로 받아옵니다.

 

List<Future>에는 Item이 들어있고 이 Item을 기반으로 AsyncItemWriter가 ItemWriter에게 작업을 위임해서 작업을 마무리 짓습니다. 

 

 

구조

AsyncItemProcessor에는 실제 prcess 를 수행하는 delegate라는 객체가 있고 Thread를 할당해주는 TaskExecutor도 있습니다. 

 

내부적으로는 Task 로서 Callable을 실행시키고 결과를 FutureTask로 받았습니다. 그리고 이 task 라는 하나의 작업을 TaskExecutor를 이용해 execute 하는 모습입니다. 

 

이 구조를 하나의 Job의 관점에서 보기 편하게 그림으로 살펴보도록 하겠습니다.

 

 

Job은 TaskletStep을 실행하고 TaskletStep의 구현체인 ChunkOrientedTasklet이 실행됩니다. 이 때 순차적으로 ItemReader, ItemProcessor, ItemWriter가 실행됩니다.

 

ItemReader에서 값을 가져와서 Chunk<I>에 담아 ItemProcessor 에 넘기는데 ItemProcessor가 아닌 AsyncItemPrcessor가 실행되고 내부적으로 TaskExecutor는 스레드를 생성해 ItemPrcessor를 실행하고 반환값으로 FutureTask를 받습니다. 

 

이후 ItemWriter 대신 AsyncItemWriter가 실행되고 AsynsItemProcessor에서 받아온 Future<T>를 List로 받아 내부적으로 아이템들을 List에 넣어 DB에 작업을 끝마칩니다. 

 

이 때 ItemWriter가 write하는 과정에서 비동기 실행 결과 값들을 모두 받아오기 전까지 대기(wait())하고 있다가 ItemProcessor의 모든 과정이 끝나면 그 때 작업을 끝냅니다. 

 

 

사용 방법

사용 방법은 아주 간단합니다. 

 

@Configuration
@RequiredArgsConstructor
public class JpaWriterJob {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory emf;

    private static final int chunk = 1000;

    @Bean
    public Job jpaJob() {
        return jobBuilderFactory.get("jpaJob")
                .start(jpaStep())
                .build();
    }

    @Bean
    public Step jpaStep() {
        return stepBuilderFactory.get("jpaStep")
                .<Member, Member>chunk(chunk)
                .reader(jpaReader())
                .processor(asyncItemProcessor())
                .writer(asyncItemWriter())
                .build();
    }

    @Bean
    public JpaPagingItemReader<Member> jpaReader() {
        return new JpaPagingItemReaderBuilder<Member>()
                .entityManagerFactory(emf)
                .pageSize(chunk)
                .name("jpaReader")
                .queryString("select m from Member m")
                .build();
    }

    @Bean
    public AsyncItemProcessor asyncItemProcessor() {
        AsyncItemProcessor<Member, Member> asyncItemProcessor = new AsyncItemProcessor<>();

        asyncItemProcessor.setDelegate(jpaProcessor());
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());

        return asyncItemProcessor;
    }

    @Bean
    public AsyncItemWriter asyncItemWriter() {
        AsyncItemWriter<Member> asyncItemWriter = new AsyncItemWriter<>();

        asyncItemWriter.setDelegate(jpaWriter());

        return asyncItemWriter;
    }

    @Bean
    public ItemProcessor<Member, Member> jpaProcessor() {
        return member -> {
            Thread.sleep(30);
            return member.setName(member.getName() + "test");
        };
    }

    @Bean
    public JpaItemWriter<Member> jpaWriter() {
        return new JpaItemWriterBuilder<Member>()
                .entityManagerFactory(emf)
                .build();
    }
}

 

기존에 만들던대로 ItemProcessor와 ItemWriter를 만들어주고 AsyncItemProcessor, AsyncItemWriter를 빈으로 등록하는 작업을 끝내면 완성입니다. 

 

눈여겨 볼 점이라면 setDelegate를 통해 ItemProcessor, ItemWriter를 주입받는다는 점과 AsyncItemProcessor에서 TaskExecutor를 SimpleAsyncTaskExecutor로 주입받는다는 점입니다. 

 

그리고 확연한 성능 차이를 보기 위해 jpaProcessor() 메서드의 process() 작업에 0.03초의 텀을 주었습니다. 

 

실제 같았으면 동기적 실행이기 때문에 0.03초의 딜레이가 큰 영향을 줍니다. 실제로 테스트 해보고 싶으시다면 새로운 Step을 만들고 그 Step의 processor, writer에 AsyncItemProcessor가 아닌 일반 ItemProcessor를 넣어보면 됩니다. 

 

 

정리

이번 포스팅에선 멀티 스레드 환경에서의 스프링 배치 그 첫 번째 시간 AsyncItemProcessor, AsyncItemWriter에 대해서 알아봤습니다. 

 

이번 AsyncItemProcessor, AsyncItemWriter는 여러번 테스트 해본 결과 Reader, Writer의 성능을 커버해주진 못했습니다. 

 

즉, 직접적인 성능 차이가 있는 JpaItemWriter와 JdbcBatchItemWriter 두개 중 JpaItemWriter는 멀티 스레드로 돌렸음에도 여전히 성능상  많이 뒤쳐지는 것을 확인했습니다. 

 

10만건의 데이터를 기준으로 JpaItemWriter는 평균 34~35초 정도 걸렸는데 멀티 스레드를 사용하더라도 대충 평균 정도로 나왔습니다. 멀티 스레드로 붙여도 능사가 아니라는 말이겠죠. 

 

아마 제 생각엔 멀티 스레드가 강력한 효과가 발휘하는 때는 바로 process 단계에서 오래 걸리는 작업인 경우인 것 같습니다. 

 

예를 들어 SMTP를 이용해 이메일을 보낸다고 생각해보면 이메일 하나 보내는데 1~2초정도 걸리기 때문에 데이터가 많아지면 엄청난 차이가 보일 것으로 생각됩니다. 이 때 멀티 스레드로 붙여주면 꽤나 큰 성능 향상이 있을 것으로 예측됩니다.