[Spring] 스프링 배치 ItemReader의 개념, (MybatisCursorItemReader, MybatisPagingItemReader 구현)

1. ItemReader란?

스프링 배치의 ItemReader는 다음과 같은 과정을 거쳐 데이터를 처리한다.

 

출처:https://jojoldu.tistory.com/336

대부분의 데이터 형태는 이미 ItemReader로 제공하고 있기에 ItemReader, ItemStream 인터페이스 자체를 구현할 필요는 없다.

ItemReader는 Chunk 기반 트랜잭션을 다루며 Cursor, Paging 가 대표적인 2가지 방식이다.

2. Cursor, Paging 형식

출처:https://ojt90902.tistory.com/780

2-1. Cursor기반 ItemReader

  • JDBC ResultSet의 기본 기능이다.
  • ResultSet이 Open 될 때마다 데이터베이스의 데이터가 반환된다.
  • 데이터베이스와 연결 맺은 후 데이터를 Streaming 방식으로 I/O이다.
  • 현재 행에서Cursor를 유지하며 다음 데이터를 호출하면 Cursor를 한 칸씩 옮기면서 데이터를 가져온다.
  • 하나의 Connection으로 배치가 끝날때까지 사용되기에 Batch가 끝나기 전에 데이터베이스와 애플리케이션의 연결이 먼저 끊어질 수 있어 데이터베이스와 SocketTimeout을 충분한 값으로 설정하여야 한다.
  • 모든 결과를 메모리에 할당 하기 때문에 메모리 사용량이 많아진다.
  • Chunk 사이즈 만큼의 트랜잭션 단위로 데이터를 처리한다.

Cursor 기반 ItemReader 구현체

- JdbcCursorItemReader
- HibernateCursorItemReader
- StoredProcedureItemReader
- MybatisCursorItemReader

2-2. Paging기반 ItemReader

  • Chunk로 데이터베이스에서 데이터를 검색
  • Page Size 만큼만 한 번에 메모리로 가져오기에 메모리 사용량이 적어진다.
  • 페이지 단위로 컨넥션을 맺고 끊기를 반복하기에 대량 데이터를 처리하기 좋다.
  • 쿼리자체에 반환하고자하는 데이터 범위를 지정하여 사용한다. (offset, limit)
  • 컨넥션 유지시간이 길지 않고 메모리를 효율적으로 사용해야 하는 데이터에 적합하다.

Paging 기반 ItemReader 구현체

- JdbcPagingItemReader
- HibernatePagingItemReader
- JpaPagingItemReader
- MybatisPagingItemReader

 

3. MybatisItemReader 구현

3-1. MybatisCursorItemReader

MybatisCursorItemReader로 구현시 간단하다. 한 번에 조회해온 결과를 chunk만큼 트랜잭션을 분할하여 대용량 처리를 한다.

BatchConfig.java

@Bean
public T jobStep(StepBuilderFactory steps) throws Exception {
	return stepBuilderFactory.get("JOB").<T, T> chunk(1000) -- Chunk 사이즈 조절
			.reader(itemReader.reader(sqlSessionFactory))
			.processor(new itemProcessor())
			.writer(new itemWriter())
			.build();
}

ItemReader.Java

MyBatisCursorItemReader<T> databaseReader = new MyBatisCursorItemReader<>();
databaseReader.setSqlSessionFactory(sqlSessionFactory);
databaseReader.setQueryId(QueryId);
databaseReader.setParameterValues(map);
databaseReader.setSaveState(true);
return databaseReader;

 

데이터베이스에서 모든 결과를 메모리에 할당한 후, Chunk 사이즈만큼의 트랜잭션 단위로 데이터를 처리한다.

3-2. MyBatisPagingItemReader 구현

다음과 같이 조회 쿼리 자체에 OFFSET, LIMIT을 설정하여, 한 페이지당 조회할 데이터 위치를 파악한다.

MyBatisPagingItemReader에서는 다음 파라미터로 페이징 관련 값들에 바로 접근이 가능하다.

_page : 읽을 page 수량 (0부터 시작)
_pagesize : 한번에 읽을 페이지 수량 (리턴 받을 데이터의 수량)
_skiprows : _page * _pagesize (다음 페이지 시작 포인트, offset)

해당 값들을 쿼리에서 바로 사용 가능하며 다음과 같이 적용할 수 있다.

<select id="getEmployee" resultMap="employeeBatchResult">
  SELECT id, name, job FROM employees ORDER BY id ASC 
  OFFSET #{_skiprows} LIMIT #{_pagesize}
</select>

한번에 가져올 페이지 사이즈 (_pagesize)는 ItemReader.Java에서 setPageSize()를 통해 설정가능하다. (쿼리의 LIMIT에 해당하는 값)

MyBatisPagingItemReader<T> databaseReader = new MyBatisPagingItemReader<>();
databaseReader.setSqlSessionFactory(sqlSessionFactory);
databaseReader.setQueryId(QueryId);
databaseReader.setParameterValues(map);
databaseReader.setPageSize(1000); -- Paging에서는 한번에 읽을 Page수량을 추가해야한다. default = 10
databaseReader.setSaveState(true);
return databaseReader;

주의사항

매 페이지를 신규 조회할때 데이터의 변경되어 전체 페이징 기준이 달라진다면 누락되거나 중복처리되는 데이터가 있을 수 있다.

같은 이유로, order by를 적절하게 하지 않을 경우 미처리, 혹은 중복처리 되는 데이터가 발견될 수 있다. 매 Paging마다 그 시점의 페이징 데이터를 조회하기 때문이다.

 

특히 처리완료 데이터를 마킹하면서 처리하고, 미처리 데이터를 조회조건에 넣는다면, 데이터가 처리될 때마다 특정 페이지의 값들이 달라질 것이다. 이 경우 Cursor를 사용하면 쉽게 해결되지만, 메모리 및 컨넥션 타임 문제로 Paging을 꼭 사용하여야 하는 경우에는 쿼리에서 offset을 제거하거나 _page변수를 항상 0으로 지정해 주면 된다. 

 

MybatisPagingItemReader.java의 내부 구조를 확인해보면

@Override
  protected void doReadPage() {
    if (sqlSessionTemplate == null) {
      sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory, ExecutorType.BATCH);
    }
    Map<String, Object> parameters = new HashMap<>();
    if (parameterValues != null) {
      parameters.putAll(parameterValues);
    }
    parameters.put("_page", getPage());
    parameters.put("_pagesize", getPageSize());
    parameters.put("_skiprows", getPage() * getPageSize());
    if (results == null) {
      results = new CopyOnWriteArrayList<>();
    } else {
      results.clear();
    }
    results.addAll(sqlSessionTemplate.selectList(queryId, parameters));
  }

_page는 getPage() 값을 사용하기 때문에

MyBatisPagingItemReader<T> reader = new MyBatisPagingItemReader<T>(){
	@Override
	public int getPage()	{
		return 0;
	}
};

다음과 같이 매 Paging 조회마다 페이지 값을 0으로 리셋해주면 매 page를 조회할 때마다 offset = 0인 채로 조회가 가능하다.

 

doReadPage()를 override 하여 페이지 읽는 로직 자체를 커스터마이징 하는 것도 가능하다.

 

참고

https://mybatis.org/spring/batch.html

https://ojt90902.tistory.com/780

https://junuuu.tistory.com/611

https://jojoldu.tistory.com/336