Synchroner Batch mit Mule 4

Keine Kommentare

Während in Mule 3 der Batch noch eine eigenständige Komponente war und Batches sich in der Konfiguration auf der gleichen Ebene wie Flows befanden, ist der Batch in Mule 4 zu einem sogenannten Scope geworden, der jetzt innerhalb eines Flows lebt. Auf den ersten Blick könnte man daher denken, er würde auch synchron im Flow ausgeführt. Das ist jedoch nicht so: Der Flow läuft sofort weiter, während der Batch parallel dazu ausgeführt wird. (Dies ist auch so dokumentiert, siehe Mule Batch Processing.) Ich werde in diesem Blogpost zeigen, wie man auf einen Mule Batch warten kann.

In vielen Fällen ergeben asynchrone Batches auch Sinn, schließlich verbindet man einen Batch gedanklich mit „dauert länger“ oder „läuft in der Nacht“. Die vom Mule-Batch gebotenen Features wie Parallelisierung, Gruppierung von Datensätzen etc. können jedoch auch in einem synchronen Kontext sinnvoll sein: Unter Umständen möchte man einen REST call per Batch parallelisieren, wobei der Aufrufer an dem Ergebnis des Batches interessiert ist. Die Bordmittel von Mule reichen hier nicht mehr aus. Mit ein wenig Java lässt sich das Problem jedoch lösen.

Problemdefinition

Bevor ich die Lösung zeige, stelle ich hier erst mal ein (für die Praxis sinnfreies) Beispielproblem vor: Ein Flow wird per HTTP gestartet. Der erste Schritt füllt die Payload mit einer Liste der Zahlen von 1 bis 13. Der Batch macht nichts anderes, als die Zahlen per Logger auszugeben. Im Flow hinter dem Batch folgt ein Logger und eine Transformation von Java nach JSON, damit im Browser eine lesbare Ausgabe erfolgt.

Hier die grafische Darstellung:

HTTP Flow, der im Batch eine Liste von Zahlen per Logger ausgibt.

Würde der Batch synchron im Flow laufen, müsste die Log-Ausgabe „after batch“ immer hinter den 13 Zahlen ausgegeben werden. Typischerweise ist es jedoch umgekehrt. Da die Ausgabe in zwei getrennten Threads erfolgt, ist auch das nicht garantiert. An der Ausgabe im REST-Client oder Browser lässt sich erkennen, dass der Batch noch läuft. Die Ausgabe zeigt eine Instanz von BatchJobInstance mit dem entsprechenden Status:

  "status": "EXECUTING"

Wenn der Flow auf den Batch warten soll, benötigen wir irgendeine Form der Synchronisation.

Wie es nicht geht

Wenn es mit Mule Bordmitteln nicht mehr weiter geht, liegt der Ausweg über eine Java-Klasse nahe. In Java existieren dazu Bordmittel, die auf das Synchronisationsproblem zugeschnitten sind: Der Aufruf wait() legt den aktuellen Thread schlafen, bis ihn ein anderer Thread auf der selben Instanz per notify() wieder weckt.

Wir benötigen also eine Instanz einer Java-Klasse, auf der wir am Ende des Batches (in der Completion Phase) notify() aufrufen. Hinter dem Batch können wir dann den Flow-Thread durch den Aufruf von wait() aus der selben Instanz blockieren. Mit DataWeave geht das recht einfach (siehe auch Mule-Dokumentation):

MySynchronizerClass::new();

Wenn wir das Ergebnis vor Aufruf des Batches in einer Flow-Variablen ablegen, so ist diese sowohl innerhalb des Batches (Process Records und Completion Phase) als auch dahinter verfügbar. Es wird nur nicht funktionieren! Warum? Weil der Batch darauf ausgelegt ist, mit großen Datenmengen (jenseits der Hauptspeichergröße) zu arbeiten und einen Server-Restart zu überleben. In der Loading Phase werden daher alle Daten – Records und Flow-Variablen – serialisiert ausgelagert. Als Folge arbeitet man innerhalb des Batches auf einer Kopie der Flow-Variablen. Ein notify() auf der Kopie löst natürlich keine Deblockierung eines wait() auf dem Originalobjekt aus.

Schattenbuchhaltung

