개발놀이터

스프링 배치 도메인 이해 : ExecutionContext 본문

Spring/Spring Batch

스프링 배치 도메인 이해 : ExecutionContext

마늘냄새폴폴 2022. 9. 23. 21:16

본 포스팅은 인프런의 정수원님의 스프링 배치 강의를 듣고 정리한 포스팅입니다. 더 자세한 내용은 강의를 참고해주세요. 

 

 

ExecutionContext

기본 개념

  • 프레임워크에서 유지 및 관리하는 키/값으로 된 컬렉션으로 StepExecution 또는 JobExecution 객체의 상태를 저장하는 공유 객체
  • DB에 직렬화 한 값으로 저장됨 - { "key" : "value" }
  • 공유 범위
    • Step 범위 - 각 Step의 StepExecution에 저장되며 Step간 서로 공유 안됨
    • Job 범위 - 각 Job의 JobExecution에 저장되며 Job간 서로 공유 안되며 해당 Job의 Step간 서로 공유됨
  • Job 재 시작시 이미 처리한 Row 데이터는 건너뛰고 이후로 수행하도록 할 때 상태 정보를 활용한다. 

 

JobInstance나 JobParameter, JobExecution, StepExecution은 어느정도는 직관적이라고 생각했는데... ExecutionContext는 뭐냐?

 

ExecutionContext는 스프링 배치에서의 세션입니다.

 

우리는 세션을 어떠한 경우에 사용하는지 복기해보면 됩니다. 

 

세션은 페이지에서 페이지간 이동이 긴 경우 꽤 오랜 시간동안 데이터를 잡아두고 있어야 하는 경우에 사용합니다. 

 

ExecutionContext는 이렇게 배치 내에서 데이터를 유지하고 싶은 경우에 사용합니다. ExecutionContext를 사용할 수 있는 방법은 두가지입니다. tasklet에서 사용하는 방법과 chunk 지향처리에서 사용하는 방법이죠.

 

tasklet에서 사용하는 방법에 대해서 우선 알아보겠습니다.

 

tasklet 에서의 ExecutionContext

@Component
public class ExecutionContextTasklet implements Tasklet {
	
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
    	
        ExecutionContext jobExecutionContext = chunkContext.getStepContext().getstepExecution().getJobExecution().getExecutionContext();
        ExecutionContext stepExecutionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
        
        jobExecutionContext.put("name", "name123");
        stepExecutionContext.put("age", 25);
        
        return RepeatStatus.FINISHED;
    }
}

 

이렇게 선언한 뒤에는 다른 tasklet에서 

 

ExecutionContext jobExecutionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
ExecutionContext stepExecutionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();

jobExecutionContext.get("name");
stepExecutionContext.get("age");

 

이런식으로 뽑아 쓸 수 있습니다. 

 

그런데 왜 ExecutionContext를 두개를 만들었을까요? 둘이 꺼내는 방식도 다르네요.

 

그 이유는 JobExecution에서 꺼낸 ExecutionContext와 StepExecution에서 꺼낸 ExecutionContext는 같지만 다른 방식으로 작동합니다. 그 이유는 JobExecutionContext의 경우 Commit 시점에 저장되는 반면 StepExecutionContext는 실행 사이에 저장이 되기 때문입니다. 

 

이 내용은 위에서도 언급되어있습니다. 적용 범위가 서로 다르다는 것을요. StepExecutionContext는 Step간 공유가 안됩니다. 다만 JobExecutionContext는 Step간 공유가 됩니다. 따라서 세션처럼 사용하고 싶다면 JobExecutionContext를 사용하는 것이 옳겠습니다. 

 

 

chunk 지향 처리에서의 ExecutionContext

chunk 지향 처리에서의 ExecutionContext를 사용하는 방법은 조금 귀찮긴 합니다. 하지만 Step간 데이터를 보존해야 하는 경우에는 한번쯤 고려해볼만 합니다. 

 

우선 ExecutionContext를 사용할 수 있는 super 클래스를 만들어야 합니다. 

 

SuperStepExecution.java

package io.sprintbatch.springbatchlecture;

import lombok.Getter;
import lombok.Setter;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ExecutionContext;

@Getter
@Setter
public class SuperStepExecution<T> {

    private StepExecution stepExecution;

    protected void putData(String key, T data) {
        if (this.stepExecution == null) {
            throw new NullPointerException("StepExecution is null");
        }

        ExecutionContext jobContext = stepExecution.getJobExecution().getExecutionContext();
        jobContext.put(key, data);
    }

    protected Object getData(String key) {
        if (this.stepExecution == null) {
            throw new NullPointerException("StepExecution is null");
        }

        ExecutionContext jobContext = stepExecution.getJobExecution().getExecutionContext();

        return jobContext.get(key);
    }
}

 

그리고 chunk 지향 처리에 따른 ItemReader, ItemProcessor, ItemWriter에 대한 구현 클래스를 만들어야 합니다. 제가 이번에 사용한 예제에서는 Step1에서 ItemProcessor에서 ExecutionContext에 값을 넣어주고 Step2의 ItemReader에서 값을 꺼내는 형태로 작업했습니다. 

 

