Datenbankoperationen in Mule 4 optimieren

Keine Kommentare

Häufig geht es in Mule-Projekten darum, Daten aus irgendeiner Quelle effizient in einer Datenbank abzulegen. Heute zeige ich, mit welchen Strategien man dabei die Performance optimieren kann.

Aufgabenstellung

Da es hier primär um Datenbankoperationen geht, habe ich den Rest bewusst einfach gestaltet: Über einen POST-Request wird eine CSV-Datei an Mule geschickt, z.B. mit curl:

curl -X POST \
     -H "Content-Type: application/csv" \
     --data-binary @coordinates.csv \
     http://localhost:8080/foreach

Die Datei enthält Koordinaten (Breiten-/Längengrad) und zusätzlich die Höhe pro Zeile:

latitude,longitude,altitude
7.144122,51.280051,759.578857
7.155711,51.275112,752.368896

Die Datenbanktabelle enthält zusätzlich eine ID-Spalte. Diese ist vom Dateianfang an ab 0 mitzuzählen, so dass die Reihenfolge der Zeilen rekonstruierbar ist.

Erster einfacher Mule-Flow

Die Basislösung ist recht simpel: Der POST-Request wird von einem http-listener entgegengenommen. Ein einfaches DataWeave-Script wandelt nicht nur das CSV-Format in eine Java-Struktur (ArrayList of Map) um, sondern zählt auch die ID hoch:

%dw 2.0
output application/java
---
payload map (row, index) -> {
	id: index,
	longitude: row.longitude,
	latitude: row.latitude,
	altitude: row.altitude
}

Ein For-Each Scope iteriert über die Zeilen ArrayList, in dem For-Each steht ein Insert, der je eine Zeile in die Datenbank einfügt. Die Query besteht aus einem simplen Insert-Statement:

Insert-Statement

Insgesamt erhält man den folgenden Flow:

Flow mit foreach

Damit wir die Performance der verschiedenen Varianten einfach vergleichen können, mache ich noch eine Zeitmessung und gebe die Laufzeit am Ende per Logger aus. Der Aufrufer erhält als Ergebnis ein JSON-Dokument als Erfolgsmeldung.

Mein Laptop benötigt mit einer MariaDB als Backend für eine CSV-Datei mit 10.000 Zeilen ca. 40 Sekunden, das heißt, es werden ca. 250 Zeilen / Sekunde verarbeitet. Für eine Million Zeilen wären das – hochgerechnet – 4.000 Sekunden, also mehr als eine Stunde.

Kann ich jetzt optimieren? Sicherlich, sonst hätte ich den Titel anders gewählt :-). Sollte ich jetzt optimieren? Beraterantwort: Kommt drauf an. Wie sind die nichtfunktionalen Anforderungen? Muss ich täglich – in einem Batch – 1.000 Zeilen importieren? Stören mich die dann zu erwartenden vier Sekunden? Vermutlich nicht. Muss ich täglich eine oder 100 Millionen Zeilen importieren? Letzteres würde mit der beim Test eingesetzten Hardware nicht mehr funktionieren: 100 Millionen durch 250 Zeilen/s ergibt 4,6 Tage, also zu lang für einen Tag. Dann wäre eine Optimierung zwingend erforderlich.

Erste Analyse und Performance-Verbesserung

Eigentlich gehört vor irgendwelche Veränderungen eine Analyse mit einem Profiler, damit man weiß, wo die Zeit verbraucht wird. Mit etwas Hintergrundwissen ist hier aber auch so klar, was passiert. Ein DataWeave kann auf gängiger Hardware deutlich mehr als 250 Zeilen/s verarbeiten. Wer es messen möchte, kann das For-Each mit dem Insert testweise weglassen.

Schauen wir uns die Schleife mit dem Insert also etwas genauer an, insbesondere den Advanced-Reiter vom Insert:

Transaktionshandling konfigurieren

Im Dropdown „Transactional action“ steht der Default-Wert: JOIN_IF_POSSIBLE. Was bedeutet das? Wenn schon eine Transaktion auf der konfigurierten JDBC-Verbindung läuft, nimmt das Insert-Statement an dieser Transaktion teil. Ansonsten – was hier der Fall ist – wird für das Statement eine Transaktion aufgemacht, das Statement ausgeführt und die Transaktion anschließend per Commit beendet.

Wer jetzt auf die Idee kommt, dass es mit NOT_SUPPORTED besser wird: Zu kurz gedacht. Ohne Transaktion gibt es bei SQL-Datenbanken nicht, das heißt dann – genau wie vorher – eine Transaktion pro Statement und damit gleich langsam.

