개발놀이터

스프링 배치를 이용해서 휴면 계정을 관리해보자 본문

Spring/Spring Batch

스프링 배치를 이용해서 휴면 계정을 관리해보자

마늘냄새폴폴 2022. 8. 25. 04:44

스프링 배치에 대한 전반적인 내용은 아래의 포스팅을 참고해주세요. 때문에 스프링 배치에서 사용되는 단어에 대한 설명은 생략합니다.

 

https://coding-review.tistory.com/172

 

스프링 부트 배치 개념

대략 10만명의 회원을 거느리는 웹 서비스를 운영한다고 가정했을 때 우린 매일마다 회원들의 상태변화를 감지하고 운용할 수 있어야 합니다. 가령 오늘까지 우리 서비스에 접속하지 않은지 1년

coding-review.tistory.com

 

스프링 배치를 이용해서 휴면 계정을 관리해 보겠습니다.

 

사용한 기술 스택

-Spring Boot 2.7.2

-Spring Data JPA

 

우선 Entity를 만들어 보겠습니다. 

 

코드에 대해서 하나씩 설명하면서 진행하도록 하겠습니다.

 

Member.java

package com.capston.chatting.entity;

import com.capston.chatting.enums.MemberRole;
import com.capston.chatting.enums.MemberStatus;
import com.capston.chatting.enums.OTPApplyStatus;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import javax.persistence.*;
import java.time.LocalDateTime;

@Entity
@Getter
@NoArgsConstructor
public class Member extends BaseTime{

    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "member_id")
    private Long id;

    private String loginId;
    private String loginPw;
    private String name;

    private int score;

    @Enumerated(EnumType.STRING)
    private MemberRole role;

    @Enumerated(EnumType.STRING)
    private MemberStatus status;

    @Setter
    private LocalDateTime updateDate;

    public Member(String loginId, String loginPw, String name, int score, MemberRole role, MemberStatus status, LocalDateTime updateDate) {
        this.loginId = loginId;
        this.loginPw = loginPw;
        this.name = name;
        this.score = score;
        this.role = role;
        this.status = status;
        this.updateDate = updateDate;
    }

    public Member setInactive() {
        this.status = MemberStatus.INACTIVE;
        return this;
    }
}

 

BaseTime을 상속하고 있는데 이 부분에 대해서는 

https://coding-review.tistory.com/173

 

@CreatedDate, @LastModifiedDate

데이터를 저장할 때 '생성된 시간 정보'와 '수정된 시간 정보'는 여러모로 많이 사용되고 또 중요합니다. JPA를 사용하면서 @CreatedDate, @LastModifiedDate를 사용하여 생성된 시간 정보, 수정된 시간 정

coding-review.tistory.com

해당 포스팅에 설명해놓았으니 위의 포스팅을 참고해주세요

 

Entity에 대한 설명은 굳이 안해도 될 것 같습니다. 맨 아래 setInactive 메서드는 1년이 지나서도 활동하지 않는 회원에 대해 MemberStatus를 ACTIVE에서 INACTIVE로 변경하기 위해서 만든 메서드입니다. 

 

MemberStatus는 enum타입으로 ACTIVE, INACTIVE 두개의 값을 가지고 있습니다. 

 

 

이제 가장 중요한 배치에 관련된 코드를 보겠습니다. 

 

Job

    @Bean
    public Job inactiveMemberJob() {
        log.info("InactiveMemberJob execution");
        return jobBuilderFactory.get("inactiveMemberJob")
                .start(inactiveJobStep())
                .preventRestart()
                .build();
    }

우선 가장 먼저 보이는건 로그네요 Job이 실행됐을 때 실행됐다는 것을 보여주기 위한 로그입니다.

 

JobBuilderFactory.get()

다음은 JobBuilderFactory에서 .get은 Job의 이름을 지정해주고 앞으로 Job을 실행할 때 이 이름으로 쓰겠다~를 지정해주는 문법입니다. 

 

start()

해당 Step을 사용하겠다는 의미입니다. Job은 여러개의 Step을 가질 수 있기 때문에 .next를 이용해서 다음 Step을 설정해줄 수 있습니다. 

 

preventRestart()

이 설정을 넣은 Job은 재시작을 막습니다. 재시작을 막는다는 의미는 항상 새로운 Job을 실행한다는 의미입니다. 

 

build()

Job을 실행하라는 의미입니다. 별다른 의미는 없습니다. 

 

 

Step

