Finally, last part of the blog series! Today we’ll have a quick look at scaled batch jobs, done via partitioning and multi-threaded step.
This is the sixth post about the new Java based configuration features in Spring Batch 2.2. Previous posts are about a comparison between the new Java DSL and XML, JobParameters, ExecutionContexts and StepScope, profiles and environments, job inheritance and modular configurations. You can find the JavaConfig code examples on Github.
Partitioning
I won’t explain partitioning in detail here, just this: with partitioning you need to find a way to partition your data. Each partition of data gets its own StepExecution
and will be executed in its own thread. The most important interface here is the Partitioner
.
Of course, when working with different threads, we’ll need a source of those threads, and that’ll be a TaskExecutor
. Since that’s a very low level component, we add it to the InfrastructureConfiguration
interface:
public interface InfrastructureConfiguration {
@Bean
public abstract DataSource dataSource();
@Bean
public abstract TaskExecutor taskExecutor();
} |
public interface InfrastructureConfiguration { @Bean
public abstract DataSource dataSource();
@Bean
public abstract TaskExecutor taskExecutor(); }
For testing environments, this can be an implementation:
@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
@Bean
public DataSource dataSource(){
EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
.addScript("classpath:schema-partner.sql")
.setType(EmbeddedDatabaseType.HSQL)
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(4);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
} |
@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
@Bean
public DataSource dataSource(){
EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
.addScript("classpath:schema-partner.sql")
.setType(EmbeddedDatabaseType.HSQL)
.build();
} @Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(4);
taskExecutor.afterPropertiesSet();
return taskExecutor;
} }
The job that I used as an example during the last blog posts read data from one file and wrote that data to a database. Now we want to read data from more than one file, and we want a partition for each file.
Let’s take a look at the important parts of the job configuration:
@Bean
public Job flatfileToDbPartitioningJob(){
return jobBuilders.get("flatfileToDbPartitioningJob")
.listener(protocolListener())
.start(partitionStep())
.build();
}
@Bean
public Step partitionStep(){
return stepBuilders.get("partitionStep")
.partitioner(flatfileToDbStep())
.partitioner("flatfileToDbStep", partitioner())
.taskExecutor(infrastructureConfiguration.taskExecutor())
.build();
}
@Bean
public Step flatfileToDbStep(){
return stepBuilders.get("flatfileToDbStep")
.<Partner,Partner>chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(logProcessListener())
.build();
}
@Bean
public Partitioner partitioner(){
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
Resource[] resources;
try {
resources = resourcePatternResolver.getResources("file:src/test/resources/*.csv");
} catch (IOException e) {
throw new RuntimeException("I/O problems when resolving the input file pattern.",e);
}
partitioner.setResources(resources);
return partitioner;
} |
@Bean
public Job flatfileToDbPartitioningJob(){
return jobBuilders.get("flatfileToDbPartitioningJob")
.listener(protocolListener())
.start(partitionStep())
.build();
}
@Bean
public Step partitionStep(){
return stepBuilders.get("partitionStep")
.partitioner(flatfileToDbStep())
.partitioner("flatfileToDbStep", partitioner())
.taskExecutor(infrastructureConfiguration.taskExecutor())
.build();
}
@Bean
public Step flatfileToDbStep(){
return stepBuilders.get("flatfileToDbStep")
.<Partner,Partner>chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(logProcessListener())
.build();
}
@Bean
public Partitioner partitioner(){
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
Resource[] resources;
try {
resources = resourcePatternResolver.getResources("file:src/test/resources/*.csv");
} catch (IOException e) {
throw new RuntimeException("I/O problems when resolving the input file pattern.",e);
}
partitioner.setResources(resources);
return partitioner;
}
We defined a Partitioner
that’s looking for csv files in a special location and creating a partition for each file. We defined the step like we did it in the other examples, and then we defined a special partitionStep
that’s combining our standard step, the partitioner
and the TaskExecutor
. And finally, the job is using that partitionStep
.
Multi-threaded step
This is a quite simple way of scaling, it just adds some more threads to the processing of a step. Since reading from a file isn’t suitable for this kind of scaling we need a new use case, and it’ll be reading from a queue and writing to a log file. We need some more infrastructure for it:
public interface InfrastructureConfiguration {
@Bean
public abstract DataSource dataSource();
@Bean
public abstract TaskExecutor taskExecutor();
@Bean
public abstract ConnectionFactory connectionFactory();
@Bean
public abstract Queue queue();
@Bean
public abstract JmsTemplate jmsTemplate();
} |
public interface InfrastructureConfiguration { @Bean
public abstract DataSource dataSource();
@Bean
public abstract TaskExecutor taskExecutor();
@Bean
public abstract ConnectionFactory connectionFactory();
@Bean
public abstract Queue queue(); @Bean
public abstract JmsTemplate jmsTemplate(); }
We are using ActiveMQ in a test environment:
@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
@Bean
public DataSource dataSource(){
EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
.addScript("classpath:schema-partner.sql")
.setType(EmbeddedDatabaseType.HSQL)
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(4);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
}
@Bean
public Queue queue() {
return new ActiveMQQueue("queueName");
}
@Bean
public BrokerService broker() throws Exception{
BrokerService broker = new BrokerService();
// configure the broker
broker.addConnector("tcp://localhost:61616");
broker.start();
return broker;
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setDefaultDestination(queue());
jmsTemplate.setReceiveTimeout(500);
return jmsTemplate;
}
} |
@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
@Bean
public DataSource dataSource(){
EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
.addScript("classpath:schema-partner.sql")
.setType(EmbeddedDatabaseType.HSQL)
.build();
} @Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(4);
taskExecutor.afterPropertiesSet();
return taskExecutor;
} @Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
} @Bean
public Queue queue() {
return new ActiveMQQueue("queueName");
}
@Bean
public BrokerService broker() throws Exception{
BrokerService broker = new BrokerService();
// configure the broker
broker.addConnector("tcp://localhost:61616");
broker.start();
return broker;
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setDefaultDestination(queue());
jmsTemplate.setReceiveTimeout(500);
return jmsTemplate;
} }
The job configuration is quite simple then:
@Configuration
public class MultiThreadedStepJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilders;
@Autowired
private StepBuilderFactory stepBuilders;
@Autowired
private InfrastructureConfiguration infrastructureConfiguration;
@Bean
public Job multiThreadedStepJob(){
return jobBuilders.get("multiThreadedStepJob")
.listener(protocolListener())
.start(step())
.build();
}
@Bean
public Step step(){
return stepBuilders.get("step")
.<String,String>chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(infrastructureConfiguration.taskExecutor())
.throttleLimit(4)
.build();
}
@Bean
public JmsItemReader<String> reader(){
JmsItemReader<String> itemReader = new JmsItemReader<String>();
itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
return itemReader;
}
@Bean
public ItemProcessor<String,String> processor(){
return new LogItemProcessor<String>();
}
@Bean
public ItemWriter<String> writer(){
return new LogItemWriter<String>();
}
@Bean
public ProtocolListener protocolListener(){
return new ProtocolListener();
}
} |
@Configuration
public class MultiThreadedStepJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilders;
@Autowired
private StepBuilderFactory stepBuilders;
@Autowired
private InfrastructureConfiguration infrastructureConfiguration;
@Bean
public Job multiThreadedStepJob(){
return jobBuilders.get("multiThreadedStepJob")
.listener(protocolListener())
.start(step())
.build();
}
@Bean
public Step step(){
return stepBuilders.get("step")
.<String,String>chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(infrastructureConfiguration.taskExecutor())
.throttleLimit(4)
.build();
}
@Bean
public JmsItemReader<String> reader(){
JmsItemReader<String> itemReader = new JmsItemReader<String>();
itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
return itemReader;
}
@Bean
public ItemProcessor<String,String> processor(){
return new LogItemProcessor<String>();
}
@Bean
public ItemWriter<String> writer(){
return new LogItemWriter<String>();
}
@Bean
public ProtocolListener protocolListener(){
return new ProtocolListener();
}
}
The difference to a job without any scaling is just the calls to taskExecutor
and throttleLimit
in the step definition.
Conclusion
Configuring scalability in Spring Batch jobs is easy in Java based configuration. And again, you can see the advantage of having an interface for the infrastructure configuration to easily switch between environments.
I hope this blog series was useful for you, and if there are any questions, don’t hesitate to comment the blog posts!