Event-Zeit-Verarbeitung in Apache Spark und Apache Flink

Keine Kommentare

Mit dem neuen Release von Spark 2.1 wurden die Eventzeit-Fähigkeiten von Spark Structured Streaming ausgebaut. Höchste Zeit also den Stand der Unterstützung genauer unter die Lupe zu nehmen und mit Apache Flinkausgestattet mit einem breiten Support für Event-Time-Verarbeitung – zu vergleichen. In diesem Artikel werde ich beschreiben, wie die drei grundsätzlichen Lösungskonzepte für Eventzeitverarbeitung – Watermarks, Trigger und Akkumulatoren – funktionieren und dann ihre Implementierung in Spark und Flink vergleichen. Basis des Vergleichs ist Spark in Version 2.1 und Flink in Version 1.2.

Für einen weitergehenden Vergleich der Frameworks sei auf Verteilte Stream Processing Frameworks für Fast Data & Big Data – Ein Meer an Möglichkeiten verwiesen. Dort werden auch weitere Grundlagen beschrieben, die in diesem Artikel vorausgesetzt werden, beispielsweise zum Stafeful & Window Stream Processing sowie zum Unterschied zwischen Event und Verarbeitungszeit.

Zentrales Problem der Eventzeitverarbeitung sind unsortierte sowie verspätete Events. Ereignisse werden nicht exakt zur der Zeit vom System beobachtet zu der sie auftreten. Die Differenz zwischen Eventzeit und Verarbeitungszeit ist dabei nicht konstant. Insbesondere bei zeitlichen Auswertungen erhöht dies die Komplexität des Systems und damit auch der Pogrammierung. Wenn beispielsweise alle Ereignisse zwischen 12 und 13 Uhr verarbeitet werden sollen (eine typische Window Operation), stellen sich mehrere Fragen: Wie soll mit verspäteten Events umgegangen werden? Wie lange soll auf verspätete Einträge gewartet werden? Wie soll das Resultat bei verspäteten Events aktualisiert werden? Google hat mit der Dataflow API, die nun in Apache Beam aufgegangen ist, verschiedene Lösungskonzepte für diese Probleme vorgestellt. Sowohl Spark und Flink implementieren nicht explizit die Dataflow API, allerdings sind die Konzepte ähnlich und können deshalb verglichen werden.

Die Dataflow Konzepte sind ein Weg denen Problemen bei der Eventzeit Verarbeitung zu begegnen. Zum Beispiel nutzt Kafka Streams andere Strategien für den Eventzeit Support. Mehr dazu hoffentlich in einem folgenden Blogpost.

Watermarks

Um zu ermitteln, bis zu welcher Zeit Events bereits verarbeitet wurden, gibt es Watermarks. Diese zeigen – wie bei einem Wasserstand –  den bislang erreichten “Zeitpegel” an. Beim Erreichen eines Watermarks wird das Ergebnis der Berechnung dann materialisiert. Wenn beispielsweise der Watermark als der Zeitpunkt des neusten Events abzüglich eines fixen Puffers von 30 Sekunden definiert ist, bedeutet dies “Es wird angenommen, dass nun alle Events bis zum Zeitpunkt x angekommen sind. x ist in diesem Fall der Watermark definiert als Zeitpunkt(neuestes Event) – 30 Sekunden

Zudem gibt es noch heuristische Watermarks, bei denen der Puffer – zum Beispiel basierend auf empirischen Messungen – dynamisch angepasst wird. So könnte das System beobachten, dass Events nachts immer mit deutlicher Verzögerung empfangen werden. Der Puffer könnte auf dieser Grundlage erhöht werden. Ebenso könnte aus der Verzögerung der letzten Stunde die erwartete Verzögerung extrapoliert werden. Außerdem gibt es noch den – zumeist unrealistischen – perfekten Watermark, bei dem die Verarbeitung des Event direkt beim Auftreten des Events stattfindet.