다음은 Step입니다. 

    @Bean
    public Step inactiveJobStep() {
        log.info("InactiveMemberStep execution");
        return stepBuilderFactory.get("inactiveMemberStep")
                .<Member, Member>chunk(10)
                .reader(inactiveMemberReader())
                .processor(inactiveMemberProcessor())
                .writer(inactiveMemberWriter())
                .build();
    }

로그는 무시하고 바로 넘어가겠습니다. 

 

stepBuilderFactory.get()

StepBuilderFactory에서 StepBuilder를 만들고 Step을 생성하는데 이름을 inactiveMemberStep으로 하겠다. 앞으로 이 Step은 저 이름으로 사용하겠다. 라는 의미입니다. 

 

<Member, Member> chunk(10)

앞선 포스팅을 보셨다면 chunk에 대해서 바로 이해하실겁니다. 하지만 앞선 포스팅을 보지 않으신 분들을 위해 간단하게 설명해드리겠습니다. 

 

chunk는 간단하게 설명해서 트랜잭션 커밋을 이루는 단위입니다. 

 

너무 대충 설명했나요? 

 

트랜잭션은 트랜잭션의 특징(ACID) 중에 A에 해당하는 Atom 즉 원자성을 보장해야 합니다. 예를 들어서 10건의 데이터가 있으면 5건까지는 잘 되다가 6건째부터 갑자기 예외가 터져서 프로그램이 다운됐다. 그럼 5건의 데이터만 완료되고 나머지 5건의 데이터는 완료되지 않은 상황이 벌어집니다. 이건 꽤 큰일이죠

 

엥 그게 왜? 라고 하신다면 아직 트랜잭션에 대해 감이 없으신것입니다. 

 

예를 들어서 10명의 사원에게 월급을 보내는 로직이라고 가정해봅시다. 자 사원 1부터 순서대로 10까지 300만원의 월급이 들어갑니다. 근데 5명의 월급은 정상적으로 들어갔는데 6번째에서 예외가 터져서 나머지 5명은 월급이 안들어갔습니다. 이때 우리는 어떻게 해야할까요?

 

1. 월급을 처음부터 다시 준다. 

이 방법은 잘못됐습니다. 그렇게 되면 처음 받았던 5명은 월급을 또 받게 되니까요.

 

2. 못준 사람에게 월급을 준다. 

이 방법도 좋은 방법은 아닐겁니다. 프로그램이 용빼는 재주가 있는 것도 아니고 받았는지 안받았는지 어떤 수로 알까요? 

 

3. 월급 줬던 사람것을 다시 돌려받고 처음부터 다시 준다. 

이 방법이 가장 현실적인 방법일 것입니다. 

 

따라서 트랜잭션은 위의 세가지 방법 중에서 3번을 채택하고 있습니다.

 

자 이제 우리는 트랜잭션의 원자성에 대해서 간단하게 알아봤습니다. 그럼 chunk가 트랜잭션의 원자성을 보장하기 위한 단위라는 것도 아시겠죠. 

 

하지만 chunk는 단순히 원자성을 보장해주는 정도의 수준이 아닙니다. chunk의 매력에 대해서 좀 더 설명해보겠습니다. 그리고 이 매력을 알게 되면 왜 스프링 배치가 대용량 데이터를 관리하는데에 특화됐다고 하는지 아실겁니다. 

 

자 위의 예제는 큰 문제가 있습니다. 만약 10건이 아니라 100만건이라면 어떨까요? 100만건이라면 두가지 문제가 있습니다. 

 

1. 100만건을 모두 메모리에 올린다

JVM이 버텨줄지 모르겠네요...

 

2. 중간에 끊기는 경우

100만건을 다 메모리에 올렸다고 가정해봅시다. 50만건까지는 잘 이행됐습니다. 하지만 50만1번째에서 예외가 터졌습니다. 그럼 50만건의 성공을 전부 롤백하고 다시 처음부터 시작해야합니다. 이는 다시 1번 문제로 돌아갑니다. 

 

때문에 우리는 chunk를 이용해서 100만건이라면 chunk 사이즈를 1만건으로 잡아두고 50만건이 쭉쭉 성공하고 51만번째에서 예외가 터졌다면 chunk 사이즈인 1만건에 대해서만 롤백을 하면 됩니다. 이는 더 효율적이죠

 

때문에 위의 코드는 Processor에서 사용할 데이터 건수가 10건이라는 것을 의미합니다. 앞에 있는 <Member, Member>인 이유도 아시겠죠? Processor에서 사용할 Input과 Output이기 때문입니다. 

 

reader(), processor(), writer()

이건 간단해보입니다. reader로 inactiveMemberReader()를, processor로 inactiveMemberProcessor를, writer로 inactiveMemberWriter를 사용하겠다는 의미입니다. 

 

 

