Beliebte Suchanfragen

Cloud Native

DevOps

IT-Security

Agile Methoden

Java

//

Tutorial „Enterprise Service Bus mit Mule ESB“: Nachrichten mit Java transformieren

18.1.2013 | 13 Minuten Lesezeit

Im letzten Teil  habe ich eine Java-Komponente vorgestellt, die auf Basis des Nachrichteninhalts Properties gesetzt hat, so dass die Nachrichten anschließend über einen Choice Router sortiert werden konnten. Jetzt wird wieder eine Java-Komponente im Mittelpunkt stehen, aber diesmal wird sie den Nachrichteninhalt transformieren. Da ich zu faul war, eine Inhaltstransformation selbst zu schreiben, habe ich dies delegiert: Der Inhalt wird von einem externen Programm transformiert. Welches Programm die Arbeit erledigt und mit welchen Parametern es aufgerufen wird ist dabei konfigurierbar.  Damit habe ich ein anderes Problem elegant gelöst: Ein Beispiel für die Spring-Konfiguration von Mule-Komponenten zu finden.

Was können wir als Beispiel nutzen? Mit einem bunten Beispiel kann ich auch heute nicht dienen, dafür wird es laut: Der Mule-Flow hat die Aufgabe, ein Eingangsverzeichnis zu überwachen. Alle dort liegenden WAV-Dateien soll er in MP3-Dateien konvertieren und in einem Ausgabeverzeichnis ablegen. Als MP3-Encoder wird das Open-Source-Programm lame verwendet.

Der Flow ist damit sehr einfach, zwischen den zwei File-Endpoints befindet sich unsere Komponente:

Mule-Flow zum MP3-Encoding.

Einigen Lesern fällt jetzt vielleicht schon ein Unterschied zum letzten Beispiel auf: Dort gab es hinter dem File-Endpoint eine „File to Byte Array“ Komponente, die den ankommenden InputStream in ein Array (byte[]) umgewandelt hat. Damit haben wir zwar einen recht einfachen Zugriff auf den Inhalt der Datei, allerdings für den Preis, dass die Datei sich sofort komplett im Hauptspeicher materialisiert. Bei WAV-Dateien (50 MByte oder mehr für einen Musiktitel) ist das schon nicht mehr zu vernachlässigen, bei heutigen Rechnern aber noch möglich. Bei Video-Dateien im Gigabyte-Bereich verbietet sich dieses Vorgehen jedoch. Glücklicherweise ist Mule in der Lage, einfach den eingehenden Stream weiterzureichen. Ich werde zeigen, dass man auch auf der Ausgabeseite mit Streaming arbeiten kann, so dass die Dateien beliebig groß werden können, ohne den Hauptspeicher unnötig zu belasten. (Dafür muss natürlich auch das extern gestartete Programm Streaming unterstützen.)

Das Streaming hat jedoch einen Nebeneffekt: Die Datei bleibt während der gesamten Bearbeitung geöffnet. Wir erinnern uns vielleicht noch daran, dass die Mule-Implementierung des InputStream die Datei erst  dann löscht oder verschiebt, wenn der Stream geschlossen wird. Stellt man nun das Polling-Intervall kürzer als die Verarbeitungszeit ein, so greift Mule die Datei mehrfach auf. Alle Zugriffe ab dem zweiten führen zu einer Exception im Log, da die Datei durch den ersten Zugriff gesperrt ist. Wie lässt sich verhindern, dass Mule Dateien mehrfach aufgreift? Mule besitzt dafür einen eingebauten Mechanismus: Dateien können für die Zeit der Bearbeitung in ein Work Directory verschoben werden. Die dazu notwendigen Konfigurationsparameter findet man jedoch nicht im Standard-Dialog für File Endpoints. Ich muss hier wohl noch etwas ausholen…

Global Elements

Schaut man sich die Edit View im Mule Studio genauer an, fallen am unteren Rand drei Reiter auf, mit denen man verschiedene Sichten einstellen kann:

  • Message Flow: Die grafische Sicht, aus der ich schon einige Screenshots gezeigt habe.
  • Global Elements: Globale Einstellungen (nicht nur) für Endpoints, auf die man sich in den Komponenten des Flows beziehen kann.
  • Configuration XML: Die XML-Konfiguration des Mule-Projekts. Sie beginnt mit dem Import der verschiedenen XML-Schema-Dateien, dann folgen die Global Elements und anschließend die Flows.