Häufig wird im Zusammenhang mit Watermarks auch die “allowed lateness “ genannt – auf Deutsch „die erlaubte Verspätung“. Bei Spark beispielsweise bezeichnet diese den oben beschriebenen Puffer bei den Watermarks. Im Dataflow Model hingegen ist die allowed lateness eine zusätzliche Zeitspanne nach dem Watermark, in der Events nicht ignoriert werden, sondern nachträglich das Ergebnis beeinflussen können. Dazu wird in der Regel über Trigger definiert, wie und wie oft Events verarbeitet werden sollen, die innerhalb der allowed lateness auftreten.

Unabhängig von der Interpretation der allowed lateness müssen alle Daten für die Berechnung des Ergebnisses so lange gespeichert bleiben, bis die allowed lateness verstrichen ist, um die Ergebnisse aktualisieren zu können. Anschließend werden die Daten automatisch gelöscht.

Trigger

Während Watermarks den aktuellen Stand der eingegangen Daten anzeigen, lösen Trigger die Berechnung aus. Der Watermark Trigger ist dabei der Einfachste: Dieser wird ausgelöst, sobald der Watermark das Ende des Zeitfenster erreicht – zum Beispiel 13 Uhr bei einem Fenster von 12 bis 13 Uhr. Mit dem oben definierten Watermark mit einem fixen Puffer von 30 Sekunden wird dies um 13:00:30 der Fall sein.

Weitere Trigger sind möglich:

  • auf der Verarbeitungszeit, zum Beispiel alle 2 Minuten
  • auf der Eventanzahl, beispielsweise alle 100 Events
  • bei speziellen Events, zum Beispiel das Ende einer Datei, einem technischen Flush Event oder auch basierend auf dem Inhalt des Events.
  • Kombinationen aus den vorherigen dreien

Trigger werden typischerweise genutzt, um bereits vor Erreichen des Watermarks Zwischenresultate zu ermitteln oder um die Ergebnisse bei verspäteten Events zu aktualisieren. Möglich wäre es zum Beispiel bei einem Fenster von 10 Minuten, jede Minute zu triggern, dann noch einmal beim Watermark und letztendlich bei jedem verspäteten Event bis die allowed lateness erreicht ist.

Akkumulatoren

Wenn nun das Ergebnis durch die Nutzung von Triggern mehrmals berechnet wird, muss der Entwickler definieren wie mit den einzelnen Teilergebnissen umgegangen wird. Dafür gibt es drei Varianten:

  • Discarding: Bei jedem Trigger wird nur das neue Teilergebnis weitergegeben und die bis hierhin ermittelten Resultate werden anschließend gelöscht.
  • Accumulating: Bei jedem Trigger wird das aktuelle Ergebnisse aktualisiert und weitergeben. Das Teilergebnis wird nicht gelöscht.
  • Accumulating & Retracting: Wie Accumulating mit der zusätzlichen Information über das vorherige Ergebnis, sodass nachfolgende Operationen ihre Ergebnisse einfach korrigieren können. Dieser Modus ist zwar im DataFlow Model vorgesehen, wird bislang aber von keinem Framework implementiert.

Die Wahl des passenden Akkumulators hängt stark von der nachfolgenden Verarbeitung und letztendlichen Senke ab. Wenn diese in der Lage ist Ergebnisse zu aktualisieren, kann der Accumulating Modus genutzt werden. Wenn allerdings jedes Teilergebnis nur einmal als Input für den nächsten Schritt dienen darf, muss der Discarding Mode verwendet werden.

Unterstützung in Spark

Spark benötigt  keinen speziellen Modus zur Eventzeitverarbeitung. Intern unterscheidet sich zunächst nichts von der Verarbeitungszeit. Um in Spark ein Fenster für die Eventzeit zu definieren, muss als Erstes nach dieser gruppiert werden:

val words = ... // streaming DataFrame mit dem Schema { timestamp: Timestamp, word: String }
// Gruppieren nach Window und Word, berechnen der Anzahl für jede Gruppe
val windowedCounts = words.groupBy(
    window($"timestamp", "10 minutes", "5 minutes"),
    $"word"
).count()

