개발놀이터

스프링 배치와 Querydsl 본문

Spring/Spring Batch

스프링 배치와 Querydsl

마늘냄새폴폴 2022. 9. 11. 16:50

서론

요즘 배치에 대해서 깊이있게 공부중입니다. 

 

하지만 한가지 아쉬운 점이 있더라구요. 바로 JPA를 사용하면 배치에서 기본으로 제공해주는 JpaCursorItemReader, JpaPagingItemReader가 순수 JPQL을 사용해야 한다는 것이었습니다.

 

아시다시피 순수 JPQL은 문자열로 쿼리를 만들기 때문에 안전성 부분에서 큰 부담이 됩니다. 흔히 말하는 휴먼 에러를 피할 수 없습니다. 순간 잘못해서 공백을 한칸 더 추가한다거나 혹은 오타가 나기 시작하면 어디서 틀렸는지 찾기도 만만치않게 힘들죠 

 

그래서 아 Querydsl에 관련된 ItemReader가 있으면 정말 좋겠다 라는 생각을 하고 있었습니다. 그래서 혹시 있을까 싶어서 구글링을 해봤는데 우아한형제들 기술블로그에 향로(jojoldu)님이 작성하신 QuerydslPagingItemReader에 대한 포스팅을 발견했습니다. 

 

도입부부터 너무 감명받아서 길이가 좀 되는 포스팅을 두세번씩 읽었습니다. 그리고 실제 프로젝트에 적용하기 위해서 굉장히 여러번 봤습니다. 

 

너무 감명깊어서 블로그에 포스팅을 옮기려는데 솔직히 해당 포스팅의 길이가 상당합니다. 그래서 제 포스팅을 보시는 분들은 압축해서 알려드리기 위해서 해당 포스팅을 정리해보았습니다. 우아한형제들 기술블로그에 포스팅된 게시글은 Reference 부분에서 링크를 적어놓도록 하겠습니다. 그럼 이제 본격적으로 시작해보겠습니다.

 

 

QuerydslPagingItemReader

Querydsl은 단순 JPQL에서보다 복잡한 쿼리를 사용할 때 용이합니다. 여러개의 join과 복잡한 order by, group by를 Native Query로 만들기엔 다소 무리가 있기 때문에 Querydsl은 JPA를 사용한다면 복잡한 쿼리에서 많이 사용합니다.

 

하지만 Querydsl을 사용함에 있어서 한가지 문제가 있는데요, 바로 스프링 배치에서는 QuerydsPagingItemReader를 제공하지 않는다는 것입니다. 

 

스프링 배치에서 JpaPagingItemReader와 Querydsl을 함께 사용하기란 정말 복잡한 문제가 뒤따랐습니다. 

 

물론 Querydsl을 포기하고 JpaPagingItemReader를 이용해도 되지만 그렇게 되면 Querydsl의 타입 안전성, 자동완성, 컴파일 단계 문법 체크, 공백 이슈 대응을 지원받을 수 없습니다. 더군다나 페이징 성능 향상을 위한 Offset이 제거된 페이징 처리는 JpaPagingItemReader에서도 불가능하여 매번 별도의 Reader를 만들 수 밖에 없었습니다.

 

그래서 QuerydslPagingItemReader를 만들게 되었습니다. 

 

QuerydslPagingItemReader의 컨셉은 아주 단순합니다. JpaPagingItemReader에서 JPQL이 수행되는 부분만 교체하는 것입니다. 

 

JPQL이 수행되는 부분은 어딜까요? 기본적인 chunk 지향구조에서 doReadPage() 메서드에서 수행됩니다.

chunk 지향구조

 

 

그래서 해당 부분에 createQuery() 부분을 오버라이딩 하려고 봤더니 private여서 확장할 수 없게 되었습니다. 그래서 JpaPagingItemReader를 전부 복사해서 구현할 수 밖에 없었습니다.

 

public class QuerydslPagingItemReader<T> extends AbstractPagingItemReader<T> {

    protected final Map<String, Object> jpaPropertyMap = new HashMap<>();
    protected EntityManagerFactory entityManagerFactory;
    protected EntityManager entityManager;
    protected Function<JPAQueryFactory, JPAQuery<T>> queryFunction;
    protected boolean transacted = true;//default value

