Amazon Elastic MapReduce


Amazon Elastic MapReduce is a service in the AWS portfolio that can be used for data processing and analytics on vast amounts of data. It is based on Hadoop (as of writing this article it is using Hadoop 0.20.205) and relies on other AWS services such as EC2 and S3.

The data processing applications can be implemented using various technologies such as Hive, Pig, Java (Custom Jar) and Streaming (e.g. python or ruby). This post will demonstrate how to use Hive on Amazon Elastic MapReduce – the sample application will calculate the average price of Apple stock in every year from 1984 till 2012.   At the time of writing Hive version is 0.7.1 . (Side note: as it will be shown, AAPL started at around 25 USD as an average  price in 1984, managed to get down to 18 USD in 1997 and now it is around 500 – 496.32138, to be more precise -,  quite some numbers for a company that is in Infinite Loop for decades…)

How to create Elastic MapReduce Jobs?

There are three steps to manage an EMR jobflow:

1./ Upload the script (i.e. hive.q file) and the data to be processed onto S3. If you are unfamiliar with AWS, this is a good place to start to understand its structure and the way how to use it.

The test data used in the post is downloaded from Yahoo! Finance website (Historical data for AAPL stock). Go to http://finance.yahoo.com/q/hp?s=AAPL+Historical+Prices  and then  scroll down to Download to Spreadsheet link. This will create a csv file (~6,950 lines) with the following columns: Date,Open,High,Low,Close,Volume,Adj Close. Remove the header (the first line) to leave only the relevant data in the csv file.

Steps to upload the input files:

a./ go to AWS S3 console and create “stockprice” bucket:

b./ create folders under stockprice bucket: apple/input, apple/output and hive-scripts.

c./ upload apple.q hive-script into //stockprice/hive-scripts folder

d./ upload the csv input file containing AAPL stock prices into //stockprice/apple/input folder

2./ Create an Elastic MapReduce jobflow:

Natigate to https://console.aws.amazon.com/elasticmapreduce/home

b./ select “Create New Job Flow”

c./ configure job parameters:

d./ configure EC2 instances:

e./ define EC2 key pair:

f./ if you want, you can configure debugging by defining a S3 log path and selecting “Enable Debugging” (optional). I highly recommend to do it if you are in development phase:

g./ Set no bootstrap actions:

h./ review the configuration before you hit the run button:

i./ create job flow:

j./ you can verify the job flow status from STARTING to RUNNING to SHUTDOWN.

Should there be any issues occuring, you can check the stderr, stdout, syslog from “Debug” menu.

3./ Check the result:

After a few minutes of number crunching, the output will be generated in //stokcprice/apple/output folder (e.g. 000000 file). The file will have a text format with date and stock price cloumns (separeted by SOH – start of heading – ascii 001), see:

1984 25.578625
1985 20.193676
1986 32.46103
1987 53.889683
1988 41.54008
1989 41.659763
1990 37.562687
1991 52.495533
1992 54.803387
1993 41.02672……

2010 259.84247
2011 364.00433
2012 496.32138

Appendix

The hive code to process the data (apple.q) looks like this:

