Overview

Spring Batch 2.2 – JavaConfig Part 6: Partitioning and Multi-threaded Step

5 Comments

Finally, last part of the blog series! Today we’ll have a quick look at scaled batch jobs, done via partitioning and multi-threaded step.
This is the sixth post about the new Java based configuration features in Spring Batch 2.2. Previous posts are about a comparison between the new Java DSL and XML, JobParameters, ExecutionContexts and StepScope, profiles and environments, job inheritance and modular configurations. You can find the JavaConfig code examples on Github.

Partitioning

I won’t explain partitioning in detail here, just this: with partitioning you need to find a way to partition your data. Each partition of data gets its own StepExecution and will be executed in its own thread. The most important interface here is the Partitioner.
Of course, when working with different threads, we’ll need a source of those threads, and that’ll be a TaskExecutor. Since that’s a very low level component, we add it to the InfrastructureConfiguration interface:

public interface InfrastructureConfiguration {
 
	@Bean
	public abstract DataSource dataSource();
 
	@Bean
	public abstract TaskExecutor taskExecutor();
 
}

For testing environments, this can be an implementation:

@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
 
	@Bean
	public DataSource dataSource(){
		EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
		return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
				.addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
				.addScript("classpath:schema-partner.sql")
				.setType(EmbeddedDatabaseType.HSQL)
				.build();
	}
 
	@Bean
	public TaskExecutor taskExecutor() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
		taskExecutor.setMaxPoolSize(4);
		taskExecutor.afterPropertiesSet();
		return taskExecutor;
	}
 
}

The job that I used as an example during the last blog posts read data from one file and wrote that data to a database. Now we want to read data from more than one file, and we want a partition for each file.
Let’s take a look at the important parts of the job configuration:

	@Bean
	public Job flatfileToDbPartitioningJob(){
		return jobBuilders.get("flatfileToDbPartitioningJob")
				.listener(protocolListener())
				.start(partitionStep())
				.build();
	}
 
	@Bean
	public Step partitionStep(){
		return stepBuilders.get("partitionStep")
				.partitioner(flatfileToDbStep())
				.partitioner("flatfileToDbStep", partitioner())
				.taskExecutor(infrastructureConfiguration.taskExecutor())
				.build();
	}
 
	@Bean
	public Step flatfileToDbStep(){
		return stepBuilders.get("flatfileToDbStep")
				.<Partner,Partner>chunk(1)
				.reader(reader())
				.processor(processor())
				.writer(writer())
				.listener(logProcessListener())
				.build();
	}
 
	@Bean
	public Partitioner partitioner(){
		MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
		Resource[] resources;
		try {
			resources = resourcePatternResolver.getResources("file:src/test/resources/*.csv");
		} catch (IOException e) {
			throw new RuntimeException("I/O problems when resolving the input file pattern.",e);
		}
		partitioner.setResources(resources);
		return partitioner;
	}

We defined a Partitioner that’s looking for csv files in a special location and creating a partition for each file. We defined the step like we did it in the other examples, and then we defined a special partitionStep that’s combining our standard step, the partitioner and the TaskExecutor. And finally, the job is using that partitionStep.

Multi-threaded step

This is a quite simple way of scaling, it just adds some more threads to the processing of a step. Since reading from a file isn’t suitable for this kind of scaling we need a new use case, and it’ll be reading from a queue and writing to a log file. We need some more infrastructure for it:

public interface InfrastructureConfiguration {
 
	@Bean
	public abstract DataSource dataSource();
 
	@Bean
	public abstract TaskExecutor taskExecutor();
 
	@Bean
	public abstract ConnectionFactory connectionFactory();
 
	@Bean
	public abstract Queue queue();
 
	@Bean
	public abstract JmsTemplate jmsTemplate();
 
}

We are using ActiveMQ in a test environment:

@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
 
	@Bean
	public DataSource dataSource(){
		EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
		return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
				.addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
				.addScript("classpath:schema-partner.sql")
				.setType(EmbeddedDatabaseType.HSQL)
				.build();
	}
 
	@Bean
	public TaskExecutor taskExecutor() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
		taskExecutor.setMaxPoolSize(4);
		taskExecutor.afterPropertiesSet();
		return taskExecutor;
	}
 
	@Bean
	public ConnectionFactory connectionFactory() {
		return new ActiveMQConnectionFactory("tcp://localhost:61616");
	}
 
	@Bean
	public Queue queue() {
		return new ActiveMQQueue("queueName");
	}
 
	@Bean
	public BrokerService broker() throws Exception{
		BrokerService broker = new BrokerService();
		// configure the broker
		broker.addConnector("tcp://localhost:61616");
		broker.start();
		return broker;
	}
 
	@Bean
	public JmsTemplate jmsTemplate(){
		JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
		jmsTemplate.setDefaultDestination(queue());
		jmsTemplate.setReceiveTimeout(500);
		return jmsTemplate;
	}
 
}

The job configuration is quite simple then:

@Configuration
public class MultiThreadedStepJobConfiguration {
 
	@Autowired
	private JobBuilderFactory jobBuilders;
 
	@Autowired
	private StepBuilderFactory stepBuilders;
 
	@Autowired
	private InfrastructureConfiguration infrastructureConfiguration;
 
	@Bean
	public Job multiThreadedStepJob(){
		return jobBuilders.get("multiThreadedStepJob")
				.listener(protocolListener())
				.start(step())
				.build();
	}
 
	@Bean
	public Step step(){
		return stepBuilders.get("step")
				.<String,String>chunk(1)
				.reader(reader())
				.processor(processor())
				.writer(writer())
				.taskExecutor(infrastructureConfiguration.taskExecutor())
				.throttleLimit(4)
				.build();
	}
 
	@Bean
	public JmsItemReader<String> reader(){
		JmsItemReader<String> itemReader = new JmsItemReader<String>();
		itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
		return itemReader;
	}
 
	@Bean
	public ItemProcessor<String,String> processor(){
		return new LogItemProcessor<String>();
	}
 
	@Bean
	public ItemWriter<String> writer(){
		return new LogItemWriter<String>();
	}
 
	@Bean
	public ProtocolListener protocolListener(){
		return new ProtocolListener();
	}
 
}

The difference to a job without any scaling is just the calls to taskExecutor and throttleLimit in the step definition.

Conclusion

Configuring scalability in Spring Batch jobs is easy in Java based configuration. And again, you can see the advantage of having an interface for the infrastructure configuration to easily switch between environments.
I hope this blog series was useful for you, and if there are any questions, don’t hesitate to comment the blog posts!

Kommentare

  • Jan Omar

    12. September 2013 von Jan Omar

    The example file reader in your github project is wrong. It processes the same file as many times as files are found using the ant pattern. You have to define the reader as @Stepscope and use the actual resource from the executioncontext. Other than that, great tutorial! Thanks!

    • Jan Omar

      12. September 2013 von Jan Omar

      To follow up, the reader sould be something defined along these lines:


      @StepScope
      @Bean
      public FlatFileItemReader reader(
      @Value("#{stepExecutionContext['fileName']}") String filename) {
      FlatFileItemReader itemReader = new FlatFileItemReader();
      itemReader.setLineMapper(lineMapper());
      try {
      itemReader.setResource(new UrlResource(filename));
      } catch (MalformedURLException e) {
      throw new RuntimeException(e);
      }
      return itemReader;
      }

      • Tobias Flohre

        12. September 2013 von Tobias Flohre

        Of course you’re right. I wonder how I didn’t see that ;-). Glad you liked the tutorial.

  • Nilanjan Roy

    1. February 2014 von Nilanjan Roy

    Hi,
    I have a requirement where I have to send a parameter from my Job Launcher to the Partitioner based on which the partitioner will pick up a set of records and write to flat file. I am able to get the parameter in the reader from the job launcher but not able to get it in Partitioner. Any help will be highly appreciated.
    Thanks,
    Nilanjan

Comment

Your email address will not be published. Required fields are marked *