Bisher habe ich mich immer auf den Message-Flow bezogen. In den Komponenten im Flow kann man jedoch nicht immer alles einstellen, manche Einstellungen lassen sich nur an Global Elements vornehmen. Die Komponenten können sich später darauf beziehen. Also schalten wir auf Global Elements um und klicken „Create“, um ein neues Global Element zu erzeugen. Der richtige Typ für unser Beispiel ist „File Endpoint“. Nun können wir die Konfiguration vornehmen und dem Ding einen Namen geben:

Konfiguration eines Global File Endpoints.

Wenn Mule jetzt eine WAV-Datei annimmt, verschiebt er sie sofort nach C:\MuleDemo\work. Damit das funktioniert, müssen das Eingangs- und das Work-Verzeichnis auf der gleichen Partition (Windows) bzw. auf dem gleichen Dateisystem (Unix/Linux) liegen, nur dann lässt sich eine Datei per Umbenennung verschieben. Im Feld Work File Name Pattern gibt man noch einen Ausdruck in der Mule Expression Language an, der den Dateinamen festlegt.

Wer das Tutorial bis hier gelesen hat, dem ist sicher auch noch etwas XML zuzumuten, daher zeige ich ab jetzt auch die Konfiguration im XML-Format. Damit kann man sehen, was aus den Daten im Dialog im XML wird:

1<file:connector name="wavInput"  
2                workDirectory="C:/MuleDemo/work" autoDelete="true"
3                streaming="true" validateConnections="true"
4                doc:name="File" workFileNamePattern="#[header:originalFilename]"/>

Im File Endpoint selbst konfiguriert man jetzt wie gehabt den Path und das Move to Directory, zusätzlich wird noch eine Angabe im Reiter References fällig:

Referenz auf globale Einstellungen für einen File Endpoint.

Im Dropdown Connector Reference taucht unser vorher definiertes Global Element auf, so dass ein Klick ausreicht. Man hätte übrigens auch anders herum vorgehen können: Über das Plus-Zeichen rechts neben dem Dropdown lässt sich der Create-Dialog für ein Global Element ebenfalls aufrufen.

Hier wieder das XML zum Dialog:

1<file:inbound-endpoint connector-ref="wavInput"
2                       doc:name="wav"
3                       path="C:\MuleDemo\input"
4                       responseTimeout="10000"
5                       moveToDirectory="C:\MuleDemo\processed"/>

Spring Konfiguration einer Java-Komponente

Langsam nähern wir uns dem Kern, der Java-Komponente. Da sie nicht nur mit lame, sondern mit beliebigen anderen externen Programmen funktionieren soll, muss sie konfigurierbar sein. Als Minimum müssen der Pfad zum externen Programm und die Argumente angegeben werden. Der Pfad ist ein String, die Argumente eine Liste von Strings. Wie bekommt man solche Dinge über den Dialog eingegeben, der ja nicht speziell auf unsere Komponente vorbereitet ist?

Schauen wir uns den Dialog doch mal näher an:

Spring-Konfiguration einer Java-Komponente.

Die Felder Display Name und Transformer Class hatten wir schon im letzten Teil behandelt, ihre Bedeutung ist offensichtlich. Interessant wird der Bereich darunter, mit Property überschrieben. Hier kann man sich seine Spring-Konfiguration zusammenklicken. Das externe Programm wird über die Property „command“ eingestellt. Also auf das Plus-Zeichen klicken, im Feld Name „command“ und im Feld Value den Pfad zu lame eingeben.

Etwas komplizierter wird es bei der Liste von Argumenten (als Strings). Nachdem wir den Namen der Liste – „arguments“ – angegeben haben, wechseln wir in den Reiter Advanced. Dort findet sich wieder ein Plus-Button, darüber lässt sich die Liste einfügen. Als Value Type gibt man java.lang.String an. Anschließend kann man mittels weiterer Klicks auf den nächsten Plus-Button die einzelnen Argumente hinzufügen. Da die Liste bereits auf String typisiert ist, müssen für die einzelnen Elemente nur noch die Werte angegeben werden.

Die weiteren Argumente – deren Bedeutung ich später erkläre – lassen sich also analog zum Pfad konfigurieren. Im XML enthält man folgenden Schnipsel:

1<custom-transformer class="de.codecentric.external.ExternalToolTransformer"
2                    doc:name="wav2mp3">
3    <spring:property name="maxExternalProcesses" value="3"/>
4    <spring:property name="command" 
5                     value="C:/Program Files (x86)/Exact Audio Copy/lame.exe" />
6    <spring:property name="regex" value="(.*)\.wav"/>
7    <spring:property name="replacement" value="$1.mp3"/>
8    <spring:property name="arguments">
9        <spring:list merge="default" value-type="java.lang.String">
10            <spring:value>-h</spring:value>
11            <spring:value>-</spring:value>
12            <spring:value>-</spring:value>
13        </spring:list>
14    </spring:property>
15</custom-transformer>