CREATE EXTERNAL TABLE stockprice (
yyyymmdd STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, stock_volume INT, adjclose_price FLOAT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
LINES TERMINATED BY ‘\n’
LOCATION ‘s3://stockprice/apple/input/’;
CREATE TABLE tmp_stockprice (
year INT, close_price FLOAT
)
STORED AS SEQUENCEFILE;

INSERT OVERWRITE TABLE tmp_stockprice
SELECT YEAR(sp.yyyymmdd), sp.close_price
FROM stockprice sp;

CREATE TABLE avg_yearly_stockprice (
year INT, avg_price FLOAT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘\t’
LINES TERMINATED BY ‘\n’
STORED AS TEXTFILE;

INSERT OVERWRITE TABLE avg_yearly_stockprice
SELECT tmp_sp.year, avg(tmp_sp.close_price)
FROM tmp_stockprice tmp_sp
GROUP BY tmp_sp.year;

INSERT OVERWRITE DIRECTORY ‘s3://stockprice/apple/output/’
SELECT * from avg_yearly_stockprice;

Alternatively you can define LOCATION for avg_yearly_stockprice in a similar way (external table) as it is done stockprice table instead of INSERT OVERWRITE DIRECTORY.

Advertisements

Spring Data – Apache Hadoop


  

Spring for Apache Hadoop is a Spring project to support writing applications that can benefit of the integration of Spring Framework and Hadoop.  This post describes how to use Spring Data Apache Hadoop in an Amazon EC2 environment using the “Hello World” equivalent  of Hadoop programming – a Wordcount application.

1./ Launch an Amazon Web Services EC2 instance.

– Navigate to AWS EC2 Console (“https://console.aws.amazon.com/ec2/home”):

– Select Launch Instance then Classic Wizzard and click on Continue. My test environment was a “Basic Amazon Linux AMI 2011.09” 32-bit., Instant type: Micro (t1.micro , 613 MB), Security group quick-start-1 that enables ssh to be used for login. Select your existing key pair (or create a new one). Obviously you can select another AMI and instance types depending on your favourite flavour.  (Should you vote for Windows 2008 based instance, you also need to have cygwin installed as an additional Hadoop prerequisite beside Java JDK and ssh, see “Install Apache Hadoop” section)

2./ Download Apache  Hadoop – as of writing this article, 1.0.0 is the latest stable version of Apache Hadoop, that is what was used for testing purposes. I downloaded hadoop-1.0.0.tar.gz  and copied it into /home/ec2-user directory using pscp command from my PC running Windows:

c:\downloads>pscp -i mykey.ppk hadoop-1.0.0.tar.gz  ec2-user@ec2-176-34-201-185.eu-west-1.compute.amazonaws.com:/home/ec2-user

(the computer name above – ec2-ipaddress-region-compute.amazonaws.com – can be found on AWS EC2 console, Instance Description, public DNS field)

3./ Install Apache Hadoop:

As prerequisites, you need to have Java JDK 1.6 and ssh installed, see Apache Single-Node Setup Guide.  (ssh is automatically installed with Basic Amazon AMI). Then install hadoop itself:

$ cd  ~   # change directory to ec2-user home (/home/ec2-user)

$ tar xvzf hadoop-1.0.0.tar.gz

$ ln -s hadoop-1.0.0  hadoop

$ cd hadoop/conf

$ vi hadoop-env.sh   # edit as below

export JAVA_HOME=/opt/jdk1.6.0_29

$ vi core-site.xml    # edit as below – this defines the namenode to be running on localhost and listeing to port 9000.

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

$ vi hdsf-site.xml  # edit as below  this defines that file system replicate is 1 (in  production environment it is supposed to be 3 by default)

<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

$ vi mapred-site.xml  # edit as below – this defines the jobtracker to be running on localhost and listeing to port 9001.

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
</configuration>

$ cd ~/hadoop

$ bin/hadoop namenode -format

$ bin/start-all.sh

At this stage all hadoop jobs are running in pseudo distributed mode, you can verify it by running:

$ ps -ef | grep java

You should see 5 java processes: namenode, secondarynamenode, datanode, jobtracker and tasktracker.

4./ Install Spring Data Hadoop

Download Spring Data Hadoop package from SpringSource community download site.  As of writing this article, the latest stable version is spring-data-hadoop-1.0.0.M1.zip.

$ cd ~

$ tar xzvf spring-data-hadoop-1.0.0.M1.zip

$ ln -s spring-data-hadoop-1.0.0.M1 spring-data-hadoop

5./ Build and Run Spring Data Hadoop Wordcount example

$ cd spring-data-hadoop/spring-data-hadoop-1.0.0.M1/samples/wordcount

Spring Data Hadoop is using gradle as build tool. Check build.grandle  build file. The original version packaged in the tar.gz file does not compile,  it complains about thrift, version 0.2.0 and  jdo2-api, version2.3-ec.

Add datanucleus.org maven repository to the build.gradle file to support jdo2-api (http://www.datanucleus.org/downloads/maven2/) .

Unfortunatelly, there seems to be no maven repo for thrift 0.2.0 . You should  download thrift 0.2.0.jar and thrift.0.2.0.pom file e.g. from this repo: “http://people.apache.org/~rawson/repo” and then add it to local maven repo.

$ mvn install:install-file -DgroupId=org.apache.thrift  -DartifactId=thrift  -Dversion=0.2.0 -Dfile=thrift-0.2.0.jar  -Dpackaging=jar

$ vi build.grandle  # modify the build file to refer to datanucleus maven repo for jdo2-api and the local repo for thrift

repositories {
// Public Spring artefacts
mavenCentral()
maven { url “http://repo.springsource.org/libs-release&#8221; }
maven { url “http://repo.springsource.org/libs-milestone&#8221; }
maven { url “http://repo.springsource.org/libs-snapshot&#8221; }
maven { url “http://www.datanucleus.org/downloads/maven2/&#8221; }
maven { url “file:///home/ec2-user/.m2/repository” }
}

I also modified the META-INF/spring/context.xml file in order to run hadoop file system commands manually:

$ cd /home/ec2-user/spring-data-hadoop/spring-data-hadoop-1.0.0.M1/samples/wordcount/src/main/resources

$vi META-INF/spring/context.xml   # remove clean-script and also the dependency on it for JobRunner.

<?xml version=”1.0″ encoding=”UTF-8″?>
xmlns=”http://www.springframework.org/schema/beans&#8221;
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance&#8221;
xmlns:context=”http://www.springframework.org/schema/context&#8221;
xmlns:hdp=”http://www.springframework.org/schema/hadoop&#8221;
xmlns:p=”http://www.springframework.org/schema/p&#8221;
xsi:schemaLocation=”http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd”&gt;
<context:property-placeholder location=”hadoop.properties”/>

<hdp:configuration>
fs.default.name=${hd.fs}
<!–hdp:configuration>

<hdp:job id=”wordcount-job” validate-paths=”false”
input-path=”${wordcount.input.path}” output-path=”${wordcount.output.path}”
mapper=”org.apache.hadoop.examples.WordCount.TokenizerMapper”
reducer=”org.apache.hadoop.examples.WordCount.IntSumReducer”/>
<!– simple job runner –>
<bean id=”runner” class=”org.springframework.data.hadoop.mapreduce.JobRunner” p:jobs-ref=”wordcount-job”/>

</beans>

Copy the sample file – nietzsche-chapter-1.txt – to Hadoop file system (/user/ec2-user-/input directory)

$ cd src/main/resources/data

$ hadoop fs -mkdir /user/ec2-user/input

$ hadoop fs -put nietzsche-chapter-1.txt /user/ec2-user/input/data

$ cd ../../../..   # go back to samples/wordcount directory

$ ../gradlew

Verify the result:

$ hadoop fs -cat /user/ec2-user/output/part-r-00000 | more

“AWAY 1
“BY 1
“Beyond 1
“By 2
“Cheers 1
“DE 1
“Everywhere 1
“FROM” 1
“Flatterers 1
“Freedom 1

Welcome to BigHadoop


BigData and particulary Hadoop/MapReduce represent a quickly growing part of Business Intelligence and Data Analytics. In his frequently quoted article on O’Reilly Radar, Edd Dumbill gives a good introduction to big data landscape: What is big data?

Three V-words are recurring when experts attempt to give definition what big data is all about: Volume (terabytes and petabytes of information),  Velocity (data is literally streaming  in with unprecedented speed) and Variety (structured and unstrucuted data). You can convert these V-words into the forth one: Value. BigData promises insights about  things that remained hidden until now.

The intention of this blog is to cover various technologies from cloud computing that provides the infrastructure to Hadoop distributions that are used to crunch the numbers to mobile analytics that can support easy access to the results of the complex algorithms and enormous computing capacity.