개발놀이터

Chunk 지향 처리 : ItemWriter (심화) 본문

Spring/Spring Batch

Chunk 지향 처리 : ItemWriter (심화)

마늘냄새폴폴 2022. 10. 11. 02:39

본 포스팅은 인프런의 정수원님의 스프링 배치 강의를 듣고 정리한 포스팅입니다. 더 자세한 내용은 강의를 참고해주세요. 

 

 

ItemReader와 마찬가지로 ItemWriter 또한 Flat File이나 XML, json과 같이 데이터를 저장할 수 있지만 분량 관계상 DB와 관련된 JdbcItemWriter, JpaItemWriter 이 두가지에 대해서만 포스팅 할 예정입니다. 이 부분 참고해서 봐주시면 감사하겠습니다. 

 

 

JdbcBatchItemWriter

기본 개념

  • JdbcCursorItemReader 설정과 마찬가지로 DataSource 를 지정하고 SQL 속성에 실행할 쿼리를 설정
  • JDBC의 Batch 기능을 사용하여 bulk insert/update/delete 방식으로 처리
  • 단건 처리가 아닌 일괄 처리이기 때문에 성능에 이점을 가진다. 

 

JdbcBatchItemWriter의 핵심적인 내용은 bulk로 처리한다는 것입니다. bulk로 처리한다는게 무엇일까요? 

 

JPA를 공부해보신 분들이라면 어떤 내용인지 아시겠지만 그렇지 않은 분들을 위해 간단하게 설명하고 넘어가도록 하겠습니다. 

 

기존의 insert/update/delete 처리방식은 어떤 방식인지 먼저 알아야합니다. 

 

기존의 방식은 예를 들어 1000건의 데이터가 있다고 가정했을 때 한건 한건씩 데이터를 처리했습니다. 그렇게 처리할 때마다 DB Connection에 연결을 해야하고 트랜잭션 처리도 있을 것입니다. 

 

이렇게 1000개를 반복하다 보면 엄청난 리소스 낭비일 것입니다. 

 

bulk 연산은 Chunk Size 만큼 일종의 버퍼를 두고 한번에 insert/update/delete 처리를 한다는 개념으로 받아들이시면 될 것 같습니다. 

 

때문에 리소스 낭비가 적고 위에서 언급한대로 성능상 이점을 가지고 갈 수 있는 것입니다.

 

API

 

동작 방식

  1. 우선 Step 과정에서 List로 items 들을 받아오게 될 것입니다. 이렇게 받아온 List들은 JdbcBatchItemWriter에 전달됩니다. 
  2. 넘어오는 items 의 타입이 Map인지 아닌지를 그 다음에 판별하게 됩니다. 만약 Pojo 기반이라면 beanMapped() 메서드를 통해 BeanPropertyItemSqlParameterSourceProvider (좀 기네요 ㅎㅎ..) 를 사용하고 Map 기반이라면 columnMapped() 메서드를 통해 ColumnMapItemPreparedStatementSetter 를 사용합니다. 
  3. 둘 중 하나의 클래스로 SQL문을 받아서 JdbcTemplate이 Batch 처리를 일괄 처리합니다. 

 

 

JpaItemWriter

기본 개념

  • JPA Entity 기반으로 데이터를 처리하며 EntityManagerFactory를 주입받아 사용한다. 
  • Entity 를 하나씩 chunk 크기 만큼 insert 혹은 merge 한 다음 flush 한다.
  • ItemReader 나 ItemProcessor로 부터 아이템을 전달 받을 때는 Entity 클래스 타입으로 받아야 한다. 

 

세 번째 내용이 무슨 내용인지 한번 알아보도록 하겠습니다.

 

JdbcBatchItemWriter와 다르게 JpaItemWriter는 Map이나 Pojo 클래스로 데이터를 저장할 수 없다는 의미입니다. 

 

그렇다는 얘기는 ItemReader로 값을 받아와서 ItemProcessor로 적당히 Entity 클래스로 변환해주어야 ItemWriter에서 처리할 수 있다는 뜻이겠지요. 

 

그럼 왜 Entity로 받아와야 할까요?

 

그 이유는 EntityManager가 Item 을 Persist나 merge로 바로 insert 하기 때문입니다. 따라서 ItemWriter에는 반드시 Entity클래스가 넘어와야 합니다. 

 

 

JdbcBatchItemWriter와 다른 점 중 하나는 JpaItemWriter는 bulk 처리가 안된다는 것을 알 수 있습니다. 따라서 위에서 설명드렸다시피 성능상 큰 단점을 가지고 있습니다. 

 

둘의 성능 차이에 대해서는 포스팅 마지막에 테스트를 통해 수치상으로 확인해보도록 하겠습니다. 

 

API

 

동작 방식

  1. Step이 Item을 List 형태로 ItemWriter에 전달합니다. 
  2. 그럼 JpaItemWriter는 EntityManager를 통해 값을 저장합니다. 이 때 usePersist 값이 true 면 persist를 false라면 merge를 수행합니다. (기본 값은 true 입니다.)
  3. 모든 작업이 완료되면 DB에 데이터들을 밀어 넣습니다. 

 

 

성능 테스트 JdbcBatchItemWriter VS JpaItemWriter

둘의 차이에 대해 성능 테스트를 한번 해보도록 하겠습니다. 

 

우선 사용한 데이터 접근 방식은 Spring Data JPA를 사용했음을 미리 알려드리겠습니다. 

 

먼저 Entity 클래스입니다. 

