Beliebte Suchanfragen

Cloud Native

DevOps

IT-Security

Agile Methoden

Java

//

Spark 2.0 – Datasets and case classes

27.7.2016 | 6 minutes of reading time

The brand new major 2.0 release of Apache Spark was given out two days ago. One of its features is the unification of the DataFrame and Dataset APIs. While the DataFrame API has been part of Spark since the advent of Spark SQL (they replaced SchemaRDDs), the Dataset API was included as a preview in version 1.6 and aims at overcoming some of the shortcomings of DataFrames in regard to type safety.

This post has five sections:

  • The problem (roughly): States the problem in a rough fashion.
  • DataFrames versus Datasets: Quick recall on DataFrames and Datasets.
  • The problem (detailed): Detailed statement of the problem.
  • The solution: Proposes a solution to the problem.
  • It concludes with a Summary.

The problem (roughly)

The question this blog post addresses is roughly (for details see below): Given a Dataset, how can one append a column to it containing values derived from its columns without passing strings as arguments or doing anything else that would spoil the type safety the Dataset API can provide?

DataFrames versus Datasets

DataFrames have their origin in R and Python (Pandas), where they have proven to give a concise and practical programming interface for working with tabular data with a fixed schema. Due to the popularity of R and Python among Data Scientists, the DataFrame concept already has a certain degree of familiarity within these circles. Something that certainly allowed Spark to gain more users coming from this side. But the advantages of DataFrames do not only exist on the API side. There are also significant performance improvements as opposed to plain RDDs due to the additional structure information available which can be used by Spark SQL and Spark’s own Catalyst Optimizer.

Within the DataFrame API a tabular data set used to be described as an RDD consisting of rows with a row being an instance of type Array[Any]. Thus DataFrames basically do not take the data types of the column values into account. In contrast to this, the new Dataset API allows modelling rows of tabular data using Scala’s case classes.

While DataFrames are more dynamic in their typing, Datasets combine some of the benefits of Scala’s type checking with those of DataFrames. This can help to spot errors at an early stage but certain operations (see next section for an example) on Datasets still rely on passing column names in as String arguments rather than working with fields of an object.

This raises the question whether some of these operations can also be expressed within the type safe parts of the Datasets API alone, thus keeping the newly gained benefits of using the type system. As we will see in a particular example this requires some discipline and working with traits to circumvent a problem with inheritance that arises with case classes.

The problem (detailed)

The first lines of our exemplary CSV file bodies.csv look as follows:

idwidthheightdepthmaterialcolor
11.01.01.0woodbrown
22.02.02.0glassgreen
33.03.03.0metalblue

Reading CSV files like this becomes much easier beginning with Spark 2.0. A SparkSession provides a fluent API for reading and writing. We can do as follows:

1val df: DataFrame = spark.read
2                         .schema(schema)
3                         .option("header", true)
4                         .csv("/path/to/bodies.csv")

Spark is able to infer the schema automatically in most cases by passing two times over the input file. In our case it would infer all columns as of type String. To help with that, we programmatically declare the schema as follows before the above code:

1val id       = StructField("id",       DataTypes.IntegerType)
2val width    = StructField("width",    DataTypes.DoubleType)
3val height   = StructField("height",   DataTypes.DoubleType)
4val depth    = StructField("depth",    DataTypes.DoubleType)
5val material = StructField("material", DataTypes.StringType)
6val color    = StructField("color",    DataTypes.StringType)
7 
8val fields = Array(id, width, height, depth, material, color)
9val schema = StructType(fields)

DataFrames outperform plain RDDs across all languages supported by Spark and provide a comfortable API when it comes to working with structured data and relational algebra. But they provide weak support when it comes to types. There are mainly two reasons:

  1. For one thing, many operations on DataFrames involve passing in a String. Either as column name or as query. This is prone to error. For example df.select(“colour”) would pass at compile time and would only blow a likely long running job at run time.
  2. A DataFrame is basically a RDD[Row] where a Row is just an Array[Any].

