채널톡 아이콘

인프런 커뮤니티 질문&답변

박성우님의 프로필 이미지

작성한 질문수 1

스프링 배치

slave 청크의 실패 횟수에 따라 나머지 청크를 중지시키는 방법이 있을까요?

작성

·

79

0

 

        return new StepBuilder("apiStep", jobRepository)
                .<HelloVO, Hello>chunk(CHUNK_SIZE, transactionManager)
                .reader(helloReader(null))
                .processor(helloVO -> {
                    LocalDate date = helloVO.getDate();
                    if(date.getDayOfMonth() == 10){
                        throw new IllegalStateException("!! " + helloVO);
                    }
                    return helloVO.toEntity();
                })
                .writer(v -> log.info("write = {}", v))
                 .listener(partitionLimitSkipListener(null))
                .build();

 

안녕하세요.

만약 위 상황에서 파티션이 1000개라고 가정할 경우 천 건의 청크가 동작하게 될텐데요. 만약 첫 번재 값이 timeout으로 예외가 발생할 경우, 남은 999건 동일한 timeout이 발생할 수 있다고 예상되고 이런 에러는 장애로 확장될 수 있다고 예상됩니다. 실제로 위의 throw ex 상황에서 failed로 처리된 1000건의 step이 발생함을 확인하였습니다. 이런 문제를 최소화 해야 하는 것에 목적이 있습니다.

이를 해결하기 faultTolerant로 사용하고 chunkListener의 afterError메서드로 step context까지 도달하여 setTerminal 을 하였으나... 이 방법으로는 해결이 되지 않더라고요. 결국 결국 아래와 같이 operator로 처리하였습니다.

 

@Slf4j
public class ChunkExceptionCounterListener implements ChunkListener {
    private int failureCount = 0;
    private static final int MAX_FAILURES = 3;
    private final JobOperator jobOperator;

    public ChunkExceptionCounterListener(JobOperator jobOperator) {
        this.jobOperator = jobOperator;
    }

    @Override
    public void afterChunkError(ChunkContext context) {
        failureCount++;
        log.error("chunk {} exceeds error {}/{}", context.getStepContext().getStepName(), failureCount, MAX_FAILURES);
        if (failureCount >= MAX_FAILURES) {
            log.error("stop job! with afterChunkError!");
            context.getStepContext().getStepExecution().setTerminateOnly(); // 안됨..ㅠ 
            context.getStepContext().getStepExecution().getJobExecution().setExitStatus(ExitStatus.FAILED);
            Long jobExecutionId = context.getStepContext().getStepExecution().getJobExecutionId();
            try {
                jobOperator.stop(jobExecutionId); // 이건 된다! 
            } catch (NoSuchJobExecutionException e) {
                throw new RuntimeException(e);
            } catch (JobExecutionNotRunningException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

 

위 방식이 좋은 방식인지는 솔직히 모르겠습니다. 일단 기대하는 것처럼 interrupt로 에러로 남은 모든 step이 stopped되어 좋긴 하였는데, 제가 혼자서 구현한 방식인지라... 최적의 방식인지는 잘 모르겠네요.

 

더불어 retry template에서 recover callback 메서드를 구현할 경우 청크 파티션 일부의 에러를 마스터 슬레이브 전체에 대한 정지 처리로 할 수 있던 것으로 기억하는데, 이 정도의 튜닝은 제공하는지도 궁금합니다. 감사합니다^^

 

답변 2

0

정수원님의 프로필 이미지
정수원
지식공유자

네 이 부분은 리스너를 통한 커스텀 구현도 방법은 되겠으나 스프링배치의 구조적인 방식과는 조금 거리가 멀고 강제종요로 인해 리소스 해제나 트랜잭션 처리에도 이슈가 될 수 있습니다

보통 스텝이 실패할 경우 skip 이나 retry 를 사용하여 오류를 관리합니다

예를 들어 다음과 같은 경우를 생각해 볼 수 있습니다

return new StepBuilder("apiStep", jobRepository)

.<HelloVO, Hello>chunk(CHUNK_SIZE, transactionManager)

.reader(helloReader(null))

.processor(helloVO -> {

LocalDate date = helloVO.getDate();

if(date.getDayOfMonth() == 10){

throw new IllegalStateException("!! " + helloVO);

}

return helloVO.toEntity();

})

.writer(v -> log.info("write = {}", v))

.faultTolerant()

.skipPolicy(new LimitCheckingItemSkipPolicy(MAX_FAILURES, IllegalStateException.class))

.listener(skipListener)

.build();

즉 최대실패갯수에 도달할 경우 어떤 처리를 하도록 하는거죠

위 코드는 하나의 예시라고 이해하시면 됩니다

0

인프런 AI 인턴님의 프로필 이미지

안녕하세요, 인프런 AI 인턴입니다.

상황설명해주신대로 스프링 배치에서 청크 단위의 처리 과정 중 일정 횟수의 실패가 발생하였을 때, 전체 Job을 중지시키는 방법에 대한 질문이시군요. 제공해주신 코드와 설명에 따르면, ChunkExceptionCounterListener 클래스를 통해 실패 횟수를 카운팅하고, 특정 횟수에 도달했을 때 JobOperator를 사용하여 Job을 중지시키는 방식으로 구현하셨습니다. 이 접근 방식은 실제 환경에서 필요한 기능을 구현하는 하나의 유효한 방법입니다.

해당 방식은 이미 스프링 배치의 기능을 활용하고, 추가로 비즈니스 요구사항에 맞춰 필요한 로직을 구현한 것으로, 특정 조건에서 전체 작업을 중단시키는 요구사항을 충족시킵니다. 스프링 배치에서는 이와 같이 커스텀 리스너와 JobOperator를 사용하여 작업 실행 도중에 Job을 중지시킬 수 있는 방법을 제공하지만, 이 방법이 항상 “최적”이라고 할 수는 없습니다. 상황에 따라 다른 접근방식이 더 적합할 수 있으며, 중요한 것은 요구사항을 효과적으로 충족시키는 것입니다.

retry template에서의 recover callback 메서드 구현 관련하여, 이는 재시도 로직 후에 복구 메커니즘을 제공하는 방식으로, 파티션 레벨에서의 실패 처리를 다루기 위해 사용될 수 있습니다. 반면, 귀하께서 설명한 “특정 횟수의 실패 후 전체 Job을 중지”하는 로직은 주로 Job 레벨에서 관리되므로, ChunkListener와 같은 커스텀 구현 방식이 필요합니다.

결국, 스프링 배치에서는 다양한 확장 포인트와 커스터마이즈 방법을 제공합니다. 이를 통해 다양한 요구사항에 대응할 수 있는 맞춤형 솔루션을 구현할 수 있습니다. 귀하의 접근 방식은 이러한 맥락에서 유효한 해결책 중 하나로 보입니다. 더 나은 방식이 있는지에 대한 여부는, 해당 기능의 특정 요구사항, 성능 특성, 개발 및 유지보수의 용이성 등 다양한 측면을 고려하여 결정할 수 있을 것입니다.