인프런 커뮤니티 질문&답변

김기훈님의 프로필 이미지
김기훈

작성한 질문수

스프링 배치

Repeat

reader 에서 반복 제어를 하고 싶은데 도움 부탁드립니다.

작성

·

237

0

안녕하세요. 강의보면서 따라해보고 있는데 응용이 잘되지 않아 질문드립니다.

 

JdbcPagingItemReader, ItemProcessor, JdbcBatchItemWriter 를 사용하여 스텝을 구성하고 여러 스텝으로 잡 하나를 만들어서 배치를 돌리려고 합니다.

그런데 read 하는 테이블의 데이터가 실시간으로 계속 쌓인다면 read 가 끝나지 않으므로 배치가 멈추지 않고 계속 돌게 됩니다.

read 할 데이터 개수가 고정일 때는 배치가 반복되는게 어차피 read 가 끝나면 멈춰서 문제가 안됐었는데,

실시간으로 쌓이는 상황을 생각하니 모니터링 등을 고민하다가 특정 시간대에 돌리고 멈췄다가 다시 돌리는 식으로 방향을 잡았습니다.

그래서 repeatTemplate 를 사용해서 특정 개수만큼만 돌리고 잡을 완료시키고 싶습니다.

그리고 나머지는 이후에 다른 잡에서 하려고 합니다.

 

보여주신 예제는 processor 에서 반복 제어를 하였는데 저는 read 에서 무한 반복을 막고 싶은 것이라서 reader 에 넣어봤습니다. 그런데 테스트해봤을 때 반복 제어가 되지 않고 데이터 있는 개수만큼 다 돌고 끝나게 됩니다.

(현재 로컬 환경에서는 실시간으로 쌓이진 않지만 중간에 repeateTemplate 조건에 맞게 끊길 수 있는 개수로 테스트했습니다.)

repeatTemplate 설정을 했는데 설정이 안먹고 제대로 설정도 안된 느낌입니다..

 

@Configuration 파일에서 일부 코드입니다.

jdbcPagingItemReader() 에 코드를 추가했습니다.

 

@Configuration
@RequiredArgsConstructor
public class ReportConfig {
    private final DataSource dataSource;

    private static final int CHUNK_SIZE = 2;


    /**
     * READ TARGET
     */
    @Bean
    public JdbcPagingItemReader<RequestReportTargetDto> jdbcPagingItemReader() throws Exception {

// 여기서부터 
        RepeatTemplate repeatTemplate = new RepeatTemplate();

        repeatTemplate.setCompletionPolicy(new SimpleCompletionPolicy(3));

        repeatTemplate.iterate(new RepeatCallback() {
            @Override
            public RepeatStatus doInIteration(RepeatContext context) throws Exception {
                System.out.println("ReportConfig.doInIteration");
                return RepeatStatus.CONTINUABLE;
            }
        });
// 여기까지 추가했습니다.

        HashMap<String, Object> parameters = new HashMap<>();
        parameters.put("reqReportYn", "N");

        return new JdbcPagingItemReaderBuilder<RequestReportTargetDto>()
                .name("jdbcPagingItemReader")
                .dataSource(dataSource)
                .fetchSize(CHUNK_SIZE)
                .rowMapper(new BeanPropertyRowMapper<>(RequestReportTargetDto.class))
                .pageSize(CHUNK_SIZE)
                .queryProvider(createQueryProvider())
                .parameterValues(parameters)
                .build();
    }


    @Bean
    public PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean qp = new SqlPagingQueryProviderFactoryBean();
        qp.setDataSource(dataSource);
        qp.setSelectClause("M_ID, ....");
        qp.setFromClause("from TARGET_TEST");
        qp.setWhereClause("where REQ_REPORT_YN = :reqReportYn");

        Map<String, Order> sortKeys = new HashMap<>();
        sortKeys.put("M_ID", Order.ASCENDING);

        qp.setSortKeys(sortKeys);