Dies unterscheidet sich nicht wesentlich von einem groupBy nach einem Schlüssel nur dass der Schlüssel ein Zeitfenster ist. In diesem Fall ist es ein Zeitfenster von 10 Minuten mit einem Sliding Intervall von 5 Minuten. Zusätzlich können die Einträge noch nach einem fachlichen Key gruppiert werden, in diesem Fall das „word“. Durch das anschließende Zählen erhält man einen WordCount für ein 10 Minuten Fenster.

Im obigen Beispiel werden alle Daten unendlich lange gespeichert, damit auch bei verspäteten Events das Ergebnis noch aktualisiert werden kann. Mit Watermarks kann diese Zeit begrenzt werden:

val windowedCounts = words
   .withWatermark("timestamp", "10 minutes")
   .groupBy(
       window($"timestamp", "10 minutes", "5 minutes"),
       $"word")
   .count()

Nun wartet Spark nur 10 Minuten auf verspätete Daten. Anschließend werden die Daten für diese Fenster gelöscht. Aktuell kann nur ein Watermark mit einer fixen erlaubten Verspätung genutzt werden. Bei Spark ist der Watermark aktuell gleichbedeutend zur allowed lateness.

Spark implementiert aktuell zwei verschiedene Modi zur Ausgabe des Ergebnisses:

  • Beim Append Mode wird das Ergebnis des Fensters nach Erreichen des Watermarks, hier also 10 Minuten nach dem Ende des Zeitfensters, ausgegeben. Der Watermark fügt hier eine zusätzliche Latenz hinzu.
  • Beim Complete Mode wird das bislang errechnete Ergebnis bei jedem Trigger ausgegeben. Allerdings unterstützt Spark bislang nur Trigger auf Basis der Verarbeitungszeit. Zeitunabhängig oder gar zusammengesetzte Trigger werden aktuell noch nicht unterstützt.

In Verbindung mit Watermarks kann zudem nur der Append Mode genutzt werden, da für den Complete Mode alle bisherigen Daten vorhanden sein müssen und dementsprechend nicht nach Erreichen des Watermarks gelöscht werden können.

Spark bietet somit eine rudimentäre Unterstützung für Watermarks (nur fixe Verspätung) und Trigger (nur auf Verarbeitungszeit). Bei den Akkumulationen entsprechen die implementierten Output Modes von Spark am ehesten dem Accumulating Modus der Datastream API, da immer das komplette Ergebnis für ein Zeitfenster nach Ablauf der erlaubten Verzögerung ermittelt wird.

Unterstützung in Flink

Flink muss zunächst mitgeteilt werden, dass die Verarbeitung nun nach der Eventzeit erfolgen soll. Dies geschieht per

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTIme);

Abhängig von der Zeit-Charakteristik verhalten sich beispielsweise die verschiedenen Window Implementierungen unterschiedlich. Außerdem wird die Eventzeit der Stream Quelle berücksichtigt. Falls die Quelle keine Eventzeit bereitstellt, muss die Eventzeit manuell aus dem Event extrahiert werden mittels Timestamp Assignern. Ebenso muss der Watermark für diese Events festgelegt werden.

stream.assignTimestampsAndWatermarks(new TimestampAndWatermarkAssigner());

Der Entwickler kann den Assigner dabei vollständig selber implementieren oder vordefinierte Implementierungen erweitern. So sind insbesondere für die Watermarks unterschiedliche Implementierungen möglich:

  • mit einem fixen Abstand
  • mit einem dynamischen, aber begrenzten Abstand
  • oder basierend auf bestimmten Events.
class FixedWatermarkGenerator extends AssignerWithPeriodicWatermarks[SomeEvent] {
 
   override def extractTimestamp(element: SomeEvent, previousElementTimestamp: Long): Long = {
       element.getEventTimestamp
   }
 
