Skipping in asynchronous batch processing

28.3.2014 | 2 minutes of reading time

With this article I want to publish a small code snippet that fills a gap in Spring-Batch code.


With version 2.2.x of Spring-Batch, Spring offers an AsyncItemProcessor and an AsyncItemWriter as part of the library spring-batch-integration. Both of them run as wrappers around own single threaded ItemProcessors and ItemWriters. The AsyncItemProcessor uses a TaskExecutor to distribute his work to separate threads. This is done by creating a FutureTask per Item “to move the processing to the future”. This Future is given to the AsyncItemWriter that itself waits for the end of processing. If the Future has completed his work the processed Item is delegated to the own ItemWriter. With this method it is easy to parallalize the processing step of a Spring-Batch.


If you are using the LimitCheckingItemSkipPolicy to handle Exceptions you would see the already mentioned gap when you migrate to asynchronous processing. When you create a LimitCheckingItemSkipPolicy you have to pass Exception classes that will be skipped at runtime ( – until reaching the given limit). For instance you can pass an IllegalArgumentException. When you parallelize your processing with AsyncItemProcessor and AsyncItemWriter you will note that the SkipPolicy is not working anymore.


As mentioned before the AsyncItemWriters write method will perform the Future (future.get()) that was passed by the AsyncItemProcessor. If there raises an Exception inside this method execution this Exception is wrapped into a java.util.concurrent.ExecutionException. Unfortunately Spring-Batch doesn’t offer a build-in solution inside spring-batch-integration.


You have to extend LimitCheckingItemSkipPolicy so that it react on the Exceptions included in upcoming ExecutionExceptions.

1package de.codecentric.batch.skip;
3import java.util.Map;
4import java.util.concurrent.ExecutionException;
6import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
7import org.springframework.classify.Classifier;
9public class AsyncLimitCheckingItemSkipPolicy extends LimitCheckingItemSkipPolicy {
11    public AsyncLimitCheckingItemSkipPolicy() {
12        super();
13    }
15    public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Classifier<Throwable, Boolean> skippableExceptionClassifier) {
16        super(skipLimit, skippableExceptionClassifier);
17    }
19    public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Map<Class<? extends Throwable>, Boolean> skippableExceptions) {
20        super(skipLimit, skippableExceptions);
21    }
23    @Override
24    public boolean shouldSkip(Throwable t, int skipCount) {
25        if (t instanceof ExecutionException) {
26            return super.shouldSkip(t.getCause(), skipCount);
27        }
28        return super.shouldSkip(t, skipCount);
29    }

With this custom AsyncLimitCheckingItemSkipPolicy the skipping is now working as before. You can use this pattern also to extend other SkipPolicies so that they behave as desired, also after migrating to asynchronous processing.

share post




More articles in this subject area\n

Discover exciting further topics and let the codecentric world inspire you.


Gemeinsam bessere Projekte umsetzen

Wir helfen Deinem Unternehmen

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.