    protected QuerydslPagingItemReader() {
        setName(ClassUtils.getShortName(QuerydslPagingItemReader.class));
    }

    public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory,
                                    int pageSize,
                                    Function<JPAQueryFactory, JPAQuery<T>> queryFunction) {
        this();
        this.entityManagerFactory = entityManagerFactory;
        this.queryFunction = queryFunction;
        setPageSize(pageSize);
    }

    public void setTransacted(boolean transacted) {
        this.transacted = transacted;
    }

    @Override
    protected void doOpen() throws Exception {
        super.doOpen();

        entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
        if (entityManager == null) {
            throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
        }
    }

    @Override
    @SuppressWarnings("unchecked")
    protected void doReadPage() {

        clearIfTransacted();

        JPAQuery<T> query = createQuery()
                .offset(getPage() * getPageSize())
                .limit(getPageSize());

        initResults();

        fetchQuery(query);
    }

    protected void clearIfTransacted() {
        if (transacted) {
            entityManager.clear();
        }
    }

    protected JPAQuery<T> createQuery() {
        JPAQueryFactory queryFactory = new JPAQueryFactory(entityManager);
        return queryFunction.apply(queryFactory);
    }

    protected void initResults() {
        if (CollectionUtils.isEmpty(results)) {
            results = new CopyOnWriteArrayList<>();
        } else {
            results.clear();
        }
    }

    protected void fetchQuery(JPAQuery<T> query) {
        if (!transacted) {
            List<T> queryResult = query.fetch();
            for (T entity : queryResult) {
                entityManager.detach(entity);
                results.add(entity);
            }
        } else {
            results.addAll(query.fetch());
        }
    }

    @Override
    protected void doJumpToPage(int itemIndex) {
    }

    @Override
    protected void doClose() throws Exception {
        entityManager.close();
        super.doClose();
    }
}

 

모든 코드를 복사했기 때문에 달라진 부분만 보자면 먼저 람다 표현식을 사용할 수 있도록

 

Function<JPAQueryFactory, JPAQuery<T>> queryFunction

 

이 생성자 인자로 추가되었습니다.

 

두번째로는 트랜잭션과 관련된 코드가 사라졌습니다. 이 옵션을 제거하더라도 스프링 배치에서는 기본적으로 chunk 단위로 트랜잭션이 보장되고 있기 때문에 chunk 단위 롤백 등 트랜잭션 관리는 잘 작동되는 것을 확인했습니다. 

 

 

QuerydslNoOffsetPagingItemReader

MySQL은 특성상 페이징이 뒤로 갈수록 느려집니다. (꼭 MySQL만 그런건 아니고 많은 RDBMS가 비슷하게 작동합니다.)

 

예를 들어서 limit 10000, 20이라고 하면 10,020개의 행을 읽어야 합니다. 그리고 이 중 앞의 10,000개의 행을 버립니다.

 

따라서 QuerydslNoOffsetPagingItemReader에서의 컨셉은 읽기 시작한 부분을 지정해 매번 첫 페이지만 읽도록 하는 방식입니다. 이는 쿼리가 매번 이전 페이지의 행을 건너 뛸 수 있음을 의미합니다. 즉, 아무리 페이지가 뒤로 가더라도 처음 페이지를 읽은 것과 같은 효과를 가지게 됩니다. 

 

QuerydslNoOffsetPagingItemReader를 사용할 때의 고려사항은 다음과 같습니다.

1. 기존 배치 잡들의 Reader 쿼리에서 order by, group by가 없는지?

  • 어떤 순서로 읽는게 중요하지 않고, 대량의 데이터를 가공하는게 중요한 경우에 사용하면 된다.
  • order by, group by가 PK 이외의 다른 기준으로 복잡하게 사용해야 한다면 QuerydslNoOffsetPagingItemReader를 활용하기에는 어려움이 있다. 

기존의 where 절에서 추가적으로 id < 마지막조회ID를 추가하는 것이다. 

 

