Datenlookup in Spark Streaming

2 Kommentare

Bei der Verarbeitung von Streaming-Daten reichen die Rohdaten aus den Events häufig nicht aus. Meist müssen noch zusätzliche Daten hinzugezogen werden, beispielsweise Metadaten zu einem Sensor, von dem im Event nur die ID mitgeschickt wird.

In diesem Blogpost möchte ich auf verschiedene Möglichkeiten eingehen, dieses Problem in Spark Streaming zu lösen. Ich gehe in meinen Beispielen davon aus, dass die zusätzlichen Daten zunächst einmal außerhalb der Streaming-Anwendung liegen und über das Netzwerk – zum Beispiel in einer Datenbank – gelesen werden können. Alle Beispiele und Techniken beziehen sich auf Spark Streaming und nicht auf Spark Structured Streaming. Die wesentlichen Techniken sind

  • Broadcast: für statische Daten
  • MapPartitions: für volatile Daten
  • MapPartitions + Connection Broadcast: effektives Connectionhandling
  • MapWithState: Geschwindigkeit durch einen lokalen State

Broadcast

Spark besitzt einen integrierten Broadcast-Mechanismus, mit dem beim Start der Anwendung Daten auf alle Worker Nodes übertragen werden können. Dies hat insbesondere bei großen Datenmengen den Vorteil, dass die Übertragung nur einmal pro Worker Node erfolgt und nicht bei jedem Task.

Da die Daten allerdings im weiteren Verlauf nicht aktualisiert werden können, ist dies nur eine Option wenn die Metadaten statisch sind. Das heißt, es dürfen keine zusätzlichen Daten, zum Beispiel Informationen über neue Sensoren, hinzukommen, und es dürfen auch keine Daten verändert werden. Zudem muss das übertragene Objekt serialisierbar sein.

In diesem Beispiel soll jeder Sensor-Typ, gespeichert als numerische ID (1,2,…) im Stream Processing durch einen Klartextnamen ersetzt werden (Reifentemperatur, Reifendruck,..). Es wird davon ausgegangen, dass die Zuordnung Typ ID -> Name fix ist.

val namesForId: Map[Long,String] = Map(1 -> "Wheel-Temperature", 2 -> "Wheel-Pressure")
stream.map (typId => (typId,namesForId(typId)))
Ein Lookup ohne Broadcast. Die Map wird für jeden Task serialisiert und auf die Worker Nodes übertragen, auch wenn vorher schon Tasks auf dem Worker ausgeführt wurden.
val namesForId: Map[Long,String] = Map(1 -> "Wheel-Temperature", 2 -> "Wheel-Pressure")
val namesForIdBroadcast = sc.broadcast(namesForId)
stream.map (typId => (typId,namesForIdBroadcast.value(typId)))
Die Map wird über einen Broadcast auf die Worker verteilt und muss anschließend nicht mehr für jeden Task übertragen werden.

MapPartitions

Die erste Möglichkeit nicht statische Daten zu lesen, ist in einer map() Operation. Dabei sollte allerdings nicht map(), sondern mapPartitions() aufgerufen werden. Diese wird nicht für jedes einzelne Element aufgerufen, sondern für jede Partition, die dann mehrere Elemente enthält. Dies ermöglicht es, die Verbindung zur Datenbank nur einmal pro Partition aufzubauen und dann für alle Elemente wieder zu verwenden.

Für die Abfrage der Daten gibt es zwei verschiedene Möglichkeiten: Die Nutzung einer Bulk API, um alle Elemente der Partition gemeinsam zu verarbeiten, oder eine asynchrone Variante. Dabei wird für jeden Eintrag eine asynchrone, nicht blockierende Query abgesetzt und die Ergebnisse im Anschluss eingesammelt.

wikiChanges.mapPartitions(elements => {
  Session session = // create database connection and session
  PreparedStatement preparedStatement = // prepare statement, if supported by database
  elements.map(element => {
    // extract key from element and bind to prepared statement
    BoundStatement boundStatement = preparedStatement.bind(???)
    session.asyncQuery(boundStatement) // returns a Future
  })
  .map(...) //retrieve value from future
})
Ein Beispiel für den Lookup von Daten, gespeichert in Cassandra, mit mapPartitions und asynchronen Abfragen

