Skipping in asynchroner Batchverarbeitung

Keine Kommentare

Ich möchte in diesem kleinen Artikel einen kleinen Codeschnippsel veröffentlichen, der einen kleine Lücke in Spring-Batch schließt.

Hintergrund

Für die Version 2.2.x von Spring-Batch wird in dem Paket spring-batch-integration ein AsyncItemProcessor und ein AsyncItemWriter angeboten. Beide funktionieren als Wrapper um die eigenen ItemProcessors und ItemWriter. Hierbei nutzt der AsyncItemProcessor einen TaskExecutor um die Arbeit auf mehrere Threads zu verteilen. Es wird pro Item ein FutureTask erstellt, der die Ausführung im eigenen ItemProcessor „in die Zukunft auslagert“. Dieses Future wird dann an den AsyncItemWriter übergeben, der nun auf das Ende der Ausführung wartet und dann an den eigenen ItemWriter delegiert. Mit dieser Methode kann also der Processor-Schritt im Spring-Batch parallelisiert werden.

Skipping

Wer in seinem Projekt bisher die LimitCheckingItemSkipPolicy nutzt, um Skips bis zu einem bestimmten Anzahl zuzulassen, der wird bei einer Umstellung auf eine asynchrone Verarbeitung des Processor-Schritts auf die Lücke stoßen. Der LimitCheckingItemSkipPolicy kann man Exception-Klassen mitgeben, deren Auftreten im Batch dann übersprungen wird ( – bis zum Erreichen der definierten Anzahl). Hier hat man beispielsweise eine IllegalArgumentException als skippable deklariert. Wenn man nun seine Verarbeitung mithilfe von AsyncItemProcessor und AsyncItemWriter parallelisiert, wird man feststellen, dass diese SkipPolicy nicht mehr funktioniert.

Problem

Wie schon oben erwähnt wird innerhalb der write-Methode des AsyncItemWriters das vom AsyncItemProcessor übergebene Future ausgeführt (future.get()). Wenn innerhalb dieser Ausführung nun Exceptions auftreten, werden diese in eine java.util.concurrent.ExecutionException verpackt. Leider bietet Spring-Batch hier keine eigene Lösung in dem spring-batch-integration Paket an.

Lösung

Die LimitCheckingItemSkipPolicy muss derart erweitert werden, dass sie die auftretenden ExecutionExceptions anschaut und auf die darin enthaltenen Exceptions reagiert.

package de.codecentric.batch.skip;
 
import java.util.Map;
import java.util.concurrent.ExecutionException;
 
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
import org.springframework.classify.Classifier;
 
public class AsyncLimitCheckingItemSkipPolicy extends LimitCheckingItemSkipPolicy {
 
    public AsyncLimitCheckingItemSkipPolicy() {
        super();
    }
 
    public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Classifier<Throwable, Boolean> skippableExceptionClassifier) {
        super(skipLimit, skippableExceptionClassifier);
    }
 
    public AsyncLimitCheckingItemSkipPolicy(int skipLimit, Map<Class<? extends Throwable>, Boolean> skippableExceptions) {
        super(skipLimit, skippableExceptions);
    }
 
    @Override
    public boolean shouldSkip(Throwable t, int skipCount) {
        if (t instanceof ExecutionException) {
            return super.shouldSkip(t.getCause(), skipCount);
        }
        return super.shouldSkip(t, skipCount);
    }
 
}
Ergebnis

Mithilfe dieser AsyncLimitCheckingItemSkipPolicy funktioniert nun das Skipping auch wieder wie zuvor. Anhand dieses Musters kann man auch andere vorhandene SkipPolicies so modifizieren, dass sie in der Lage sind auch bei Nutzung der asynchronen Verarbeitung wie gewohnt zu funktionieren.

Nach seinem Studium der Informatik stieß er direkt im Anschluss, im Jahr 2007, auf codecentric und ist seitdem begeisterter Arbeitnehmer.
In seinen Projekten bei der codecentric war er für diverse Versicherungen tätig und hat darin Systeme und Lösungen für Versicherungsprodukte geschaffen.

Share on FacebookGoogle+Share on LinkedInTweet about this on TwitterShare on RedditDigg thisShare on StumbleUpon

Kommentieren

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.