개발놀이터

멀티스레드 환경에서의 스프링 배치 : Parallel Steps 본문

Spring/Spring Batch

멀티스레드 환경에서의 스프링 배치 : Parallel Steps

마늘냄새폴폴 2022. 10. 23. 23:30

본 포스팅은 인프런의 정수원님의 스프링 배치 강의를 듣고 정리한 포스팅입니다. 더 자세한 내용은 강의를 참고해주세요. 

 

 

Parallel Steps

기본 개념

  • SplitState 를 사용해서 여러 개의 Flow 들을 병렬적으로 실행하는 구조
  • 실행이 다 완료된 후 FlowExecutionStatus 결과들을 취합해서 다음 단계를 결정한다. 

 

구조

Job이 Flow를 통해 SplitState를 만들어서 TaskExecutor를 통해 스레드를 생성하고 각각의 FutureTask를 통해 멀티테스킹을 유도합니다. 

 

앞선 포스팅에서 AsyncItemProcessor, AsyncItemWriter와 Multi-threaded Step 처럼 Step을 멀티스레드로 처리하는 것이 아닌 Flow를 멀티스레드로 처리하고 싶을 때 사용할 수 있습니다. 

 

이 구조를 좀 더 자세히 보면 다음과 같습니다. 

 

이 때 눈여겨 볼 점은 앞선 멀티스레드 환경에서의 배치잡들처럼 각각의 스레드간 데이터 공유가 일어나지 않아 Thread-safe 하다는 점입니다. 

 

또한, 각각의 SimpleFlow들의 결괏값으로 FlowExecutionStatus가 나오고 이를 모아 최종 실행 결과에 따라 다음 Step으로 이동할지 안할지를 결정합니다. 

 

 

API를 확인해보면 start를 통해 Flow를 시작합니다. 그 이후 split메서드를 이용해 TaskExecutor를 주입합니다. 이렇게 스레드를 만들고 flow를 추가합니다. 마지막으로 next 메서드를 통해 마지막 Flow를 실행시킵니다. 

 

순서로 나타내면 다음과 같습니다. 

 

  1. Flow 1을 생성한다. 
  2. Flow 2와 Flow 3 을 생성하고 총 3개의 Flow를 합친다. - TaskExecutor 에서 Flow 개수만큼 스레드를 생성해서 각 Flow를 실행시킨다. 
  3. Flow 4는 split 처리가 완료된 후 실행이 된다. 

 

실행

우선 실행할 Job입니다. FlowJob.java

@Configuration
@RequiredArgsConstructor
public class FlowJob {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job flowJob() {
        return jobBuilderFactory.get("flowJob")
                .start(flow1())
                .next(flow2())
//                .split(taskExecutor()).add(flow2())
                .end()
                .build();
    }

    @Bean
    public Flow flow1() {
        TaskletStep step1 = stepBuilderFactory.get("step1")
                .tasklet(tasklet())
                .build();

        TaskletStep step2 = stepBuilderFactory.get("step2")
                .tasklet(tasklet())
                .build();

        return new FlowBuilder<Flow>("flow1")
                .start(step1)
                .next(step2)
                .build();
    }

    @Bean
    public Flow flow2() {
        TaskletStep step3 = stepBuilderFactory.get("step3")
                .tasklet(tasklet())
                .build();

        return new FlowBuilder<Flow>("flow2")
                .start(step3)
                .build();
    }

    @Bean
    public Tasklet tasklet() {
        return new CustomTasklet();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(4);
        taskExecutor.setMaxPoolSize(8);
        taskExecutor.setThreadNamePrefix("async-thread");
        return taskExecutor;
    }
}

 

tasklet입니다. CustomTasklet.java

@Slf4j
public class CustomTasklet implements Tasklet {

    private long sum;

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        for (int i = 0; i < 1000000000; i++) {
            sum++;
        }

        log.info("sum : {}", sum);
        log.info("Tread Name : {}", Thread.currentThread().getName());

        return RepeatStatus.FINISHED;
    }
}

 

우선 Job에서 주석처리 한 부분은 비교를 위해 주석처리를 한 것입니다. 첫 번째는 멀티스레드를 적용하지 않은 단일 스레드 환경에서의 Job이고 두 번째 실행은 멀티스레드를 적용한 Job입니다. 

 

@SpringBootTest
public class FlowJobTest {

    @Autowired
    private JobExplorer jobExplorer;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private FlowJob config;

    @Test
    @DisplayName("Parallel Steps Test")
    public void test() throws Exception {
        jobLauncher.run(
                config.flowJob(), new JobParametersBuilder(jobExplorer)
                        .addString("requestDate", LocalDateTime.now().toString())
                        .toJobParameters()
        );
    }
}

테스트 설정은 이렇게 했습니다. 

 

1차 테스트

 

1차 테스트는 단일 스레드로 만든 Job입니다. 1.226초가 걸렸네요.

 

스레드 이름도 메인스레드이고 sum 값도 잘 찍히는 모습입니다. tasklet이 세번 실행됐으니 30억이 찍히는게 맞는 결과입니다. 

 

 

2차 테스트

 