Das obige Beispiel zeigt schematisch einen Lookup mittels mapPartitions: Teure Operationen wie das Herstellen der Verbindung erfolgen nur einmal pro Partition. Für jedes Element wird eine asynchrone, nicht blockierende Query abgesetzt und anschließend die Werte aus den Futures ermittelt. Einige Bibliotheken zum Lesen aus Datenbanken nutzen im Wesentlichen dieses Muster, so beispielsweise das joinWithCassandraTable aus dem Spark Cassandra Connector.

Warum wird die Connection nicht einmal zu Beginn des Jobs erstellt und dann für jede Partition genutzt? Dafür müsste die Connection serialisiert werden, um dann für jeden Task an die Worker übertragen zu werden. Die Datenmenge dabei wäre in der Tat nicht zu groß, allerdings sind die meisten Connection-Objekte nicht serialisierbar.

Broadcast Connection + MapPartitions

Trotzdem ist es eine gute Idee, die Connection nicht für jede Partition neu aufzubauen, sondern nur einmal pro Worker. Um dies zu erreichen, wird nicht die Connection gebroadcastet, da diese nicht serialisierbar ist (siehe oben), sondern eine Factory, die beim ersten Aufruf die Connection aufbaut und bei allen weiteren aufrufen diese Connection dann zurück gibt. Diese Funktion wird dann in mapPartitions() aufgerufen, um die Verbindung zur Datenbank zu erhalten.

In Scala ist es nicht nötig, dafür eine Funktion zu verwenden. Hier kann ein lazy val genutzt werden. Der lazy val wird innerhalb einer Wrapper-Klasse definiert. Diese Klasse ist serialisierbar und kann somit gebroadcastet werden. Auf den Worker Nodes wird dann beim ersten Aufruf eine Instanz der nicht serialisierbaren Connection-Klasse erzeugt.

class DatabaseConnection extends Serializable {
  lazy val connection: AConnection = {
    // all the stuff to create the connection
    new AConnection(???)
  }
}
val connectionBroadcast = sc.broadcast(new DatabaseConnection)
incomingStream.mapPartitions(elements => {
  val connection = connectionBroadcast.value.connection
  // see above
})

mapWithState()

Alle bislang gezeigten Lösungsansätze holen die Daten bei Bedarf aus einer Datenbank. Dies bedeutet in der Regel einen Zugriff über ein Netzwerk für jeden Eintrag oder zumindest für jede Partition. Effizienter wäre es, die Daten direkt in-memory verfügbar zu haben.

Stateful stream procssing within an operation

Spark selbst bietet mit mapWithState() eine Möglichkeit, Daten mittels einen States zu verändern und im Gegenzug ebenfalls den State anzupassen. Der State wird dabei anhand eines Schlüssels verwaltet. Mittels dieses Schlüssels werden die Daten im Cluster verteilt, sodass nicht auf jedem Worker Node alle Daten vorgehalten werden müssen. Ein eingehender Stream muss dementsprechend auch als Schlüssel-Wert-Paar aufgebaut sein.

Dieser keyed-State kann auch für einen Lookup genutzt werden. Mittels initialState() kann ein RDD als initialer Zustand übergeben werden. Jegliche Updates können dann allerdings nur noch basierend auf einem Schlüssel durchgeführt werden. Dies gilt ebenso für das Löschen von Einträgen. Es ist nicht möglich, den State komplett zu löschen oder neu zu laden.

Um den State zu aktualisieren, müssen zusätzliche Notification Events im Stream vorhanden sein. Diese können beispielsweise aus einem separaten Kafka Topic kommen und mit dem eigentlichen Datenstream zusammengeführt werden (union()). Die gesendete Datenmenge kann dabei von einer einfachen Benachrichtigung mit einer ID, die dann für das Lesen der neuen Daten genutzt wird, bis hin zu dem kompletten neuen Datensatz reichen.

Geschrieben wird in das Kafka Topic zum Beispiel wenn Metadaten aktualisiert oder neu angelegt werden. Außerdem können zeitgesteuert Events in das Kafka Topic eingestellt werden oder aber durch einen Custom Receiver in Spark selber erzeugt werden.

Data Lookup in Spark Streaming using mapWithState

Eine einfache Implementierung kann so aussehen. Zunächst werden hier die Streams von Kafka gelesen und die Schlüssel zusätzlich mit einer Markierung des Datentyps ergänzt (data oder notification). Anschließend werden beide Streams zu einem gemeinsamen Stream vereint und in mapWithState() verarbeitet. Der State wurde zuvor spezifiziert indem die Funktion des States angegeben wurde.

