Mule: Streaming mit DataWeave

Keine Kommentare

Mule legt den Datentyp für die Payload einer Nachricht nicht fest. Genauer als Object will es das Maultier nicht wissen. Häufig sind es PoJos, XML oder JSON. Da die letzten beiden nur strukturierter Text sind, müssen sie irgendwie abgelegt werden. Das geschieht entweder am Stück (String) oder über Streaming (InputStream). Die meisten Mule-Komponenten schlucken beide Varianten klaglos. Auch die in der Enterprise-Version vorhandene Transformationssprache DataWeave arbeitet serienmäßig mit beiden Varianten.

Für erfahrene Mule-Entwickler dürfte das alles nicht neu sein. Was jedoch weniger bekannt ist: Streaming funktioniert auch mit PoJos: Richtig konfiguriert kann DataWeave nicht nur eine List von Objekten erzeugen, sondern auch einen Iterator.

Vorher aber noch etwas einge Details zum Streaming von Texten. Für alte Hasen ist das quasi eine Wiederholung, für viele sind aber sicher noch einige Neuigkeiten dabei.

DataWeave Streaming mit XML, JSON und anderen Textformaten

Zieht man eine DataWeave-Transformation in den Flow und konfiguriert ein Textformat als Ausgabe (CSV, XML, JSON), so sieht man anschließend in der Payload einen java.io.InputStream (bzw. eine davon abgeleitete Klasse). Es sieht also so aus, als würde DataWeave seine Eingabe liegenlassen und erst mit der Arbeit anfangen, sobald der nächste Message Processor den Stream liest. Das stimmt jedoch nicht: Schaut man genauer hin, sieht man einen ByteArraySeekableStream. DataWeave verarbeitet seine Eingabe komplett und liefert anschließend eine Stream-Sicht auf das Ergebnis.

Aber halt: Heißt es nicht, dass durch Streaming Payloads größer als der Hauptspeicher möglich sind? Was passiert bei einer großen Eingabe – gibt es dann irgendwann eine OutOfMemoryException, weil das Byte-Array zu groß wird? Nein, Mule verwendet nur dann ein Byte-Array, wenn es nicht zu viele Daten sind. Ab der Größe von 1,5 MByte (genau: 1572864 Bytes) schaltet Mule auf einen RandomAccessFileSeekableStream um, also auf einen Stream, der über den Umweg eines Delegate auf eine Datei zeigt.

Mule kombiniert hier also die schnelle Lösung „Hauptspeicher“ mit der langsameren, aber sicheren Lösung „Dateisystem“. Wem die Grenze von 1,5 MByte nicht gefällt: Sie lässt sich ändern – über die System Property com.mulesoft.dw.buffersize. Damit gilt sie allerdings für den gesamten Server, nicht für einen einzelnen DataWeave. Alles gut? Nicht ganz: Die abstrakte Klasse InputStream enthält eine close()-Methode. Im Fall der Hauptspeicherlösung spielt sie keine große Rolle, hinter dem Byte-Array steckt nur ein Stück Speicher, um den sich auch ohne Aufruf von close() irgendwann der Garbage Collector kümmert.

Im Fall des RandomAccessFileSeekableStream ist es jedoch doppelt gefährlich: Wird hier kein close() aufgerufen, bleibt nicht nur das Datei-Handle auf Betriebssystem-Ebene geöffnet, sondern es bleibt auch die Datei auf der Platte liegen. Der Delegate-Mechanismus sorgt nämlich dafür, dass beim Aufruf von close() die Datei nicht nur geschlossen, sondern auch aus dem Dateisystem gelöscht wird.

Geschlossen?

Es stellt sich die Frage, wie es dazu kommen kann – schließlich ruft doch der nächste Message-Prozessor automatisch close() auf. Schneller als man denkt: Eventuell wird der Stream einige Schritte weitergereicht, bevor er verarbeitet wird. Tritt dann eine Exception auf, haben wir ein Problem. Oder es wird nach dem DataWeave noch eine Entscheidung per Choice-Router getroffen, und in einem der Zweige ist die Payload nicht interessant.

Es gibt mehrere Varianten, wie man sich hier ins Knie schießen kann. Dummerweise handelt es sich um ein Problem, das meistens erst in der Produktion auffällt: Wer testet schon mit großen Dateien? Und dann noch mit so vielen, dass die Platte vollläuft? Wer schaut nach Tests schon im Temp-Verzeichnis nach? Was hilft also? Nur Wissen um das Problem und Vorsicht: Im Zweifelsfall im Exception-Handler noch den Stream schließen. Generell ist es guter Stil, offene Streams nicht über viele Schritte weiterzureichen.

On Demand Streaming

