Beliebte Suchanfragen

Cloud Native

DevOps

IT-Security

Agile Methoden

Java

//

Calculating Pi with Apache Spark

16.4.2016 | 9 minutes of reading time

Apache Spark is a system for cluster computing and part of the increasingly popular SMACK stack . The aim of this blog post is to provide a beginners introduction on how to set up a mini Spark cluster of virtual machines (VMs) using Vagrant and to run a small example application on it that approximates \(\pi\).

The cluster

To set up the Vagrant cluster on your local machine you need to first install Oracle VirtualBox on your system. After this it suffices to clone the Git repository from here to a working directory of your choice.

Once in the working directory, we can spin up the cluster using the console command vagrant up. The cluster is deployed in standalone mode and will consist of a designated master node named sparkmaster and a configurable number of worker nodes. The nodes are assigned consecutive static IP addresses and the workers are accessible via password-less SSH from the master node.

The following table summarizes the hostnames and IP addresses of the nodes and includes for later reference. It also includes the URLs to the web UIs provided by Spark on the nodes once the cluster is running:

NodenameIP addressWeb UI
sparkmaster192.168.33.100http://192.168.33.100:8080
sparkworker-01192.168.33.101http://192.168.33.101:8081
sparkworker-02192.168.33.102http://192.168.33.102:8082
etc.etc.etc.

After the cluster is up, we can use the command vagrant ssh to connect to the node with name nodename. For example, let us connect to the master node via vagrant ssh sparkmaster and have a look at its Spark installation directory:

1vagrant@sparkmaster:~$ ls -F $SPARK_HOME
2CHANGES.txt  NOTICE  README.md	bin/   data/  examples/  licenses/  python/
3LICENSE      R/      RELEASE	conf/  ec2/   lib/	 logs/	    sbin/

Spark comes with a couple of important directories containing executables and configuration files:

  • First of all, the directory SPARK_HOME/bin contains the spark-shell script for running Spark’s REPL (read-evaluate-print-loop), which allows interactive data exploration. But our main character here is the spark-submit script: it can be used to submit Spark applications as a JAR to the cluster.
  • Next, SPARK_HOME/conf contains the configuration files slaves and spark-env.sh. The first lists the hostnames of all VMs to be used as slaves while the second lists options used by Spark.
  • Finally, the directory SPARK_HOME/sbin will be important as it contains the shell scripts for starting and stopping the master as well as worker instances on the designated machines, either individually or in one go via the start-all.sh and stop-all.sh scripts.

We will start the master on the VM named sparkmaster while all the other VMs will be used as slaves. This can be achieved by running the start-all.sh script on sparkmaster:

1vagrant@sparkmaster:~$ $SPARK_HOME/sbin/start-all.sh

We might check that (hopefully) everything went smoothly by inspecting the log files in our cluster from the corresponding SPARK_HOME/logs directory on each individual machine. As said, the master and slave instances can be stopped by running the stop-all.sh script on sparkmaster.

Inspecting the web UI

More information is available from the Spark’s master Web UI:

Here we find the following information:

  • A list of all workers in the cluster under the section heading Workers.
  • Information on Running Applications and Completed Applications.

The UI is reachable as long as we do not deliberately stop the master by invoking one of the scripts for stopping it.

Submitting an application to the cluster

To actually submit an application to our cluster we make usage of the SPARK_HOME/bin/spark-submit.sh script. To test this and also that our cluster is set up properly, we will use the example applications for computing an approximation to \(\pi\) via Monte Carlo that ships with the Spark installation (Code: GitHub ).

For convenience the shared vagrant folder contains a shell script for submitting the example application to the cluster:

1spark-submit \
2--class de.codecentric.SparkPi \
3--master spark://192.168.33.100:7077  \
4--conf spark.eventLog.enabled=true \
5/vagrant/jars/spark-pi-example-1.0.jar 10

Besides a reference to the main class in the JAR and the path to the latter, we pass the IP address and port for the the Spark master instance and enable event logging. The latter will allow us to look at specific information in the web UI even after the application has finished. The argument 10 determines the size of the random sample used and also the degree of parallelism; see below.

If we invoke this script we get the result of the computation printed to the console. Also note the corresponding finished application after switching to the master Web UI in our browser:

1vagrant@sparkmaster:~$ /vagrant/scripts/submit-script-pi.sh
2Pi is roughly 3.13918

How is \(\pi\) approximated here?

This computation is based on the following heuristic: By definition \(\pi\) is the area \(A_{\mathrm{Circle}}\) of a circle with radius \(r=1\) (generally, \(\pi\cdot r^2\) is the area of a circle of radius \(r\)).

  • One then circumscribes this unit circle with a square whose area equals \(A_{\mathrm{Square}}=4\). The ratio of these two areas thus equals to \(\frac{A_{\mathrm{Circle}}}{A_{\mathrm{Square}}}=\frac{\pi}{4}\) and gives the geometric probability of a point inside the square to lie inside in the circle.
  • Now let us assume that we pick a huge number \(n\) of points randomly inside the circumscribed square, for example, by throwing darts or dropping rain drops onto it. A certain number \(n_{\mathrm{in}}\) of these points will end up inside the area described by the circle while the remaining number \(n_{\mathrm{out}}\) of these points will lie outside of it (but inside the square). Thus \(n_{\mathrm{in}}+n_{\mathrm{out}}=n\) and the probability of a point lying inside of the circle area is \(\frac{n_{\mathrm{in}}}{n}\).
  • Heuristically, one has \(\frac{A_{\mathrm{Circle}}}{A_{\mathrm{Square}}}\approx\frac{n_{\mathrm{in}}}{n}\) and hence \(\pi\approx\frac{4\cdot n_{\mathrm{in}}}{n}\).