Die Struktur hat Mule nicht neu erfunden, hier sieht man eine ganz gewöhnliche Spring-Konfiguration. Das ist praktisch, wer Spring bereits kennt, muss sich nicht umgewöhnen. Außerdem kann man damit beliebig komplexe Konfigurationen für seine Java-Komponenten erfinden. Neu ist nur die grafische Oberfläche dafür. Spring-Kenner sind hier jedoch vermutlich mit XML schneller am Ziel.

Was muss man auf der Java-Seite machen, damit Spring dort die Konfiguration in die Komponente injizieren kann? Nicht viel: Nur Get- uns Set-Methoden für die einzelnen Properties. Für „command“ muss der Typ String sein, für „arguments“ List. Spring kann auch einige einfache Typumwandlungen automatisch vornehmen, so verwendet das Get-Set-Paar für „maxExternalProcesses“ int als Typ.

Spring prüft die einzelnen Werte allerdings nicht auf Konsistenz. Dafür bietet sich die Methode initialize() an, die man dazu überladen kann:

1@Override
2public void initialise() throws InitialisationException {
3    super.initialise();
4    File cmdFile = new File(command);
5    if (!cmdFile.canExecute()) {
6        throw new InitialisationException(
7            MessageFactory.createStaticMessage(cmdFile.getAbsolutePath()
8                                               + " is not an executable file"), this);
9    }
10    if (maxExternalProcesses <= 0) {
11        throw new InitialisationException(
12            MessageFactory.createStaticMessage(
13                        "maxExternalProcesses must be positive, but is: "
14                        + maxExternalProcesses), this);
15    }
16}

Geprüft wird hier, ob sich hinter dem angegebenen „command“ eine ausführbare Datei verbirgt und ob die Anzahl der externen Prozesse eine positive Zahl ist. Nimmt man diese Prüfung erst später vor, bekommt man die Fehlermeldungen nicht schon beim Hochfahren von Mule, sondern erst zu dem Zeitpunkt, wo die ersten Messages (WAV-Dateien) durch Mule transformiert werden.

Streaming, Threads und der ganze Rest

Nach dem ganzen Vorgeplänkel und einigen Nebenschauplätzen wie Konfiguration nähern wir uns jetzt dem Kern, der sich in der Methode transformMessage(...) befindet. Die Methode hat zwei Aufgaben zu lösen:

  1. Es muss dafür gesorgt werden, dass die Ausgabedatei eine neue Endung bekommt (.wav -> .mp3)
  2. Der InputStream in der Message Payload kommt als WAV-Stream an, muss jedoch in einen ein mp3-Stream konvertiert werden.

Name

Fangen wir mit dem einfacheren Teil – der Dateiendung – an. Der Dateiname wird in der Message Property „originalFilename“ (im Scope inbound) geliefert, für den schreibenden File Endpoint muss der neue Dateiname in der Property „filename“ (im Scope outbound) stehen. Man erliegt hier vielleicht schnell der Versuchung und fängt mit Hilfe von indexOf() an den String zu zerlegen. Man sollte jedoch einige Randbedingungen beachten:

  • Wir verarbeiten unter Umständen keine Dateien, sondern Streams aus Quellen, die überhaupt keine Dateinamen kennen, hier kann/muss also auch keine Property gesetzt werden
  • Eventuell soll der Dateiname vollständig erhalten bleiben, ohne irgendwelche Änderungen.
  • Wenn er geändert werden soll, dann unter Umständen nicht nur am Ende.

Die Konvertierung des Dateinamens nutzt reguläre Ausdrücke. Konfiguriert man einen regulären Ausdruck (als String-Property „regex“), so wird dieser im Setter direkt in ein Pattern übersetzt:

1public void setRegex(String regex) {
2    this.regex = Pattern.compile(regex);
3}

Die Transformationslogik wird nur aktiv, wenn ein Dateiname und ein regulärer Ausdruck vorhanden sind. Zusätzlich muss der reguläre Ausdruck auch noch „matchen“. Über die Methode replaceAll(replacement) wird der alte in den neuen Dateinamen umgewandelt. Reguläre Ausdrücke erlauben es, Teilausdrücke zu definieren und diese beim Ersetzen zu referenzieren. In unserem Beispiel passt (.*)\.wav auf die WAV-Dateien, wobei alles vor der Endung mittels runder Klammern zu einem Teilausdruck wird. Dieser lässt sich dann im Replacement $1.mp3 verwenden (der Teil $1). Man erhält so mit wenigen Zeilen eine flexible Ersetzungslogik:

