//

Datenlookup in Spark Streaming

1.6.2017 | 7 Minuten Lesezeit

Bei der Verarbeitung von Streaming-Daten reichen die Rohdaten aus den Events häufig nicht aus. Meist müssen noch zusätzliche Daten hinzugezogen werden, beispielsweise Metadaten zu einem Sensor, von dem im Event nur die ID mitgeschickt wird.

In diesem Blogpost möchte ich auf verschiedene Möglichkeiten eingehen, dieses Problem in Spark Streaming zu lösen. Ich gehe in meinen Beispielen davon aus, dass die zusätzlichen Daten zunächst einmal außerhalb der Streaming-Anwendung liegen und über das Netzwerk – zum Beispiel in einer Datenbank – gelesen werden können. Alle Beispiele und Techniken beziehen sich auf Spark Streaming und nicht auf Spark Structured Streaming. Die wesentlichen Techniken sind

  • Broadcast: für statische Daten
  • MapPartitions: für volatile Daten
  • MapPartitions + Connection Broadcast: effektives Connectionhandling
  • MapWithState: Geschwindigkeit durch einen lokalen State

Broadcast

Spark besitzt einen integrierten Broadcast-Mechanismus, mit dem beim Start der Anwendung Daten auf alle Worker Nodes übertragen werden können. Dies hat insbesondere bei großen Datenmengen den Vorteil, dass die Übertragung nur einmal pro Worker Node erfolgt und nicht bei jedem Task.

Da die Daten allerdings im weiteren Verlauf nicht aktualisiert werden können, ist dies nur eine Option wenn die Metadaten statisch sind. Das heißt, es dürfen keine zusätzlichen Daten, zum Beispiel Informationen über neue Sensoren, hinzukommen, und es dürfen auch keine Daten verändert werden. Zudem muss das übertragene Objekt serialisierbar sein.

In diesem Beispiel soll jeder Sensor-Typ, gespeichert als numerische ID (1,2,…) im Stream Processing durch einen Klartextnamen ersetzt werden (Reifentemperatur, Reifendruck,..). Es wird davon ausgegangen, dass die Zuordnung Typ ID -> Name fix ist.

1val namesForId: Map[Long,String] = Map(1 -> "Wheel-Temperature", 2 -> "Wheel-Pressure")
2stream.map (typId => (typId,namesForId(typId)))
3
Ein Lookup ohne Broadcast. Die Map wird für jeden Task serialisiert und auf die Worker Nodes übertragen, auch wenn vorher schon Tasks auf dem Worker ausgeführt wurden.
1val namesForId: Map[Long,String] = Map(1 -> "Wheel-Temperature", 2 -> "Wheel-Pressure")
2val namesForIdBroadcast = sc.broadcast(namesForId)
3stream.map (typId => (typId,namesForIdBroadcast.value(typId)))
4
Die Map wird über einen Broadcast auf die Worker verteilt und muss anschließend nicht mehr für jeden Task übertragen werden.

MapPartitions

Die erste Möglichkeit nicht statische Daten zu lesen, ist in einer map() Operation. Dabei sollte allerdings nicht map(), sondern mapPartitions() aufgerufen werden. Diese wird nicht für jedes einzelne Element aufgerufen, sondern für jede Partition, die dann mehrere Elemente enthält. Dies ermöglicht es, die Verbindung zur Datenbank nur einmal pro Partition aufzubauen und dann für alle Elemente wieder zu verwenden.

Für die Abfrage der Daten gibt es zwei verschiedene Möglichkeiten: Die Nutzung einer Bulk API, um alle Elemente der Partition gemeinsam zu verarbeiten, oder eine asynchrone Variante. Dabei wird für jeden Eintrag eine asynchrone, nicht blockierende Query abgesetzt und die Ergebnisse im Anschluss eingesammelt.

1wikiChanges.mapPartitions(elements => {
2  Session session = // create database connection and session
3  PreparedStatement preparedStatement = // prepare statement, if supported by database
4  elements.map(element => {
5    // extract key from element and bind to prepared statement
6    BoundStatement boundStatement = preparedStatement.bind(???)
7    session.asyncQuery(boundStatement) // returns a Future
8  })
9  .map(...) //retrieve value from future
10})
11
Ein Beispiel für den Lookup von Daten, gespeichert in Cassandra, mit mapPartitions und asynchronen Abfragen

