Spring Batch:入门篇

DD的博客全面升级,阅读体验更佳(尤其是系列教程),后续不再通过这里发布新文章,而是改到 www.didispace.com 发布啦,奔走相告!点击直达~

SpringBatch介绍:

SpringBatch 是一个大数据量的并行处理框架。通常用于数据的离线迁移,和数据处理,⽀持事务、并发、流程、监控、纵向和横向扩展,提供统⼀的接⼝管理和任务管理;SpringBatch是SpringSource和埃森哲为了统一业界并行处理标准为广大开发者提供方便开发的一套框架。

官方地址:github.com/spring-projects/spring-batch

  • SpringBatch 本身提供了重试,异常处理,跳过,重启、任务处理统计,资源管理等特性,这些特性开发者看重他的主要原因;
  • SpringBatch 是一个轻量级的批处理框架;
  • SpringBatch 结构分层,业务与处理策略、结构分离;
  • 任务的运行的实例状态,执行数据,参数都会落地到数据库;

快速入门

  • pom.xml 添加
 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
  • 创建BatchConfig(可以是其他类名)
@Configuration
@EnableBatchProcessing
public class BatchConfig {

// tag::readerwriterprocessor[]
@Bean
public FlatFileItemReader<Person> flatFileItemReader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("sample-data.csv"));
FixedLengthTokenizer fixedLengthTokenizer = new FixedLengthTokenizer();
reader.setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[]{"firstName", "lastName"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
return reader;
}

@Bean
public JdbcPagingItemReader<Person> jdbcPagingItemReader(DataSource dataSource) {
JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(100);

reader.setQueryProvider(new MySqlPagingQueryProvider() {{
setSelectClause("SELECT person_id,first_name,last_name");
setFromClause("from people");
setWhereClause("last_name=:lastName");
setSortKeys(new HashMap<String, Order>() {{
put("person_id", Order.ASCENDING);
}});
}});
reader.setParameterValues(new HashMap<String, Object>() {{
put("lastName", "DOE");
}});
reader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
return reader;
}

@Bean
public JdbcBatchItemWriter<Person> jdbcBatchItemWriter(DataSource dataSource) {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
writer.setDataSource(dataSource);
return writer;
}

/*@Bean
public FlatFileItemWriter<Person> flatFileItemWriter(DataSource dataSource) {
FlatFileItemWriter<Person> writer = new FlatFileItemWriter<>();
writer.setAppendAllowed(true);
writer.setEncoding("UTF-8");
// writer.set(dataSource);
return writer;
}*/

// end::readerwriterprocessor[]

// tag::jobstep[]
@Bean
public Job importUserJob(JobBuilderFactory jobBuilderFactory, JobCompletionNotificationListener listener, Step step) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(step)
.build();
}

@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, PersonItemProcessor processor, ItemWriter jdbcBatchItemWriter, ItemReader flatFileItemReader) {
/*CompositeItemProcessor compositeItemProcessor = new CompositeItemProcessor();
compositeItemProcessor.setDelegates(Lists.newArrayList(processor, processor));*/
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(flatFileItemReader)
.processor(processor)
.writer(jdbcBatchItemWriter)
.build();
}
// end::jobstep[]
}

Spring Batch的分层架构

  • Insfrastructure 策略管理:包括任务的失败重试,异常处理,事务,skip,以及数据的输入输出(文本文件,DB,Message)
  • Core: springBatch 的核心,包括JobLauch,job,step等等
  • Application: 业务处理,创建任务,决定任务的执行方式(定时任务,手动触发等)

Spring Batch执行流程

本文作者:杨小强,
原文链接:http://www.jianshu.com/p/b7d7756e6e25
版权归作者所有,转载请注明作者、原文、译者等出处信息