솔직히 엄청 놀랐습니다. 왜 시간이 더 걸리는거지..? 내가 테스트를 잘못했나? 몇번을 살펴보고 테스트 해봤는데 결과는 동일했습니다. 

 

심지어 더 큰일인점은 

멀티 스레드로 잘 적용 되었는데 멀티스레드의 문제점 중 하나인 동시성 문제가 발생했습니다. 

 

sum을 10억번 더하는 과정에서 10억번이 다 완료되지 않은 상태에서 두 번째 스레드가 sum에 접근해서 생기는 문제입니다. 

 

일단 결과는 굉장히 실망이었습니다. 멀티스레드는 성능 향상을 위해 사용하는 방법 중 하나인데 오히려 성능이 떨어져버리니... 거기다 단일스레드라면 신경쓰지 않아도 되는 동시성 문제도 발생했습니다. 

 

이렇게 Parallel Steps는 구립니다! 하고 포스팅을 마치면 안되겠죠 동시성 문제정도는 해결하고 마치도록 하겠습니다. 

 

 

동시성 문제

갑자기 동시성 문제 포스팅으로 바뀐것 같지만... 동시성 문제에 대해서는 예~전에 포스팅 한 적이 있습니다. 

 

https://coding-review.tistory.com/92

 

동시성문제와 스레드 로컬

이 포스팅은 인프런 김영한 님의 스프링 핵심 원리 고급 편을 보고 각색한 포스팅입니다. 자세한 내용은 강의를 확인해주세요 이번 시간에는 스프링을 사용할 때 주의할 점과 해결방법인 스레

coding-review.tistory.com

 

위의 포스팅에서는 동시성 문제를 해결하는 방안 중 하나로 Thread-Local을 선택했습니다. 

 

자바에서 동시성 문제를 해결하는 방법은 여러가지지만 가장 대표적인 방법은 synchronized 키워드를 적는 것입니다. 

 

위의 예제에서 synchronized 를 사용하는 방법은 다음과 같습니다. 

 

@Slf4j
public class CustomTasklet implements Tasklet {

    private long sum;
    private Object lock = new Object();

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

        synchronized (lock) {
            for (long i = 0; i < 1000000000; i++) {
                sum++;
            }
            log.info("sum : {}", sum);
            log.info("Tread Name : {}", Thread.currentThread().getName());
        }

        return RepeatStatus.FINISHED;
    }
}

 

이렇게 사용하면 동시성 문제가 해결됩니다. 실제로 이렇게 코드를 작성하고 테스트를 돌려보면 동시성 문제가 해결되는 것을 볼 수 있습니다. 

 

하지만 synchronized 키워드는 효과는 확실하지만 성능적인 부분에서 아쉬운 부분이 있습니다. 왜냐하면 lock 이라는게 말그대로 해당 메서드 혹은 해당 코드에 lock을 걸어버려서 synchronized 가 붙은 부분이 끝나기 전까지 다른 스레드가 접근할 수 없기 때문입니다. 

 

이 때문에 사용되는 것이 바로 스레드 로컬 (Thread-Local) 입니다. 

 

스레드 로컬의 자세한 동작방식에 대해서 알고 싶으신 분들은 위의 링크를 타고 들어가시면 자세히 적어놨으니 확인해주시면 감사하겠습니다. 

 

스레드 로컬의 사용방법은 어렵지 않습니다. 

 

@Slf4j
public class CustomTasklet implements Tasklet {

    private ThreadLocal<Long> sumStore = new ThreadLocal<>();
    private long sum;

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        sumStore.set(sum);
        Long threadLocalSum = sumStore.get();

        for (long i = 0; i < 1000000000; i++) {
            threadLocalSum++;
        }

        sumStore.remove();

        log.info("sum : {}", threadLocalSum);
        log.info("Tread Name : {}", Thread.currentThread().getName());

        return RepeatStatus.FINISHED;
    }
}

이렇게 스레드 로컬을 사용하면 동시성 문제를 해결할 수 있습니다. 

 

다만 이렇게 사용하면 한가지 문제가 생기는데..

 

동시성 문제를 너무 잘 해결해버려서 우리가 원했던 10억 20억 30억이 차례대로 찍히는 것은 할 수 없었습니다. 

 

우리가 원하는 결과를 얻으려면 ExecutionContext를 사용해야 할 것 같습니다. 

 

 

정리

이렇게 Parallel Steps에 대해서 알아봤습니다. 솔직히 결과는 만족스럽지 못했습니다. 멀티스레드를 활용해서 성능 향상을 기대했는데 기대하는 수치는 나오지 않았습니다. 

 

왜 이런 결과가 나오게 됐는지 알 수 없지만 사용하는 방법부터 일반적인 방법이 아니기 때문에 대중적으로 사용되지는 않을 것 같습니다. 

 

동시성 문제에 대해서 총 정리하는 느낌으로 포스팅을 계획중입니다. 스프링 배치 포스팅이 전체적으로 마무리가 되면 동시성 문제를 해결하는 다양한 방법으로 찾아뵙도록 하겠습니다. 

 

긴 글 읽어주셔서 감사합니다.