1String originalName = message.getProperty("originalFilename", PropertyScope.INBOUND);
2String filename = "";
3if (originalName != null) {
4    if (regex != null) {
5        filename = regex.matcher(originalName).replaceAll(replacement);
6    } else {
7        filename = originalName;
8    }
9    message.setProperty("filename", filename, PropertyScope.OUTBOUND);
10}

Die Konfigurationssyntax mit regulären Ausdrücken ist zwar mit Sicherheit nicht fachbereichstauglich, Mule-Flows werden aber auch eher von der IT-Profis konfiguriert, denen reguläre Ausdrücke nicht fremd sein sollten.

Inhalt

So, alle einfachen Teilprobleme sind erledigt, jetzt geht es an die Transformation des Inhalts, der in Form eines InputStream geliefert wird und auch in der gleichen Form weitergereicht werden soll. Als Nebenbedingung ist zu beachten, dass sowohl die Eingabe als auch die Ausgabe nie vollständig im Speicher liegen sollen. Die Konvertierung delegieren wir an einen externen Prozess, der in Java schnell gestartet ist:

1Process proc = Runtime.getRuntime().exec(cmdarray);

Dazu muss nur vorher aus dem Kommando und der Liste von Argumenten ein String-Array zusammengebaut werden. Der Prozess läuft parallel zu unserer virtuellen Maschine mit Mule. Mit dem Prozess können wir über drei Streams kommunizieren:

  1. Die Standardeingabe des Prozesses, aus unserer Sicht ein OutputStream.
  2. Die Standardausgabe des Prozesses, aus unserer Sicht ein InputStream.
  3. Die Fehlerausgabe des Prozesses, aus unserer Sicht ebenfalls ein InputStream.

Der Stream der Standardausgabe passt genau als Payload für unsere Message, um ihn müssen wir uns nicht weiter kümmern. Was aber mit den beiden anderen? Wir bekommen die Daten als Eingabe-Stream, müssen sie aber an einen Ausgabe-Stream liefern. (Wenn man unter Unix Prozesse direkt startet, dann hätte man den Eingabe-Stream per fork/exec direkt weitergeben können, aber wir machen hier ja Java…) Es bleibt also nichts anderes übrig, als die Daten aus einem Stream zu lesen und sie direkt in den anderen zu kopieren. Falls man dies direkt in transformMessage(...) implementiert, läuft der Flow in einen Deadlock: Zwischen Mule und dem externen Programm befindet sich ein Puffer begrenzter Größe, typischerweise kann man hier mit 4-8 KByte rechnen. Das gleiche gilt für den Rückweg vom externen Programm zu Mule. Dazu kommt noch die Datenmenge, die das externe Programm intern puffert. Wenn die Eingabe größer als die Summe dieser drei Puffer ist – was bei WAV-Dateien ziemlich sicher so sein wird – laufen die drei Puffer irgendwann voll, bevor Mule am anderen Ende liest. Warum? Weil Mule den Flow nicht weiter treibt, solange wir uns noch in der Methode transformMessage(…) befinden, Mule wartet also auch, ein klassischer Deadlock. Das Kopieren muss also in den Hintergrund verlegt werden, wozu ein einfacher Copy-Thread dient.

Wenn keine Fehler passieren, könnte man den Stream mit der Fehlerausgabe ignorieren. Aber geht alles gut? Wenn nicht, kann das externe Programm blockieren, weil der Puffer zwischen Programm und Mule belegt ist. Außerdem verschwinden Fehlermeldungen im Nirwana, was die Fehlersuche nicht gerade erleichtert. Daher wird noch ein Thread gestartet, der die Fehlerausgabe in einzelne Zeilen aufteilt und über einen org.apache.log4j.Logger ausgibt (Mule setzt log4j ein).