Das obige Beispiel zeigt schematisch einen Lookup mittels mapPartitions: Teure Operationen wie das Herstellen der Verbindung erfolgen nur einmal pro Partition. Für jedes Element wird eine asynchrone, nicht blockierende Query abgesetzt und anschließend die Werte aus den Futures ermittelt. Einige Bibliotheken zum Lesen aus Datenbanken nutzen im Wesentlichen dieses Muster, so beispielsweise das joinWithCassandraTable aus dem Spark Cassandra Connector .

Warum wird die Connection nicht einmal zu Beginn des Jobs erstellt und dann für jede Partition genutzt? Dafür müsste die Connection serialisiert werden, um dann für jeden Task an die Worker übertragen zu werden. Die Datenmenge dabei wäre in der Tat nicht zu groß, allerdings sind die meisten Connection-Objekte nicht serialisierbar.

Broadcast Connection + MapPartitions

Trotzdem ist es eine gute Idee, die Connection nicht für jede Partition neu aufzubauen, sondern nur einmal pro Worker. Um dies zu erreichen, wird nicht die Connection gebroadcastet, da diese nicht serialisierbar ist (siehe oben), sondern eine Factory, die beim ersten Aufruf die Connection aufbaut und bei allen weiteren aufrufen diese Connection dann zurück gibt. Diese Funktion wird dann in mapPartitions() aufgerufen, um die Verbindung zur Datenbank zu erhalten.

In Scala ist es nicht nötig, dafür eine Funktion zu verwenden. Hier kann ein lazy val genutzt werden. Der lazy val wird innerhalb einer Wrapper-Klasse definiert. Diese Klasse ist serialisierbar und kann somit gebroadcastet werden. Auf den Worker Nodes wird dann beim ersten Aufruf eine Instanz der nicht serialisierbaren Connection-Klasse erzeugt.

1class DatabaseConnection extends Serializable {
2  lazy val connection: AConnection = {
3    // all the stuff to create the connection
4    new AConnection(???)
5  }
6}
7val connectionBroadcast = sc.broadcast(new DatabaseConnection)
8incomingStream.mapPartitions(elements => {
9  val connection = connectionBroadcast.value.connection
10  // see above
11})
12

mapWithState()

Alle bislang gezeigten Lösungsansätze holen die Daten bei Bedarf aus einer Datenbank. Dies bedeutet in der Regel einen Zugriff über ein Netzwerk für jeden Eintrag oder zumindest für jede Partition. Effizienter wäre es, die Daten direkt in-memory verfügbar zu haben.

Spark selbst bietet mit mapWithState() eine Möglichkeit, Daten mittels einen States zu verändern und im Gegenzug ebenfalls den State anzupassen. Der State wird dabei anhand eines Schlüssels verwaltet. Mittels dieses Schlüssels werden die Daten im Cluster verteilt, sodass nicht auf jedem Worker Node alle Daten vorgehalten werden müssen. Ein eingehender Stream muss dementsprechend auch als Schlüssel-Wert-Paar aufgebaut sein.

Dieser keyed-State kann auch für einen Lookup genutzt werden. Mittels initialState() kann ein RDD als initialer Zustand übergeben werden. Jegliche Updates können dann allerdings nur noch basierend auf einem Schlüssel durchgeführt werden. Dies gilt ebenso für das Löschen von Einträgen. Es ist nicht möglich, den State komplett zu löschen oder neu zu laden.

Um den State zu aktualisieren, müssen zusätzliche Notification Events im Stream vorhanden sein. Diese können beispielsweise aus einem separaten Kafka Topic kommen und mit dem eigentlichen Datenstream zusammengeführt werden (union()). Die gesendete Datenmenge kann dabei von einer einfachen Benachrichtigung mit einer ID, die dann für das Lesen der neuen Daten genutzt wird, bis hin zu dem kompletten neuen Datensatz reichen.