이에대한 특징은 이와 같다.

  • offset이 제거된 limit 쿼리
  • 조회된 페이지의 마지막 id값을 캐시
  • 캐시된 마지막 id값을 다음 페이지 쿼리 조건문에 추가
  • 정렬된 기준에 따라 조회 조건에 마지막 id의 조건이 자동 포함

asc는 id > 마지막id

desc는 id < 마지막id 

이런 형식으로 구성됩니다. 

 

여기서 좀 더 보편적으로 사용하기 위해서는 몇개의 조건이 추가됩니다.

-id 뿐만 아니라 다른 필드들도 정렬 조건에 사용할 수 있어야 함

  --모든 테이블의 PK필드가 꼭 id가 아닐 수 있음

  --PK필드 외에도 인덱스 필드를 사용할 수도 있음

  --order by가 별도의 필드를 필요할 수도 있음

-Long(bigint) 외에도 정렬 기준이 가능해야 함

  --String (varchar), Integer (int) 등도 언제든 조건으로 사용할 수 있음

-어떤 필드가 대상일지 문자열이 아닌, QClass 필드로 직접 지정할 수 있어야 함

  --즉, "id"가 아닌 QProduct.product.id가 되어야 함을 의미

  --문자열로 지정할 경우 오타, 필드 변경에 대해 컴파일 체크가 안되기 때문에 Querydsl의 QClass 필드로 지정

 

위의 여러 기능을 좀 더 관리하기 쉽도록 ItemReader외에 2개의 클래스를 추가로 개발해야 합니다.

-QuerydslNoOffsetOptions

  --어떤 필드를 기준으로 사용할지 결정하는 추상 클래스이다.

  --해당 필드의 타입에 따라 NumberOptions, StringOptions와 같은 하위 구현체를 사용한다.

package org.springframework.batch.item.querydsl.reader.options;

import com.querydsl.core.types.Path;
import com.querydsl.jpa.impl.JPAQuery;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.querydsl.reader.expression.Expression;

import javax.annotation.Nonnull;
import java.lang.reflect.Field;

public abstract class QuerydslNoOffsetOptions<T> {
    protected Log logger = LogFactory.getLog(getClass());

    protected final String fieldName;
    protected final Expression expression;

    public QuerydslNoOffsetOptions(@Nonnull Path field,
                                   @Nonnull Expression expression) {
        String[] qField = field.toString().split("\\.");
        this.fieldName = qField[qField.length-1];
        this.expression = expression;

        if (logger.isDebugEnabled()) {
            logger.debug("fieldName= " + fieldName);
        }
    }

    public String getFieldName() {
        return fieldName;
    }

    public abstract void initKeys(JPAQuery<T> query, int page);

    protected abstract void initFirstId(JPAQuery<T> query);
    protected abstract void initLastId(JPAQuery<T> query);

    public abstract JPAQuery<T> createQuery(JPAQuery<T> query, int page);

    public abstract void resetCurrentId(T item);

    protected Object getFiledValue(T item) {
        try {
            Field field = item.getClass().getDeclaredField(fieldName);
            field.setAccessible(true);
            return field.get(item);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            logger.error("Not Found or Not Access Field= " + fieldName, e);
            throw new IllegalArgumentException("Not Found or Not Access Field");
        }
    }

    public boolean isGroupByQuery(JPAQuery<T> query) {
        return isGroupByQuery(query.toString());
    }

    public boolean isGroupByQuery(String sql) {
        return sql.contains("group by");

    }

}

 

 

-Expression

  --where, order by 조건을 만들어주는 클래스이다.

  --정렬 조건이 asc인지, desc인지에 따라 where 조건문을 자동으로 결정하는 역할도 한다.

package org.springframework.batch.item.querydsl.reader.expression;

import com.querydsl.core.types.OrderSpecifier;
import com.querydsl.core.types.dsl.BooleanExpression;
import com.querydsl.core.types.dsl.NumberPath;
import com.querydsl.core.types.dsl.StringPath;

/**
 * Created by jojoldu@gmail.com on 23/01/2020
 * Blog : http://jojoldu.tistory.com
 * Github : http://github.com/jojoldu
 */