먼저 Step1의 ItemProcessor를 구현한 MemberProcessor클래스입니다. 

 

package io.sprintbatch.springbatchlecture.processor;

import io.sprintbatch.springbatchlecture.SuperStepExecution;
import io.sprintbatch.springbatchlecture.entity.Member;
import io.sprintbatch.springbatchlecture.service.MemberService;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Component
@StepScope
@RequiredArgsConstructor
public class MemberProcessor extends SuperStepExecution<String> implements ItemProcessor<Member, Member> {

    private final MemberService memberService;

    @Override
    public Member process(Member member) throws Exception {
        return memberService.setName(member);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        super.setStepExecution(stepExecution);
        super.putData("name", "user1");
    }
}

 

그 다음은 Step2의 ItemReader 구현체인 MemberReader클래스입니다.

 

package io.sprintbatch.springbatchlecture.reader;

import io.sprintbatch.springbatchlecture.SuperStepExecution;
import io.sprintbatch.springbatchlecture.entity.Member;
import io.sprintbatch.springbatchlecture.repository.MemberRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@StepScope
@Slf4j
@RequiredArgsConstructor
public class MemberReader extends SuperStepExecution<Member> implements ItemReader<Member> {

    private boolean isRead;
    private final MemberRepository memberRepository;

    @PostConstruct
    public void init() {
        isRead = false;
    }

    @Override
    public Member read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        Object name = super.getData("name");
        log.info("name : {}", name);
        return memberRepository.findById(0L).orElse(null);
    }

    @BeforeStep
    public void retrieveInterStepData(StepExecution stepExecution) {
        super.setStepExecution(stepExecution);
    }
}

 

chunk 지향 처리에 대한 구현체에서 주의하셔야 할 점은 두가지입니다. 바로 @Component와 @StepScope입니다. 이 두개를 반드시 붙여주셔야 ExecutionContext를 제대로 사용할 수 있습니다. 마치 JobParameter를 이용할 때 @Bean 과 @StepScope, @JobScope 를 붙이는 것과 비슷합니다. 

 

하지만 @StepScope는 ItemReader, ItemProcessor, ItemWriter에 붙이는 어노테이션이고 @JobScope는 Step에 붙이는 어노테이션 이라는 차이점이 있습니다. 

 

 

이제 구조적으로 분석해보도록 하겠습니다. 우선 각각의 구현 클래스에서는 @BeforeStep 어노테이션을 통해 Step이 시작되기 전 StepExecution을 주입받습니다. 

 

그리고 주입받은 StepExecution을 바탕으로 각각의 값을 저장하거나 가져올 수 있습니다. 

 

이렇게 저장한 뒤 Job에서 의존성 주입으로 해당 구현체들을 주입한 후 Job을 실행해보면 다음과 같이 로그가 찍히는 것을 확인할 수 있습니다. 

 

 

이로써 각기 다른 Step에서 데이터를 보존할 수 있었습니다. 

 

 

JobExecutionListener, StepExecutionListener를 이용한 (SpEL을 이용한) ExecutionContext

우선 LIstener를 설정해줍니다. Listener에 대해서는 추후에 포스팅 하겠지만 지금은 그냥 Job이나 Step이 시작하기 전과 끝난 후에 작업할 수 있는 용도로 쓰인다고만 알아주셔도 충분합니다. 

 

Listener를 설정하는 방법은 다음과 같습니다. 

 

먼저 Listener 클래스를 만듭니다. 

package io.sprintbatch.springbatchlecture.listener;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;

public class CustomStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        stepExecution.getJobExecution().getExecutionContext().put("name2", "user2");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }
}

JobExecutionListener를 만들던 StepExecutionListener를 만들던 사용방법은 똑같습니다. 편하신 것으로 만들어주시면 됩니다. 

 

위와 같이 클래스를 만들어주고 Step이 있는 곳으로 갑니다. 그리고 다음과 같이 설정해줍니다. 

 

이 다음 @StepScope, @JobScope와 @Bean을 붙여줍니다. @StepScope는 ItemReader, ItemProcessor, ItemWriter, tasklet에 붙이는 어노테이션이고 @JobScope는 Step에 붙이는 어노테이션입니다. 

 

이렇게 설정하면 끝입니다. 이 후에 Job을 실행해보면?

 

위와 같이 로그가 찍히는 것을 볼 수 있습니다. 

 

 

이렇게 ExecutionContext에 대해서 알아봤습니다. 흠... 이건 써먹을곳이 있을것 같으면서도 안쓸것 같으면서도 아리송하네요.

 

긴 글 읽어주셔서 감사합니다. 다음에는 JobLauncher, JobRepository에 대해서 포스팅 해보겠습니다. 감사합니다. 

 

Reference

https://wckhg89.github.io/archivers/springbatch1

 

스프링 배치(스프링 Boot 기반)삽질기 1탄 - Step간 데이터 공유 « 구피개발일기

회사에서 스프링 배치를 이용한 배치성 프로그램을 제작을 하게 되었습니다. 스프링배치를 많이 다뤄본 경험이 부족하여 이번 프로젝트를 진행하며 많은 삽질을 경험했는데, 삽질 내용을 남기

wckhg89.github.io