If you have read the paper published by Google’s Jeffrey Dean and Sanjay Ghemawat (MapReduce: Simplied Data Processing on Large Clusters), they revealed that their work was inspired by the concept of functional languages: “Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages….Our use of a functional model with user-specified map and reduce operations allows us to parallelize large computations easily and to use re-execution as the primary mechanism for fault tolerance.”
Given the fact the Scala is a programming language that combines objective oriented and functional progarmming and runs on JVM, it is a fairly natural evolution to introduce Scala in Hadoop environment. That is what Twitter engineers did. (See more on how Scala is used at Twitter: “Twitter on Scala” and “The Why and How of Scala at Twitter“). Scala has powerful support for mapping, filtering, pattern matching (regular expressions) so it is a pretty good fit for MapReduce jobs.
Scalding
Twitter Scala based MapReduce implementation – Scalding – is based on Cascading Java API (that is where it is name coming from – it is essentially a Scala library built on top of Cascading API) and has been open-sourced this year. The code can be found in github.
Prerequisites
In order to use scalding, you need to have Scala installed, you can grab the latest version from here. As of writing this article, the latest stable version is Scala 2.9.2, that was used by my tests, too.
$ scala Welcome to Scala version 2.9.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_31). Type in expressions to have them evaluated. Type :help for more information. scala>
You also need sbt, the scala build tool. That can be downloaded from here. Unless you want to build the code from the source, you can just grap the jar file and create a sbt file, as described in the sbt Getting Started wiki. The sbt file is a one-liner but it requires quite some memory so if you are in a cloud envrionment, an extra small virtual machine might not fit to your need (that was the wall that I hit in the beginning).
java -Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=384M -jar `dirname $0`/sbt-launch.jar "$@"
In addition, you will need ruby because there is a ruby wrapper to run the sclading Hadoop job in an easy way. I had ruby 1.8.6 for tests.
ruby --version ruby 1.8.6 (2007-09-24 patchlevel 111) [x86_64-linux]
Dive into Scalding (Tribute to Mark Pilgrim)
Once you downloaded the code from Scalding repository using git clone command (git clone git@github.com:twitter/scalding.git)
or got the zip file (as of writing this article it is named twitter-scalding-0.4.1-15-ga54a5a3.zip), you need to the run the following commands:
$ sbt update
$ sbt test
$ sbt assembly
The latter one creates a jar file which is used by the ruby wrapper script to run hadoop jobs – I came back to this later.
Since the Getting Started examples are full of WordCounts, I was going to try something different – I wanted to implement the
$ hadoop jar hadoop-examples-*.jar grep input output Hello
code – not that this is something never seen, just for the taste. In the appendix you will see the original Grep Hadoop code from the distribution and you may appreciate the condensed Scala code. Do not get me wrong, this article is not meant to compare Java to Scala in any shape or form but still the style of Scala is remarkable.
GrepJob.scala
import com.twitter.scalding._ import scala.util.matching.Regex class GrepJob(args : Args) extends Job(args) { val pattern = new Regex(args("regexp")) TextLine(args("input")) .flatMap('line -> 'word) { line : String => line.split("\\s+") } .flatMap('word -> 'match) { word : String => pattern.findAllIn(word).toList } .groupBy('match) { _.size } .write(Tsv(args("output"))) }
You can run the code with the aforementioned scald.rb ruby wrapper which is in scripts directory. The code can be run locally or on hdfs (similar concept to Yelp’s mrjob python solution)
First, the standard Hadoop grep example:
$ hadoop jar /opt/hadoop/hadoop-examples-0.20.203.0.jar grep input output Hello $ hadoop fs -cat output/part* 3 Hello
Then Scalding in local mode looks like this (but before you run it using the scald.rb script, you need to specify the host where you want to run the Hadoop job. This requires modifying the HOST variable in scald.rb file:
HOST=”my.remote.host” #where the job is rsynced to and run
In my case it was
HOST=”hadoopmaster”
Also I recommend to modify the REDUCERS variable in the scald.rb file, by default it is set to 100.)
scripts/scald.rb --local GrepJob.scala --input input.txt --output output.txt --regexp Hello /root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar Hello java -Xmx3g -cp /root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar:/tmp/GrepJob.jar com.twitter.scalding.Tool GrepJob --local --input input.txt --output output.txt --regexp Hello12/04/28 15:58:05 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+] 12/04/28 15:58:05 INFO flow.Flow: [] starting 12/04/28 15:58:05 INFO flow.Flow: [] source: FileTap["TextLine[['num', 'line']->[ALL]]"]["input.txt"]"] 12/04/28 15:58:05 INFO flow.Flow: [] sink: FileTap["TextDelimited[[UNKNOWN]->[ALL]]"]["output.txt"]"] 12/04/28 15:58:05 INFO flow.Flow: [] parallel execution is enabled: true 12/04/28 15:58:05 INFO flow.Flow: [] starting jobs: 1 12/04/28 15:58:05 INFO flow.Flow: [] allocating threads: 1 12/04/28 15:58:05 INFO planner.FlowStep: [] starting step: (1/1) local 12/04/28 15:58:05 INFO assembly.AggregateBy: using threshold value: 100000 $ cat output.txt Hello 3
And finally Scalding with default, hdfs mode:
$ scripts/scald.rb --hdfs GrepJob.scala --input input/input.txt --output output --regexp Hello /root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar ssh -C hadoopmaster HADOOP_CLASSPATH=/usr/share/java/hadoop-lzo-0.4.14.jar:scalding-assembly-0.4.1.jar:job-jars/GrepJob.jar hadoop jar scalding-assembly-0.4.1.jar -libjars job-jars/GrepJob.jar -Dmapred.reduce.tasks=1 GrepJob --hdfs --input input/input.txt --output output --regexp Hello12/04/28 15:55:53 INFO hadoop.HadoopUtil: resolving application jar from found main method on: com.twitter.scalding.Tool$ 12/04/28 15:55:53 INFO hadoop.HadoopPlanner: using application jar: /root/scalding-assembly-0.4.1.jar 12/04/28 15:55:53 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+] 12/04/28 15:55:53 INFO flow.Flow: [] starting 12/04/28 15:55:53 INFO flow.Flow: [] source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["input/input.txt"]"] 12/04/28 15:55:53 INFO flow.Flow: [] sink: Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"] 12/04/28 15:55:53 INFO flow.Flow: [] parallel execution is enabled: true 12/04/28 15:55:53 INFO flow.Flow: [] starting jobs: 1 12/04/28 15:55:53 INFO flow.Flow: [] allocating threads: 1 12/04/28 15:55:53 INFO planner.FlowStep: [] starting step: (1/1) Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"] 12/04/28 15:56:02 INFO mapred.FileInputFormat: Total input paths to process : 1 12/04/28 15:56:02 INFO planner.FlowStep: [] submitted hadoop job: job_201204241302_0034 $ hadoop fs -cat output/part* Hello 3
The key command that is executed on hadoopmaster server is basically a hadoop jar command with the scalding-assembly jar file (the one that was created by sbt assembly command above) :
ssh -C hadoopmaster HADOOP_CLASSPATH=/usr/share/java/hadoop-lzo-0.4.14.jar:scalding-assembly-0.4.1.jar:job-jars/GrepJob.jar hadoop jar scalding-assembly-0.4.1.jar -libjars job-jars/GrepJob.jar -Dmapred.reduce.tasks=1 GrepJob –hdfs –input input/input.txt –output output –regexp Hello
That is it. I hope you got the impression that Scala and Hadoop can really go hand in hand, they can complement each other very well.
Appendix
Grep.java
package org.apache.hadoop.examples; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* Extracts matching regexs from input files and counts them. */ public class Grep extends Configured implements Tool { private Grep() {} // singleton public int run(String[] args) throws Exception { if (args.length < 3) { System.out.println("Grep []"); ToolRunner.printGenericCommandUsage(System.out); return -1; } Path tempDir = new Path("grep-temp-"+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); JobConf grepJob = new JobConf(getConf(), Grep.class); try { grepJob.setJobName("grep-search"); FileInputFormat.setInputPaths(grepJob, args[0]); grepJob.setMapperClass(RegexMapper.class); grepJob.set("mapred.mapper.regex", args[2]); if (args.length == 4) grepJob.set("mapred.mapper.regex.group", args[3]); grepJob.setCombinerClass(LongSumReducer.class); grepJob.setReducerClass(LongSumReducer.class); FileOutputFormat.setOutputPath(grepJob, tempDir); grepJob.setOutputFormat(SequenceFileOutputFormat.class); grepJob.setOutputKeyClass(Text.class); grepJob.setOutputValueClass(LongWritable.class); JobClient.runJob(grepJob); JobConf sortJob = new JobConf(Grep.class); sortJob.setJobName("grep-sort"); FileInputFormat.setInputPaths(sortJob, tempDir); sortJob.setInputFormat(SequenceFileInputFormat.class); sortJob.setMapperClass(InverseMapper.class); sortJob.setNumReduceTasks(1); // write a single file FileOutputFormat.setOutputPath(sortJob, new Path(args[1])); sortJob.setOutputKeyComparatorClass // sort by decreasing freq (LongWritable.DecreasingComparator.class); JobClient.runJob(sortJob); } finally { FileSystem.get(grepJob).delete(tempDir, true); } return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Grep(), args); System.exit(res); } }
Pingback: Twitter’s Scalding – Scala and Hadoop hand in hand « Another Word For It
I cloned the workspace, and at the step:
sbt update
it downloaded a lot of stuff and then stopped with:
[warn] Note: Some unresolved dependencies have extra attributes. Check that these dependencies exist with the requested attributes.
[warn] com.eed3si9n:sbt-assembly:0.8.1 (sbtVersion=0.12, scalaVersion=2.9.2)
[warn]
sbt.ResolveException: unresolved dependency: com.eed3si9n#sbt-assembly;0.8.1: not found
What should I try next?
I found the solution to “unresolved dependency: com.eed3si9n#sbt-assembly;0.8.1” here:
https://github.com/twitter/scalding/wiki/Scala-and-sbt-for-MacPorts-users