public enum Expression {
    ASC(WhereExpression.GT, OrderExpression.ASC),
    DESC(WhereExpression.LT, OrderExpression.DESC);

    private final WhereExpression where;
    private final OrderExpression order;

    Expression(WhereExpression where, OrderExpression order) {
        this.where = where;
        this.order = order;
    }

    public boolean isAsc() {
        return this == ASC;
    }

    public BooleanExpression where (StringPath id, int page, String currentId) {
        return where.expression(id, page, currentId);
    }

    public <N extends Number & Comparable<?>> BooleanExpression where (NumberPath<N> id, int page, N currentId) {
        return where.expression(id, page, currentId);
    }

    public OrderSpecifier<String> order (StringPath id) {
        return isAsc() ? id.asc() : id.desc();
    }

    public <N extends Number & Comparable<?>> OrderSpecifier<N> order (NumberPath<N> id) {
        return isAsc() ? id.asc() : id.desc();
    }
}

 

그래서 결국 QuerydslNoOffsetPagingItemReader는 이렇게 정리할 수 있습니다. 

public class QuerydslNoOffsetPagingItemReader<T> extends QuerydslPagingItemReader<T> {

    private QuerydslNoOffsetOptions<T> options;

    private QuerydslNoOffsetPagingItemReader() {
        super();
        setName(ClassUtils.getShortName(QuerydslNoOffsetPagingItemReader.class));
    }

    public QuerydslNoOffsetPagingItemReader(EntityManagerFactory entityManagerFactory,
                                            int pageSize,
                                            QuerydslNoOffsetOptions<T> options,
                                            Function<JPAQueryFactory, JPAQuery<T>> queryFunction) {
        this();
        super.entityManagerFactory = entityManagerFactory;
        super.queryFunction = queryFunction;
        this.options = options;
        setPageSize(pageSize);
    }

    @Override
    @SuppressWarnings("unchecked")
    protected void doReadPage() {

        clearIfTransacted();

        JPAQuery<T> query = createQuery().limit(getPageSize());

        initResults();

        fetchQuery(query);

        resetCurrentIdIfNotLastPage(); // 조회된 페이지의 마지막 ID 캐시
    }

    @Override
    protected JPAQuery<T> createQuery() {
        JPAQueryFactory queryFactory = new JPAQueryFactory(entityManager);
        options.initFirstId(queryFunction.apply(queryFactory), getPage()); // 제일 첫번째 페이징시 시작해야할 ID 찾기

        return options.createQuery(queryFunction.apply(queryFactory), getPage()); // 캐시된 ID를 기준으로 페이징 쿼리 생성
    }

    private void resetCurrentIdIfNotLastPage() {
        if (isNotEmptyResults()) {
            options.resetCurrentId(getLastItem());
        }
    }

    // 조회결과가 Empty이면 results에 null이 담긴다
    private boolean isNotEmptyResults() {
        return !CollectionUtils.isEmpty(results) && results.get(0) != null;
    }

    private T getLastItem() {
        return results.get(results.size() - 1);
    }
}

 

그리고 이제 사용방법은 이렇습니다.

 

네 여기까지 QuerydslPagingItemReader 와 QuerydslNoOffsetPagingItemReader에 대해서 알아봤습니다. 이제 배치를 사용할 때도 Querydsl을 사용할 수 있다니 가슴이 두근두근한걸요. 그럼 다음 포스팅에서 뵙겠습니다.

 

Reference

https://techblog.woowahan.com/2662/

 

Spring Batch와 Querydsl | 우아한형제들 기술블로그

{{item.name}} Spring Batch와 QuerydslItemReader 안녕하세요 우아한형제들 정산시스템팀 이동욱입니다. 올해는 무슨 글을 기술 블로그에 쓸까 고민하다가, 1월초까지 생각했던 것은 팀에 관련된 주제였습

techblog.woowahan.com

https://github.com/jojoldu/spring-batch-querydsl

 

GitHub - jojoldu/spring-batch-querydsl: 스프링배치와 QuerydslPagingItemReader

스프링배치와 QuerydslPagingItemReader. Contribute to jojoldu/spring-batch-querydsl development by creating an account on GitHub.

github.com