Auf der Datenbankverbindung habe ich übrigens einen Connection-Pool konfiguriert, für die Transaktionen (Statements) muss also jeweils „nur“ eine Connection aus dem Pool geholt und wieder zurückgegeben werden. Ohne Pool müsste immer eine neue Connection geholt werden, das wäre deutlich langsamer.

Eine erste Optimierung besteht darin, nur eine Transaktion zu verwenden. Das ist auch nicht weiter schwierig: Ich habe einfach das For-Each in einen Try Scope verschoben und dessen transaktionales Verhalten auf ALWAYS_BEGIN gestellt:

Flow mit einer großen Transaktion

Zusammen mit dem JOIN_IF_POSSIBLE laufen jetzt alle Insert-Statements in einer Transaktion ab. Was bringt es? Eine Menge, jetzt werden 3.817 Zeilen/s verarbeitet, also mehr als die zehnfache Performance verglichen mit der ersten Lösung. Für eine Million Zeilen sind das 262 Sekunden, also gut vier Minuten. Das hat sich ja schon mal gelohnt…

Genug optimiert?

Die Optimierung mit einer großen Transaktion über die gesamte Datenmenge hat jedoch Folgen:

  1. Transaktionen in der Datenbank gibt es nicht umsonst, um sie ggf. wieder zurückrollen zu können, muss die Datenbank Speicher anfordern (RAM oder Massenspeicher). Je nach Datenbank ist dieser teilweise stark limitiert, so dass Transaktionen ab einer gewissen Größe scheitern.
  2. Andere Prozesse, die auf der gleichen Tabelle (bei komplizierten Jobs ggf. auch mehrere Tabellen) arbeiten, werden unter Umständen länger blockiert und laufen im schlimmsten Fall in Timeouts.
  3. Der gesamte Import läuft aus Sicht anderer Prozesse atomar ab.

Die ersten beiden Punkte können problematisch sein, der dritte dagegen kann sogar Teil der Anforderung sein, wenn andere Prozesse keinen unvollständigen Import sehen dürfen.

Die goldene Mitte

Wenn eine Transaktion zu groß ist und eine Transaktion für jede Zeile zu klein ist, wie wäre es dann mit der goldenen Mitte? Wie muss die Mitte aussehen? Groß genug, damit der Overhead für die Transaktionsverwaltung (weitgehend) unter den Tisch fällt, klein genug, damit wir kein Speicherproblem bekommen. Bei kleinen Datensätzen ist mein erster Reflex immer die 1.000 als Anzahl, im Detail sollte man das aber im Kontext evaluieren. Die Größe der Datensätze und die Latenz zwischen Anwendung und Datenbank können den „richtigen“ Wert beeinflussen.

Wie bekommt man die Datensätze aufgeteilt? Ganz einfach: Das ist ein Feature des For-Each Scope, hier habe ich die Batch Size vom default Wert 1 auf 1.000 hochgesetzt:

Batch-Size im For-Each konfigurieren

Zusätzlich vertauscht man noch den Try Scope und For-Each Scope, das heißt, For-Each außen, Try innen. Das sieht dann so aus:

Flow mit For-Each und Bulk-Insert

Was passiert nun? Die Collection, die den For-Each füttert, wird nicht in einzelne Objekte aufgeteilt, sondern in Päckchen von 1.000 Objekten. Moment, wie geht denn das mit dem Insert? Der kann doch nur einen Datensatz einfügen. Stimmt, der normale Insert schon, für den müsste man jetzt eine neue Schleife einfügen. Ich bin hier jedoch zwei Schritte auf einmal gegangen und habe den Insert durch einen „Bulk-Insert“ ersetzt. Was macht „Bulk-Insert“? Es akzeptiert nicht ein einzelnes Objekt, sondern ein Array von Objekten. Eine Instanz von PreparedStatement in Java kann nämlich ein Statement nicht nur mit einem Parametersatz, sondern mehreren ausführen. Dazu wird mit addBatch() ein neuer Parametersatz eingeleitet. Diese Funktion reicht Mule mit dem „Bulk-Insert“ durch.

In unserem Beispiel werden dadurch 999 von 1.000 Round-Trips zur Datenbank eingespart. Trotz der kleineren Transaktionen erreichen wir immer noch beinahe eine Verdopplung des Durchsatzes auf 6.410 Zeilen/s. Den gleichen Trick hätte man auch bei der großen Transaktion anwenden können, die Gründe dagegen habe ich jedoch bereits dargelegt. Insgesamt sind wir gegenüber der Ursprungslösung jetzt schon um den Faktor 25 schneller geworden.