Reader

    @Bean
    public JpaCursorItemReader<Member> jpaCursorItemReader() {
        Map<String, Object> map = new HashMap<>();
        map.put("updateDate", LocalDateTime.now().minusYears(1));
        map.put("status", MemberStatus.ACTIVE);

        return new JpaCursorItemReaderBuilder<Member>()
                .name("jpaCursorItemReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("select m from Member m where m.updateDate <= :updateDate and m.status = :status")
                .parameterValues(map)
                .build();
    }

저는 JPA를 사용하고 있기 때문에 JpaCursorItemReader를 사용했습니다. ItemReader는 다양한 구현체를 갖고있으니 필요한 Reader를 사용하면 좋을 것 같습니다.

 

name()

해당 reader의 이름은 이것이다! 하고 선언해주는 메서드입니다. 

 

entityManagerFactory()

JPQL을 실행하기 위한 EntityManagerFactory를 주입받아서 사용하면 됩니다. 

 

queryString()

우리가 실행할 쿼리입니다. 해당 쿼리는 JPQL로 넘어가서 실행됩니다.

 

parameterValues()

JPQL의 각종 조건을 담아두는 메서드입니다. Map에 담아서 넘겨주면 자동으로 값을 꺼내와서 매칭시킵니다.

 

 

Processor

    @Bean
    public ItemProcessor<Member, Member> inactiveMemberProcessor() {
        return new ItemProcessor<Member, Member>() {
            @Override
            public Member process(Member member) throws Exception {
                log.info("InactiveMemberProcessor execution");
                return member.setInactive();
            }
        };
    }

Input과 Output을 정의하는 제네릭에 Member를 넣습니다. 우리는 Member에서 Member로 이동할거니까요.

 

이제 Entity에서 setInactive의 리턴타입을 Member로 했는지 이해되실겁니다. Processor에서 거쳐간 Member가 Writer에서도 사용되어야 하기 때문이죠

 

현재 저는 ItemProcessor를 Override해서 리턴하고 있습니다. 이 방법도 물론 좋은 방법이지만 ItemProcessor 인터페이스를 직접 까보면 메서드가 하나밖에 없는것을 알 수 있습니다. 

 

메서드가 하나밖에 없다는 뜻은? 해당 메서드를 오버라이딩할 때 람다를 사용할 수 있다는 얘기입니다. 따라서 위의 코드는 이렇게 변경될 수 있습니다.

 

    @Bean
    public ItemProcessor<Member, Member> inactiveMemberProcessor() {
        return member -> {
            log.info("InactiveMemberProcessor execution");
            return member.setInactive();
        };
    }

이렇게 하는게 좀 더 깔끔해보이겠죠? 하지만 람다에 익숙하지 않으신 분은 굳이 저렇게까지 쓰지 않아도 됩니다. 솔직히 저도 얼마전까지만 해도 람다의 사용에 대해 거부감을 느끼고 있었거든요. 람다가 가독성을 올려준다고 하는데 람다의 어디가 가독성을 올려준다는 것인지 이해하지 못했거든요. 지금은 아주 잘 사용하고 있지만 저도 처음엔 람다를 사용하지 않았습니다. 

 

아무튼 람다가 편하신 분들은 위의 방법을 추천드립니다. 

 

 

Writer

@Bean
public ItemWriter<Member> inactiveMemberWriter() {
    log.info("InactiveMemberWriter execution");
    return ((List<? extends Member> members) -> {
        memberRepository.saveAll(members);
    });
}

이제 Processor에서 가공한 데이터를 가지고 직접 데이터베이스에 넣어주는 작업인 ItemWriter입니다. Spring Data JPA에서 제공해주는 기본적인 메서드인 saveAll을 사용해서 데이터를 넣어주는 모습입니다. 

 

 

전체적인 코드는 다음과 같습니다. 

 

package com.capston.chatting.config.batch;

import com.capston.chatting.entity.Member;
import com.capston.chatting.enums.MemberStatus;
import com.capston.chatting.repository.MemberRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.List;

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveMemberJob {

    private final MemberRepository memberRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job inactiveMemberJob() {
        log.info("InactiveMemberJob execution");
        return jobBuilderFactory.get("inactiveMemberJob")
                .start(inactiveJobStep())
                .preventRestart()
                .build();
    }

    @Bean
    public Step inactiveJobStep() {
        log.info("InactiveMemberStep execution");
        return stepBuilderFactory.get("inactiveMemberStep")
                .<Member, Member>chunk(10)
                .reader(inactiveMemberReader())
                .processor(inactiveMemberProcessor())
                .writer(inactiveMemberWriter())
                .build();
    }

    @Bean
    public JpaCursorItemReader<Member> jpaCursorItemReader() {
        return new JpaCursorItemReaderBuilder<Member>()
                .name("jpaCursorItemReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("select m from Member m")
                .build();
    }

    @Bean
    public ItemProcessor<Member, Member> inactiveMemberProcessor() {
        ItemProcessor<Member, Member> memberItemProcessor = (member) -> {
            log.info("InactiveMemberProcessor execution");
            return member.setInactive();
        };
        return memberItemProcessor;
//        return member -> {
//            log.info("InactiveMemberProcessor 작동");
//            return member.setInactive();
//        };
    }

    @Bean
    public ItemWriter<Member> inactiveMemberWriter() {
        log.info("InactiveMemberWriter execution");
        return ((List<? extends Member> members) -> {
            memberRepository.saveAll(members);
        });
    }
}

 

이제 실행해보면 잘 실행되는 것을 알 수 있습니다. 

 

하지만 한가지 궁금합니다. Reader -> Processor -> Writer로 이어지는 이 하나의 로테이션에서 스프링 배치는 어떻게 데이터를 이동시킬까요? 우리가 평소에 사용하는 방법은 메서드안에 파라미터를 넣어서 데이터를 전달합니다. 근데 배치에서는 아무데도 파라미터가 안보이는데요? 

 

그런 의문을 가진 당신! 정말 예리하시군요. 코드를 그냥 주어진대로 보는 것이 아니라 그런 의문을 품는다는 것 자체가 높게 평가할만 합니다. 

 

해당 질문을 여기서 설명하기엔 지금도 스크롤 압박이 살짝 있으니 관련해서 스택오버플로우에 올려져있는 질문을 링크해드리겠습니다. 

 

https://stackoverflow.com/questions/48092171/how-data-is-passed-from-reader-to-processor-and-to-writer/48694672#48694672

 

How data is passed from Reader to Processor and to Writer

I am new to spring batch. I want to understand how the data is passed from Reader to Processor and from processor to Writer? So basically in Reader we will be having read() method which will return...

stackoverflow.com

 

이제 포스팅은 마치겠습니다. 긴 글 읽어주시느라 고생 많으셨습니다. 다음 포스팅엔 스프링 배치 + 스케줄러를 이용해서 특정 시간이 되면 배치를 실행시키는 작업을 진행해보도록 하겠습니다. 감사합니다. 

 

 

CF)

근데 잠시만요!

 

Job, Step, Reader, Writer 의 로그는 다 찍혔는데 Processor의 로그는 안찍혔는데요? 라고 하시는 분들! 

 

혹시 저같은 분이 있을까봐 알려드립니다. 해당 질문은 저도 답답해서 직접 올렸던 스택 오버플로우 질문의 링크를 첨부해 드리겠습니다. 

 

https://stackoverflow.com/questions/73434102/spring-batch-return-is-not-working-so-logs-and-another-code-does-not-working

 

Spring Batch return is not working so logs and another code does not working

I was working Spring Batch for inactive accounts. But the return type doesn't work. In inactiveMemberProcessor log.info("InactiveMemberProcessor execution") doesn't work. Not only the log...

stackoverflow.com

 

영어가 약하신 분들을 위해 간단하게 설명해드리자면 

 

According to the log message Step already complete or not restartable, so no action to execute, your step does not seem to be executed. So probably that is the reason you are not seeing your log message.

 

You can set allowStartIfComplete to true on your step and that should work.

 

로그 메세지에 따르면 Step은 이미 완료되었거나 재시작할 수 없는 상태입니다. 그래서 아무런 실행에 대한 액션이 없는 것이지요. 당신의 Step은 실행자체가 된것 같지 않습니다. 그러니 아마 이러한 이유때문에 로그가 찍히지 않은 것 같은데요. Step의 속성중에 allowStartIfComplete를 true 로 줘보시겠어요? 

 

라는 뜻입니다. 

 

따라서 저 답변대로 Step에 있는 속성중에 allowStartIfComplete를 true로 주면 됩니다. 

    @Bean
    public Step inactiveJobStep() {
        log.info("InactiveMemberStep execution");
        return stepBuilderFactory.get("inactiveMemberStep")
                .<Member, Member>chunk(10)
                .reader(inactiveMemberReader())
                .processor(inactiveMemberProcessor())
                .writer(inactiveMemberWriter())
                .allowStartIfComplete(true)
                .build();
    }