Twitter’s Scalding – Scala and Hadoop hand in hand


If you have read the paper published by Google’s Jeffrey Dean and Sanjay Ghemawat (MapReduce: Simplied Data Processing on Large Clusters), they revealed that their work was inspired by the concept of functional languages: “Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages….Our use of a functional model with user-specified map and reduce operations allows us to parallelize large computations easily and to use re-execution as the primary mechanism for fault tolerance.”

Given the fact the Scala is a programming language that combines objective oriented and functional progarmming and runs on JVM, it is a fairly natural evolution to introduce Scala in Hadoop environment.  That is what Twitter engineers did. (See more on how Scala is used at Twitter: “Twitter on Scala” and “The Why and How of Scala at Twitter“). Scala has powerful support for mapping, filtering, pattern matching (regular expressions) so it is a pretty good fit for MapReduce jobs.

Scalding

Twitter Scala based MapReduce implementation – Scalding – is based on Cascading Java API (that is where it is name coming from – it is essentially a Scala library built on top of Cascading API) and has been open-sourced this year. The code can be found in github.

Prerequisites

In order to use scalding, you need to have Scala installed, you can grab the latest version from here. As of writing this article, the latest stable version is Scala 2.9.2, that was used by my tests, too.

$ scala
Welcome to Scala version 2.9.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_31).
Type in expressions to have them evaluated.
Type :help for more information.

scala>

You also need sbt, the scala build tool. That can be downloaded from here. Unless you want to build the code from the source, you can just grap the jar file and create a sbt file, as described in the sbt Getting Started wiki. The sbt file is a one-liner but it requires quite some memory so if you are in a cloud envrionment, an extra small virtual machine might not fit to your need (that was the wall that I hit in the beginning).

java -Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=384M -jar `dirname $0`/sbt-launch.jar "$@"

In addition, you will need ruby because there is a ruby wrapper to run the sclading Hadoop job in an easy way. I had ruby 1.8.6 for tests.

ruby --version
ruby 1.8.6 (2007-09-24 patchlevel 111) [x86_64-linux]

Dive into Scalding (Tribute to Mark Pilgrim)

Once you downloaded the code from Scalding repository using git clone command (git clone git@github.com:twitter/scalding.git) or got the zip file (as of writing this article it is named twitter-scalding-0.4.1-15-ga54a5a3.zip), you need to the run the following commands:
$ sbt update
$ sbt test
$ sbt assembly

The latter one creates a jar file which is used by the ruby wrapper script to run hadoop jobs – I came back to this later.

Since the Getting Started examples are full of WordCounts, I was going to try something different – I wanted to implement the 

$ hadoop jar hadoop-examples-*.jar grep input output Hello

code – not that this is something never seen, just for the taste. In the appendix you will see the original Grep Hadoop code from the distribution and you may appreciate the condensed Scala code. Do not get me wrong, this article is not meant to compare Java to Scala in any shape or form but still the style of Scala is remarkable.

GrepJob.scala

import com.twitter.scalding._
import scala.util.matching.Regex

class GrepJob(args : Args) extends Job(args) {
    val pattern = new Regex(args("regexp"))

    TextLine(args("input"))
    .flatMap('line -> 'word)  { line : String => line.split("\\s+") }
    .flatMap('word -> 'match) { word : String => pattern.findAllIn(word).toList }
    .groupBy('match) { _.size }
    .write(Tsv(args("output")))
}

You can run the code with the aforementioned scald.rb ruby wrapper which is in scripts directory. The code can be run locally or on hdfs (similar concept to Yelp’s mrjob python solution)

First, the standard Hadoop grep example:

$ hadoop jar /opt/hadoop/hadoop-examples-0.20.203.0.jar grep input output Hello
$ hadoop fs -cat output/part*
3       Hello

Then Scalding in local mode looks like this (but before you run it using the scald.rb script, you need to specify the host where you want to run the Hadoop job. This requires modifying the HOST variable in scald.rb file:

HOST=”my.remote.host” #where the job is rsynced to and run

In my case it was

HOST=”hadoopmaster”

Also I recommend to modify the REDUCERS variable in the scald.rb file, by default it is set to 100.)

scripts/scald.rb --local GrepJob.scala --input input.txt  --output output.txt --regexp  Hello
/root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar
Hello
java -Xmx3g -cp /root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar:/tmp/GrepJob.jar com.twitter.scalding.Tool GrepJob --local --input input.txt --output output.txt --regexp Hello12/04/28 15:58:05 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+]
12/04/28 15:58:05 INFO flow.Flow: [] starting
12/04/28 15:58:05 INFO flow.Flow: []  source: FileTap["TextLine[['num', 'line']->[ALL]]"]["input.txt"]"]
12/04/28 15:58:05 INFO flow.Flow: []  sink: FileTap["TextDelimited[[UNKNOWN]->[ALL]]"]["output.txt"]"]
12/04/28 15:58:05 INFO flow.Flow: []  parallel execution is enabled: true
12/04/28 15:58:05 INFO flow.Flow: []  starting jobs: 1
12/04/28 15:58:05 INFO flow.Flow: []  allocating threads: 1
12/04/28 15:58:05 INFO planner.FlowStep: [] starting step: (1/1) local
12/04/28 15:58:05 INFO assembly.AggregateBy: using threshold value: 100000

