Skipping in asynchronous batch processing

No Comments

With this article I want to publish a small code snippet that fills a gap in Spring-Batch code.


With version 2.2.x of Spring-Batch, Spring offers an AsyncItemProcessor and an AsyncItemWriter as part of the library spring-batch-integration. Both of them run as wrappers around own single threaded ItemProcessors and ItemWriters. The AsyncItemProcessor uses a TaskExecutor to distribute his work to separate threads. This is done by creating a FutureTask per Item “to move the processing to the future”. This Future is given to the AsyncItemWriter that itself waits for the end of processing. If the Future has completed his work the processed Item is delegated to the own ItemWriter. With this method it is easy to parallalize the processing step of a Spring-Batch.


If you are using the LimitCheckingItemSkipPolicy to handle Exceptions you would see the already mentioned gap when you migrate to asynchronous processing. When you create a LimitCheckingItemSkipPolicy you have to pass Exception classes that will be skipped at runtime ( – until reaching the given limit). For instance you can pass an IllegalArgumentException. When you parallelize your processing with AsyncItemProcessor and AsyncItemWriter you will note that the SkipPolicy is not working anymore.


As mentioned before the AsyncItemWriters write method will perform the Future (future.get()) that was passed by the AsyncItemProcessor. If there raises an Exception inside this method execution this Exception is wrapped into a java.util.concurrent.ExecutionException. Unfortunately Spring-Batch doesn’t offer a build-in solution inside spring-batch-integration.


You have to extend LimitCheckingItemSkipPolicy so that it react on the Exceptions included in upcoming ExecutionExceptions.

package de.codecentric.batch.skip;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
import org.springframework.classify.Classifier;
public class AsyncLimitCheckingItemSkipPolicy extends LimitCheckingItemSkipPolicy {
    public AsyncLimitCheckingItemSkipPolicy() {
    public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Classifier<Throwable, Boolean> skippableExceptionClassifier) {
        super(skipLimit, skippableExceptionClassifier);
    public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Map<Class<? extends Throwable>, Boolean> skippableExceptions) {
        super(skipLimit, skippableExceptions);
    public boolean shouldSkip(Throwable t, int skipCount) {
        if (t instanceof ExecutionException) {
            return super.shouldSkip(t.getCause(), skipCount);
        return super.shouldSkip(t, skipCount);

With this custom AsyncLimitCheckingItemSkipPolicy the skipping is now working as before. You can use this pattern also to extend other SkipPolicies so that they behave as desired, also after migrating to asynchronous processing.


Your email address will not be published. Required fields are marked *