Spark 2.0 introduces Datasets to better address these points. The take away message is that instead of using type agnostic Rows, one can use Scala’s case classes or tuples to describe the contents of the rows. The (not so) magic gluing is done by using as on a Dataframe. (Tupels would match by position and also lack the possibility to customize naming.)

1final case class Body(id: Int, 
2                      width: Double, 
3                      height: Double, 
4                      depth: Double, 
5                      material: String, 
6                      color: String)
7 
8val ds = df.as[Body]

The matching between the DataFrames columns and the fields of the case class is done by name and the types should match. In summary, this introduces a contract and narrows down possible sources of error. For example, one immediate benefit is that we can access fields via the dot operator and get additional IDE support:

1val colors = ds.map(_.color) // Compiles!
2ds.map(_.colour)             // Typo - WON'T compile!

Further, we can use this feature and the newly added type-safe aggregation functions to write queries with compile time safety:

1import org.apache.spark.sql.expressions.scalalang.typed.{
2  count => typedCount, 
3  sum => typedSum}
4 
5ds.groupByKey(body => body.color)
6  .agg(typedCount[Body](_.id).name("count(id)"),
7       typedSum[Body](_.width).name("sum(width)"),
8       typedSum[Body](_.height).name("sum(height)"),
9       typedSum[Body](_.depth).name("sum(depth)"))
10  .withColumnRenamed("value", "group")
11  .alias("Summary by color level")
12  .show()

If we wanted to compute the volume of all bodies, this would be quite straightforward in the DataFrame API. Two solutions come to mind:

1// 1. Solution: Using a user-defined function and appending the results as column
2val volumeUDF = udf {
3 (width: Double, height: Double, depth: Double) => width * height * depth
4}
5 
6ds.withColumn("volume", volumeUDF($"width", $"height", $"depth"))
7 
8// 2. Solution: Using a SQL query
9spark.sql(s"""
10           |SELECT *, width * height * depth
11           |AS volume
12           |FROM bodies
13           |""".stripMargin)

But this would throw us back again to working with strings again. How could a solution with case classes look like? Of course, more work might be involved here but keeping type support could be a rewarding benefit in crucial operations.

While case classes are convenient in many regards they do not support inheritance (Link ). So we cannot declare a case class BodyWithVolume that extends Body with an additional volume field. Assuming we had such a class, we could do this:

1ds.map { 
2 body => 
3  val volume = body.width * body.height * body.depth
4  BodyWithVolume(body.id, body.width, body.height, body.depth, body.material, body.color, volume)
5}

This would of course solve our problem of adding the volume as new field and mapping a Dataset onto a new Dataset but as said, case classes do not support inheritance. Of course, no one could prevent us from declaring the classes Body and BodyWithVolume independently without the latter extending the former. But this certainly feels awkward given their close relationship.

The solution

Are we out of luck? Not quite. We can extend both classes starting from some common traits:

1trait IsIdentifiable {
2 def id: Int
3}
4 
5trait HasThreeDimensions {
6 def width: Double
7 def height: Double
8 def depth: Double
9}
10 
11trait ConsistsOfMaterial {
12 def material: String
13 def color: String
14}
15 
16trait HasVolume extends HasThreeDimensions {
17 def volume = width * height * depth
18}
19 
20final case class Body(id: Int, 
21                      width: Double, 
22                      height: Double, 
23                      depth: Double, 
24                      material: String, 
25                      color: String) extends 
26                      IsIdentifiable with 
27                      HasThreeDimensions with 
28                      ConsistsOfMaterial
29 
30final case class BodyWithVolume(id: Int, 
31                                width: Double, 
32                                height: Double, 
33                                depth: Double, 
34                                material: String, 
35                                color: String) extends 
36                                IsIdentifiable with 
37                                HasVolume with 
38                                ConsistsOfMaterial

share post

Likes

0

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.