Twitter’s Scalding – Scala and Hadoop hand in hand

If you have read the paper published by Google’s Jeffrey Dean and Sanjay Ghemawat (MapReduce: Simplied Data Processing on Large Clusters), they revealed that their work was inspired by the concept of functional languages: “Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages….Our use of a functional model with user-specified map and reduce operations allows us to parallelize large computations easily and to use re-execution as the primary mechanism for fault tolerance.”

Given the fact the Scala is a programming language that combines objective oriented and functional progarmming and runs on JVM, it is a fairly natural evolution to introduce Scala in Hadoop environment.  That is what Twitter engineers did. (See more on how Scala is used at Twitter: “Twitter on Scala” and “The Why and How of Scala at Twitter“). Scala has powerful support for mapping, filtering, pattern matching (regular expressions) so it is a pretty good fit for MapReduce jobs.


Twitter Scala based MapReduce implementation – Scalding – is based on Cascading Java API (that is where it is name coming from – it is essentially a Scala library built on top of Cascading API) and has been open-sourced this year. The code can be found in github.


In order to use scalding, you need to have Scala installed, you can grab the latest version from here. As of writing this article, the latest stable version is Scala 2.9.2, that was used by my tests, too.

$ scala
Welcome to Scala version 2.9.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_31).
Type in expressions to have them evaluated.
Type :help for more information.


You also need sbt, the scala build tool. That can be downloaded from here. Unless you want to build the code from the source, you can just grap the jar file and create a sbt file, as described in the sbt Getting Started wiki. The sbt file is a one-liner but it requires quite some memory so if you are in a cloud envrionment, an extra small virtual machine might not fit to your need (that was the wall that I hit in the beginning).

java -Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=384M -jar `dirname $0`/sbt-launch.jar "$@"

In addition, you will need ruby because there is a ruby wrapper to run the sclading Hadoop job in an easy way. I had ruby 1.8.6 for tests.

ruby --version
ruby 1.8.6 (2007-09-24 patchlevel 111) [x86_64-linux]

Dive into Scalding (Tribute to Mark Pilgrim)

