개발/Spring Batch

Multithread step 적용하기

dev-yoon-jerry 2023. 11. 19. 15:30

많은 데이터를 S3에 업로드 할 필요가 있었는데. 집에서 AWS 프리티어를 이용해서.. 테스트 해보고 실제로 적용해보았다.

return JobBuilerFactory.get("batchJob")
	.incrementer(new RunIdIncrementer())
        .listener(listener)
        .start(checkDbIsEmptyStep)
        	.on("FAILED")
            .end()
        .from(checkDbIsEmptyStep)
        	.on("*")
            .to(s3UploadStep)
            .end()
        .end()
        .build();

DB에서 S3로 업로드하는 job이다. S3업로드 step에서 Job의 대부분의 시간이 소요되었다.

@Bean
public TaskExecutor S3UploadTaskExecutor() {
    threadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(4);
    executor.initialize();
    return executor;
}

다음과 같이 스레드 수를 설정할 수 있다.

@Bean
public Step S3FileUploadStep(@Qualifier("pagingQueryProvider") PagingQueryProvider queryProvider,
@Qualifier("S3UploadTaskExecutor") TaskExecutor taskExecutor) throws Exception {
	return stepBuilderFactory.get("s3UploadStep")
	        .<TestDoc,TestDoc>chunk(CHUNK_SIZE)
            .reader(jdbcRepositoryItemReader(queryProvider))
            .writer(s3FileWriter())
            .faultTolerant()
            .skipPolicy(skipPolicy())
            .taskExecutor(taskExecutor) // 추가
            .build();
}

정의한 S3UploadTaskExecutor를 추가하여 task를 멀티스레드로 수행하게 하였다.,

Thread-safe체크

public JdbcPagingItemReader<TestDoc> jdbcRepositoryItemReader(){
    JdbcPagingItemReader jdbcPagingItemReader = new JdbcPagingItemReaderBuilder<TestDoc>()
      //뒤는 설정하고 .build() 하는 소스.. 생략
}

주의해야 할 점은 사용하고있는 reader인 jdbcPagingItemReader가 thread-safe인지 확인해야 한다는 것이다. 

https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/database/JdbcPagingItemReader.html

결과

최적의 thread를 찾기 위해서 thread를 늘려가며 테스트 해보았다. 

  1. 기존 (1 thread) - 분당 1021개
  2. 테스트1 (2 thread) - 분당 2008개
  3. 테스트2 (4 thread) - 분당 4021개
  4. 테스트3 (6 trhead) - 분당 4018개

최적의 thread는 4개였다. 집에서 간단한 파일 올리는데는 별로 차이가 없어보이겠지만.
4시간걸리는 배치라면 1시간으로 줄일 수 있다.