Yelp’s mrjob – a python package for Hadoop jobs

This time I cover mrjob which is a python package open sourced by Yelp to write Hadoop jobs. It supports your own Hadoop cluster (based on Hadoop streaming) and AWS Elastic MapReduce (EMR), too. It requires Python 2.5+, see more details in here.

HP Cloud Services

The cloud services that I used for testing mrjob is from HP – HP Public Could Services. That could offering alone would deserve a dedicated post – I signed up for their private beta and liked it, though it is way behind the sophistatication of AWS service portfolio. It is still free but it seems now that HP is planning to charge for the service from mid-May, 2012 onwards. It is based an OpenStack (more precisely I used HP Cloud Compute relying on Nova from the OpenStack suite).

To control the virtual machines on HP Cloud Services, you need to login to (sign up first) and go to Manage. Once you activated the service, you go to Manage Servers and click on Create. You can pick up various flavours of Linux distrubutions from Centos to Ubuntu to Fedora and to Debian. (originally – before embarking on the mrjob exercise – I picked up Centos 5.6, which I should not have, should I know in advance about mrjob because Centos5.6 comes with python2.4 and mrjob needs python2.5+. But that is fine, just python2.6 and python2.6-devel had to be installed along the way, see the necessary steps in Appendix A below. Centos6.2 – coming with python2.6 – could have saved this extra work ).I created two instances for hadoop, one for master and one for slave.

HP Could Services dashboard looks like this:


Yelp’s mrjob can be downloaded from GitHub. I used mrjob v0.3.2 for my tests, the code itself is around 15,000 lines. It supports local mode (mainly for quick tests and debugging), hadoop mode running on a hadoop cluster and emr mode running on AWS EMR.

The “Hello World!” of Hadoop – wordcount – looks as follows:

class MRWordCountUtility(MRJob):

    def __init__(self, *args, **kwargs):
        super(MRWordCountUtility, self).__init__(*args, **kwargs)
        self.chars = 0
        self.words = 0
        self.lines = 0

    def mapper(self, _, line):
        # Don't actually yield anything for each line. Instead, collect them
        # and yield the sums when all lines have been processed. The results
        # will be collected by the reducer.
        self.chars += len(line) + 1  # +1 for newline
        self.words += sum(1 for word in line.split() if word.strip())
        self.lines += 1

    def mapper_final(self):
        yield('chars', self.chars)
        yield('words', self.words)
        yield('lines', self.lines)

    def reducer(self, key, values):
        yield(key, sum(values))

if __name__ == '__main__':

This is plain and simple so let us have a look at another example (Author: Jordan Andersen) that comes with the tarball. This is the classic travelling salesman problem: “given a list of cities and their pairwise distances, the task is to find the shortest possible route that visits each city exactly once and returns to the origin city”. For mathematically inclined people, this is another source to get more details about the problem.

Using Hadoop and mrjob, the travelling salesman problem can have a brute force MapReduce solution: it is a 2-step process, in the first step the mapper defines reasonable sized chunks for the second step where then each mapper will capculate the shortest and longest past from the possible  (N-1)!  tours – N is the number of cities/nodes in the graph – and eventually the reducer picks up the right solution.

The travelling salesman mapreduce code looks like this:

_author__ = 'Jordan Andersen '

from mrjob.job import MRJob
from scipy.misc.common import factorial
import sys
import numpy

    import simplejson as json
    json  # quiet "redefinition of unused ..." warning from pyflakes
except ImportError:
    import json

def map_int_to_tour(num_nodes, i, start_node):
    """Gets a unique tour through a graph given an integer and starting node.

    num_nodes -- the number of nodes in the graph being toured
    i -- the integer to be mapped to the set of tours for the graph
    start_node -- the node index to begin and end the tour on
    nodes_remaining = range(0,start_node) + range(start_node + 1, num_nodes)
    tour = []

    while len(nodes_remaining) > 0:
        num_nodes = len(nodes_remaining)
        next_step = nodes_remaining[i % num_nodes]
        i = i / num_nodes

    tour = [start_node] + tour + [start_node]
    return tour

def cost_tour(graph, tour):
    """Calculates the travel cost of given tour through a given graph.

    graph -- A square numpy.matrix representing the travel cost of each edge on
            the graph.
    tour -- A list of integers representing a tour through the graph where each
            entry is the index of a node on the graph.
    steps = zip(tour[0:-1], tour[1:])
    cost = sum([ graph[step_from,step_to] for step_from, step_to in steps])
    return cost