Once you downloaded the code from Scalding repository using git clone command (git clone or got the zip file (as of writing this article it is named, you need to the run the following commands:
$ sbt update
$ sbt test
$ sbt assembly

The latter one creates a jar file which is used by the ruby wrapper script to run hadoop jobs – I came back to this later.

Since the Getting Started examples are full of WordCounts, I was going to try something different – I wanted to implement the 

$ hadoop jar hadoop-examples-*.jar grep input output Hello

code – not that this is something never seen, just for the taste. In the appendix you will see the original Grep Hadoop code from the distribution and you may appreciate the condensed Scala code. Do not get me wrong, this article is not meant to compare Java to Scala in any shape or form but still the style of Scala is remarkable.


import com.twitter.scalding._
import scala.util.matching.Regex

class GrepJob(args : Args) extends Job(args) {
    val pattern = new Regex(args("regexp"))

    .flatMap('line -> 'word)  { line : String => line.split("\\s+") }
    .flatMap('word -> 'match) { word : String => pattern.findAllIn(word).toList }
    .groupBy('match) { _.size }

You can run the code with the aforementioned scald.rb ruby wrapper which is in scripts directory. The code can be run locally or on hdfs (similar concept to Yelp’s mrjob python solution)

First, the standard Hadoop grep example:

$ hadoop jar /opt/hadoop/hadoop-examples- grep input output Hello
$ hadoop fs -cat output/part*
3       Hello

Then Scalding in local mode looks like this (but before you run it using the scald.rb script, you need to specify the host where you want to run the Hadoop job. This requires modifying the HOST variable in scald.rb file:

HOST=”” #where the job is rsynced to and run

In my case it was


Also I recommend to modify the REDUCERS variable in the scald.rb file, by default it is set to 100.)

scripts/scald.rb --local GrepJob.scala --input input.txt  --output output.txt --regexp  Hello
java -Xmx3g -cp /root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar:/tmp/GrepJob.jar com.twitter.scalding.Tool GrepJob --local --input input.txt --output output.txt --regexp Hello12/04/28 15:58:05 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+]
12/04/28 15:58:05 INFO flow.Flow: [] starting
12/04/28 15:58:05 INFO flow.Flow: []  source: FileTap["TextLine[['num', 'line']->[ALL]]"]["input.txt"]"]
12/04/28 15:58:05 INFO flow.Flow: []  sink: FileTap["TextDelimited[[UNKNOWN]->[ALL]]"]["output.txt"]"]
12/04/28 15:58:05 INFO flow.Flow: []  parallel execution is enabled: true
12/04/28 15:58:05 INFO flow.Flow: []  starting jobs: 1
12/04/28 15:58:05 INFO flow.Flow: []  allocating threads: 1
12/04/28 15:58:05 INFO planner.FlowStep: [] starting step: (1/1) local
12/04/28 15:58:05 INFO assembly.AggregateBy: using threshold value: 100000

$ cat output.txt
Hello   3

And finally Scalding with default, hdfs mode:

$ scripts/scald.rb --hdfs GrepJob.scala --input input/input.txt --output output --regexp Hello

ssh -C hadoopmaster HADOOP_CLASSPATH=/usr/share/java/hadoop-lzo-0.4.14.jar:scalding-assembly-0.4.1.jar:job-jars/GrepJob.jar hadoop jar scalding-assembly-0.4.1.jar -libjars job-jars/GrepJob.jar -Dmapred.reduce.tasks=1 GrepJob --hdfs --input input/input.txt --output output --regexp Hello12/04/28 15:55:53 INFO hadoop.HadoopUtil: resolving application jar from found main method on: com.twitter.scalding.Tool$

12/04/28 15:55:53 INFO hadoop.HadoopPlanner: using application jar: /root/scalding-assembly-0.4.1.jar
12/04/28 15:55:53 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+]
12/04/28 15:55:53 INFO flow.Flow: [] starting
12/04/28 15:55:53 INFO flow.Flow: [] source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["input/input.txt"]"]
12/04/28 15:55:53 INFO flow.Flow: [] sink: Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"]
12/04/28 15:55:53 INFO flow.Flow: [] parallel execution is enabled: true
12/04/28 15:55:53 INFO flow.Flow: [] starting jobs: 1
12/04/28 15:55:53 INFO flow.Flow: [] allocating threads: 1
12/04/28 15:55:53 INFO planner.FlowStep: [] starting step: (1/1) Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"]
12/04/28 15:56:02 INFO mapred.FileInputFormat: Total input paths to process : 1
12/04/28 15:56:02 INFO planner.FlowStep: [] submitted hadoop job: job_201204241302_0034

$ hadoop fs -cat output/part*
Hello 3

The key command that is executed on hadoopmaster server is basically a hadoop jar command with the scalding-assembly jar file (the one that was created by sbt assembly command above) :

ssh -C hadoopmaster HADOOP_CLASSPATH=/usr/share/java/hadoop-lzo-0.4.14.jar:scalding-assembly-0.4.1.jar:job-jars/GrepJob.jar hadoop jar scalding-assembly-0.4.1.jar -libjars job-jars/GrepJob.jar -Dmapred.reduce.tasks=1 GrepJob –hdfs –input input/input.txt –output output –regexp Hello

That is it. I hope you got the impression that Scala and Hadoop can really go hand in hand, they can complement each other very well.


package org.apache.hadoop.examples;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/* Extracts matching regexs from input files and counts them. */
public class Grep extends Configured implements Tool {
  private Grep() {}                               // singleton

  public int run(String[] args) throws Exception {
    if (args.length < 3) {
      System.out.println("Grep    []");
      return -1;

    Path tempDir =
      new Path("grep-temp-"+
          Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

    JobConf grepJob = new JobConf(getConf(), Grep.class);

    try {


      FileInputFormat.setInputPaths(grepJob, args[0]);

      grepJob.set("mapred.mapper.regex", args[2]);
      if (args.length == 4)
        grepJob.set("", args[3]);


      FileOutputFormat.setOutputPath(grepJob, tempDir);


      JobConf sortJob = new JobConf(Grep.class);

      FileInputFormat.setInputPaths(sortJob, tempDir);


      sortJob.setNumReduceTasks(1);                 // write a single file
      FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));
      sortJob.setOutputKeyComparatorClass           // sort by decreasing freq

    finally {
      FileSystem.get(grepJob).delete(tempDir, true);
    return 0;

  public static void main(String[] args) throws Exception {
    int res = Configuration(), new Grep(), args);


Yelp’s mrjob – a python package for Hadoop jobs

This time I cover mrjob which is a python package open sourced by Yelp to write Hadoop jobs. It supports your own Hadoop cluster (based on Hadoop streaming) and AWS Elastic MapReduce (EMR), too. It requires Python 2.5+, see more details in here.

HP Cloud Services

The cloud services that I used for testing mrjob is from HP – HP Public Could Services. That could offering alone would deserve a dedicated post – I signed up for their private beta and liked it, though it is way behind the sophistatication of AWS service portfolio. It is still free but it seems now that HP is planning to charge for the service from mid-May, 2012 onwards. It is based an OpenStack (more precisely I used HP Cloud Compute relying on Nova from the OpenStack suite).

To control the virtual machines on HP Cloud Services, you need to login to (sign up first) and go to Manage. Once you activated the service, you go to Manage Servers and click on Create. You can pick up various flavours of Linux distrubutions from Centos to Ubuntu to Fedora and to Debian. (originally – before embarking on the mrjob exercise – I picked up Centos 5.6, which I should not have, should I know in advance about mrjob because Centos5.6 comes with python2.4 and mrjob needs python2.5+. But that is fine, just python2.6 and python2.6-devel had to be installed along the way, see the necessary steps in Appendix A below. Centos6.2 – coming with python2.6 – could have saved this extra work ).I created two instances for hadoop, one for master and one for slave.

HP Could Services dashboard looks like this:


Yelp’s mrjob can be downloaded from GitHub. I used mrjob v0.3.2 for my tests, the code itself is around 15,000 lines. It supports local mode (mainly for quick tests and debugging), hadoop mode running on a hadoop cluster and emr mode running on AWS EMR.

The “Hello World!” of Hadoop – wordcount – looks as follows:

class MRWordCountUtility(MRJob):

    def __init__(self, *args, **kwargs):
        super(MRWordCountUtility, self).__init__(*args, **kwargs)
        self.chars = 0
        self.words = 0
        self.lines = 0

    def mapper(self, _, line):
        # Don't actually yield anything for each line. Instead, collect them
        # and yield the sums when all lines have been processed. The results
        # will be collected by the reducer.
        self.chars += len(line) + 1  # +1 for newline
        self.words += sum(1 for word in line.split() if word.strip())
        self.lines += 1

    def mapper_final(self):
        yield('chars', self.chars)
        yield('words', self.words)
        yield('lines', self.lines)

    def reducer(self, key, values):
        yield(key, sum(values))

if __name__ == '__main__':

This is plain and simple so let us have a look at another example (Author: Jordan Andersen) that comes with the tarball. This is the classic travelling salesman problem: “given a list of cities and their pairwise distances, the task is to find the shortest possible route that visits each city exactly once and returns to the origin city”. For mathematically inclined people, this is another source to get more details about the problem.

Using Hadoop and mrjob, the travelling salesman problem can have a brute force MapReduce solution: it is a 2-step process, in the first step the mapper defines reasonable sized chunks for the second step where then each mapper will capculate the shortest and longest past from the possible  (N-1)!  tours – N is the number of cities/nodes in the graph – and eventually the reducer picks up the right solution.

The travelling salesman mapreduce code looks like this:

_author__ = 'Jordan Andersen '

from mrjob.job import MRJob
from scipy.misc.common import factorial
import sys
import numpy

    import simplejson as json
    json  # quiet "redefinition of unused ..." warning from pyflakes
except ImportError:
    import json

def map_int_to_tour(num_nodes, i, start_node):
    """Gets a unique tour through a graph given an integer and starting node.

    num_nodes -- the number of nodes in the graph being toured
    i -- the integer to be mapped to the set of tours for the graph
    start_node -- the node index to begin and end the tour on
    nodes_remaining = range(0,start_node) + range(start_node + 1, num_nodes)
    tour = []

    while len(nodes_remaining) > 0:
        num_nodes = len(nodes_remaining)
        next_step = nodes_remaining[i % num_nodes]
        i = i / num_nodes

    tour = [start_node] + tour + [start_node]
    return tour

def cost_tour(graph, tour):
    """Calculates the travel cost of given tour through a given graph.

    graph -- A square numpy.matrix representing the travel cost of each edge on
            the graph.
    tour -- A list of integers representing a tour through the graph where each
            entry is the index of a node on the graph.
    steps = zip(tour[0:-1], tour[1:])
    cost = sum([ graph[step_from,step_to] for step_from, step_to in steps])
    return cost

class MRSalesman(MRJob):

    def steps(self):
        """Defines the two steps, which are as follows:

        1.  Mapper splits the problem into reasonable chunks by mapping each
            possible tour to the integers and assigning each Step 2 mapper a
            range of tours to cost.
        2.  The mapper takes a range of tours and a description of the trip and
            yields the longest and shortests tours. The reduces yields the
            longest of the long and the shortest of the short tours.

        Notice the first step has no reducer. This allows all of the keys put
        out by the first step to be inputs to step 2's mappers without having
        to be reduced.
        return ([,
                        mapper_final = self.mapper_final)]

    def __init__(self, *args, **kwargs):
        """Initializes an instance of the MRSalesman class. See MRJob for

        Some instance variables are initialized here that will be modified
        with while mapping in step 2 and output but the step 2 mapper_final.
        super(MRSalesman, self).__init__(*args, **kwargs)
        self.shortest_length = sys.maxint
        self.shortest_path = []
        self.longest_length = 0
        self.longest_path = []

    def splitter(self, key, line):
        """The mapper for step 1. Splits the range of possible tours into
        reasonably sized chunks for the consumption of the step 2 mappers.

        At this point the 'line' input should come directly from the first line
        of the one-line json file contains the edge cost graph and the starting
        node. The key is not relevant.
        #loading the json description of the trip to get at the size
        #of the edge costgraph
        sales_trip = json.loads(line)
        m = numpy.matrix(sales_trip['graph'])
        num_nodes = m.shape[0]
        num_tours = factorial(num_nodes - 1)

        #Here we break down the full range of possible tours into smaller
        #pieces. Each piece is passed along as a key along with the trip
        step_size = int(100 if num_tours < 100**2 else num_tours / 100)
        steps = range(0, num_tours, step_size) + [num_tours]
        ranges = zip(steps[0:-1], steps[1:])

        for range_low, range_high in ranges:
            #The key prepresents the range of tours to cost
            yield( ("%d-%d"%(range_low,range_high), sales_trip ))

    def mapper(self, key, sales_trip):
        """Mapper for step 2. Finds the shortest and longest tours through a
        small range of all possible tours through the graph.

        At this step the key will contain a string describing the range of
        tours to cost. The sales_trip has the edge cost graph and the starting
        node in a dict.
        #This first line makes this function a generator function rather than a
        #normal function, which MRJob requires in its mapper functions. You need
        #to do this when all the output comes from the mapper_final.
        if False: yield
        matrix = numpy.matrix(sales_trip['graph'])
        num_nodes = matrix.shape[0]

        #The key prepresents the range of tours to cost
        range_low, range_high = map(int,key.split('-'))
        for i in range(range_low,range_high):

            tour = map_int_to_tour(num_nodes, i, sales_trip['start_node'])
            cost = cost_tour(matrix, tour)

            if cost  self.longest_length:
                self.longest_length = cost
                self.longest_path = tour

    def mapper_final(self):
        """Mapper_final for step 2. Outputs winners found by mapper."""
        yield ('shortest', (self.shortest_length, self.shortest_path))
        yield ('longest', (self.longest_length, self.longest_path))

    def reducer(self, key, winners):
        """Reducer for Step 2. Takes the shortest and longest from several
        mappers and/or reducers and yields the overall winners in each category.

        The winners are a list of winners from several mappers OR reducers for
        the given key.

        Run this reducer enough and eventually you get to the final winner in
        each key/category.
        if key == "shortest":
            yield (key ,min(winners))
        if key == "longest":
            yield (key ,max(winners))

if __name__ == '__main__':

The code requires Numpy and Scipy, those can be installed using instructions from install page. On Linux, Scipy and Numpy official releases are source-code only and require other packages such as LAPACK (Linear Algebra PACKage) and ATLAS (Automatically Tuned Linear Algebra Software). I also compiled and installed them from source which is quite an extensive exercise, not for the faint-hearted. Appendix B is for detailed install instructions.

Once the necessary packages (ATLAS, LAPACK, Numpy, Scipy) are installed, the first thing is to test the code running in local mode. It can be run like this for a 10-node example:

$ python -r local example_graphs/10_nodes.json

The result is:

Streaming final output from /tmp/mr_travelling_salesman.root.20120415.220200.910545/output
“longest”    [8.5095326196766621, [8, 9, 0, 4, 7, 5, 1, 2, 3, 6, 8]]
“shortest”    [2.2185043035773591, [8, 2, 9, 5, 3, 4, 1, 6, 0, 7, 8]]

Now it is time to run the code on Hadoop cluster. Debug mode can be enabled using –verbose flag:

$ python -r hadoop –verbose –cmdenv PYTHONPATH=/usr/local/lib64/python2.6/site-packages –cmdenv LD_LIBRARY_PATH=/usr/local/atlas/lib example_graphs/10_nodes.json

What the code does is as follows:

– It creates temporary HDFS input directory using hadoop fs -mkdir

– it copies the input file (10_nodes.json) into HDFS temp directory using hadoop fs -put example_graphs/10_nodes.json

– it creates another temporary HDFS files directory again using hadoop fs -mkdir

– it copies the code ( into this HDFS directory using hadoop fs -put and it also copies the zipped mrjob package (mrjob.tar.gz) into the same HDFS files directory

– then runs the first step: /opt/hadoop/bin/hadoop jar /opt/hadoop/contrib/streaming/hadoop-streaming- -files ‘hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/files/’ -archives ‘hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/files/mrjob.tar.gz#mrjob.tar.gz’ -cmdenv LD_LIBRARY_PATH=/usr/local/atlas/lib -cmdenv PYTHONPATH=/usr/local/lib64/python2.6/site-packages:mrjob.tar.gz -input hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/input -output hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/step-output/1 -mapper ‘python –step-num=0 –mapper’ -jobconf mapred.reduce.tasks=0

You can relialize the PYTHONPATH and LD_LIBRARY_PATH environment settings in the command line – they are indicating the directories where Numpy and Scipy plus the ATLAS/LAPACK libraries were installed, respectively.

– Then it runs the second step: /opt/hadoop/bin/hadoop jar /opt/hadoop/contrib/streaming/hadoop-streaming- -files ‘hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/files/’ -archives ‘hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/files/mrjob.tar.gz#mrjob.tar.gz’ -cmdenv LD_LIBRARY_PATH=/usr/local/atlas/lib -cmdenv PYTHONPATH=/usr/local/lib64/python2.6/site-packages:mrjob.tar.gz -input hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/step-output/1 -output hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/output -mapper ‘python –step-num=1 –mapper’ -reducer ‘python –step-num=1 –reducer’

– once the job is finished, it output the result using hadoop fs -cat
“longest”    [8.5095326196766621, [8, 9, 0, 4, 7, 5, 1, 2, 3, 6, 8]]
“shortest”    [2.2185043035773591, [8, 2, 9, 5, 3, 4, 1, 6, 0, 7, 8]]

– finally it removes the temporary directories using hadoop fs -rmr

Brilliant, isn’t it?

Appendix A. Installing python 2.6 on Centos5.6

$ wget
$ rpm -ivh epel-release-5-4.noarch.rpm
$  yum install python26
$ yum install python26-devel

Note that this will leave python (/usr/bin/python) hard-linked to /usr/bin/python2.4. It is not going to work, even if you run the hadoop code using python26 (python26 -r hadoop …) because in the background the mapper and reducer will be using python (see the details above: e.g. -mapper ‘python –step-num=0 –mapper’ ). is relying on python2.5+ syntax, more specifically it is using ‘with’ statement, see

        with self.make_runner() as runner:

            if not self.options.no_output:
                for line in runner.stream_output():

This will fail with invalid syntax error in python2.4. Thus you need to remove /usr/lib/python hardlink from /usr/bin/python2.4 and relink it to /usr/bin/python26):

$ rm /usr/bin/python
$ ln /usr/bin/python26 /usr/bin/python

This may kill, actually, yum so if you also need to use yum in the meantime, you need to relink python to python2.4. Pain in the neck. That is why I wrote that you’d better go with Centos6.2 or any other distributions coming with python2.6 by default.

Appendix B Installing ATLAS+LAPACK and Numpy/Scipy

To install ATLAS+LAPACK from source, you need to run a few commands like:

$  yum install gcc
$  yum install gcc-gfortran
$  yum install gcc-c++

Download lapack  (Linear Algebra PACKage) from
$  tar xvzf lapack-3.4.0.tgz
$  cd lapack-3.4.0
$  cp INSTALL/
$  cd SRC
$  make

it creates liblapack.a in the parent directory (..)

Downoad ATLAS (Automatically Tuned Linear Algebra Software) from http://sourcefor

$  bunzip -c atlas.3.8.4.tar.bz2
$  tar xvf atlas3.8.4.tar
$  mv ATLAS ATLAS3.8.4
$  cd ATLAS3.8.4
$  mkdir ATLAS3.8.4_Centos5
$  cd ATLAS3.8.4_Centos5/
$  ../configure -Fa alg -fPIC –with-netlib-lapack=/root/lapack/lapack-3.4.0

$ make build                          # tune & build lib
$ make check                                     # sanity check correct answer
$ make time                                        # check if lib is fast
$ cd lib
$ make shared
$ make ptshared
$ cd ..
$ make install                                  # copy libs to install dir

Download NumPy from
Download SciPy from

Set the following enviroment variables (you can also do it in .bash_profile and then source it):

$ export PYTHONPATH=/usr/local/lib64/python2.6/site-packages
# export LD_LIBRARY_PATH=/usr/local/atlas/lib

For Numpy you need to set site.cfg as follows:

library_dirs = /usr/local/atlas/lib
include_dirs = /usr/local/atlas/include

libraries = ptf77blas, ptcblas, atlas
libraries = lapack, ptf77blas, ptcblas, atlas

Then build and install (python should be python2.6):

$ python build
$ python install –prefix=/usr/local

For Scipy you need to run the same commands:
$ python build
$ python install –prefix=/usr/local