Geschrieben wird in das Kafka Topic zum Beispiel wenn Metadaten aktualisiert oder neu angelegt werden. Außerdem können zeitgesteuert Events in das Kafka Topic eingestellt werden oder aber durch einen Custom Receiver in Spark selber erzeugt werden.

Eine einfache Implementierung kann so aussehen. Zunächst werden hier die Streams von Kafka gelesen und die Schlüssel zusätzlich mit einer Markierung des Datentyps ergänzt (data oder notification). Anschließend werden beide Streams zu einem gemeinsamen Stream vereint und in mapWithState() verarbeitet. Der State wurde zuvor spezifiziert indem die Funktion des States angegeben wurde.

1val kafkaParams = Map("metadata.broker.list" -> brokers)
2val notifications = notificationsFromKafka
3  .map(entry => ((entry._1, "notification"), entry._2))
4val data = dataFromKafka
5  .map(entry => ((entry._1, "data"), entry._2))
6val lookupState = StateSpec.function(lookupWithState _)
7notifications
8  .union(data)
9  .mapWithState(lookupState)
10

Die lookupWithState Funktion beschreibt die Verarbeitung im State. Folgende Parameter werden dabei übergeben:

  • batchTime: die Startzeit des aktuellen Microbatches
  • key: der Schlüssel, in diesem Fall der Originalschlüssel aus dem Stream, zusammen mit der Typmarkierung (data oder notification)
  • valueOpt: der Wert zum Schlüssel im Stream
  • state: der Wert, der im State zum Schlüssel gespeichert ist

Zurückgegeben wird ein Tupel bestehend aus dem Originalschlüssel und dem Originalwert sowie einer zufälligen Zahl, die aus dem State entnommen wird oder – falls noch nicht im State vorhanden – zufällig gewählt wird.

1def lookupWithState(batchTime: Time, key: (String, String), valueOpt: Option[String], state: State[Long]): Option[((String, String), Long)] = {
2  key match {
3    case (originalKey, "notification") =>
4      // retrieve new value from notification or external system
5      val newValue = Random.nextLong()
6      state.update(newValue)
7      None // no downstream processing for notifications
8    case (originalKey, "data") =>
9      valueOpt.map(value => {
10        val stateVal = state.getOption() match {
11          // check if there is a state for the key
12          case Some(stateValue) => stateValue
13          case None =>
14            val newValue = Random.nextLong()
15            state.update(newValue)
16            newValue
17        }
18      ((originalKey, value), stateVal)
19      })
20  }
21}
22

Zudem kann auch noch der timeout-Mechanismus des mapWithState() genutzt werden, um Events nach einer bestimmten Zeit ohne Aktualisierung aus dem State zu entfernen.

Fazit

Das Laden zusätzlicher Informationen ist ein häufiges Problem in Streaming-Anwendungen. Mit Spark Streaming gibt es eine Reihe von Möglichkeiten, dies zu bewerkstelligen.

Am einfachsten ist der Broadcast statischer Daten beim Start der Anwendung. Für volatile Daten ist das Lesen per Partition einfach zu implementieren und bietet bereits eine solide Performance. Bei der Benutzung der Spark States kann die Geschwindigkeit noch gesteigert werden, allerdings ist es insgesamt aufwendiger zu entwickeln.

Optimalerweise liegen die Daten immer aktuell direkt auf dem Worker Node vor, auf dem die Daten verarbeitet werden. Dies ist beispielsweise bei der Benutzung des Spark States der Fall. Kafka Streams verfolgt diesen Ansatz noch konsequenter. Hier wird eine Tabelle als Stream behandelt und – vorausgesetzt die Streams sind identisch partitioniert – genauso im Cluster verteilt wie der originale Stream. So sind lokale Lookups möglich.

Auch bei Apache Flink wird an effizienten Lookups gearbeitet, hier unter dem Titel Side Inputs .

Beitrag teilen

Gefällt mir

0

//

Weitere Artikel in diesem Themenbereich

Entdecke spannende weiterführende Themen und lass dich von der codecentric Welt inspirieren.

//

Gemeinsam bessere Projekte umsetzen

Wir helfen Deinem Unternehmen

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.