Die bisher vorgestellte Lösung funktioniert ohne Deadlock, sie hat jedoch noch einen Nachteil: Unter Umständen startet sie ziemlich viele externe Programme parallel, dazu kommen noch die erwähnten Java-Threads. Das kann dann wieder zu Ressourcen-Engpässen führen. Warum ist das so? Vergegenwärtigen wir uns den Ablauf, wenn im Eingabeverzeichnis viele Dateien liegen. Mule arbeitet bei Flows wie in unserem Beispiel mit drei Threadpools (die Größe lässt sich konfigurieren): Der erste für eingehende Endpoints, der zweite für die Verarbeitung, der dritte für ausgehende Endpoints. Die ersten beiden haben in unserem Fall jedoch nicht viel zu tun: Datei sehen, ins Work-Verzeichnis verschieben, öffnen, an Java-Komponente übergeben. Die Methode transformMessage(...) wird immer direkt zurückkehren, da sie nur einige Threads im Hintergrund startet. Eine „Bremse“ existiert nur bei im dritten Threadpool: Er bearbeitet eine begrenzte Anzahl von Dateien parallel. In Bezug auf den Ressourcenverbrauch ist das jedoch zu spät, schließlich sind die externen Programme alle schon gestartet, auch wenn von ihnen nur eine begrenzte Anzahl läuft und die anderen nach kurzer Zeit darauf warten, dass ihnen jemand die geschriebenen Daten abnimmt.

Wie kann man dafür sorgen, dass nicht beliebig viele externe Programme (und damit interne Threads) parallel gestartet werden? Als erstes führt man eine Konfigurationsvariable ein, mit der die Anzahl eingestellt werden kann: „maxExternalProcesses“, vorbelegt mit eins. Immer wenn ein neues externes Programm gestartet wird, zählt man intern die Anzahl der externen Programme hoch. Ist die Grenze erreicht, wartet man, bis sie wieder unterschritten ist (mit wait()). Für das Runterzählen dient wiederum ein Thread, der auf das Ende des externen Programms wartet und anschließend per notifyAll() alle wartenden Threads weckt. Der Zugriff auf die Anzahl der extern laufenden Programme und das wait() / notifyAll() müssen natürlich jeweils in synchronized{} eingeschlossen werden. Wie viele externe Programme man parallel starten sollte lässt sich nicht allgemein beantworten. Im Fall von lame reicht eins jedoch aus, da lame intern mit Multithreading arbeitet und damit die vorhanden CPU-Cores bereits auslastet.

Ich möchte den Artikel nicht mit zu vielen Code-Schnipseln aufblähen, den vollständigen Code es aber zum Donwload: ExternalToolTransformer.java

Zusammenfassung

Was haben wir gelernt? Zuerst einmal, dass selbst einfach aussehende Endpoints wie der File Endpoint noch Überraschungen bieten: Es lässt sich nicht alles im Endpoint konfigurieren, für einige Parameter muss man zusätzlich ein Global Element anlegen. Dies gilt auch für viele andere Endpoints. Wenn man bei Mule mal eine Option nicht im Endpoint oder einer Komponente findet, lohnt immer ein Blick auf die Global Elements.

Unsere Komponente sollte konfigurierbar sein, dafür hat sich die Spring-Konfiguration von Java-Komponenten als einfacher Weg erwiesen. Wer Spring kennt, muss hier nicht einmal Neuland betreten. Komplex wurde es erst innerhalb der Komponente, da für die Kommunikation mit dem externen Programm eigene Threads gestartet werden müssen. Das ist jedoch ein allgemeines Java-Thema und nicht Mule geschuldet. Trotzdem hält sich die Länge des Java-Codes in Grenzen, es sind weniger als 300 Zeilen Code. Die ersten 100 davon sind nur die Imports, Variablendeklarationen sowie Getter und Setter, der eigentlich Kern besteht aus weniger als 200 Zeilen. Dafür lassen sich jetzt beliebige externe Programme zur Transformation einsetzen, sofern sie Streaming unterstützen.

Weitere Teile dieser Artikelserie

  1. Was ist ein ESB und wofür kann man ihn nutzen?
  2. Tutorial “Enterprise Service Bus mit Mule ESB”: Hello World/Bus
  3. Tutorial “Enterprise Service Bus mit Mule ESB”: MuleMessage und Java-Komponenten
  4. Tutorial „Enterprise Service Bus mit Mule ESB“: Nachrichten mit Java transformieren
  5. Tutorial „Enterprise Service Bus mit Mule ESB“: Integration von CICS Programmen
  6. Tutorial “Enterprise Service Bus mit Mule ESB”: Transport, Connector, Endpoint: Ein paar Grundlagen…
  7. Tutorial “Enterprise Service Bus mit Mule ESB”: Performance und Threads
  8. Tutorial “Enterprise Service Bus mit Mule ESB”: Steuerung und Kontrolle per JMX
  9. Veröffentlichen von Informationen zu Mule Applikationen im Maven-Umfeld
  10. Tutorial “Enterprise Service Bus mit Mule ESB”: Exceptions und Email

Beitrag teilen

Gefällt mir

0

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.