@Getter
@Setter
@Entity
@NoArgsConstructor
public class Member {

    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "member_id")
    private Long id;

    private String name;
    private int age;

    public Member(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public Member setName(String name) {
        this.name = name;
        return this;
    }
}

 

 

JdbcBatchItemWriter를 사용한 Job 입니다. JdbcWriterJob.java

@Configuration
@RequiredArgsConstructor
public class JdbcWriterJob {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final EntityManagerFactory emf;

    private static final int chunk = 1000;

    @Bean
    public Job jdbcJob() {
        return jobBuilderFactory.get("jdbcJob")
                .start(jdbcStep())
                .build();
    }

    @Bean
    public Step jdbcStep() {
        return stepBuilderFactory.get("jdbcStep")
                .<Member, Member>chunk(chunk)
                .reader(jdbcReader())
                .processor(jdbcProcessor())
                .writer(jdbcWriter())
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public JpaPagingItemReader<Member> jdbcReader() {
        return new JpaPagingItemReaderBuilder<Member>()
                .pageSize(chunk)
                .entityManagerFactory(emf)
                .queryString("select m from Member m")
                .name("jdbcReader")
                .build();
    }

    @Bean
    public ItemProcessor<Member, Member> jdbcProcessor() {
        return member -> {
            return member.setName(member.getName() + "test");
        };
    }

    @Bean
    public JdbcBatchItemWriter<Member> jdbcWriter() {
        return new JdbcBatchItemWriterBuilder<Member>()
                .dataSource(dataSource)
                .sql("update member set name = :name, age = :age where id = :id")
                .build();
    }
}

 

JpaItemWriter를 사용한 Job입니다. JpaWriterJob.java

@Configuration
@RequiredArgsConstructor
public class JpaWriterJob {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory emf;

    private static final int chunk = 1000;

    @Bean
    public Job jpaJob() {
        return jobBuilderFactory.get("jpaJob")
                .start(jpaStep())
                .build();
    }

    @Bean
    public Step jpaStep() {
        return stepBuilderFactory.get("jpaStep")
                .<Member, Member>chunk(chunk)
                .reader(jpaReader())
                .processor(jpaProcessor())
                .writer(jpaWriter())
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public JpaPagingItemReader<Member> jpaReader() {
        return new JpaPagingItemReaderBuilder<Member>()
                .entityManagerFactory(emf)
                .pageSize(chunk)
                .name("jpaReader")
                .queryString("select m from Member m")
                .build();
    }

    @Bean
    public ItemProcessor<Member, Member> jpaProcessor() {
        return member -> {
            return member.setName(member.getName() + "test");
        };
    }

    @Bean
    public JpaItemWriter<Member> jpaWriter() {
        return new JpaItemWriterBuilder<Member>()
                .entityManagerFactory(emf)
                .build();
    }
}

 

둘의 다른 점은 확인해보시면 아시겠지만 빈 이름과 ItemWriter의 차이만 존재합니다. 

 

테스트를 실행하기 전 미리 10만행의 데이터를 데이터베이스에 적재해둔 상태입니다. 이 10만행으로 둘의 성능 차이를 테스트 해보도록 하겠습니다. 

 

테스트 코드입니다. 

@SpringBootTest
public class BatchTest {

    @Autowired
    MemberRepository memberRepository;

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    JobExplorer jobExplorer;

    @Autowired
    JdbcWriterJob jdbcWriterJob;

    @Autowired
    JpaWriterJob jpaWriterJob;

    @Test
    @DisplayName("jdbcBatchWriterTest")
    public void jdbcWriterTest() throws Exception {
        jobLauncher.run(
                jdbcWriterJob.jdbcJob(), new JobParametersBuilder(jobExplorer)
                        .addString("requestDate", LocalDateTime.now().toString())
                        .toJobParameters()
        );
    }

    @Test
    @DisplayName("jpaItemWriter")
    public void jpaWriterTest() throws Exception {
        jobLauncher.run(
                jpaWriterJob.jpaJob(), new JobParametersBuilder(jobExplorer)
                        .addString("requestDate", LocalDateTime.now().toString())
                        .toJobParameters()
        );
    }
}

 

이제 돌려보도록 하겠습니다. 

 

그랬을 때 결과는?

 

 

JdbcBatchItemWriter는 0.57초 JpaItemWriter는 33.259초가 걸린 것을 확인할 수 있습니다. 

 

참고로 데이터가 1만행이었을 때는 JdbcBatchItemWriter가 0.13초 JpaItemWriter가 1.1초로 큰 차이가 있진 않았지만 확실히 데이터가 많아지면 많아질수록 더 급격한 차이를 보입니다. 

 

직접 실행해보면서 느낀 점은 확실히 JdbcBatchItemWriter는 쿼리문 자체가 많이 날라가지 않는걸 볼 수 있습니다. 반면에 JpaItemWriter는 update문이 한줄씩 쭉쭉쭉 올라가는 모습을 볼 수 있었습니다. 

 

 

*주의!

ItemWriter에 대한 단위 테스트가 아닌 통합테스트이므로 단위테스트를 할 경우 조금 다른 결과를 얻을 수 있을 것입니다. 하지만 스프링 배치는 단위 테스트가 굉장히 복잡하기 때문에 통합 테스트로 진행했다는 점 유의해주세요! 

 

 

오늘은 ItemWriter 그중에서도 DB에 접근하는 JdbcBatchItemWriter, JpaItemWriter에 대해서 알아보고 성능 차이도 테스트 해봤습니다. 여기까지 긴 글 읽어주셔서 감사합니다. 다음엔 ItemProcessor 심화에 대해서 알아보도록 하겠습니다.