Apache Spark – a Fast Big Data Analytics Engine


Introduction

There are different approaches in big data world to make Hadoop more suitable for ad-hoc, interactive queries and iterative data processing. As it is very well known, Hadoop MapReduce framework is primarily designed for batch processing and that makes it less suitable for ad-hoc data exploration, machine learning processes and the like. Big data vendors are trying to address this challenge by replacing MaReduce with alternatives. In case of SQL on Hadoop, there are various initiatives; Cloudera Impala, Pivotal HAWQ or Hortonworks Stinger initiative that aims to improve Hive performance significantly.

Apache Spark is another increasingly popular alternative to replace MapReduce with a more performant execution engine but still use Hadoop HDFS as storage engine for large data sets.

Spark Architecture

From architecture perspective Apache Spark is based on two key concepts; Resilient Distributed Datasets (RDD) and directed acyclic graph (DAG) execution engine. With regards to datasets, Spark supports two types of RDDs: parallelized collections that are based on existing Scala collections and Hadoop datasets that are created from the files stored on HDFS. RDDs support two kinds of operations: transformations and actions. Transformations create new datasets from the input (e.g. map or filter operations are transformations), whereas actions return a value after executing calculations on the dataset (e.g. reduce or count operations are actions).
The DAG engine helps to eliminate the MapReduce multi-stage execution model and offers significant performance improvements.

Spark-Architecture
Figure 1: Spark Architecture

Installing Spark

Spark is written in Scala so before you install Spark, you need to install Scala. Scala binaries can be downloaded from http://www.scala-lang.org.

$ wget http://www.scala-lang.org/files/archive/scala-2.10.4.tgz
$ tar xvf scala-2.10.4.tgz 
$ ln -s scala-2.10.4 scala
$ vi .bashrc
export SCALA_HOME=/home/istvan/scala
export PATH=$SCALA_HOME/bin:$PATH

You can validate your Scala installation by running Scala REPL (Scala command line interpreter), below is an example how to execute the classic HelloWorld program from Scala:

$ scala
Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_32).
Type in expressions to have them evaluated.
Type :help for more information.

scala> object HelloWorld {
     |     def main(args: Array[String]) {
     |         println("Hello, world!")
     |     }
     | }
defined module HelloWorld

scala> HelloWorld.main(null)
Hello, world!

Then you can download Spark binaries from http://spark.apache.org/downloads.html. There are a couple of pre-compiled versions depending on your Hadoop distribution; we are going to use Spark binaries built for Cloudera CDH4 distribution.

$ wget http://d3kbcqa49mib13.cloudfront.net/spark-0.9.0-incubating-bin-cdh4.tgz
$ tar xvf spark-0.9.0-incubating-bin-cdh4.tgz 
$ ln -s spark-0.9.0-incubating-bin-cdh4 spark

Using Spark shell

Now we are ready to run Spark shell which is a command line interpreter for Spark:

$ bin/spark-shell
14/03/31 15:54:51 INFO HttpServer: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
14/03/31 15:54:51 INFO HttpServer: Starting HTTP Server
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.9.0
      /_/

Using Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_32)
Type in expressions to have them evaluated.
Type :help for more information.
....
14/03/31 15:54:59 INFO HttpServer: Starting HTTP Server
...
Created spark context..
Spark context available as sc.

scala>

In our example we are going to process Apache weblogs that support the common logfile format having the following fields: hostname, timestamp, request, HTTP status code and number of bytes. The test file that we are using in this example is based on the public NASA weblog from 1995 August, see http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html. The file was cleaned and modified to support tab separated format.

Let us assume that you want to count how many hits the NASA web server got in August, 1995. In order to get the result, you can run the following commands in spark-shell:

Spark context available as sc.

scala> val accessLog = sc.textFile("hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95")
14/03/31 16:18:16 INFO MemoryStore: ensureFreeSpace(82970) called with curMem=0, maxMem=311387750
14/03/31 16:18:16 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 81.0 KB, free 296.9 MB)
accessLog: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12