class MRSalesman(MRJob):

    def steps(self):
        """Defines the two steps, which are as follows:

        1.  Mapper splits the problem into reasonable chunks by mapping each
            possible tour to the integers and assigning each Step 2 mapper a
            range of tours to cost.
        2.  The mapper takes a range of tours and a description of the trip and
            yields the longest and shortests tours. The reduces yields the
            longest of the long and the shortest of the short tours.

        Notice the first step has no reducer. This allows all of the keys put
        out by the first step to be inputs to step 2's mappers without having
        to be reduced.
        return ([,
                        mapper_final = self.mapper_final)]

    def __init__(self, *args, **kwargs):
        """Initializes an instance of the MRSalesman class. See MRJob for

        Some instance variables are initialized here that will be modified
        with while mapping in step 2 and output but the step 2 mapper_final.
        super(MRSalesman, self).__init__(*args, **kwargs)
        self.shortest_length = sys.maxint
        self.shortest_path = []
        self.longest_length = 0
        self.longest_path = []

    def splitter(self, key, line):
        """The mapper for step 1. Splits the range of possible tours into
        reasonably sized chunks for the consumption of the step 2 mappers.

        At this point the 'line' input should come directly from the first line
        of the one-line json file contains the edge cost graph and the starting
        node. The key is not relevant.
        #loading the json description of the trip to get at the size
        #of the edge costgraph
        sales_trip = json.loads(line)
        m = numpy.matrix(sales_trip['graph'])
        num_nodes = m.shape[0]
        num_tours = factorial(num_nodes - 1)

        #Here we break down the full range of possible tours into smaller
        #pieces. Each piece is passed along as a key along with the trip
        step_size = int(100 if num_tours < 100**2 else num_tours / 100)
        steps = range(0, num_tours, step_size) + [num_tours]
        ranges = zip(steps[0:-1], steps[1:])

        for range_low, range_high in ranges:
            #The key prepresents the range of tours to cost
            yield( ("%d-%d"%(range_low,range_high), sales_trip ))

    def mapper(self, key, sales_trip):
        """Mapper for step 2. Finds the shortest and longest tours through a
        small range of all possible tours through the graph.

        At this step the key will contain a string describing the range of
        tours to cost. The sales_trip has the edge cost graph and the starting
        node in a dict.
        #This first line makes this function a generator function rather than a
        #normal function, which MRJob requires in its mapper functions. You need
        #to do this when all the output comes from the mapper_final.
        if False: yield
        matrix = numpy.matrix(sales_trip['graph'])
        num_nodes = matrix.shape[0]

        #The key prepresents the range of tours to cost
        range_low, range_high = map(int,key.split('-'))
        for i in range(range_low,range_high):

            tour = map_int_to_tour(num_nodes, i, sales_trip['start_node'])
            cost = cost_tour(matrix, tour)

            if cost  self.longest_length:
                self.longest_length = cost
                self.longest_path = tour

    def mapper_final(self):
        """Mapper_final for step 2. Outputs winners found by mapper."""
        yield ('shortest', (self.shortest_length, self.shortest_path))
        yield ('longest', (self.longest_length, self.longest_path))

    def reducer(self, key, winners):
        """Reducer for Step 2. Takes the shortest and longest from several
        mappers and/or reducers and yields the overall winners in each category.

        The winners are a list of winners from several mappers OR reducers for
        the given key.

        Run this reducer enough and eventually you get to the final winner in
        each key/category.
        if key == "shortest":
            yield (key ,min(winners))
        if key == "longest":
            yield (key ,max(winners))

if __name__ == '__main__':

The code requires Numpy and Scipy, those can be installed using instructions from install page. On Linux, Scipy and Numpy official releases are source-code only and require other packages such as LAPACK (Linear Algebra PACKage) and ATLAS (Automatically Tuned Linear Algebra Software). I also compiled and installed them from source which is quite an extensive exercise, not for the faint-hearted. Appendix B is for detailed install instructions.

Once the necessary packages (ATLAS, LAPACK, Numpy, Scipy) are installed, the first thing is to test the code running in local mode. It can be run like this for a 10-node example:

$ python -r local example_graphs/10_nodes.json

The result is:

Streaming final output from /tmp/mr_travelling_salesman.root.20120415.220200.910545/output
“longest”    [8.5095326196766621, [8, 9, 0, 4, 7, 5, 1, 2, 3, 6, 8]]
“shortest”    [2.2185043035773591, [8, 2, 9, 5, 3, 4, 1, 6, 0, 7, 8]]

Now it is time to run the code on Hadoop cluster. Debug mode can be enabled using –verbose flag:

$ python -r hadoop –verbose –cmdenv PYTHONPATH=/usr/local/lib64/python2.6/site-packages –cmdenv LD_LIBRARY_PATH=/usr/local/atlas/lib example_graphs/10_nodes.json

What the code does is as follows:

– It creates temporary HDFS input directory using hadoop fs -mkdir

– it copies the input file (10_nodes.json) into HDFS temp directory using hadoop fs -put example_graphs/10_nodes.json

– it creates another temporary HDFS files directory again using hadoop fs -mkdir

– it copies the code ( into this HDFS directory using hadoop fs -put and it also copies the zipped mrjob package (mrjob.tar.gz) into the same HDFS files directory

– then runs the first step: /opt/hadoop/bin/hadoop jar /opt/hadoop/contrib/streaming/hadoop-streaming- -files ‘hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/files/’ -archives ‘hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/files/mrjob.tar.gz#mrjob.tar.gz’ -cmdenv LD_LIBRARY_PATH=/usr/local/atlas/lib -cmdenv PYTHONPATH=/usr/local/lib64/python2.6/site-packages:mrjob.tar.gz -input hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/input -output hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/step-output/1 -mapper ‘python –step-num=0 –mapper’ -jobconf mapred.reduce.tasks=0

You can relialize the PYTHONPATH and LD_LIBRARY_PATH environment settings in the command line – they are indicating the directories where Numpy and Scipy plus the ATLAS/LAPACK libraries were installed, respectively.

– Then it runs the second step: /opt/hadoop/bin/hadoop jar /opt/hadoop/contrib/streaming/hadoop-streaming- -files ‘hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/files/’ -archives ‘hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/files/mrjob.tar.gz#mrjob.tar.gz’ -cmdenv LD_LIBRARY_PATH=/usr/local/atlas/lib -cmdenv PYTHONPATH=/usr/local/lib64/python2.6/site-packages:mrjob.tar.gz -input hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/step-output/1 -output hdfs:///user/root/tmp/mrjob/mr_travelling_salesman.root.20120415.220633.429682/output -mapper ‘python –step-num=1 –mapper’ -reducer ‘python –step-num=1 –reducer’

– once the job is finished, it output the result using hadoop fs -cat
“longest”    [8.5095326196766621, [8, 9, 0, 4, 7, 5, 1, 2, 3, 6, 8]]
“shortest”    [2.2185043035773591, [8, 2, 9, 5, 3, 4, 1, 6, 0, 7, 8]]

– finally it removes the temporary directories using hadoop fs -rmr

Brilliant, isn’t it?

Appendix A. Installing python 2.6 on Centos5.6

$ wget
$ rpm -ivh epel-release-5-4.noarch.rpm
$  yum install python26
$ yum install python26-devel

Note that this will leave python (/usr/bin/python) hard-linked to /usr/bin/python2.4. It is not going to work, even if you run the hadoop code using python26 (python26 -r hadoop …) because in the background the mapper and reducer will be using python (see the details above: e.g. -mapper ‘python –step-num=0 –mapper’ ). is relying on python2.5+ syntax, more specifically it is using ‘with’ statement, see

        with self.make_runner() as runner:

            if not self.options.no_output:
                for line in runner.stream_output():

This will fail with invalid syntax error in python2.4. Thus you need to remove /usr/lib/python hardlink from /usr/bin/python2.4 and relink it to /usr/bin/python26):

$ rm /usr/bin/python
$ ln /usr/bin/python26 /usr/bin/python

This may kill, actually, yum so if you also need to use yum in the meantime, you need to relink python to python2.4. Pain in the neck. That is why I wrote that you’d better go with Centos6.2 or any other distributions coming with python2.6 by default.

Appendix B Installing ATLAS+LAPACK and Numpy/Scipy

To install ATLAS+LAPACK from source, you need to run a few commands like:

$  yum install gcc
$  yum install gcc-gfortran
$  yum install gcc-c++

Download lapack  (Linear Algebra PACKage) from
$  tar xvzf lapack-3.4.0.tgz
$  cd lapack-3.4.0
$  cp INSTALL/
$  cd SRC
$  make

it creates liblapack.a in the parent directory (..)

Downoad ATLAS (Automatically Tuned Linear Algebra Software) from http://sourcefor

$  bunzip -c atlas.3.8.4.tar.bz2
$  tar xvf atlas3.8.4.tar
$  mv ATLAS ATLAS3.8.4
$  cd ATLAS3.8.4
$  mkdir ATLAS3.8.4_Centos5
$  cd ATLAS3.8.4_Centos5/
$  ../configure -Fa alg -fPIC –with-netlib-lapack=/root/lapack/lapack-3.4.0

$ make build                          # tune & build lib
$ make check                                     # sanity check correct answer
$ make time                                        # check if lib is fast
$ cd lib
$ make shared
$ make ptshared
$ cd ..
$ make install                                  # copy libs to install dir

Download NumPy from
Download SciPy from

Set the following enviroment variables (you can also do it in .bash_profile and then source it):

$ export PYTHONPATH=/usr/local/lib64/python2.6/site-packages
# export LD_LIBRARY_PATH=/usr/local/atlas/lib

For Numpy you need to set site.cfg as follows:

library_dirs = /usr/local/atlas/lib
include_dirs = /usr/local/atlas/include

libraries = ptf77blas, ptcblas, atlas
libraries = lapack, ptf77blas, ptcblas, atlas

Then build and install (python should be python2.6):

$ python build
$ python install –prefix=/usr/local

For Scipy you need to run the same commands:
$ python build
$ python install –prefix=/usr/local

1 thought on “Yelp’s mrjob – a python package for Hadoop jobs

  1. Pingback: Twitter scalding – Scala and Hadoop hand in hand | BigHadoop

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s