Spring Batch
:books: Spring batch Architecture

Spring batch는 Job bean이 생성되면 JobLauncher에 의해 Job을 실행한다.
JobLauncher가 job을 실행하고, Job은 Step을 실행하게 된다.
JobRepository는 DB또는 Memory에 Spring batch가 실행할 수 있도록 Metadata를 관리하는 Class이다.
Job은 배치의 실행단위
Job은 N개의 Step을 실행할 수 있으며, 흐름(Flow)를 관리할 수 있다.
- Job Flow : A step 실행 후 조건에 따라 B Step 또는 C Step을 실행 설정
Step은 Job의 세부 실행 단위이며, N개가 등록되어 실행된다.
Step의 실행단위는 크게 2가지로 나눌 수 있다.
1) Chunk 기반 : 하나의 큰 덩어리를 n개씩 나눠서 실행 (10000개의 data를 한번 1000개씩 10번 나눠서 처리)
2) Task 기반 : 하나의 작업 기반
# Chunk 기반 Step은 ItemReader, ItemProcessor, ItemWriter가 있다.
- 여기서 Item은 배치 처리 대상 객체를 의미한다.
ItemReader는 배치 처리 대상 객체를 읽어 ItemProcessor 또는 ItemWriter에게 전달한다.
- 예를 들면 파일 또는 DB에서 데이터를 읽는다.
ItemProcessor는 input객체를 output객체로 filtering 또는 Processing 해 ItemWriter에게 전달한다.
ItemProcessor는 필수가 아닌다.
- 예를 들면, ItemReader에서 읽은 데이터를 수정 또는 ItemWriter 대상인지 filtering 한다.
- ItemProcessor는 optional 하다.
- ItemProcessor가 하는 일을 ItemReader 또는 ItemWriter가 대신할 수 있따.
ItemWriter는 배치 처리 대상 객체를 처리한다.
- DB update를 하거나 처리 대상 사용자에게 알림을 보낸다.
Spring batch metatable

배치 실행을 위한 메타 데이터가 저장되는 테이블
~ JOB
BATCH_JOB_INSTANCE
- Job이 실행되며 생성되는 최상위 계층의 테이블
- job_name과 job_key를 기준으로 하나의 row가 생성되며, 같은 job_name과 job_key가 저장될 수 없다.
- job_key는 BATCH_JOB_EXECUTION_PARAMS에 저장되는 Parameter를 나열해 암호화해 저장한다.
BATCH_JOB_EXECUTION
- Job이 실행되는 시작/종료시간, job 상태 등을 관리
BATCH_JOB_EXECTUION_PARAMS
- Job을 실행하기 위해 주입된 parameter 정보 저장
BATCH_JOB_EXECTUION_CONTEXT
- Job이 실행되며 공유해야할 데이터를 직렬화해 저장
~ STEP
BATCH_STEP_EXECUTION
- Step이 실행되는 동안 필요한 데이터 또는 실행된 결과 저장
BATCH_STEP_EXECUTION_CONTEXT
- Step이 실행되며 공유해야할 데이터를 직렬화해 저장