scala> accessLog.count()
14/03/31 16:18:24 INFO FileInputFormat: Total input paths to process : 1
14/03/31 16:18:24 INFO SparkContext: Starting job: count at :15
14/03/31 16:18:24 INFO DAGScheduler: Got job 0 (count at :15) with 2 output partitions (allowLocal=false)
...
14/03/31 16:18:26 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:134217728+27316431
..
14/03/31 16:18:26 INFO SparkContext: Job finished: count at :15, took 2.297566932 s
res0: Long = 1569898

The first command creates an RDD from the NASA Apache access log stored on HDFS (hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95). The second command (an action executed on the accessLog RDD) will count the number of lines in the file. Note that the execution time is around 2 seconds.

If you are interested to know how many requests were initiated from one particular server (e.g. beta.xerox.com in our example), you need to execute a filter operation and then you run count action on the filtered dataset, see:

scala> val filteredLog = accessLog.filter(line => line.contains("beta.xerox.com"))

filteredLog: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at :14

scala> filteredLog.count()
14/03/31 16:19:35 INFO SparkContext: Starting job: count at :17
14/03/31 16:19:35 INFO DAGScheduler: Got job 1 (count at :17) with 2 output partitions (allowLocal=false)
...
14/03/31 16:19:35 INFO Executor: Running task ID 2
...
14/03/31 16:19:35 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:0+134217728
14/03/31 16:19:36 INFO Executor: Serialized size of result for 2 is 563
1...
14/03/31 16:19:36 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:134217728+27316431
...
14/03/31 16:19:37 INFO DAGScheduler: Stage 1 (count at :17) finished in 1.542 s
14/03/31 16:19:37 INFO SparkContext: Job finished: count at :17, took 1.549706892 s
res1: Long = 2318

You can also execute more complex logic and define your own functions, thanks to Scala language capabilities. Let us assume that we need to calculate the total number of bytes generated by the given NASA web server in August, 1995. The number of bytes is the last (5th) field in the lines in the weblog file and unfortunately, there are cases when the field is ‘-‘, not an integer value as it would be expected. The standard toInt Scala String function throws an exception if you want to convert a non-numeric value into Integer. Thus we need to be able to identify whether a given string is a number or not and if not, we need to return 0. This requires a custom function (convertToInt) that will extend the standard String Scala class and will be made available for the String data type. Then we can use this custom function and the Spark standard RDD operations to calculate the total number of  bytes generated by the NASA webserver.

scala> def isNumeric(input: String): Boolean = input.forall(_.isDigit)
isNumeric: (input: String)Boolean


scala> class StringHelper(s:String) {
     |    def convertToInt():Int = if (isNumeric(s)) s.toInt else 0
     | }
defined class StringHelper


scala> implicit def stringWrapper(str: String) = new StringHelper(str)
warning: there were 1 feature warning(s); re-run with -feature for details
stringWrapper: (str: String)StringHelper


scala> "123".convertToInt
res2: Int = 123

scala> "-".convertToInt
res4: Int = 0

scala> accessLog.map(line=>line.split("\t")).map(line=>line(4).convertToInt).sum
14/04/01 13:12:39 INFO SparkContext: Starting job: sum at :21
14/04/01 13:12:39 INFO DAGScheduler: Got job 2 (sum at :21) with 2 output partitions (allowLocal=false)
...
14/04/01 13:12:40 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:0+134217728
... 
14/04/01 13:12:45 INFO SparkContext: Job finished: sum at :21, took 5.53186208 s
res4: Double = 2.6828341424E10

As you can see, the result is 26,8GB and the calculation was executed in 5.5 seconds.

Programming Spark

In addition to spark-shell that can be used to execute operations interactively, you can also write and build your code using Scala, Java or Python programming languages. Let us take an example how you can implement your weblog application in Scala.

In order to build your application, you need to follow the directory structure as shown below:

./
./simple.sbt
./src/main/scala/WeblogApp.scala
./project/build.properties
./sbt

You can copy sbt build tool from your Spark home directory (cp -af $SPARK_HOME/sbt/* ./sbt).

The simple.sbt build file should look something like this:

name := "Weblog Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.4.0"

And the WeblogApp.scala code is as follows:

import org.apache.spark._

object WeblogApp {
    def main(args: Array[String]) {
        val file = "hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95";
        val sc = new SparkContext("local", "WeblogApp",
                                  System.getenv("SPARK_HOME"), 
                                  SparkContext.jarOfClass(this.getClass))
        val accessLog = sc.textFile(file)

        println("Number of entries: " + accessLog.count())
    }
}

Then you can build and run the application using the Scala sbt build tool:

$ sbt/sbt package
$ sbt/sbt run
Launching sbt from sbt/sbt-launch-0.12.4.jar
[info] Loading project definition from /home/istvan/project
[info] Set current project to Weblog Project (in build file:/home/istvan/)
[info] Running WeblogApp 
...
14/04/01 14:48:58 INFO SparkContext: Starting job: count at WeblogApp.scala:12
14/04/01 14:48:58 INFO DAGScheduler: Got job 0 (count at WeblogApp.scala:12) with 2 output partitions (allowLocal=false)
...
14/04/01 14:49:01 INFO SparkContext: Job finished: count at WeblogApp.scala:12, took 2.67797083 s
Number of entries: 1569898
14/04/01 14:49:01 INFO ConnectionManager: Selector thread was interrupted!
[success] Total time: 9 s, completed 01-Apr-2014 14:49:01

Conclusion

Apache Spark has started gaining significant momentum and considered to be a promising alternative to support ad-hoc queries and iterative processing logic by replacing MapReduce. It offers interactive code execution using Python and Scala REPL but you can also write and compile your application in Scala and Java. There are various tools running on top of Spark such as Shark (SQL on Hadoop), MLib (machine learning), Spark Streaming and GraphX.It will be interesting to see how it evolves.

Twitter’s Scalding – Scala and Hadoop hand in hand


If you have read the paper published by Google’s Jeffrey Dean and Sanjay Ghemawat (MapReduce: Simplied Data Processing on Large Clusters), they revealed that their work was inspired by the concept of functional languages: “Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages….Our use of a functional model with user-specified map and reduce operations allows us to parallelize large computations easily and to use re-execution as the primary mechanism for fault tolerance.”

Given the fact the Scala is a programming language that combines objective oriented and functional progarmming and runs on JVM, it is a fairly natural evolution to introduce Scala in Hadoop environment.  That is what Twitter engineers did. (See more on how Scala is used at Twitter: “Twitter on Scala” and “The Why and How of Scala at Twitter“). Scala has powerful support for mapping, filtering, pattern matching (regular expressions) so it is a pretty good fit for MapReduce jobs.

Scalding

Twitter Scala based MapReduce implementation – Scalding – is based on Cascading Java API (that is where it is name coming from – it is essentially a Scala library built on top of Cascading API) and has been open-sourced this year. The code can be found in github.

Prerequisites

In order to use scalding, you need to have Scala installed, you can grab the latest version from here. As of writing this article, the latest stable version is Scala 2.9.2, that was used by my tests, too.

$ scala
Welcome to Scala version 2.9.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_31).
Type in expressions to have them evaluated.
Type :help for more information.

scala>

You also need sbt, the scala build tool. That can be downloaded from here. Unless you want to build the code from the source, you can just grap the jar file and create a sbt file, as described in the sbt Getting Started wiki. The sbt file is a one-liner but it requires quite some memory so if you are in a cloud envrionment, an extra small virtual machine might not fit to your need (that was the wall that I hit in the beginning).

java -Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=384M -jar `dirname $0`/sbt-launch.jar "$@"

In addition, you will need ruby because there is a ruby wrapper to run the sclading Hadoop job in an easy way. I had ruby 1.8.6 for tests.

ruby --version
ruby 1.8.6 (2007-09-24 patchlevel 111) [x86_64-linux]

Dive into Scalding (Tribute to Mark Pilgrim)

Once you downloaded the code from Scalding repository using git clone command (git clone git@github.com:twitter/scalding.git) or got the zip file (as of writing this article it is named twitter-scalding-0.4.1-15-ga54a5a3.zip), you need to the run the following commands:
$ sbt update
$ sbt test
$ sbt assembly

The latter one creates a jar file which is used by the ruby wrapper script to run hadoop jobs – I came back to this later.

Since the Getting Started examples are full of WordCounts, I was going to try something different – I wanted to implement the 

$ hadoop jar hadoop-examples-*.jar grep input output Hello

code – not that this is something never seen, just for the taste. In the appendix you will see the original Grep Hadoop code from the distribution and you may appreciate the condensed Scala code. Do not get me wrong, this article is not meant to compare Java to Scala in any shape or form but still the style of Scala is remarkable.

GrepJob.scala

import com.twitter.scalding._
import scala.util.matching.Regex

class GrepJob(args : Args) extends Job(args) {
    val pattern = new Regex(args("regexp"))

    TextLine(args("input"))
    .flatMap('line -> 'word)  { line : String => line.split("\\s+") }
    .flatMap('word -> 'match) { word : String => pattern.findAllIn(word).toList }
    .groupBy('match) { _.size }
    .write(Tsv(args("output")))
}

You can run the code with the aforementioned scald.rb ruby wrapper which is in scripts directory. The code can be run locally or on hdfs (similar concept to Yelp’s mrjob python solution)

First, the standard Hadoop grep example:

$ hadoop jar /opt/hadoop/hadoop-examples-0.20.203.0.jar grep input output Hello
$ hadoop fs -cat output/part*
3       Hello

Then Scalding in local mode looks like this (but before you run it using the scald.rb script, you need to specify the host where you want to run the Hadoop job. This requires modifying the HOST variable in scald.rb file:

HOST=”my.remote.host” #where the job is rsynced to and run

In my case it was

HOST=”hadoopmaster”

Also I recommend to modify the REDUCERS variable in the scald.rb file, by default it is set to 100.)

scripts/scald.rb --local GrepJob.scala --input input.txt  --output output.txt --regexp  Hello
/root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar
Hello
java -Xmx3g -cp /root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar:/tmp/GrepJob.jar com.twitter.scalding.Tool GrepJob --local --input input.txt --output output.txt --regexp Hello12/04/28 15:58:05 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+]
12/04/28 15:58:05 INFO flow.Flow: [] starting
12/04/28 15:58:05 INFO flow.Flow: []  source: FileTap["TextLine[['num', 'line']->[ALL]]"]["input.txt"]"]
12/04/28 15:58:05 INFO flow.Flow: []  sink: FileTap["TextDelimited[[UNKNOWN]->[ALL]]"]["output.txt"]"]
12/04/28 15:58:05 INFO flow.Flow: []  parallel execution is enabled: true
12/04/28 15:58:05 INFO flow.Flow: []  starting jobs: 1
12/04/28 15:58:05 INFO flow.Flow: []  allocating threads: 1
12/04/28 15:58:05 INFO planner.FlowStep: [] starting step: (1/1) local
12/04/28 15:58:05 INFO assembly.AggregateBy: using threshold value: 100000

$ cat output.txt
Hello   3

And finally Scalding with default, hdfs mode:

$ scripts/scald.rb --hdfs GrepJob.scala --input input/input.txt --output output --regexp Hello
/root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar

ssh -C hadoopmaster HADOOP_CLASSPATH=/usr/share/java/hadoop-lzo-0.4.14.jar:scalding-assembly-0.4.1.jar:job-jars/GrepJob.jar hadoop jar scalding-assembly-0.4.1.jar -libjars job-jars/GrepJob.jar -Dmapred.reduce.tasks=1 GrepJob --hdfs --input input/input.txt --output output --regexp Hello12/04/28 15:55:53 INFO hadoop.HadoopUtil: resolving application jar from found main method on: com.twitter.scalding.Tool$

12/04/28 15:55:53 INFO hadoop.HadoopPlanner: using application jar: /root/scalding-assembly-0.4.1.jar
12/04/28 15:55:53 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+]
12/04/28 15:55:53 INFO flow.Flow: [] starting
12/04/28 15:55:53 INFO flow.Flow: [] source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["input/input.txt"]"]
12/04/28 15:55:53 INFO flow.Flow: [] sink: Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"]
12/04/28 15:55:53 INFO flow.Flow: [] parallel execution is enabled: true
12/04/28 15:55:53 INFO flow.Flow: [] starting jobs: 1
12/04/28 15:55:53 INFO flow.Flow: [] allocating threads: 1
12/04/28 15:55:53 INFO planner.FlowStep: [] starting step: (1/1) Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"]
12/04/28 15:56:02 INFO mapred.FileInputFormat: Total input paths to process : 1
12/04/28 15:56:02 INFO planner.FlowStep: [] submitted hadoop job: job_201204241302_0034

$ hadoop fs -cat output/part*
Hello 3

The key command that is executed on hadoopmaster server is basically a hadoop jar command with the scalding-assembly jar file (the one that was created by sbt assembly command above) :

ssh -C hadoopmaster HADOOP_CLASSPATH=/usr/share/java/hadoop-lzo-0.4.14.jar:scalding-assembly-0.4.1.jar:job-jars/GrepJob.jar hadoop jar scalding-assembly-0.4.1.jar -libjars job-jars/GrepJob.jar -Dmapred.reduce.tasks=1 GrepJob –hdfs –input input/input.txt –output output –regexp Hello

That is it. I hope you got the impression that Scala and Hadoop can really go hand in hand, they can complement each other very well.

Appendix 

Grep.java

package org.apache.hadoop.examples;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/* Extracts matching regexs from input files and counts them. */
public class Grep extends Configured implements Tool {
  private Grep() {}                               // singleton

  public int run(String[] args) throws Exception {
    if (args.length < 3) {
      System.out.println("Grep    []");
      ToolRunner.printGenericCommandUsage(System.out);
      return -1;
    }

    Path tempDir =
      new Path("grep-temp-"+
          Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

    JobConf grepJob = new JobConf(getConf(), Grep.class);

    try {

      grepJob.setJobName("grep-search");

      FileInputFormat.setInputPaths(grepJob, args[0]);

      grepJob.setMapperClass(RegexMapper.class);
      grepJob.set("mapred.mapper.regex", args[2]);
      if (args.length == 4)
        grepJob.set("mapred.mapper.regex.group", args[3]);

      grepJob.setCombinerClass(LongSumReducer.class);
      grepJob.setReducerClass(LongSumReducer.class);

      FileOutputFormat.setOutputPath(grepJob, tempDir);
      grepJob.setOutputFormat(SequenceFileOutputFormat.class);
      grepJob.setOutputKeyClass(Text.class);
      grepJob.setOutputValueClass(LongWritable.class);

      JobClient.runJob(grepJob);

      JobConf sortJob = new JobConf(Grep.class);
      sortJob.setJobName("grep-sort");

      FileInputFormat.setInputPaths(sortJob, tempDir);
      sortJob.setInputFormat(SequenceFileInputFormat.class);

      sortJob.setMapperClass(InverseMapper.class);

      sortJob.setNumReduceTasks(1);                 // write a single file
      FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));
      sortJob.setOutputKeyComparatorClass           // sort by decreasing freq
      (LongWritable.DecreasingComparator.class);

      JobClient.runJob(sortJob);
    }
    finally {
      FileSystem.get(grepJob).delete(tempDir, true);
    }
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new Grep(), args);
    System.exit(res);
  }

}