   override def getCurrentWatermark(): Watermark = {
       // the watermark is 10s behind the current time
       new Watermark(System.currentTimeMillis() - 10000)
   }
}

Der Watermark wird genutzt, um den Zeitpunkt zu bestimmen, an dem die meisten Daten verarbeitet wurden. Zu diesem Zeitpunkt wird dann eine Berechnung durchgeführt. Im obigen Beispiel liegt der Watermark immer 10 Sekunden hinter der aktuellen Zeit. Zusätzlich zum Watermark kann noch eine allowed lateness angegeben werden. Diese bezeichnet die Zeit, die ein Event nach einem Watermark verspätet sein darf. Definiert wird die allowed lateness immer zusammen mit einer Window Operation.

stream
   .assignTimestampsAndWatermarks(new TimestampAndWatermarkAssigner());
   .keyBy(event -> event.someKey)
   .window((SlidingEventTimeWindows.of(Time.minutes(15), Time.minutes(5)))
   .allowedLateness(Time.minutes(2))
   .apply()

Der obige Auszug zeigt ein Sliding Window mit einer Länge von 15 Minuten, einem Slidingintervall von 5 Minuten und einer allowed lateness von 2 Minuten.

Wenn es um die Berechnung eines Fensters geht, bietet Flink neben dem Watermark weitere Trigger. Diese Trigger können sowohl auf die Zeit (maßgebend ist die definierte Charakteristik, siehe oben), auf eine bestimmten Anzahl (zum Beispiel alle 100 Events) oder auch auf  eine Mischung aus beiden reagieren. Ebenso können auch dynamisch Trigger registriert werden, die in der Zukunft ausgeführt werden, beispielsweise eine bestimmte Zeit nach einem Event.

Bleibt zuletzt noch die Frage nach den Akkumulatoren: Standardmäßig werden die vollständigen aktualisierten Daten an nachfolgende Operationen innerhalb der Streaming Anwendung weitergegeben. Stattdessen kann auch ein Fire and Purge Trigger genutzt werden, bei dem alle aktuellen Daten nach dem Trigger gelöscht werden und alle weiteren Trigger nur die neuen Daten weitergeben. Dies ist dann effektiv ein Discarding Akkumulator.

Flink bietet somit mit den unterschiedlichen Watermarks und den flexiblen Triggern sowie Windows umfangreichen Support bei der Eventzeit Verarbeitung. Trotz der Flexibilität ist es ohne größeren Aufwand möglich Standardfälle zu implementieren.

Zusammenfassung und Empfehlung

Es ist offensichtlich, dass Flink schon deutlich länger an der Unterstützung für Eventzeit Verarbeitung arbeitet. So ist es auch zu erklären, dass deutlich mehr Konzepte bereits unterstützt werden. Flink arbeitet zudem kontinuierlich daran, die Dataflow Konzepte vollständig zu unterstützen, beispielsweise mit der Implementierung des Retracting Akkumulators.
Spark hingegen hat erst mit Structured Streaming begonnen die Eventzeitverarbeitung zu unterstützen. Bislang wurden vor allem die Grundlagen geschaffen und mit der Version 2.1 wurde erste Konzepte implementiert. Es ist zu erwarten, dass Spark im Laufe des Jahres die wesentlichen Funktionen implementieren wird. Wer aktuell allerdings Stream Processing mit Eventzeitverarbeitung benötigt, sollte mit Flink starten.

Matthias Niehoff

Matthias beschäftigt sich insbesondere mit Big-Data- und Streaming-Anwendungen auf Basis von Apache Cassandra und Apache Spark. Dabei verliert er aber auch andere Tools und Werkzeuge im Big-Data-Bereich nicht aus den Augen. Matthias teilt seine Erfahrungen auf Konferenzen, Meet-ups und Usergroups.

Share on FacebookGoogle+Share on LinkedInTweet about this on TwitterShare on RedditDigg thisShare on StumbleUpon

Kommentieren

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.