spring-batch-core/org.springframework/batch/core/* 에 위치
schema.sql설정
- schema-**.sql의 실행 구분은 DB종류별로 script가 구분
- spring.batch.initialize-schema config로 구분한다.
- ALWAYS(항상 실행) , EMBEDDED(내장 DB일 때만 실행/DEFAULT), NEVER(항상 실행안함)로 관리하다.
schema-mysql.sql
-- Autogenerated: do not edit this file
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME(6) NOT NULL,
START_TIME DATETIME(6) DEFAULT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL DATETIME(6) DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME DATETIME(6) NOT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
JobExecutionContext와 StepExecutionContext
JobExecutionContext
- Job이 관리하는 Step에서 데이터 공유
StepExecutionContext
- 해당 Step에서 데이터를 공유함
Parameter
- JobParameters 객체 사용
- Spring EL
1. JobParameters 객체 사용

private Tasklet tasklet() {
List<String> items = getItems();
return ((contribution, chunkContext) -> {
StepExecution stepExecution = contribution.getStepExecution();
JobParameters jobParameters = stepExecution.getJobParameters();
String value = jobParameters.getString("chunkSize", "10");
int chunkSize = StringUtils.isNotEmpty(value) ? Integer.parseInt(value) : 10;
int fromIndex = stepExecution.getReadCount();
int toIndex = fromIndex + chunkSize;
if (fromIndex >= items.size()) {
return RepeatStatus.FINISHED;
}
// subList 메서드로 패이징 처리가능
List<String> subList = items.subList(fromIndex, toIndex);
log.info("task item size : {}", subList.size());
stepExecution.setReadCount(toIndex);
return RepeatStatus.CONTINUABLE;
});
}
2. Spring EL사용
@Bean
public Job chunkProcessingJob() {
return jobBuilderFactory.get("chunkProcessingJob")
.incrementer(new RunIdIncrementer())
.start(this.taskBaseStep())
.next(this.chunkBaseStep(null))
.build();
}
// null 을 return 할떄까지 반복
@Bean
@JobScope
public Step chunkBaseStep(@Value("{jobParameters[chunkSize]}") String chunkSize){
return stepBuilderFactory.get("chunkBaseStep")
/*
paging 처리 처럼 nice 하게 처리할 수 있다.
100개의 데이터를 10개씩 나눈다는 뜻.
<input type , output type>
*/
.<String, String>chunk(StringUtils.isNotEmpty(chunkSize) ? Integer.parseInt(chunkSize) : 10)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@JobScope와 @StepScope?
scope 는 어떤 시점에 bean을 생성/소멸 시킬 지 bean의 lifecycle을 설정
기본은 싱글톤 scope이다.
JobScope, StepScope
@JobScope와 @StepScope는 Spring Batch에서 사용되는 어노테이션입니다. Spring Batch는 대용량 데이터 처리를 위한 배치 작업을 지원하는 프레임워크로, 많은 양의 데이터를 처리하고, 복잡한 로직을 수행하며, 대용량 처리에 대한 안정성과 확장성을 제공합니다.
@JobScope 어노테이션은 Spring Batch 작업에서 Job의 인스턴스를 생성할 때 사용됩니다. Job 인스턴스는 Spring의 ApplicationContext에 등록되며, Job 실행 전에 인스턴스가 생성됩니다. 이 어노테이션을 사용하면 Job 파라미터를 사용할 수 있으며, Job 파라미터는 Job 실행 전에 설정됩니다. 예를 들어, @JobScope를 사용하여 Job 파라미터를 주입하면, Job 실행 전에 Job 파라미터를 설정할 수 있습니다.
@StepScope 어노테이션은 Step의 인스턴스를 생성할 때 사용됩니다. Step 인스턴스는 Job 실행 중에 동적으로 생성됩니다. StepScope는 Step 파라미터를 사용할 수 있으며, Step 파라미터는 Step 실행 전에 설정됩니다. 예를 들어, @StepScope를 사용하여 Step 파라미터를 주입하면, Step 실행 전에 Step 파라미터를 설정할 수 있습니다.
따라서, @JobScope와 @StepScope는 Spring Batch의 Job과 Step 실행 시점에 파라미터를 주입하여 동적으로 Job과 Step을 구성할 수 있도록 합니다.
@Scope(value = "job", proxyMode = ScopedProxyMode.TARGET_CLASS)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface JobScope {
}
ItemReader interface 구조