        return qp.getObject();
    }




    @Bean
    public Step step(JobRepository jobRepository, PlatformTransactionManager transactionManager,
                     JdbcPagingItemReader<RequestReportTargetDto> reader) {
        return new StepBuilder("jdbcPagingStep", jobRepository)
                .<RequestReportTargetDto, List<RequestReportDto>> chunk(CHUNK_SIZE, transactionManager)
                .reader(reader)
                .processor(reportProcessor)
                .writer(jdbcBatchListItemWriter())
                .transactionManager(transactionManager)
                .allowStartIfComplete(true) // 테스트 위해서 
                .build();
    }

    @Bean
    public Step step2(JobRepository jobRepository, PlatformTransactionManager transactionManager, JdbcPagingItemReader<RequestReportTargetDto> reader) {
        return new StepBuilder("jdbcPagingStep2", jobRepository)
                .<RequestReportTargetDto, List<RequestReportDto>> chunk(CHUNK_SIZE, transactionManager)
                .reader(reader)
                .processor(orgReportProcessor)
                .writer(jdbcBatchListItemWriter2())
                .transactionManager(transactionManager)
                .allowStartIfComplete(true)// 테스트 위해서 
                .build();
    }

    @Bean
    public Job job(JobRepository jobRepository, Step step, Step step2) {
        return new JobBuilder("reportJob", jobRepository)
                .start(step)
                .next(step2)
                .build();
    }


}

 

reader 를 설정파일 말고 따로 클래스로 빼서 ItemWriter implements 해서 구현체로 만들어야 하나 싶은데 그러면 JdbcPagingItemReader 를 쓰기가 어려워집니다.

검색 많이 해봤는데 생각보다 참고할 수 있는 자료가 적어서 응용이 어렵습니다ㅠ

참고할만한 예제가 혹시 있을까요? 아니면 어떤 식으로 하면 좋을지 알려주시면 또 찾아보고 해보겠습니다.

 

답변 1

0

안녕하세요, 인프런 AI 인턴입니다.
김기훈님의 스프링 배치와 관련된 질문에 답변 드리겠습니다.

스프링 배치에서 JdbcPagingItemReader를 사용한 read 과정의 반복을 제어하고자 할 때는 RepeatTemplate을 사용하기보다는 다른 접근이 필요합니다. RepeatTemplate는 스프링 배치의 청크 기반 프로세싱에서 의도한 사용 방법이 아니며 청크 처리 방식과는 맞지 않습니다. 데이터가 실시간으로 쌓이는 환경에서 JdbcPagingItemReader를 통한 데이터 읽기를 통제하려면, 읽어야 할 데이터의 범위를 한정지어 순회하는 것이 필요합니다. 예를 들어, 배치 작업을 시작하기 전에 읽을 데이터의 범위를 결정하고 해당 범위만큼만 읽어 처리하도록 설계할 수 있습니다.

처리할 데이터의 범위를 결정하는 방법은 여러 가지가 있습니다. 예를 들면, 배치를 실행할 때마다 읽어야 할 데이터의 최대 ID 값을 파라미터로 전달하고 이를 기준으로 쿼리의 WHERE 조건을 설정하여 데이터를 읽는 방법이나, 특정 timestamp 이전의 데이터만을 조회하여 처리하는 방법 등이 있습니다.

참고로, 스프링 배치가 제공하는 기능들 중에는 배치의 실행 컨텍스트(ExecutionContext)를 이용하여 이전 배치 실행에서 처리한 마지막 데이터의 키 값을 저장하고, 다음 실행 때 이 값을 사용하여 다음 데이터부터 처리를 계속하는 방식으로도 제어할 수 있습니다.

단순히 특정 개수만큼만 데이터를 읽고 싶다면, JdbcPagingItemReadersetMaxItemCountsetCurrentItemCount 메소드를 사용하여 읽을 데이터의 개수와 시작점을 설정할 수 있습니다.

아래는 스프링 배치 문서에서 관련 정보를 참조할 수 있는 링크입니다. 추가적으로 필요한 정보를 검색하고 적용해 보시길 권장드립니다.

더 깊이있는 도움이 필요하시면, 스프링 배치에 관한 더 세부적인 자료나 커뮤니티 글 등을 참조하시거나, 스프링 배치 관련 질문을 스프링 커뮤니티에 직접 올려 전문가의 도움을 받으시는 것도 좋은 방법입니다.

김기훈님의 프로필 이미지
김기훈

작성한 질문수

질문하기