It goes without saying, that this algorithm is non-deterministic and results will likely change with each run.

To wrap things up: The beauty of this is, it paves a way to approximate \(\pi\) by simply counting the fraction of points that end up inside the circle out of a total population of points randomly thrown at the circumscribed square. Something that can be distributed in a trivial fashion. And this is exactly what the mentioned Spark application does! A interactive visualization of the above may be found here: Link.

Subsequently, we drill down on some of the basic concepts of Spark by looking into the code of SparkPi . This includes speaking about the concept of a RDD, Spark’s abstract data type for handling data distributed on a cluster.

Resilient Distributed Datasets (RDDs)

Within the Spark world the core abstraction is that of a Resilient Distributed Dataset. The rationale is that we want to create, distribute and process data within a cluster that is created from various input data, e.g. text files or plain Java/Scala collections. These input data are structured by Spark into RDDs of which one can basically think of as Java/Scala collections that are distributed over the cluster into partitions. Spark provides a functional-programming style API for Java/Scala that allows to either

  • create new RDDs from various input sources, like files residing in HDFS, etc.
  • create new RDDs from already existing ones by so-called transformations, or
  • to create final Java/Scala values from existing RDDs by so-called actions.

To make these distributed data sets resilient or fault-tolerant, Spark keeps track of the dependencies between the input data and the intermediate RDDs created from it through an RDDs dependency graph. In case of failure this graph allows to replay the parts of the computation that were necessary to create the RDD at hand. It is important to note that RDDs are computed in a lazy fashion: only creating a final Java/Scala value via an action triggers the actual execution of a computation. Since the dependency graph in Spark is an example of a directed acyclic graph (DAG) this name is used as a reference frequently, for example in the web UI.

Writing a simple Spark application

To illustrate the ideas outlined in the previous section, let us rewrite the application SparkPi step by step. We will follow the original source but allow ourselves to divert a little from it in order to stress where and how RDDs are created and transformed. To begin with, the basic skeleton for the main application looks as follows:

1import scala.math.random
2import org.apache.spark._
3 
4object SparkApp {
5  def main(args: Array[String]): Unit = {
6    val conf = new SparkConf().setAppName("Spark Pi")
7    val sc = new SparkContext(conf)
8 
9    // Application code goes here...
10 
11    sc.stop()
12  }
13}

The main entry point to every Spark application is creating a SparkContext object. It provides a connection to the Spark cluster and context information about the cluster as well as the application itself and is used to create RDDs from input data. For example we are able to set the name of the application that will also appear in the Spark web UIs to be "Spark Pi". Further parameters might be passed to the Spark context at runtime as has already happened in the above usage of the submit script; there the IP address of the master node is passed to the Spark context.

The main step in the application code is to create a huge number n of random sample points by using the parallelize method provided by the Spark context sc . It allows to create an initial RDD from any Scala collection. In our case this collection, xs, consists of the first n consecutive numbers. The resulting RDD is divided into a number of slices partitions. Next, this RDD is transformed via map to the RDD sample that contains a number of n random points \((x,y)\) inside the square \([-1,1]\times [-1,1]\). Finally, we filter out the points from the sample that lie in the interior of the unit disc and count these in order to obtain an approximative value for \(\pi\). Here counting represents the final action that triggers the evaluation of all previous RDDs along the dependency graph.

1val slices = if (args.length < 0) args(0).toInt else 2 
2val n = math.min(100000L * slices, Int.MaxValue).toInt 
3val xs = 1 until n 
4val rdd = sc.parallelize(xs, slices)
5            .setName("'Initial rdd'") 
6val sample = rdd.map { i =>
7  val x = random * 2 - 1
8  val y = random * 2 - 1
9  (x, y)
10}.setName("'Random points sample'")
11 
12val inside = sample.filter { case (x, y) => (x * x + y * y < 1) }.setName("'Random points inside circle'")
13 
14val count = inside.count()
15 
16println("Pi is roughly " + 4.0 * count / n)

We can find a visual representation of the dependency graph of the final RDD inside after running the application by clicking either corresponding application id or name (here “SparkPi”) in the master web UI under the section “Completed Applications”. There one finds a link labeled “Application Detail UI”, which leads to more detailed information about the jobs and stages involved in the application. Our application includes only one job consisting solely of one stage, and by clicking on the corresponding link in the “Application Detail UI”, we finally find a representation of the dependency graph:

Notice that we were able to set names for debugging/monitoring purposes in the application code by using the setName method provided by the RDD class, and that these names also appear in the visual representation of the dependency graph. This is for example helpful when it comes to the identification of performance bottlenecks in larger applications that involve more intricated ways of creating and transforming RDDs.

That’s all! If you want, you can stop the cluster using vagrant halt or can completely get rid of it with vagrant destroy -f after exiting from the master’s shell.

Summary

In conclusion, we described how to set up a small Spark cluster using Vagrant, and how to write and submit a simple application to the cluster. Finally, we saw how to make basic usage of the web UI for monitoring purposes.

share post

Likes

0

//

More articles in this subject area

Discover exciting further topics and let the codecentric world inspire you.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.