배치 대상 데이터를 읽기 위한 설정
Step에 ItemReader는 필수
기본 제공되는 ItemReader 구현체
- file, jdbc, jpa, hibernate, kafka, etc...
ItemReader 구현체가 없으면 직접 개발
ItemStream은 ExecutionContext로 read, write 정보를 저장
FlatFileItemReader 클래스로 파일에 저장된 데이터를 읽어 객체에 매핑 - csv 파일 read
private FlatFileItemReader<Person> csvFileItemReader() throws Exception {
// csv -> Pesron 객체로 매핑
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();// 데이터 읽을 수 있는 설정 -> line mapper 객체 생성
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "name", "age", "address");
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSet -> {
int id =fieldSet.readInt("id");
String name = fieldSet.readString("name");
String age = fieldSet.readString("age");
String address = fieldSet.readString("address");
return new Person(id,name,age,address);
});
FlatFileItemReader<Person> itemReader = new FlatFileItemReaderBuilder<Person>()
.name("csvFileItemReader")
.encoding("UTF-8")
.resource(new ClassPathResource("test.csv")) //resources 밑에 파일을 읽을 수 있는 클래스
.linesToSkip(1) // 첫 row는 필드명, skip한다는 뜻.
.lineMapper(lineMapper) //line 한 줄씩 읽도록
.build();
itemReader.afterPropertiesSet(); // 정상적으로 설정됐는지 검증하는 메서드
/*
이 기종간의 통신시 DB 데이터를 받을 수 없어, 이렇게 사용하는 경우가 많다.
*/
return itemReader;
}
JDBC 데이터 읽기 - Cursor 기반 / paging 기반


#### Cursor 기반 조회
배치 처리가 완료될 때 까지 DB Connection이 연결
DB Connection 빈도가 낮아 성능이 좋은 반면, 긴 Connection 유지 시간 필요
하나의 Connection에서 처리되기 때문에, Thread Safe 하지 않음
모든 결과를 메모리에 할당하기 때문에, 더 많은 메모리를 사용
#### Paging 기반 조회
페이징 단위로 DB Connection을 연결
DB Connection 빈도가 높아 비교적 성능이 낮은 반면, 짧은 Connection 유지 시간 필요
매번 Connection을 하기 때문에 Thread Safe
페이징 단위의 결과만 메모리에 할당하기 때문에, 비교적 더 적은 메모리를 사용
JdbcBatchItemWriter
- jdbcBatchItemWriter는 jdbc를 이용해 db에 writer
- jdbcBatchItemWriter는 bulk insert/update/delete 처리
insert into person (name,age,address) values (1,2,3),(4,5,6) .... (10000,100001,10002) ;
ItemProcessor interface 구조
ItemProcessor는 Spring Batch에서 사용되는 인터페이스로, 배치 처리 중에 각 항목에 대한 비즈니스 로직을 적용하는 데 사용됩니다.
ItemProcessor 인터페이스는 두 가지 타입 매개 변수를 사용하며, 각각 입력 및 출력 유형을 나타냅니다.
이 인터페이스는 process 메서드를 정의하며, 이를 구현하여 각 항목에 대한 처리를 수행해야 합니다.
ItemProcessor 인터페이스의 기본 구조는 다음과 같습니다:
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
/*
여기서 I는 입력 타입, O는 출력 타입입니다.
ItemProcessor를 사용하려면 이 인터페이스를 구현하는 클래스를 작성해야 합니다.
예를 들어, Person 객체를 입력으로 받아 연령을 기준으로 필터링하고 출력으로 동일한 Person 객체를 반환하는 PersonItemProcessor를 작성하고 싶다면 다음과 같이 구현할 수 있습니다:
*/
import org.springframework.batch.item.ItemProcessor;
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person item) throws Exception {
if (item.getAge() >= 18) {
return item;
}
return null;
}
}
/*
위 예제에서 process 메서드는 18세 이상의 Person 객체를 반환하고, 그렇지 않은 경우 null을 반환합니다.
pring Batch는 반환된 값이 null인 경우 해당 항목을 출력에 포함하지 않습니다.
이렇게 구현한 ItemProcessor는 Spring Batch의 ItemReader로부터 읽은 항목들에 대해 처리 로직을 적용한 후, 처리된 결과를 ItemWriter에 전달합니다.
이를 통해 각 항목에 대해 필요한 비즈니스 로직을 쉽게 적용할 수 있습니다.
*/