Wie können wir eine Instanz einer Klasse sowohl im Flow als auch im Batch nutzen? Es bleibt nur der Weg über static Member-Variablen in Java. Eine kleine Hilfsklasse BatchJobSynchronizer erledigt dies. Sie enthält eine statische HashMap mit einer Job-Id als Key und einem StateHolder als Value.

Mit Hilfe einiger statischer Methoden können wir jetzt das Problem lösen:

  1. String createJob(String prefix) erzeugt eine Job-Id und den dazugehörigen Job.
  2. void finishJob(String id, Object result) markiert den Job als beendet und sorgt dafür, dass wartende Threads geweckt werden.
  3. Object waitUntilFinished(String id) ruft man auf, wenn man auf das Ende des Jobs warten möchte.

Es bietet sich an, den Batch-Job in Mule nicht mit einer generierten Batch-Job-Id zu starten, sondern die Id der Schattenbuchhaltung zu übernehmen. Bei eventueller Fehlersuche im Debugger oder mittels Log-Ausgaben lassen sich Zusammenhänge so leichter erkennen. Der bei Beendigung des Jobs angegebene Parameter result wird an die Warte-Methode durchgereicht. Über ihn kann das BatchJobResult-Objekt aus der Completion Phase an den Flow gegeben werden.

Mit den Veränderungen sieht unser Batch damit folgendermaßen aus:

HTTP Flow, der im Batch eine Liste von Zahlen per Logger ausgibt. Der Flow wartet auf den Batch.

Vor dem Batch werden im DataWeave nicht nur die Eingabedaten erzeugt, sondern es wird auch die „Schattenbuchhaltung“ erledigt. In der Completion Phase wird finishJob(...) aufgerufen, auf das hinter dem Batch gewartet wird. Die restlichen Ergänzungen sind Log-Ausgaben, so dass der zeitliche Ablauf nachvollzogen werden kann.

Ergebnisse aus dem Batch

Mit der bisherigen Lösung können wir schon auf das Ende des Batches warten und erhalten Meta-Daten aus dem Batch, z. B. die Gesamtzahl an Records oder die Zahl der Fehler. Daten aus der Ausführungsphase fließen jedoch nicht an den Flow zurück. Wenn Daten aus der Ausführungsphase gefragt sind, so geht es meistens um Zusammenfassungen wie Durchschnittswerte oder Summen. Der Mule Batch bietet dafür einen speziellen Mechanismus an: Den Batch Aggregator. Er existiert in zwei Geschmacksrichtungen: Entweder mit einer Size zur Gruppierung von Records oder mit Streaming. Die zweite Richtung ist für uns interessant: Dabei werden alle Records in Form eines Iterator weitergegeben. Dieser lässt sich in DataWeave konsumieren, in unserem Beispiel berechnen wir die Summe der Zahlen und übergeben sie direkt an finishJob(...):

%dw 2.0
import java!de::codecentric::mule::batch::BatchJobSynchronizer
output application/java
---
BatchJobSynchronizer::finishJob(vars.jobId, payload reduce($$ + $))

Es ist damit garantiert, dass der Flow erst weiter läuft, wenn alle Records verarbeitet worden sind. So sieht der Flow damit aus:

HTTP Flow, der im Batch eine Liste von Zahlen per Logger ausgibt und summiert. Der Flow wartet auf den Batch und gibt die Summe zurück.

Zusammenfassung

In Mule 4 benötigt man zwar gegenüber Mule 3 viel weniger Java-Code, manchmal reicht DataWeave allerdings immer noch nicht aus. Der Aufwand, den von Haus aus asynchronen Batch synchron auszuführen, hält sich mit weniger 100 Zeilen Java-Code jedoch in Grenzen. Der gesamte Code – Java und Mule-Projekt – liegt übrigens auf GitHub.

Die parallele Ausführung kann nicht nur mehrere CPUs beschäftigen; gerade im Integrationskontext ist es oft viel wichtiger, die Latency beim Aufruf externer (entfernter) System zu „verstecken“. Muss es immer der Batch sein? Ein klares „ja“, wenn auch die Features wie Error-Handling oder die Auslagerung aus dem Hauptspeicher benötigt werden. In einfachen Fällen reicht gegebenenfalls auch ein Parallel foreach.

Roger Butenuth

Dr. Roger Butenuth hat in Karlsruhe Informatik studiert und anschließend in Paderborn promoviert (Kommunikation in Parallelrechnern). Er hat langjährige Erfahrung in der Projekt- und Produktentwicklung.

Kommentieren

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.