Die bisher beschriebene Streaming-Variante funktioniert zwar mit beliebiger Nachrichtengröße bei begrenztem Hauptspeicher, dafür benötigt sie jedoch externen Speicher. Der ist zwar meistens größer, aber auch endlich und kann daher zum Engpass werden. Weiterhin ist sie nicht wirklich parallel: DataWeave verarbeitet die Eingangsdaten komplett, bevor der nächste Message Processor starten kann.

Wer echte Parallelität möchte, muss im XML mode="deferred" einstellen. DataWeave gibt in diesem Fall eine Instanz zurück, die das Interface OutputHandler implementiert. Es enthält nur eine Methode: void write(MuleEvent event, OutputStream out) throws IOException.

Was passiert hier? Wenn der DataWeave-Knoten durchlaufen wird, nicht viel: DataWeave liest sein Skript ein, macht ansonsten aber nichts. Erst beim Aufruf von write(...) aus dem OutputHandler läuft der Transformationscode los und schreibt sein Ergebnis in den Ausgabestrom, der ihm sozusagen nachträglich und von hinten zur Verfügung gestellt wird.

Aber Vorsicht: Nicht alle Komponenten setzen den OutputHandler so ein, wie man es sich wünscht. Wenn man Pech hat, wird die Payload doch noch im Speicher materialisiert. Im Zweifelsfall sollte man sich nicht darauf verlassen, sondern testen. Wichtig ist es dabei, nicht nur mit kleinen Nachrichten zu testen, sondern auch mit großen, die den Hauptspeicher sprengen. Besser im Test als nach dem Livegang…

Weitere Details zu dem Thema findet man in der MuleSoft-Dokumentation unter DataWeave memory management.

DataWeave mit Java

Der Schnipsel %output application/java im Header eines DataWeave-Skripts reicht aus, um Java-Objekte zu erzeugen. Ohne Angabe konkreter Java-Klassen (PoJos) erzeugt DataWeave eine generische Struktur aus Listen und Maps. So eine Struktur ist bei kleinen Datenmengen sehr effizient. Bei größeren hat sie jedoch das Potential, den Hauptspeicher zu sprengen. Abhilfe besteht darin, auch Java-Objekte zu streamen. Wie das funktioniert, werde ich in den folgenden Abschnitten zeigen.

Beispiel: Geodaten synchronisieren

Zuerst benötigen wir aber ein (mehr oder weniger konstruiertes) Beispiel: In JSON vorliegende Geodaten (eine Menge von Punkten) sollen an einen REST-Service gesendet werden. Quelle kann eine Datei oder der HTTP-Listener von einem Post-Request sein. Wichtig ist nur, dass die Quelle einen Stream in der Payload liefert.

Als Quellformat nutze ich GeoJSON, das man sich auch leicht auf einer Karte darstellen lassen kann (siehe geojson.io). Eine Beispieldatei mit zwei Punkten auf der Weltkugel sieht folgendermaßen aus:

{
  "type": "FeatureCollection",
  "features": [
    {
      "type": "Feature",
      "properties": {
        "marker-color": "#7e7e7e",
        "marker-size": "medium",
        "marker-symbol": "",
        "name": "CC Headquarter"
      },
      "geometry": {
        "type": "Point",
        "coordinates": [
          7.00702428817749,
          51.16197720229481
        ]
      }
    },
    {
      "type": "Feature",
      "properties": {
        "marker-color": "#7e7e7e",
        "marker-size": "medium",
        "marker-symbol": "",
        "name": "MuleSoft Germany"
      },
      "geometry": {
        "type": "Point",
        "coordinates": [
          6.9658,
          50.9274
        ]
      }
    }
  ]
}

Längen- und Breitengrad stehen in einem Array (ein optionales drittes Arrayelement steht für die Höhe), weitere Daten können die Darstellung auf der Karte steuern (Art des Markers, Farbe etc.).

Das Zielformat ist etwas einfacher gestrickt:

{
  "points": [
    {
      "latitude": 51.16197720229481,
      "longitude": 7.00702428817749, 
      "name": "CC Headquarter"
    },
    {
      "latitude": 50.9274,
      "longitude": 6.9658, 
      "name": "MuleSoft Germany"
    }    
  ]
}

Neben Längen- und Breitengrad wird hier zu jedem Punkt nur der Name gespeichert. Das Skript zur Umwandlung der beiden Formate ist recht einfach:

%dw 1.0
%output application/json
---
{ 
  points: payload.features map ((feature , indexOfFeature) -> {
    name: feature.properties.name,
    latitude:  feature.geometry.coordinates[1],
    longitude: feature.geometry.coordinates[0]
  })
}

Eigentlich eine typische Situation: Für ein fachliches Problem – Liste von Punkten auf unserer Weltkugel – existieren technisch leicht inkompatible Formate. Wenn wir die Daten aus einer Datei lesen (Streaming möglich) und über HTTP-Post beim Zielservice abliefern können (Streaming möglich), dann haben wir unser Problem auch schon gelöst.