val kafkaParams = Map("metadata.broker.list" -> brokers)
val notifications = notificationsFromKafka
  .map(entry => ((entry._1, "notification"), entry._2))
val data = dataFromKafka
  .map(entry => ((entry._1, "data"), entry._2))
val lookupState = StateSpec.function(lookupWithState _)
notifications
  .union(data)
  .mapWithState(lookupState)

Die lookupWithState Funktion beschreibt die Verarbeitung im State. Folgende Parameter werden dabei übergeben:

  • batchTime: die Startzeit des aktuellen Microbatches
  • key: der Schlüssel, in diesem Fall der Originalschlüssel aus dem Stream, zusammen mit der Typmarkierung (data oder notification)
  • valueOpt: der Wert zum Schlüssel im Stream
  • state: der Wert, der im State zum Schlüssel gespeichert ist

Zurückgegeben wird ein Tupel bestehend aus dem Originalschlüssel und dem Originalwert sowie einer zufälligen Zahl, die aus dem State entnommen wird oder – falls noch nicht im State vorhanden – zufällig gewählt wird.

def lookupWithState(batchTime: Time, key: (String, String), valueOpt: Option[String], state: State[Long]): Option[((String, String), Long)] = {
  key match {
    case (originalKey, "notification") =>
      // retrieve new value from notification or external system
      val newValue = Random.nextLong()
      state.update(newValue)
      None // no downstream processing for notifications
    case (originalKey, "data") =>
      valueOpt.map(value => {
        val stateVal = state.getOption() match {
          // check if there is a state for the key
          case Some(stateValue) => stateValue
          case None =>
            val newValue = Random.nextLong()
            state.update(newValue)
            newValue
        }
      ((originalKey, value), stateVal)
      })
  }
}

Zudem kann auch noch der timeout-Mechanismus des mapWithState() genutzt werden, um Events nach einer bestimmten Zeit ohne Aktualisierung aus dem State zu entfernen.

Fazit

Das Laden zusätzlicher Informationen ist ein häufiges Problem in Streaming-Anwendungen. Mit Spark Streaming gibt es eine Reihe von Möglichkeiten, dies zu bewerkstelligen.

Am einfachsten ist der Broadcast statischer Daten beim Start der Anwendung. Für volatile Daten ist das Lesen per Partition einfach zu implementieren und bietet bereits eine solide Performance. Bei der Benutzung der Spark States kann die Geschwindigkeit noch gesteigert werden, allerdings ist es insgesamt aufwendiger zu entwickeln.

Optimalerweise liegen die Daten immer aktuell direkt auf dem Worker Node vor, auf dem die Daten verarbeitet werden. Dies ist beispielsweise bei der Benutzung des Spark States der Fall. Kafka Streams verfolgt diesen Ansatz noch konsequenter. Hier wird eine Tabelle als Stream behandelt und – vorausgesetzt die Streams sind identisch partitioniert – genauso im Cluster verteilt wie der originale Stream. So sind lokale Lookups möglich.

Auch bei Apache Flink wird an effizienten Lookups gearbeitet, hier unter dem Titel Side Inputs.

Matthias Niehoff

Matthias beschäftigt sich insbesondere mit Big-Data- und Streaming-Anwendungen auf Basis von Apache Cassandra und Apache Spark. Dabei verliert er aber auch andere Tools und Werkzeuge im Big-Data-Bereich nicht aus den Augen. Matthias teilt seine Erfahrungen auf Konferenzen, Meet-ups und Usergroups.

Share on FacebookGoogle+Share on LinkedInTweet about this on TwitterShare on RedditDigg thisShare on StumbleUpon

Kommentare

  • Timo Walther

    20. September 2017 von Timo Walther

    Effizientere Lookups in Flink sind bereits seit Version 1.2 unter dem Stichwort Async IO enthalten. Für Interessierte empfehle ich folgende Seite: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

    • Matthias Niehoff

      20. September 2017 von Matthias Niehoff

      Hi Timo,

      danke für den Hinweis.

      AysncIO hilft natürlich für effiziente, asynchrone Lookups, vergleichbar mit mapPartitions + asynchrone Queries in Spark Streaming. Flink nimmt einem hier noch etwas Boilerplate Code ab.
      SideInputs gehen dann ja noch einen Schritt weiter und würde die Daten nicht erst (remote) lesen, sondern bereits im State vorhalten, oder liege ich da falsch?

Kommentieren

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.