1. ItemReader란?
스프링 배치의 ItemReader는 다음과 같은 과정을 거쳐 데이터를 처리한다.
대부분의 데이터 형태는 이미 ItemReader로 제공하고 있기에 ItemReader, ItemStream 인터페이스 자체를 구현할 필요는 없다.
ItemReader는 Chunk 기반 트랜잭션을 다루며 Cursor, Paging 가 대표적인 2가지 방식이다.
2. Cursor, Paging 형식
2-1. Cursor기반 ItemReader
- JDBC ResultSet의 기본 기능이다.
- ResultSet이 Open 될 때마다 데이터베이스의 데이터가 반환된다.
- 데이터베이스와 연결 맺은 후 데이터를 Streaming 방식으로 I/O이다.
- 현재 행에서Cursor를 유지하며 다음 데이터를 호출하면 Cursor를 한 칸씩 옮기면서 데이터를 가져온다.
- 하나의 Connection으로 배치가 끝날때까지 사용되기에 Batch가 끝나기 전에 데이터베이스와 애플리케이션의 연결이 먼저 끊어질 수 있어 데이터베이스와 SocketTimeout을 충분한 값으로 설정하여야 한다.
- 모든 결과를 메모리에 할당 하기 때문에 메모리 사용량이 많아진다.
- Chunk 사이즈 만큼의 트랜잭션 단위로 데이터를 처리한다.
Cursor 기반 ItemReader 구현체
- JdbcCursorItemReader
- HibernateCursorItemReader
- StoredProcedureItemReader
- MybatisCursorItemReader
2-2. Paging기반 ItemReader
- Chunk로 데이터베이스에서 데이터를 검색
- Page Size 만큼만 한 번에 메모리로 가져오기에 메모리 사용량이 적어진다.
- 페이지 단위로 컨넥션을 맺고 끊기를 반복하기에 대량 데이터를 처리하기 좋다.
- 쿼리자체에 반환하고자하는 데이터 범위를 지정하여 사용한다. (offset, limit)
- 컨넥션 유지시간이 길지 않고 메모리를 효율적으로 사용해야 하는 데이터에 적합하다.
Paging 기반 ItemReader 구현체
- JdbcPagingItemReader
- HibernatePagingItemReader
- JpaPagingItemReader
- MybatisPagingItemReader
3. MybatisItemReader 구현
3-1. MybatisCursorItemReader
MybatisCursorItemReader로 구현시 간단하다. 한 번에 조회해온 결과를 chunk만큼 트랜잭션을 분할하여 대용량 처리를 한다.
BatchConfig.java
@Bean
public T jobStep(StepBuilderFactory steps) throws Exception {
return stepBuilderFactory.get("JOB").<T, T> chunk(1000) -- Chunk 사이즈 조절
.reader(itemReader.reader(sqlSessionFactory))
.processor(new itemProcessor())
.writer(new itemWriter())
.build();
}
ItemReader.Java
MyBatisCursorItemReader<T> databaseReader = new MyBatisCursorItemReader<>();
databaseReader.setSqlSessionFactory(sqlSessionFactory);
databaseReader.setQueryId(QueryId);
databaseReader.setParameterValues(map);
databaseReader.setSaveState(true);
return databaseReader;
데이터베이스에서 모든 결과를 메모리에 할당한 후, Chunk 사이즈만큼의 트랜잭션 단위로 데이터를 처리한다.
3-2. MyBatisPagingItemReader 구현
다음과 같이 조회 쿼리 자체에 OFFSET, LIMIT을 설정하여, 한 페이지당 조회할 데이터 위치를 파악한다.
MyBatisPagingItemReader에서는 다음 파라미터로 페이징 관련 값들에 바로 접근이 가능하다.
_page : 읽을 page 수량 (0부터 시작)
_pagesize : 한번에 읽을 페이지 수량 (리턴 받을 데이터의 수량)
_skiprows : _page * _pagesize (다음 페이지 시작 포인트, offset)
해당 값들을 쿼리에서 바로 사용 가능하며 다음과 같이 적용할 수 있다.
<select id="getEmployee" resultMap="employeeBatchResult">
SELECT id, name, job FROM employees ORDER BY id ASC
OFFSET #{_skiprows} LIMIT #{_pagesize}
</select>
한번에 가져올 페이지 사이즈 (_pagesize)는 ItemReader.Java에서 setPageSize()를 통해 설정가능하다. (쿼리의 LIMIT에 해당하는 값)
MyBatisPagingItemReader<T> databaseReader = new MyBatisPagingItemReader<>();
databaseReader.setSqlSessionFactory(sqlSessionFactory);
databaseReader.setQueryId(QueryId);
databaseReader.setParameterValues(map);
databaseReader.setPageSize(1000); -- Paging에서는 한번에 읽을 Page수량을 추가해야한다. default = 10
databaseReader.setSaveState(true);
return databaseReader;
주의사항
매 페이지를 신규 조회할때 데이터의 변경되어 전체 페이징 기준이 달라진다면 누락되거나 중복처리되는 데이터가 있을 수 있다.
같은 이유로, order by를 적절하게 하지 않을 경우 미처리, 혹은 중복처리 되는 데이터가 발견될 수 있다. 매 Paging마다 그 시점의 페이징 데이터를 조회하기 때문이다.
특히 처리완료 데이터를 마킹하면서 처리하고, 미처리 데이터를 조회조건에 넣는다면, 데이터가 처리될 때마다 특정 페이지의 값들이 달라질 것이다. 이 경우 Cursor를 사용하면 쉽게 해결되지만, 메모리 및 컨넥션 타임 문제로 Paging을 꼭 사용하여야 하는 경우에는 쿼리에서 offset을 제거하거나 _page변수를 항상 0으로 지정해 주면 된다.
MybatisPagingItemReader.java의 내부 구조를 확인해보면
@Override
protected void doReadPage() {
if (sqlSessionTemplate == null) {
sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory, ExecutorType.BATCH);
}
Map<String, Object> parameters = new HashMap<>();
if (parameterValues != null) {
parameters.putAll(parameterValues);
}
parameters.put("_page", getPage());
parameters.put("_pagesize", getPageSize());
parameters.put("_skiprows", getPage() * getPageSize());
if (results == null) {
results = new CopyOnWriteArrayList<>();
} else {
results.clear();
}
results.addAll(sqlSessionTemplate.selectList(queryId, parameters));
}
_page는 getPage() 값을 사용하기 때문에
MyBatisPagingItemReader<T> reader = new MyBatisPagingItemReader<T>(){
@Override
public int getPage() {
return 0;
}
};
다음과 같이 매 Paging 조회마다 페이지 값을 0으로 리셋해주면 매 page를 조회할 때마다 offset = 0인 채로 조회가 가능하다.
doReadPage()를 override 하여 페이지 읽는 로직 자체를 커스터마이징 하는 것도 가능하다.
참고
https://mybatis.org/spring/batch.html
https://ojt90902.tistory.com/780
https://junuuu.tistory.com/611
https://jojoldu.tistory.com/336
'Spring' 카테고리의 다른 글
[Spring] 단위 테스트, JUnit의 개념 및 단위 테스트 코드 작성 방법 (0) | 2023.11.14 |
---|---|
[Spring] 스프링 컨테이너(Spring container)의 개념 (1) | 2023.11.07 |
[Spring] IoC(제어의 역전) & DI(의존성 주입)의 개념 (0) | 2023.11.06 |
[Spring] 스프링부트 + Mybatis 데이터소스 여러개 연결 (스프링 다중 데이터베이스 연결) (0) | 2023.10.20 |
[Spring] 스프링 트랜잭션의 개념 및 적용 (@Transactional 사용법) (0) | 2023.10.17 |