Na und? Wo ist das Problem?

Wo liegt also das Problem? Wie gesagt, wir konstruieren ein Beispiel: Nehmen wir an, dass die Post-Requests nicht beliebig groß werden dürfen. Oder wir ein Stück Java-Code eine Berechnung auf den Koordinaten ausführen lassen wollen. In beiden Fällen können wir nicht ein großes JSON (mit Streaming) erzeugen. Schalten wir das Ausgabeformat auf Java (durch %output application/java im Header), entsteht ein anderes Problem: DataWeave erzeugt eine Liste (genauer: java.util.ArrayList) von Objekten, die vollständig im Hauptspeicher landet.

DataWeave mit Iterator

Dabei existiert eine Streaming-Lösung in Java: der gute alte Iterator. Kann Mule das auch? Einfache Antwort: Ja, das geht. Auch wenn es in der Dokumentation anscheinend vergessen wurde. Einfach im DataWeave ein as :iterator anhängen, Beispiel:

%dw 1.0
%output application/java
---
(payload.features map ((feature , indexOfFeature) -> {
  name: feature.properties.name,
  latitude:  feature.geometry.coordinates[1],
  longitude: feature.geometry.coordinates[0]
})) as :iterator

Meist – wie hier – ist es dabei noch notwendig, den Ausdruck vor as :iterator in runde Klammern einzurahmen.

Mit diesem Code erhalten wir nicht mehr eine Liste, sondern einen java.util.Iterator, der sich als Eingabe in einen „For Each“ oder „Batch“ von Mule eignet.

Wenn wir die so erhaltenen Datensätze einzeln an einen Webservice übergeben, haben wir den Teufel „Speicherverschwendung“ jedoch mit dem Beelzebub „viele kleine Aufrufe“ ausgetrieben: Es ist einfach nicht effizient, jeden Datensatz einzeln per HTTP-Post an einen Service zu übergeben.

Gruppierung

Zwischen „alle Punkte auf einmal“ und „jeden Punkt einzeln“ existiert noch die goldene Mitte: „Gruppe von Punkten“. Auch das funktioniert einfach: In „For Each“ das Attribut batchSize setzen. Innerhalb der Schleife verarbeitet der Flow dann nicht mehr einzelne Datensätze, sondern jeweils eine Liste der Größe batchSize (oder kleiner, wenn das Ende der Quelle erreicht ist).

Ein Flow könnte damit folgende Elemente enthalten:

  1. Eine Datenquelle (HTTP-listener, File-listener, etc.)
  2. Ein DataWeave mit einem Iterator als Ergebnis
  3. Ein „For Each“ mit batchSize größer 1
  4. In der Schleife:
    • Ein DataWeave, der aus der Liste von Punkten ein JSON-Dokument erzeugt
    • Ein Post an den externen REST-Service

Oder als Bild:

For-Each mit Batch

Zusammenfassung und Ausblick

Streaming ist ein mächtiges Werkzeug. Nur damit ist es möglich, Datenmengen jenseits der Hauptspeichergröße verarbeiten zu können. Mule arbeitet an vielen Stellen automatisch mit Streams, so dass man sich als Entwickler nicht darum kümmern muss. Leider nicht an allen Stellen und auch nicht ohne Tücken: Ein vergessenes close() hat unter Umständen fatale Folgen in der Produktion.

In diesem Blogpost haben wir drei Arten von Streaming mit DataWeave kennengelernt:

  1. DataWeave Default mit Textformat (z. B. JSON) als Ausgabe. Hier haben wir zwar einen Stream, aber trotzdem eine rein sequentielle Verarbeitung: erst DataWeave komplett, dann der nächste Message Processor. Mule kümmert sich nur darum, dass größere Datenmengen auf der Platte gepuffert werden.
  2. DataWeave im Modus „deferred“: Hier läuft die Transformation erst in dem Moment los, in dem eine Senke zur Verfügung steht. Damit ist auch Parallelität zwischen mehreren Message Processors möglich.
  3. DataWeave mit einem Iterator als Ausgabe. Auch hier brauchen wir nicht die Platte, und es ist echte Parallelität möglich. Außerdem kann man mit einem for-each hinter dem DataWeave bequem gruppieren.

Mit Mule 4 hat sich das Streaming übrigens grundsätzlich geändert: Dort kann man zum Beispiel einen Stream auch mehrfach lesen. Aber das ist einen eigenen Blogpost wert.

Roger Butenuth

Dr. Roger Butenuth hat in Karlsruhe Informatik studiert und anschließend in Paderborn promoviert (Kommunikation in Parallelrechnern). Er hat langjährige Erfahrung in der Projekt- und Produktentwicklung.

Kommentieren

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.