작성
·
1.2K
0
package io.springbatch.springbatchlecture.retry.partitioning;
import io.springbatch.springbatchlecture.dbitemreader.Customer;
import io.springbatch.springbatchlecture.dbwriter.Customer2;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.SimplePartitioner;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
@Configuration
@RequiredArgsConstructor
public class SimpleTestConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory emf;
private final DataSource dataSource;
private AtomicLong myId = new AtomicLong();
@Bean
public Job batchJob200() {
return jobBuilderFactory.get("partitioningJob")
.incrementer(new RunIdIncrementer())
.start(slaveStep())
.build();
}
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStepMaster")
.<Customer, Customer2>chunk(1000)
// .reader(pagingItemReader())
.reader(batchReader())
.writer(batchWriter())
.processor(batchProcessor())
.build();
}
@Bean
@StepScope
public ItemProcessor<? super Customer, ? extends Customer2> batchProcessor() {
System.out.println("itemProcessor Here");
return (ItemProcessor<Customer, Customer2>) item -> Customer2.builder()
.id(myId.incrementAndGet())
.birthDate(item.getBirthDate())
.firstName(item.getFirstName())
.lastName(item.getLastName())
.build();
}
@Bean
@StepScope
public ItemWriter<? super Customer2> batchWriter() {
return new JdbcBatchItemWriterBuilder<Customer2>()
.sql("INSERT INTO Customer2(customer2_id, birth_date, first_name, last_name) values (:id, :birthDate, :firstName, :lastName)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
@StepScope
public ItemReader<? extends Customer> batchReader() {
return new JpaPagingItemReaderBuilder<Customer>()
.name("partitionStepJpaReader")
.currentItemCount(0)
.entityManagerFactory(emf)
.maxItemCount(1000)
.queryString("select c from Customer c")
.build();
}
@Bean
@StepScope // 앞쪽 강의 봐야함.
public JdbcPagingItemReader<Customer> pagingItemReader() {
System.out.println("Target Created");
HashMap<String, Order> sortKeys = new HashMap<>();
sortKeys.put("customer_id", Order.ASCENDING);
return new JdbcPagingItemReaderBuilder<Customer>()
.name("pagingBuilder")
.dataSource(dataSource)
.fetchSize(1000)
.beanRowMapper(Customer.class)
.selectClause("customer_id, first_name, last_name, birth_date")
.fromClause("from customer")
// .whereClause("where customer_id >= " + minValue + " and customer_id <= " + maxValue)
.sortKeys(sortKeys)
.build();
}
@Bean
public Partitioner partitioner() {
SimplePartitioner simplePartitioner = new SimplePartitioner();
simplePartitioner.partition(4);
return simplePartitioner;
}
}
답변 1
2
네
원인은 잘 파악하신게 맞습니다.
말씀하신 것 처럼 open() 메서드는 ItemStream 에서 정의한 메서드입니다.
그런데 JpaPagingItemReader 를 생성하는 구문에서 리턴 타입이 ItemReader 로 되어 있습니다.
@Bean
@StepScope
public ItemReader<? extends Customer> batchReader() {
return new JpaPagingItemReaderBuilder<Customer>()
.name("partitionStepJpaReader")
.currentItemCount(0)
.entityManagerFactory(emf)
.maxItemCount(1000)
.queryString("select c from Customer c")
.build();
}
즉 ItemStream 타입은 빠져 있습니다.
그렇기 때문에 @StepScope 에 의해서 Proxy 로 생성된 객체가 실행 시점에 실제 타겟빈을 찾아 호출하게 되면 타겟빈이 ItemStream 타입으로 구현되어 있는지 체크하고 open() 메소드를 실행하게 되는데 타겟빈이 ItemReader 타입의 객체로만 생성되었기 때문에 아래 구문이 실행되지 못해서 결론적으로 EntityManager 가 생성되지 못한 것입니다.
@Override
protected void doOpen() throws Exception {
super.doOpen();
entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
if (entityManager == null) {
throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
}
// set entityManager to queryProvider, so it participates
// in JpaPagingItemReader's managed transaction
if (queryProvider != null) {
queryProvider.setEntityManager(entityManager);
}
}
그래서 JpaPagingItemReader 를 생성하는 메서드의 리턴타입을 ItemReader 와 ItemStream 을 동시에 구현한 타입으로 리턴하시면 됩니다.
@Bean
@StepScope
public ItemStreamReader<? extends Customer> batchReader() {
return new JpaPagingItemReaderBuilder<Customer>()
.name("partitionStepJpaReader")
.currentItemCount(0)
.entityManagerFactory(emf)
.maxItemCount(1000)
.queryString("select c from Customer c")
.build();
}
그리고 스프링 배치에서 다형성 관점에서 객체를 생성할 필요가 크지 않다면 리턴타입은 구현 클래스 타입으로 해도 나쁘지 않을 것 같습니다.
@Bean
@StepScope
public JpaPagingItemReader<? extends Customer> batchReader() {
return new JpaPagingItemReaderBuilder<Customer>()
.....
.build();
}
저도 같은 문제에 봉착했었는데 이런 원리였군요!!!