$ cat output.txt
Hello   3

And finally Scalding with default, hdfs mode:

$ scripts/scald.rb --hdfs GrepJob.scala --input input/input.txt --output output --regexp Hello
/root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar

ssh -C hadoopmaster HADOOP_CLASSPATH=/usr/share/java/hadoop-lzo-0.4.14.jar:scalding-assembly-0.4.1.jar:job-jars/GrepJob.jar hadoop jar scalding-assembly-0.4.1.jar -libjars job-jars/GrepJob.jar -Dmapred.reduce.tasks=1 GrepJob --hdfs --input input/input.txt --output output --regexp Hello12/04/28 15:55:53 INFO hadoop.HadoopUtil: resolving application jar from found main method on: com.twitter.scalding.Tool$

12/04/28 15:55:53 INFO hadoop.HadoopPlanner: using application jar: /root/scalding-assembly-0.4.1.jar
12/04/28 15:55:53 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+]
12/04/28 15:55:53 INFO flow.Flow: [] starting
12/04/28 15:55:53 INFO flow.Flow: [] source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["input/input.txt"]"]
12/04/28 15:55:53 INFO flow.Flow: [] sink: Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"]
12/04/28 15:55:53 INFO flow.Flow: [] parallel execution is enabled: true
12/04/28 15:55:53 INFO flow.Flow: [] starting jobs: 1
12/04/28 15:55:53 INFO flow.Flow: [] allocating threads: 1
12/04/28 15:55:53 INFO planner.FlowStep: [] starting step: (1/1) Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"]
12/04/28 15:56:02 INFO mapred.FileInputFormat: Total input paths to process : 1
12/04/28 15:56:02 INFO planner.FlowStep: [] submitted hadoop job: job_201204241302_0034

$ hadoop fs -cat output/part*
Hello 3

The key command that is executed on hadoopmaster server is basically a hadoop jar command with the scalding-assembly jar file (the one that was created by sbt assembly command above) :

ssh -C hadoopmaster HADOOP_CLASSPATH=/usr/share/java/hadoop-lzo-0.4.14.jar:scalding-assembly-0.4.1.jar:job-jars/GrepJob.jar hadoop jar scalding-assembly-0.4.1.jar -libjars job-jars/GrepJob.jar -Dmapred.reduce.tasks=1 GrepJob –hdfs –input input/input.txt –output output –regexp Hello

That is it. I hope you got the impression that Scala and Hadoop can really go hand in hand, they can complement each other very well.

Appendix 

Grep.java

package org.apache.hadoop.examples;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/* Extracts matching regexs from input files and counts them. */
public class Grep extends Configured implements Tool {
  private Grep() {}                               // singleton

  public int run(String[] args) throws Exception {
    if (args.length < 3) {
      System.out.println("Grep    []");
      ToolRunner.printGenericCommandUsage(System.out);
      return -1;
    }

    Path tempDir =
      new Path("grep-temp-"+
          Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

    JobConf grepJob = new JobConf(getConf(), Grep.class);

    try {

      grepJob.setJobName("grep-search");

      FileInputFormat.setInputPaths(grepJob, args[0]);

      grepJob.setMapperClass(RegexMapper.class);
      grepJob.set("mapred.mapper.regex", args[2]);
      if (args.length == 4)
        grepJob.set("mapred.mapper.regex.group", args[3]);

      grepJob.setCombinerClass(LongSumReducer.class);
      grepJob.setReducerClass(LongSumReducer.class);

      FileOutputFormat.setOutputPath(grepJob, tempDir);
      grepJob.setOutputFormat(SequenceFileOutputFormat.class);
      grepJob.setOutputKeyClass(Text.class);
      grepJob.setOutputValueClass(LongWritable.class);

      JobClient.runJob(grepJob);

      JobConf sortJob = new JobConf(Grep.class);
      sortJob.setJobName("grep-sort");

      FileInputFormat.setInputPaths(sortJob, tempDir);
      sortJob.setInputFormat(SequenceFileInputFormat.class);

      sortJob.setMapperClass(InverseMapper.class);

      sortJob.setNumReduceTasks(1);                 // write a single file
      FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));
      sortJob.setOutputKeyComparatorClass           // sort by decreasing freq
      (LongWritable.DecreasingComparator.class);

      JobClient.runJob(sortJob);
    }
    finally {
      FileSystem.get(grepJob).delete(tempDir, true);
    }
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new Grep(), args);
    System.exit(res);
  }

}

3 thoughts on “Twitter’s Scalding – Scala and Hadoop hand in hand

  1. Pingback: Twitter’s Scalding – Scala and Hadoop hand in hand « Another Word For It

  2. I cloned the workspace, and at the step:
    sbt update
    it downloaded a lot of stuff and then stopped with:

    [warn] Note: Some unresolved dependencies have extra attributes. Check that these dependencies exist with the requested attributes.
    [warn] com.eed3si9n:sbt-assembly:0.8.1 (sbtVersion=0.12, scalaVersion=2.9.2)
    [warn]
    sbt.ResolveException: unresolved dependency: com.eed3si9n#sbt-assembly;0.8.1: not found

    What should I try next?

Leave a reply to xenophundibulum Cancel reply