Overview

Skipping in asynchronous batch processing

No Comments

With this article I want to publish a small code snippet that fills a gap in Spring-Batch code.

Background

With version 2.2.x of Spring-Batch, Spring offers an AsyncItemProcessor and an AsyncItemWriter as part of the library spring-batch-integration. Both of them run as wrappers around own single threaded ItemProcessors and ItemWriters. The AsyncItemProcessor uses a TaskExecutor to distribute his work to separate threads. This is done by creating a FutureTask per Item “to move the processing to the future”. This Future is given to the AsyncItemWriter that itself waits for the end of processing. If the Future has completed his work the processed Item is delegated to the own ItemWriter. With this method it is easy to parallalize the processing step of a Spring-Batch.

Skipping

If you are using the LimitCheckingItemSkipPolicy to handle Exceptions you would see the already mentioned gap when you migrate to asynchronous processing. When you create a LimitCheckingItemSkipPolicy you have to pass Exception classes that will be skipped at runtime ( – until reaching the given limit). For instance you can pass an IllegalArgumentException. When you parallelize your processing with AsyncItemProcessor and AsyncItemWriter you will note that the SkipPolicy is not working anymore.

Problem

As mentioned before the AsyncItemWriters write method will perform the Future (future.get()) that was passed by the AsyncItemProcessor. If there raises an Exception inside this method execution this Exception is wrapped into a java.util.concurrent.ExecutionException. Unfortunately Spring-Batch doesn’t offer a build-in solution inside spring-batch-integration.

Solution

You have to extend LimitCheckingItemSkipPolicy so that it react on the Exceptions included in upcoming ExecutionExceptions.

package de.codecentric.batch.skip;
 
import java.util.Map;
import java.util.concurrent.ExecutionException;
 
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
import org.springframework.classify.Classifier;
 
public class AsyncLimitCheckingItemSkipPolicy extends LimitCheckingItemSkipPolicy {
 
    public AsyncLimitCheckingItemSkipPolicy() {
        super();
    }
 
    public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Classifier<Throwable, Boolean> skippableExceptionClassifier) {
        super(skipLimit, skippableExceptionClassifier);
    }
 
    public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Map<Class<? extends Throwable>, Boolean> skippableExceptions) {
        super(skipLimit, skippableExceptions);
    }
 
    @Override
    public boolean shouldSkip(Throwable t, int skipCount) {
        if (t instanceof ExecutionException) {
            return super.shouldSkip(t.getCause(), skipCount);
        }
        return super.shouldSkip(t, skipCount);
    }
 
}
Conclusion

With this custom AsyncLimitCheckingItemSkipPolicy the skipping is now working as before. You can use this pattern also to extend other SkipPolicies so that they behave as desired, also after migrating to asynchronous processing.

Comment

Your email address will not be published. Required fields are marked *