Parellelität

Nachdem (!) die Möglichkeiten der sequentiellen Varianten ausgereizt sind (zumindest aus meiner Sicht) habe ich in der nächsten Variante Parallelität eingebaut. Um Teile parallel auszuführen, muss das Gesamtproblem – n Zeilen – erst mal zerlegt werden. Im For-Each erledigt das die „Batch size“. Wie geht es noch? DataWeave kennt dazu ein divideBy:

%dw 2.0
output application/java
import * from dw::core::Arrays
---
payload divideBy 1000

Damit wird ein beliebig großes Array in kleinere Arrays aus Päckchen der Größe 1.000 zerlegt. Die Päckchen landen dann wieder in einem Array, wobei das letzte Päckchen ggf. kleiner als 1.000 ist. Das Ergebnis schicken wir jetzt in ein Parallel-For-Each und erhalten folgenden Flow:

Flow mit Parallel-For-Each

Läuft schon mal, sogar schneller als die sequentielle Variante, 8.850 Zeilen/s. Wirklich überzeugt hat mich die Zahl jedoch nicht, von meinen 12 Cores (ok, virtuell, echt sind nur 6) hatte ich mehr erwartet. Woran könnte das liegen? In der Dokumentation zu Parallel-For-Each steht:

By default, all routes run in parallel.

„All routes in parallel“ könnte zu viel des Guten sein! In den meisten Fällen startet Mule 4 – im Gegensatz zu Mule 3 – automatisch mit einer „guten“ Zahl von Threads, hier übertreibt er aber. Mittels „Max concurrency“ lässt sich das Maultier jedoch bändigen, nachdem ich die Parallelität auf 12 Threads reduziert habe, lief es deutlich besser: 16.949 Zeilen/s, also beinahe eine Verdoppelung zur „unbounded“ Variante. Dies gilt zwar für meine Parameter, aber bitte nicht verallgemeinern. Selbst ausprobieren und messen.

Zusammenfassung

In der Grafik habe ich die Varianten mit ihrer Performance zusammengefasst:

Balkendiagramm mit Performance (Zeilen/s) der vorgestellten Lösungen

Bitte die Zahlen nicht zu genau betrachten, da die Unterschiede zwischen den Varianten sehr deutlich sind, habe ich nicht allzu genau gemessen (JIT warmlaufen lassen, mehrere Messungen, Durchschnitt bilden, etc.). Auch wenn am Ende die parallele Variante am schnellsten ist, die Parallelisierung hat „nur noch“ den Faktor drei gebracht. Viel wichtiger ist die Optimierung im Sequentiellen, dort ist der Hebel viel größer. Zentraler Punkt ist die Reduktion von Overhead:

  1. Anzahl von Transaktionen klein halten: Statements gruppieren
  2. Anzahl von Netzwerkroundtrips zur Datenbank klein halten: Bulk-Insert

Diese Effekte sind auf meinem Laptop noch harmlos, schließlich musste zur Datenbank nur über localhost kommuniziert werden. Schon ein einzelner Netzwerkhop führt zu einer höheren Latency, was die Effekte noch deutlich verstärken kann.

Die Optimierung im sequentiellen Fall benötigt auch kaum Ressourcen: Nur etwas mehr Speicher, um das Prepared Statement mit den Parametersätzen zu puffern, bevor es zur Datenbank geschickt wird. Die parallelen Varianten dagegen benötigen nicht nur eigene Threads sondern setzen die Datenbank durch parallele Verbindungen mehr unter Stress. Dazu liegen die Daten in der Tabelle vermutlich nicht mehr so schön in der gleichen Reihenfolge, wie sie ursprünglich in der CSV-Datei lagen und wie sie später ggf. wieder benötigt werden. Je nach Anwendungsfall kann dies relevant sein (Performance beim Lesen).

One more Thing…

Du hast Spaß an Integrationsprojekten und Interesse an API-Management? Mein Team und ich suchen Verstärkung!

Tags

Dr. Roger Butenuth hat in Karlsruhe Informatik studiert und anschließend in Paderborn promoviert (Kommunikation in Parallelrechnern). Er hat langjährige Erfahrung in der Projekt- und Produktentwicklung.

Über 1.000 Abonnenten sind up to date!

Die neuesten Tipps, Tricks, Tools und Technologien.
Jede Woche direkt in deine Inbox.

Kostenfrei anmelden und immer auf dem neuesten Stand bleiben!
(Keine Sorge, du kannst dich jederzeit abmelden.)

Kommentieren

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.