Apache Spark – a Fast Big Data Analytics Engine


Introduction

There are different approaches in big data world to make Hadoop more suitable for ad-hoc, interactive queries and iterative data processing. As it is very well known, Hadoop MapReduce framework is primarily designed for batch processing and that makes it less suitable for ad-hoc data exploration, machine learning processes and the like. Big data vendors are trying to address this challenge by replacing MaReduce with alternatives. In case of SQL on Hadoop, there are various initiatives; Cloudera Impala, Pivotal HAWQ or Hortonworks Stinger initiative that aims to improve Hive performance significantly.

Apache Spark is another increasingly popular alternative to replace MapReduce with a more performant execution engine but still use Hadoop HDFS as storage engine for large data sets.

Spark Architecture

From architecture perspective Apache Spark is based on two key concepts; Resilient Distributed Datasets (RDD) and directed acyclic graph (DAG) execution engine. With regards to datasets, Spark supports two types of RDDs: parallelized collections that are based on existing Scala collections and Hadoop datasets that are created from the files stored on HDFS. RDDs support two kinds of operations: transformations and actions. Transformations create new datasets from the input (e.g. map or filter operations are transformations), whereas actions return a value after executing calculations on the dataset (e.g. reduce or count operations are actions).
The DAG engine helps to eliminate the MapReduce multi-stage execution model and offers significant performance improvements.

Spark-Architecture
Figure 1: Spark Architecture

Installing Spark

Spark is written in Scala so before you install Spark, you need to install Scala. Scala binaries can be downloaded from http://www.scala-lang.org.

$ wget http://www.scala-lang.org/files/archive/scala-2.10.4.tgz
$ tar xvf scala-2.10.4.tgz 
$ ln -s scala-2.10.4 scala
$ vi .bashrc
export SCALA_HOME=/home/istvan/scala
export PATH=$SCALA_HOME/bin:$PATH

You can validate your Scala installation by running Scala REPL (Scala command line interpreter), below is an example how to execute the classic HelloWorld program from Scala:

$ scala
Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_32).
Type in expressions to have them evaluated.
Type :help for more information.

scala> object HelloWorld {
     |     def main(args: Array[String]) {
     |         println("Hello, world!")
     |     }
     | }
defined module HelloWorld

scala> HelloWorld.main(null)
Hello, world!

Then you can download Spark binaries from http://spark.apache.org/downloads.html. There are a couple of pre-compiled versions depending on your Hadoop distribution; we are going to use Spark binaries built for Cloudera CDH4 distribution.

$ wget http://d3kbcqa49mib13.cloudfront.net/spark-0.9.0-incubating-bin-cdh4.tgz
$ tar xvf spark-0.9.0-incubating-bin-cdh4.tgz 
$ ln -s spark-0.9.0-incubating-bin-cdh4 spark

Using Spark shell

Now we are ready to run Spark shell which is a command line interpreter for Spark:

$ bin/spark-shell
14/03/31 15:54:51 INFO HttpServer: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
14/03/31 15:54:51 INFO HttpServer: Starting HTTP Server
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.9.0
      /_/

Using Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_32)
Type in expressions to have them evaluated.
Type :help for more information.
....
14/03/31 15:54:59 INFO HttpServer: Starting HTTP Server
...
Created spark context..
Spark context available as sc.

scala>

In our example we are going to process Apache weblogs that support the common logfile format having the following fields: hostname, timestamp, request, HTTP status code and number of bytes. The test file that we are using in this example is based on the public NASA weblog from 1995 August, see http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html. The file was cleaned and modified to support tab separated format.

Let us assume that you want to count how many hits the NASA web server got in August, 1995. In order to get the result, you can run the following commands in spark-shell:

Spark context available as sc.

scala> val accessLog = sc.textFile("hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95")
14/03/31 16:18:16 INFO MemoryStore: ensureFreeSpace(82970) called with curMem=0, maxMem=311387750
14/03/31 16:18:16 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 81.0 KB, free 296.9 MB)
accessLog: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12

scala> accessLog.count()
14/03/31 16:18:24 INFO FileInputFormat: Total input paths to process : 1
14/03/31 16:18:24 INFO SparkContext: Starting job: count at :15
14/03/31 16:18:24 INFO DAGScheduler: Got job 0 (count at :15) with 2 output partitions (allowLocal=false)
...
14/03/31 16:18:26 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:134217728+27316431
..
14/03/31 16:18:26 INFO SparkContext: Job finished: count at :15, took 2.297566932 s
res0: Long = 1569898

The first command creates an RDD from the NASA Apache access log stored on HDFS (hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95). The second command (an action executed on the accessLog RDD) will count the number of lines in the file. Note that the execution time is around 2 seconds.

If you are interested to know how many requests were initiated from one particular server (e.g. beta.xerox.com in our example), you need to execute a filter operation and then you run count action on the filtered dataset, see:

scala> val filteredLog = accessLog.filter(line => line.contains("beta.xerox.com"))

filteredLog: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at :14

scala> filteredLog.count()
14/03/31 16:19:35 INFO SparkContext: Starting job: count at :17
14/03/31 16:19:35 INFO DAGScheduler: Got job 1 (count at :17) with 2 output partitions (allowLocal=false)
...
14/03/31 16:19:35 INFO Executor: Running task ID 2
...
14/03/31 16:19:35 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:0+134217728
14/03/31 16:19:36 INFO Executor: Serialized size of result for 2 is 563
1...
14/03/31 16:19:36 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:134217728+27316431
...
14/03/31 16:19:37 INFO DAGScheduler: Stage 1 (count at :17) finished in 1.542 s
14/03/31 16:19:37 INFO SparkContext: Job finished: count at :17, took 1.549706892 s
res1: Long = 2318

You can also execute more complex logic and define your own functions, thanks to Scala language capabilities. Let us assume that we need to calculate the total number of bytes generated by the given NASA web server in August, 1995. The number of bytes is the last (5th) field in the lines in the weblog file and unfortunately, there are cases when the field is ‘-‘, not an integer value as it would be expected. The standard toInt Scala String function throws an exception if you want to convert a non-numeric value into Integer. Thus we need to be able to identify whether a given string is a number or not and if not, we need to return 0. This requires a custom function (convertToInt) that will extend the standard String Scala class and will be made available for the String data type. Then we can use this custom function and the Spark standard RDD operations to calculate the total number of  bytes generated by the NASA webserver.

scala> def isNumeric(input: String): Boolean = input.forall(_.isDigit)
isNumeric: (input: String)Boolean


scala> class StringHelper(s:String) {
     |    def convertToInt():Int = if (isNumeric(s)) s.toInt else 0
     | }
defined class StringHelper


scala> implicit def stringWrapper(str: String) = new StringHelper(str)
warning: there were 1 feature warning(s); re-run with -feature for details
stringWrapper: (str: String)StringHelper


scala> "123".convertToInt
res2: Int = 123

scala> "-".convertToInt
res4: Int = 0

scala> accessLog.map(line=>line.split("\t")).map(line=>line(4).convertToInt).sum
14/04/01 13:12:39 INFO SparkContext: Starting job: sum at :21
14/04/01 13:12:39 INFO DAGScheduler: Got job 2 (sum at :21) with 2 output partitions (allowLocal=false)
...
14/04/01 13:12:40 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:0+134217728
... 
14/04/01 13:12:45 INFO SparkContext: Job finished: sum at :21, took 5.53186208 s
res4: Double = 2.6828341424E10

As you can see, the result is 26,8GB and the calculation was executed in 5.5 seconds.

Programming Spark

In addition to spark-shell that can be used to execute operations interactively, you can also write and build your code using Scala, Java or Python programming languages. Let us take an example how you can implement your weblog application in Scala.

In order to build your application, you need to follow the directory structure as shown below:

./
./simple.sbt
./src/main/scala/WeblogApp.scala
./project/build.properties
./sbt

You can copy sbt build tool from your Spark home directory (cp -af $SPARK_HOME/sbt/* ./sbt).

The simple.sbt build file should look something like this:

name := "Weblog Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.4.0"

And the WeblogApp.scala code is as follows:

import org.apache.spark._

object WeblogApp {
    def main(args: Array[String]) {
        val file = "hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95";
        val sc = new SparkContext("local", "WeblogApp",
                                  System.getenv("SPARK_HOME"), 
                                  SparkContext.jarOfClass(this.getClass))
        val accessLog = sc.textFile(file)

        println("Number of entries: " + accessLog.count())
    }
}

Then you can build and run the application using the Scala sbt build tool:

$ sbt/sbt package
$ sbt/sbt run
Launching sbt from sbt/sbt-launch-0.12.4.jar
[info] Loading project definition from /home/istvan/project
[info] Set current project to Weblog Project (in build file:/home/istvan/)
[info] Running WeblogApp 
...
14/04/01 14:48:58 INFO SparkContext: Starting job: count at WeblogApp.scala:12
14/04/01 14:48:58 INFO DAGScheduler: Got job 0 (count at WeblogApp.scala:12) with 2 output partitions (allowLocal=false)
...
14/04/01 14:49:01 INFO SparkContext: Job finished: count at WeblogApp.scala:12, took 2.67797083 s
Number of entries: 1569898
14/04/01 14:49:01 INFO ConnectionManager: Selector thread was interrupted!
[success] Total time: 9 s, completed 01-Apr-2014 14:49:01

Conclusion

Apache Spark has started gaining significant momentum and considered to be a promising alternative to support ad-hoc queries and iterative processing logic by replacing MapReduce. It offers interactive code execution using Python and Scala REPL but you can also write and compile your application in Scala and Java. There are various tools running on top of Spark such as Shark (SQL on Hadoop), MLib (machine learning), Spark Streaming and GraphX.It will be interesting to see how it evolves.

Pivotal Hadoop Distribution and HAWQ realtime query engine


Introduction

SQL on Hadoop and the support for interactive, ad-hoc queries in Hadoop is in increasing demand and all the vendors are providing their answer to these requirements. In the open source world Cloudera’s Impala, Apache Drill (backed by MapR), Hortonworks’s Stinger initiatives are competing in this market, just to mention a few key players. There are also strong offerings from BI and analytics vendors such as Pivotal (HAWQ), Teradata (SQL-H) or IBM (BigSQL).
In this post we will cover Pivotal Hadoop Distribution (Pivotal HD) and HAWQ, Pivotal’s interactive distributed SQL query engine.

Getting started with Pivotal HD

Pivotal HD contains the most well-known open source components such as HDFS, MapReduce, YARN, Hive, Pig, HBase, Flume, Sqoop and Mahout. There are also additional components available such as the Command Center, Unified Storage Services, Data Loader, Spring and HAWQ as an add-on. (Pivotal has an offering called GemFire 😄 which is a distributed in-memory data grid but that is out of scope for our current discussion).

PivotalHD_ArchitectDiagram

Let us take an example how to use Pivotal HD to answer the following question: what was the highest price of the Apple, Google and Nokia stocks ever and when those stocks reached the peak value?

First we are going to develop a MapReduce algorithm to calculate these values and then we will run SQL queries in HAWQ to get the same result. Our test environment is based on Pivotal HD Single Node virtual machine running on VMWare VMPlayer and it is using a 64-bit CentOS 6.4 distribution. Pivotal HD virtual machine does not contain Eclipse so we had to download that separately from eclipse.org.

Once we have the environment set, the next step is to create a maven project.

$ mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=highest_stock_price -DartifactId=highest_stock_price

This command will create a pom.xml where we have the basic project settings and junit added as a dependency. Then we need to edit pom.xml and add the other relevant dependencies and build settings.
After that we can start writing our Hadoop application in Eclipse. The code is also uploaded to Github (https://github.com/iszegedi/Pivotal-HD-and-HAWQ-blog) for your reference.

pivotal-eclipse-dev

The key Java classes are HighestStockPriceDriver.java which is the main driver file for our MapReduce application, the HighestStockPriceMapper.java which contains the map() function and the HighestStockPriceReducer.java which is running the reduce() function.

Then we can compile the code and package it into a jar file:

$ mvn clean compile
$ mvn -DskipTests package

The next step is to copy our data sets into a Hadoop HDFS directory.

$ hadoop fs -mkdir /stock_demo/input
$ hadoop fs -put *.csv /stock_demo/input/
$ hadoop fs -ls /stock_demo/input/
Found 3 items
-rw-r--r--   1 gpadmin hadoop     403395 2013-12-31 00:25 /stock_demo/input/apple.csv
-rw-r--r--   1 gpadmin hadoop     134696 2013-12-31 00:25 /stock_demo/input/google.csv
-rw-r--r--   1 gpadmin hadoop     248405 2013-12-31 00:25 /stock_demo/input/nokia.csv

The format of the files (apple.csv, nokia.csv, google.csv) is as follows (the columns are Symbol, Date, Open, High, Low, Close, Volume, Adj Close):

$ head -5 apple.csv
AAPL,2013-09-06,498.44,499.38,489.95,498.22,12788700,498.22
AAPL,2013-09-05,500.25,500.68,493.64,495.27,8402100,495.27
AAPL,2013-09-04,499.56,502.24,496.28,498.69,12299300,498.69
AAPL,2013-09-03,493.10,500.60,487.35,488.58,11854600,488.58
AAPL,2013-08-30,492.00,492.95,486.50,487.22,9724900,487.22

Now we are ready to run our MapReduce algorithm on the data sets:

$ hadoop jar target/highest_stock_price-1.0.jar highest_stock_price/HighestStockPriceDriver /stock_demo/input/ /stock_demo/output/

$ hadoop fs -cat /stock_demo/output/part*
AAPL:	2012-09-19	685.76
GOOG:	2013-07-15	924.69
NOK:	2000-06-19	42.24

We can check the status of the Hadoop job using the following command:

$ hadoop job -status job_1388420266428_0001
DEPRECATED: Use of this script to execute mapred command is deprecated.
Instead use the mapred command for it.

13/12/31 00:31:15 INFO service.AbstractService: Service:org.apache.hadoop.yarn.client.YarnClientImpl is inited.
13/12/31 00:31:15 INFO service.AbstractService: Service:org.apache.hadoop.yarn.client.YarnClientImpl is started.
13/12/31 00:31:17 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server

Job: job_1388420266428_0001
Job File: hdfs://pivhdsne:8020/user/history/done/2013/12/31/000000/job_1388420266428_0001_conf.xml
Job Tracking URL : http://localhost:19888/jobhistory/job/job_1388420266428_0001
Uber job : false
Number of maps: 3
Number of reduces: 1
map() completion: 1.0
reduce() completion: 1.0
Job state: SUCCEEDED
....
....

This will show us that there were 3 mappers and 1 reducer run. It will also show the number of input and output records and bytes.

HAWQ interactive distributed query engine

The common complaints with regards to the classic Hadoop MapReduce algorithms are that they require fairly extensive Java experience and they are rather tuned for batch type of data processing, they are not really suitable for exploratory data analysis using ad-hoc interactive queries. That is where HAWQ can come to the rescue.

HAWQ is a massively parallel SQL query engine. The underlying engine is based on PostgreSQL (version 8.2.15, as of writing this post) so it can support the standard SQL statements out of the box. The key architecture components are HAWQ master, HAWQ segments, HAWQ storage and HAWQ interconnect.

HAWQ-architecture

HAWQ master is responsible for accepting the connections from the clients and it also manages the system tables containing metadata about HAWQ itself (however, no user data is stored on the master). The master then parses and optimises the queries and develops an execution plan which is then dispatched to the segments.

HAWQ segments are the processing units, they are responsible of executing the local database operations on their own data sets.

HAWQ-query-execution

HAWQ stores all the user data in HDFS. HAWQ interconnect refers to the UDP based inter-process communication between the segments.

Now let us see how we can answer the same question about stock prices that we did with our MapReduce job.

First we need to login to our client (psql which is the same client that we know well from PostgeSQL databases) and create our schema and table:

$ psql
psql (8.2.15)
Type "help" for help.

gpadmin=# create schema stock_demo;
gpadmin=# create table stock_demo.stock
gpadmin-# (
gpadmin(# symbol TEXT,
gpadmin(# date TEXT,
gpadmin(# open NUMERIC(6,2),
gpadmin(# high NUMERIC(6,2),
gpadmin(# low NUMERIC(6,2),
gpadmin(# close NUMERIC(6,2),
gpadmin(# volume INTEGER,
gpadmin(# adjclose NUMERIC(6,2)
gpadmin(# )
gpadmin-# with (appendonly=true) distributed randomly;

The next step is to load the data into this HAWQ table, we can use the following commands to do this:

$ cat google.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';"
$ cat nokia.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';"
$ cat apple.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';"

Now we can login again to our psql client and run the SQL queries:

gpadmin=# select count(*) from stock_demo.stock;
 count 
-------
 14296
(1 row)

gpadmin=# select symbol, date, adjclose from stock_demo.stock where adjclose in
gpadmin-# ( select max(adjclose) as max_adj_close from stock_demo.stock 
gpadmin(#   group by symbol )
gpadmin-# order by symbol;
 symbol |    date    | adjclose 
--------+------------+----------
 AAPL   | 2012-09-19 |   685.76
 GOOG   | 2013-07-15 |   924.69
 NOK    | 2000-06-19 |    42.24
(3 rows)

These SQL queries relied on HAWQ internal table,thus we had to load the data into it from our local file system. HAWQ also support the notion of external tables using PXF (Pivotal eXtension Framework). It is an external table interface in HAWQ that allows to read data directly from HDFS directories. It has a concept of fragmenters, accessors and resolvers which are used to split the data files into smaller chunks and read them into HAWQ without having the need to explicitly load them into HAWQ internal tables.

If we want to use external table, we need to create it using the following SQL statement:

gpadmin=# create external table stock_demo.stock_pxf
gpadmin-# (
gpadmin(# symbol TEXT,
gpadmin(# date TEXT,
gpadmin(# open NUMERIC(6,2),
gpadmin(# high NUMERIC(6,2),
gpadmin(# low NUMERIC(6,2),
gpadmin(# close NUMERIC(6,2),
gpadmin(# volume INTEGER,
gpadmin(# adjclose NUMERIC(6,2)
gpadmin(# )
gpadmin-# location ('pxf://pivhdsne:50070/stock_demo/input/*.csv?Fragmenter=HdfsDataFragmenter&Accessor=TextFileAccessor&Resolver=TextResolver')
gpadmin-# format 'TEXT' (delimiter = E'\,');

Then we can run the same queries against the external table as before:

gpadmin=# select count(*) from stock_demo.stock_pxf;
 count 
-------
 14296
(1 row)

gpadmin=# select symbol, date, adjclose from stock_demo.stock_pxf where adjclose in 
gpadmin-# ( select max(adjclose) as max_adj_close from stock_demo.stock_pxf
gpadmin(#   group by symbol )
gpadmin-# order by symbol;
 symbol |    date    | adjclose 
--------+------------+----------
 AAPL   | 2012-09-19 |   685.76
 GOOG   | 2013-07-15 |   924.69
 NOK    | 2000-06-19 |    42.24
(3 rows)

Conclusion

SQL on Hadoop is gaining significant momentum, the demand to be able to run ad-hoc, interactive queries as well as batch data processing on top of Hadoop is increasing. Most of the key players in big data world have started providing solutions to address these needs. 2014 seems to be an interesting year to see how these offerings are going to evolve.

OpenStack Savanna – Fast Hadoop Cluster Provisioning on OpenStack


Introduction

OpenStack is one of the most popular open source cloud computing projects to provide Infrastructure as a Service solution. Its key core components are Compute (Nova), Networking (Neutron, formerly known as Quantum), Storage (object and block storage, Swift and Cinder, respectively), Openstack Dashboard (Horizon), Identity Service (Keystone) and Image Servie (Glance).

openstack-software-diagram

There are other official incubated projects like Metering (Celiometer) and Orchestration and Service Definition (Heat).

Savanna is a Hadoop as a Service for OpenStack introduced by Mirantis. It is still in an early phase (v.02 has been released in Summer, 2013) and according to its roadmap version 1.0 is targeted for official OpenStack incubation. In principle, Heat also could be used for Hadoop cluster provisioning but Savanna is especially tuned for providing Hadoop specific API functionality while Heat is meant to be used for generic purpose .

Savanna Architecture

Savanna is integrated with the core OpenStack components such as Keystone, Nova, Glance, Swift and Horizon. It has a REST API that supports the Hadoop cluster provisioning steps.

savanna-architecture

Savanna API is implemented as a WSGI server that, by default, listens to port 8386. In addition, Savanna can also be integrated with Horizon, the OpenStack Dashboard to create a Hadoop cluster from the management console.  Savanna also comes with a Vanilla plugin that deploys a Hadoop cluster image. The standard out-of-the-box Vanilla plugin supports Hadoop 1.1.2 version.

Installing Savanna

The simplest option to try out Savanna is to use devstack in a virtual machine. I was using an Ubuntu 12.04 virtual instance in my tests. In that environment we need to execute the following commands to install devstack and Savanna API:

$ sudo apt-get install git-core
$ git clone https://github.com/openstack-dev/devstack.git
$ vi localrc  # edit localrc
ADMIN_PASSWORD=nova
MYSQL_PASSWORD=nova
RABBIT_PASSWORD=nova
SERVICE_PASSWORD=$ADMIN_PASSWORD
SERVICE_TOKEN=nova

# Enable Swift
ENABLED_SERVICES+=,swift

SWIFT_HASH=66a3d6b56c1f479c8b4e70ab5c2000f5
SWIFT_REPLICAS=1
SWIFT_DATA_DIR=$DEST/data

# Force checkout prerequsites
# FORCE_PREREQ=1

# keystone is now configured by default to use PKI as the token format which produces huge tokens.
# set UUID as keystone token format which is much shorter and easier to work with.
KEYSTONE_TOKEN_FORMAT=UUID

# Change the FLOATING_RANGE to whatever IPs VM is working in.
# In NAT mode it is subnet VMWare Fusion provides, in bridged mode it is your local network.
FLOATING_RANGE=192.168.55.224/27

# Enable auto assignment of floating IPs. By default Savanna expects this setting to be enabled
EXTRA_OPTS=(auto_assign_floating_ip=True)

# Enable logging
SCREEN_LOGDIR=$DEST/logs/screen

$ ./stack.sh  # this will take a while to execute
$ sudo apt-get install python-setuptools python-virtualenv python-dev
$ virtualenv savanna-venv
$ savanna-venv/bin/pip install savanna
$ mkdir savanna-venv/etc
$ cp savanna-venv/share/savanna/savanna.conf.sample savanna-venv/etc/savanna.conf
# To start Savanna API:
$ savanna-venv/bin/python savanna-venv/bin/savanna-api --config-file savanna-venv/etc/savanna.conf

To install Savanna UI integrated with Horizon, we need to run the following commands:

$ sudo pip install savanna-dashboard
$ cd /opt/stack/horizon/openstack-dashboard
$ vi settings.py
HORIZON_CONFIG = {
    'dashboards': ('nova', 'syspanel', 'settings', 'savanna'),

INSTALLED_APPS = (
    'savannadashboard',
    ....

$ cd /opt/stack/horizon/openstack-dashboard/local
$ vi local_settings.py
SAVANNA_URL = 'http://localhost:8386/v1.0'

$ sudo service apache2 restart

Provisioning a Hadoop cluster

As a first step, we need to configure Keystone related environment variables to get the authentication token:

ubuntu@ip-10-59-33-68:~$ vi .bashrc
$ export OS_AUTH_URL=http://127.0.0.1:5000/v2.0/
$ export OS_TENANT_NAME=admin
$ export OS_USERNAME=admin
$ export OS_PASSWORD=nova
ubuntu@ip-10-59-33-68:~$ source .bashrc

ubuntu@ip-10-59-33-68:~$ ubuntu@ip-10-59-33-68:~$ env | grep OS
OS_PASSWORD=nova
OS_AUTH_URL=http://127.0.0.1:5000/v2.0/
OS_USERNAME=admin
OS_TENANT_NAME=admin
ubuntu@ip-10-59-33-68:~$ keystone token-get
+-----------+----------------------------------+
|  Property |              Value               |
+-----------+----------------------------------+
|  expires  |       2013-08-09T20:31:12Z       |
|     id    | bdb582c836e3474f979c5aa8f844c000 |
| tenant_id | 2f46e214984f4990b9c39d9c6222f572 |
|  user_id  | 077311b0a8304c8e86dc0dc168a67091 |
+-----------+----------------------------------+

$ export AUTH_TOKEN="bdb582c836e3474f979c5aa8f844c000"
$ export TENANT_ID="2f46e214984f4990b9c39d9c6222f572"

Then we need to create the Glance image that we want to use for our Hadoop cluster. In our example we have used Mirantis’s vanilla image but we can also build our own image:

$ wget http://savanna-files.mirantis.com/savanna-0.2-vanilla-1.1.2-ubuntu-12.10.qcow2
$ glance image-create --name=savanna-0.2-vanilla-hadoop-ubuntu.qcow2 --disk-format=qcow2 --container-format=bare < ./savanna-0.2-vanilla-1.1.2-ubuntu-12.10.qcow2
ubuntu@ip-10-59-33-68:~/devstack$ glance image-list
+--------------------------------------+-----------------------------------------+-------------+------------------+-----------+--------+
| ID                                   | Name                                    | Disk Format | Container Format | Size      | Status |
+--------------------------------------+-----------------------------------------+-------------+------------------+-----------+--------+
| d0d64f5c-9c15-4e7b-ad4c-13859eafa7b8 | cirros-0.3.1-x86_64-uec                 | ami         | ami              | 25165824  | active |
| fee679ee-e0c0-447e-8ebd-028050b54af9 | cirros-0.3.1-x86_64-uec-kernel          | aki         | aki              | 4955792   | active |
| 1e52089b-930a-4dfc-b707-89b568d92e7e | cirros-0.3.1-x86_64-uec-ramdisk         | ari         | ari              | 3714968   | active |
| d28051e2-9ddd-45f0-9edc-8923db46fdf9 | savanna-0.2-vanilla-hadoop-ubuntu.qcow2 | qcow2       | bare             | 551699456 | active |
+--------------------------------------+-----------------------------------------+-------------+------------------+-----------+--------+
$  export IMAGE_ID=d28051e2-9ddd-45f0-9edc-8923db46fdf9

Then we have installed httpie, an open source HTTP client that can be used to send REST requests to Savanna API:

$ sudo pip install httpie

From now on we will use httpie to send Savanna commands. We need to register the image with Savanna:

$ export SAVANNA_URL="http://localhost:8386/v1.0/$TENANT_ID"
$ http POST $SAVANNA_URL/images/$IMAGE_ID X-Auth-Token:$AUTH_TOKEN username=ubuntu

HTTP/1.1 202 ACCEPTED
Content-Length: 411
Content-Type: application/json
Date: Thu, 08 Aug 2013 21:28:07 GMT

{
    "image": {
        "OS-EXT-IMG-SIZE:size": 551699456,
        "created": "2013-08-08T21:05:55Z",
        "description": "None",
        "id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9",
        "metadata": {
            "_savanna_description": "None",
            "_savanna_username": "ubuntu"
        },
        "minDisk": 0,
        "minRam": 0,
        "name": "savanna-0.2-vanilla-hadoop-ubuntu.qcow2",
        "progress": 100,
        "status": "ACTIVE",
        "tags": [],
        "updated": "2013-08-08T21:28:07Z",
        "username": "ubuntu"
    }
}

$ http $SAVANNA_URL/images/$IMAGE_ID/tag X-Auth-Token:$AUTH_TOKEN tags:='["vanilla", "1.1.2", "ubuntu"]'

HTTP/1.1 202 ACCEPTED
Content-Length: 532
Content-Type: application/json
Date: Thu, 08 Aug 2013 21:29:25 GMT

{
    "image": {
        "OS-EXT-IMG-SIZE:size": 551699456,
        "created": "2013-08-08T21:05:55Z",
        "description": "None",
        "id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9",
        "metadata": {
            "_savanna_description": "None",
            "_savanna_tag_1.1.2": "True",
            "_savanna_tag_ubuntu": "True",
            "_savanna_tag_vanilla": "True",
            "_savanna_username": "ubuntu"
        },
        "minDisk": 0,
        "minRam": 0,
        "name": "savanna-0.2-vanilla-hadoop-ubuntu.qcow2",
        "progress": 100,
        "status": "ACTIVE",
        "tags": [
            "vanilla",
            "ubuntu",
            "1.1.2"
        ],
        "updated": "2013-08-08T21:29:25Z",
        "username": "ubuntu"
    }
}

Then we need to create a nodegroup templates (json files) that will be sent to Savanna. There is one template for the master nodes (namenode, jobtracker) and another template for the worker nodes such as datanode and tasktracker. The Hadoop version is 1.1.2

$ vi ng_master_template_create.json
{
    "name": "test-master-tmpl",
    "flavor_id": "2",
    "plugin_name": "vanilla",
    "hadoop_version": "1.1.2",
    "node_processes": ["jobtracker", "namenode"]
}

$ vi ng_worker_template_create.json
{
    "name": "test-worker-tmpl",
    "flavor_id": "2",
    "plugin_name": "vanilla",
    "hadoop_version": "1.1.2",
    "node_processes": ["tasktracker", "datanode"]
}

$ http $SAVANNA_URL/node-group-templates X-Auth-Token:$AUTH_TOKEN < ng_master_template_create.json
HTTP/1.1 202 ACCEPTED
Content-Length: 387
Content-Type: application/json
Date: Thu, 08 Aug 2013 21:58:00 GMT

{
    "node_group_template": {
        "created": "2013-08-08T21:58:00",
        "flavor_id": "2",
        "hadoop_version": "1.1.2",
        "id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c",
        "name": "test-master-tmpl",
        "node_configs": {},
        "node_processes": [
            "jobtracker",
            "namenode"
        ],
        "plugin_name": "vanilla",
        "updated": "2013-08-08T21:58:00",
        "volume_mount_prefix": "/volumes/disk",
        "volumes_per_node": 0,
        "volumes_size": 10
    }
}

$ http $SAVANNA_URL/node-group-templates X-Auth-Token:$AUTH_TOKEN < ng_worker_template_create.json
HTTP/1.1 202 ACCEPTED
Content-Length: 388
Content-Type: application/json
Date: Thu, 08 Aug 2013 21:59:41 GMT

{
    "node_group_template": {
        "created": "2013-08-08T21:59:41",
        "flavor_id": "2",
        "hadoop_version": "1.1.2",
        "id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6",
        "name": "test-worker-tmpl",
        "node_configs": {},
        "node_processes": [
            "tasktracker",
            "datanode"
        ],
        "plugin_name": "vanilla",
        "updated": "2013-08-08T21:59:41",
        "volume_mount_prefix": "/volumes/disk",
        "volumes_per_node": 0,
        "volumes_size": 10
    }
}

The next step is to define the cluster template:

$ vi cluster_template_create.json

{
    "name": "demo-cluster-template",
    "plugin_name": "vanilla",
    "hadoop_version": "1.1.2",
    "node_groups": [
        {
            "name": "master",
            "node_group_template_id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c",
            "count": 1
        },
        {
            "name": "workers",
            "node_group_template_id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6",
            "count": 2
        }
    ]
}

http $SAVANNA_URL/cluster-templates X-Auth-Token:$AUTH_TOKEN < cluster_template_create.json
HTTP/1.1 202 ACCEPTED
Content-Length: 815
Content-Type: application/json
Date: Fri, 09 Aug 2013 07:04:24 GMT

{
    "cluster_template": {
        "anti_affinity": [],
        "cluster_configs": {},
        "created": "2013-08-09T07:04:24",
        "hadoop_version": "1.1.2",
        "id": "{
    "name": "cluster-1",
    "plugin_name": "vanilla",
    "hadoop_version": "1.1.2",
    "cluster_template_id" : "64c4117b-acee-4da7-937b-cb964f0471a9",
    "user_keypair_id": "stack",
    "default_image_id": "3f9fc974-b484-4756-82a4-bff9e116919b"
}",
        "name": "demo-cluster-template",
        "node_groups": [
            {
                "count": 1,
                "flavor_id": "2",
                "name": "master",
                "node_configs": {},
                "node_group_template_id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c",
                "node_processes": [
                    "jobtracker",
                    "namenode"
                ],
                "volume_mount_prefix": "/volumes/disk",
                "volumes_per_node": 0,
                "volumes_size": 10
            },
            {
                "count": 2,
                "flavor_id": "2",
                "name": "workers",
                "node_configs": {},
                "node_group_template_id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6",
                "node_processes": [
                    "tasktracker",
                    "datanode"
                ],
                "volume_mount_prefix": "/volumes/disk",
                "volumes_per_node": 0,
                "volumes_size": 10
            }
        ],
        "plugin_name": "vanilla",
        "updated": "2013-08-09T07:04:24"
    }
}

Now we are ready to create the Hadoop cluster:

$ vi cluster_create.json
{
    "name": "cluster-1",
    "plugin_name": "vanilla",
    "hadoop_version": "1.1.2",
    "cluster_template_id" : "64c4117b-acee-4da7-937b-cb964f0471a9",
    "user_keypair_id": "savanna",
    "default_image_id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9"
}

$ http $SAVANNA_URL/clusters X-Auth-Token:$AUTH_TOKEN < cluster_create.json
HTTP/1.1 202 ACCEPTED
Content-Length: 1153
Content-Type: application/json
Date: Fri, 09 Aug 2013 07:28:14 GMT

{
    "cluster": {
        "anti_affinity": [],
        "cluster_configs": {},
        "cluster_template_id": "64c4117b-acee-4da7-937b-cb964f0471a9",
        "created": "2013-08-09T07:28:14",
        "default_image_id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9",
        "hadoop_version": "1.1.2",
        "id": "d919f1db-522f-45ab-aadd-c078ba3bb4e3",
        "info": {},
        "name": "cluster-1",
        "node_groups": [
            {
                "count": 1,
                "created": "2013-08-09T07:28:14",
                "flavor_id": "2",
                "instances": [],
                "name": "master",
                "node_configs": {},
                "node_group_template_id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c",
                "node_processes": [
                    "jobtracker",
                    "namenode"
                ],
                "updated": "2013-08-09T07:28:14",
                "volume_mount_prefix": "/volumes/disk",
                "volumes_per_node": 0,
                "volumes_size": 10
            },
            {
                "count": 2,
                "created": "2013-08-09T07:28:14",
                "flavor_id": "2",
                "instances": [],
                "name": "workers",
                "node_configs": {},
                "node_group_template_id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6",
                "node_processes": [
                    "tasktracker",
                    "datanode"
                ],
                "updated": "2013-08-09T07:28:14",
                "volume_mount_prefix": "/volumes/disk",
                "volumes_per_node": 0,
                "volumes_size": 10
            }
        ],
        "plugin_name": "vanilla",
        "status": "Validating",
        "updated": "2013-08-09T07:28:14",
        "user_keypair_id": "savanna"
    }
}

After a while we can run nova command to check if the instances are created and running:

$ nova list
+--------------------------------------+-----------------------+--------+------------+-------------+----------------------------------+
| ID                                   | Name                  | Status | Task State | Power State | Networks                         |
+--------------------------------------+-----------------------+--------+------------+-------------+----------------------------------+
| 1a9f43bf-cddb-4556-877b-cc993730da88 | cluster-1-master-001  | ACTIVE | None       | Running     | private=10.0.0.2, 192.168.55.227 |
| bb55f881-1f96-4669-a94a-58cbf4d88f39 | cluster-1-workers-001 | ACTIVE | None       | Running     | private=10.0.0.3, 192.168.55.226 |
| 012a24e2-fa33-49f3-b051-9ee2690864df | cluster-1-workers-002 | ACTIVE | None       | Running     | private=10.0.0.4, 192.168.55.225 |
+--------------------------------------+-----------------------+--------+------------+-------------+----------------------------------+

Now we can login to the Hadoop master instance and run the required Hadoop commands:

$ ssh -i savanna.pem ubuntu@10.0.0.2
$ sudo chmod 777 /usr/share/hadoop
$ sudo su hadoop
$ cd /usr/share/hadoop
$ hadoop jar hadoop-example-1.1.2.jar pi 10 100

Savanna UI via Horizon

In order to create nodegroup templates, to create cluster template and to create the cluster itself we have used a command line tool – httpie – to send REST API calls. The same functionality is also availabe via Horizon, the standard OpenStack dashboard.

First we need to register the image with Savanna:

savanna-image-registry

Then we need to create the nodegroup templates:

savanna-noderoup-templateAfter that we have to create the cluster template:

savanna-cluster-template

And finally we have to create the cluster:

savann-cluster

Splunk Storm – Machine Data Processing in the Cloud


Introduction

Splunk is a platform to process machine data from various sources such as weblogs, syslogs, log4j logs and can also work with JSON and CSV file formats thus any application that produces JSON or CSV output can be seen as a source for Splunk. As the volume and variety of machine data are increasing, Splunk is becoming a more and more interesting player in big data world, too.

Splunk can be considered as a search engine for IT data. Splunk collects data from multiple sources, indexes them and the users can search them using Splunk proprietary language called SPL (Search Processing Language). The search results can then be used to create reports and dashboards to visualize the data.

Splunk Architecture

Under the hood Splunk architecture has the following key components:
forwarders are used to forward data to Splunk receiver instances. Receiver instances are normally indexers.
indexers that are splunk instances to index data. Indexes are stored in files. There are two types of files; raw datafiles which store the data in compressed format and index files that contain metadata for search queries. During indexing, Splunk extracts default fields and identifies events based on timestamps or creates them if there is no timestamp found.
search head and search peers. In a distributed environment search head manages the search requests, directs them to search peers and then merges result back to the users.
Splunk Web is a graphical user interface based on Python application server.
SplunkArchitecture

Splunk Storm

Splunk Storm is a cloud service version of Splunk. Splunk Storm runs in the Amazon cloud and uses of both Elastic Block Storage (EBS) and the Simple Storage Service (S3).

The price plan is based on monthly fee, it depends on the volume of the data that you want to store. As of writing this article, there is a free tier with 1 GB storage, while for example 100 GB storage volume costs 400 USD and the maximum 1 TB storage volume costs 3,000 USD per month.

To get started, we need to sign up and crate a project.
Splunk-1

Then we can define the data inputs. There are four options: upload a file, use forwarders, use the API (it is in beta yet) or use network data sent directly from the servers.

As a first test, we will use data files uploaded from a local directory. We used a sample apache web access.log and a syslog available from http://www.monitorware.com/en/logsamples/

It takes a some time to index the files and then they become available for search queries.

Splunk-11

We can run a search query to identify all HTTP client side error codes:

"source="access_log.txt" status>="400" AND status <="500"

Splunk-15

If we want to identify all the access log entries with HTTP POST method, we can run the following search query:

source="access_log.txt" method="POST"

In a similar way, if we want to find all the messages from the uploaded syslog file that were generated by the kernel process then we can run the following query:

source="syslog-messages.txt" process="kernel"

Splunk-16

Splunk forwarder and Twitter API

As a next example, we want to test output generated by our program using Twitter API. The program will generate JSON format in a file using Python based Twitter API. The directory is monitored by a Splunk forwarder and once the file is created in the predefined directory, the forwarder will send it to Splunk Storm.

First we need to create an application in Twitter via https://dev/twitter.com portal. The application will have its customer_key, customer_secret, access_token_key and access_token_secret that is going to be required by the Twitter API.

Twitter-6

The Twitter API that we are going to use for the Python application is downloadable from Github, https://github.com/bear/python-twitter.git .

This API depends oauth2, simplejson and httplib2 so we need to installed them first. Then we can get the code from Github and build and install the package.

$ git clone https://github.com/bear/python-twitter.git

# Build and Install:
$ python setup.py build
$ python setup.py install

The Twitter application code – twtr.py –  is as follows:

# twtr.py
import sys
import twitter

if len(sys.argv) < 3:
    print "Usage: " + sys.argv[0] + " keyword count"
    sys.exit(1)

keyword = sys.argv[1]
count = sys.argv[2]
# Twitter API 1.1. Count - up to a maximum of 100
# https://dev.twitter.com/docs/api/1.1/get/search/tweets
if int(count) > 100:
    count = 100

api = twitter.Api(consumer_key="CONSUMER_KEY", consumer_secret="CONSUMER_SECRET", access_token_key="ACCESS_TOKEN_KEY", access_token_secret="4PXvz7QIiwtwhFrFXFEkc9wY7iBOdgusD8ZQLvUhabM" )

search_result = api.GetSearch(term=keyword, count=count)

for s in search_result:
    print s.AsJsonString()

The Python program can be run as follows:

$ python twtr.py "big data" 100

Installing Splunk forwarder

Then we need to install Splunk forwarder, see http://www.splunk.com/download/universalforwarder . We also need to download the Splunk credentials that will allow the forwarder to send data to our project. Once the forwarder and the ceredentials are installed we can login and add a directory (twitter_status) for our forwarder to be monitored. We defined the sourcetype as json_notimestamp.

# Download splunk forwarder
$ wget -O splunkforwarder-5.0.3-163460-Linux-x86_64.tgz 'http://www.splunk.com/page/download_track?file=5.0.3/universalforwarder/linux/splunkforwarder-5.0.3-163460-Linux-x86_64.tgz&ac=&wget=true&name=wget&typed=releases&elq=8ccba442-db76-4fc8-b36b-36252bb61257'

# Install and start splunk forwarder
$ tar xvzf splunkforwarder-5.0.3-163460-Linux-x86_64.tgz
$ export SPLUNK_HOME=/home/ec2-user/splunkforwarder
$ $SPLUNK_HOME/bin/splunk start
# Install project credentials
$ $SPLUNK_HOME/bin/splunk install app ./stormforwarder_2628fbc8d76811e2b09622000a1cdcf0.spl -auth admin:changeme
App '/home/ec2-user/stormforwarder_2628fbc8d76811e2b09622000a1cdcf0.spl' installed

# Login
$SPLUNK_HOME/bin/splunk login -auth admin:changeme

#' Add monitor (directory or file)
 $SPLUNK_HOME/bin/splunk add monitor /home/ec2-user/splunk_blog/twitter_status -sourcetype json_no_timestamp
Added monitor of '/home/ec2-user/splunk_blog/twitter_status'.

Now we are ready to run the Python code using Twitter API:

$ python twtr.py "big data" 100 | tee twitter_status/twitter_status.txt

The program creates a twitter_status.txt file under twitter_status directory which is monitored by Splunk forwarder. The forwarder sends the output file to Splunk Storm. After some time it will appears under the inputs sections as authenticated forwarder. The  file will be shown as a source together with the previously uploaded apache access log and syslog.
SPlunk-17

Splunk-18

If we want to search for users with location London, the search query looks like this:

source="/home/ec2-user/splunk_blog/twitter_status/twitter_status.txt" user.location="London, UK"

We can also define a search query to show the top 10 timezones from the Twitter result and from the search result it is easy to create a Report with just a few clicks on the web user interface. The report allows to chose multiple visualization options like column, area or pie chart types, etc.

source="/home/ec2-user/splunk_blog/twitter_status/twitter_status.txt" | top limit=10 user.time_zone

SPlunk-22

Splunk-25

Conclusion

As mentioned in the beginning of this article, the variety and the volume generated by machines are increasing dramatically; sensor data, application logs, web access logs, syslogs, database and filesystem audit logs are just a few examples of the potential data sources that require attention but can pose difficulties to process and analyse them in a timely manner. Splunk is a great tool to deal with the ever increasing data volume and with Splunk Storm users can start analysing their data in the cloud without hassle.

Hadoop REST API – WebHDFS


Introduction

Hadoop provides a Java native API to support file system operations such as create, rename or delete files and directories, open, read or write files, set permissions, etc. A very basic example can be found on Apache wiki about how to read and write files from Hadoop.

This is great for applications running within the Hadoop cluster but there may be use cases where an external application needs to manipulate HDFS like it needs to create directories and write files to that directory or read the content of a file stored on HDFS. Hortonworks developed an additional API to support these requirements based on standard REST functionalities.

WebHDFS REST API

WebHDFS concept is based on HTTP operations like GET, PUT, POST and DELETE. Operations like OPEN, GETFILESTATUS, LISTSTATUS are using HTTP GET, others like CREATE, MKDIRS, RENAME, SETPERMISSIONS are relying on HTTP PUT. APPEND operations is based on HTTP POST, while DELETE is using HTTP DELETE.

Authentication can be based on user.name query parameter (as part of the HTTP query string) or if security is turned on then it relies on Kerberos.

The standard URL format is as follows: http://host:port/webhdfs/v1/?op=operation&user.name=username

In some cases namenode returns a URL using HTTP 307 Temporary Redirect mechanism with a location URL referring to the appropriate datanode. Then the client needs to follow that URL to execute the file operations on that particular datanode.

By default the namenode and datanode ports are 50070 and 50075, respectively, see more details about the default HDFS ports on Cloudera blog.

In order to configure WebHDFS, we need to hdfs-site.xml as follows:

        <property>
           <name>dfs.webhdfs.enabled</name>
           <value>true</value>
        </property>

WebHDFS examples

As the simplest approach, we can use curl to invoke WebHDFS REST API

1./ Check directory status

$ curl -i "http://localhost:50070/webhdfs/v1/tmp?user.name=istvan&op=GETFILESTATUS"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370210454798&s=zKjRgOMQ1Q3NB1kXqHJ6GPa6TlY=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

{"FileStatus":{"accessTime":0,"blockSize":0,"group":"supergroup","length":0,"modificationTime":1370174432465,"owner":"istvan","pathSuffix":"","permission":"755","replication":0,"type":"DIRECTORY"}}

This is similar to execute the Hadoop ls filesystem command:

$ bin/hadoop fs -ls /
Warning: $HADOOP_HOME is deprecated.

Found 1 items
drwxr-xr-x - istvan supergroup 0 2013-06-02 13:00 /tmp

2./ Create a directory

$ curl -i -X PUT "http://localhost:50070/webhdfs/v1/tmp/webhdfs?user.name=istvan&op=MKDIRS"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370210530831&s=YGwbkw0xRVpEAgbZpX7wlo56RMI=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

The equivalent Hadoop filesystem command is as follows:

$ bin/hadoop fs -ls /tmp
Warning: $HADOOP_HOME is deprecated.

Found 2 items
drwxr-xr-x   - istvan supergroup          0 2013-06-02 12:17 /tmp/hadoop-istvan
drwxr-xr-x   - istvan supergroup          0 2013-06-02 13:02 /tmp/webhdfs

3./ Create a file

To create a file requires two steps. First we need to run the command against the namenode then follows the redirection and execute the WebHDFS API against the appropriate datanode.

Step 1:

curl -i -X PUT "http://localhost:50070/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?user.name=istvan&op=CREATE"
HTTP/1.1 307 TEMPORARY_REDIRECT
Content-Type: application/octet-stream
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370210936666&s=BLAIjTpNwurdsgvFxNL3Zf4bzpg=";Path=/
Location: http://istvan-pc:50075/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=CREATE&user.name=istvan&overwrite=false
Content-Length: 0
Server: Jetty(6.1.26)

Step 2:

$ curl -i -T webhdfs-test.txt "http://istvan-pc:50075/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=CREATE&user.name=istvan&overwrite=false"
HTTP/1.1 100 Continue

HTTP/1.1 201 Created
Content-Type: application/octet-stream
Location: webhdfs://0.0.0.0:50070/tmp/webhdfs/webhdfs-test.txt
Content-Length: 0
Server: Jetty(6.1.26)

To validate the result of the WebHDFS API we can run the following Hadoop filesystem command:

$ bin/hadoop fs -ls /tmp/webhdfs
Warning: $HADOOP_HOME is deprecated.

Found 1 items
-rw-r--r--   1 istvan supergroup         20 2013-06-02 13:09 /tmp/webhdfs/webhdfs-test.txt

4./ Open and read a file

In this case we run curl with -L option to follow the HTTP temporary redirect URL.

$ curl -i -L "http://localhost:50070/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=OPEN&user.name=istvan"
HTTP/1.1 307 TEMPORARY_REDIRECT
Content-Type: application/octet-stream
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211032526&s=suBorvpvTUs6z/sw5n5PiZWsUnU=";Path=/
Location: http://istvan-pc:50075/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=OPEN&user.name=istvan&offset=0
Content-Length: 0
Server: Jetty(6.1.26)

HTTP/1.1 200 OK
Content-Type: application/octet-stream
Content-Length: 20
Server: Jetty(6.1.26)

Hadoop WebHDFS test

The corresponding Hadoop filesystem is as follows:

$ bin/hadoop fs -cat /tmp/webhdfs/webhdfs-test.txt
Warning: $HADOOP_HOME is deprecated.

Hadoop WebHDFS test

5./ Rename a directory

$ curl -i -X PUT "http://localhost:50070/webhdfs/v1/tmp/webhdfs?op=RENAME&user.name=istvan&destination=/tmp/webhdfs-new"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211103159&s=Gq/EBWZTBaoMk0tkGoodV+gU6jc=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

To validate the result we can run the following Hadoop filesystem command:

$ bin/hadoop fs -ls /tmp
Warning: $HADOOP_HOME is deprecated.

Found 2 items
drwxr-xr-x   - istvan supergroup          0 2013-06-02 12:17 /tmp/hadoop-istvan
drwxr-xr-x   - istvan supergroup          0 2013-06-02 13:09 /tmp/webhdfs-new

6./ Delete a directory

This scenario results in an exception if the directory is not empty since a non-empty directory cannot be deleted.

$ curl -i -X DELETE "http://localhost:50070/webhdfs/v1/tmp/webhdfs-new?op=DELETE&user.name=istvan"
HTTP/1.1 403 Forbidden
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211266383&s=QFIJMWsy61vygFExl91Sgg5ME/Q=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

{"RemoteException":{"exception":"IOException","javaClassName":"java.io.IOException","message":"/tmp/webhdfs-new is non empty"}}

First the file in the directory needs to be deleted and then the empty directory can be deleted, too.

$ curl -i -X DELETE "http://localhost:50070/webhdfs/v1/tmp/webhdfs-new/webhdfs-test.txt?op=DELETE&user.name=istvan"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211375617&s=cG6727hbqGkrk/GO4yNRiZw4QxQ=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

$ bin/hadoop fs -ls /tmp/webhdfs-newWarning: $HADOOP_HOME is deprecated.

$ curl -i -X DELETE "http://localhost:50070/webhdfs/v1/tmp/webhdfs-new?op=DELETE&user.name=istvan&destination=/tmp/webhdfs-new"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211495893&s=hZcZFDOL0x7exEhn14RlMgF4a/c=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

$ bin/hadoop fs -ls /tmpWarning: $HADOOP_HOME is deprecated.

Found 1 items
drwxr-xr-x   - istvan supergroup          0 2013-06-02 12:17 /tmp/hadoop-istvan

Conclusion

WebHDFS provides a simple, standard way to execute Hadoop filesystem operations by an external client that does not necessarily run on the Hadoop cluster itself. The requirement for WebHDFS is that the client needs to have a direct connection to namenode and datanodes via the predefined ports. Hadoop HDFS over HTTP – that was inspired by HDFS Proxy – addresses these limitations by providing a proxy layer based on preconfigured Tomcat bundle; it is interoperable with WebHDFS API but does not require the firewall ports to be open for the client.

Amazon Web Services Redshift – Data Warehouse in the Cloud


Introduction

Amazon Web Services has made publicly available its fully managed, petabyte-scale data warehouse cloud service in February, 2013. It promises a high performance, secure, easily scalable data warehouse solution that costs 1/10th of a traditional data warehouse (less than 1,000 USD/TB/year, according to the AWS Introduction to Redshift presentation: http://aws.amazon.com/redshift/) , it is compatible with the traditional BI tools and ready to be running within minutes. As of writing this article the service is available in US East region only but supposed to be rolled out to other regions, too. The service is manageable via the regular AWS tools: AWS management console, command line tools (aws commands based on python) and API based on HTTP requests/responses.

Under the hood

Under the hood, AWS Redshift is based on PostgreSQL 8.0.2. The architecture consist of 1 leader node – a node which is responsible for managing the communications with the clients, developing the execution plan and then distributing the compiled code to the compute nodes-, and 1 or more compute nodes that are exetung the code and then sending back the result to the leader node for aggregation. The compute nodes can have either 2-cores, 15GB RAM and 2 TB storage node (dubbed as XL node) or a 16-cores, 120 GB RAM and 16 TB storage node (dubbed as 8XL node). More details about the Redshift archtecture can be found at http://docs.aws.amazon.com/redshift/latest/dg/c_internal_arch_system_operation.html

AWS-Redshift-Arch

Launching a cluster

The easiest way to launch a cluster is via AWS console.

We need to define the basic attributes like cluster identifier, database name, database port, master username and password:

AWS-RedShift1

Then we need to select the node type (XL or 8XL) and the number of compute nodes. A cluster can be single or multi-node, the minimum config is a one XL node cluster, while the maximum config is sixteen 8XL nodes – you can do the math in terms of cores, memory and storage.

AWS-Redshift2

Then we can configure additional parameters (like database encyption or security groups)

AWS-Redshift4

We can then review the configuration and are ready to launch the service:

AWS-RedShift5

The status will be first “creating” for a while then it will become “available”. This is when the JDBC url will become known and can be used for configuring the clients.

AWS-RedShift8

In order to make the service accessible, we need to configure the security options (either a security group – if Redshift is going to be accessed from EC2 – or a CIDR/IP (Classless- Inter-Domain Routing IP range)  – if Redshift is to be accessed from public Internet.  The system will automatically recognise the IP address of the client connected to AWS console.

AWS-RedShift6

And that is it! From then on the client can be connected to the Redshift cluster.

We used SQLWorkbench to test the service, the same way as suggested by AWS Redshift documentation. It is a Java based open source SQL tool. The connection parameters are the standard JDBC attributes:

AWS-Redshift9

The PostgreSQL version can be checked using

select version();

version
PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc (GCC) 3.4.2 20041017 (Red Hat 3.4.2-6.fc3), Redshift 1.0.546

We tested the service with Amazon stock prices downloaded from Yahoo Finance.

The content has been uploaded to a S3 bucket called stockprice (S3://stockprice/amzn.csv). We had to make it accessible for everyone to read. (open/download).

Then we could create the appropriate table using standard SQL command:

CREATE TABLE stockprice (
     stockdate date not null,
     stockopen decimal(6,2),
     stockhigh decimal(6,2),
     stocklow decimal(6,2),
     stockclose decimal(6,2),
     stockvolume integer,
     stockadjclose decimal(6,2)
     );

Table 'stockprice' created

Execution time: 0.15s

desc stockprice
COLUMN_NAME	DATA_TYPE	PK	NULLABLE	DEFAULT	AUTOINCREMENT	REMARKS	POSITION
stockdate	date	NO	YES		NO		1
stockopen	numeric(6,2)	NO	YES		NO		2
stockhigh	numeric(6,2)	NO	YES		NO		3
stocklow	numeric(6,2)	NO	YES		NO		4
stockclose	numeric(6,2)	NO	YES		NO		5
stockvolume	integer	NO	YES		NO		6
stockadjclose	numeric(6,2)	NO	YES		NO		7

To load the data into stockprice table, we had to use copy command with the S3 source file (it could also be an Amazon DynamoDB source).

copy stockprice from 's3://stockprices/amzn.csv' CREDENTIALS 'aws_access_key_id=XXXXXXX;aws_secret_access_key=XXXXXXX' delimiter ',';

If there is any error during the load operation, it can be verified by running a select statement on the stl_load_errors table. (e.g. incorrect data format).

And then we can run our SQL statements to analyse the data.

select * from stockprice order by stockadjclose desc limit 100;

stockdate	stockopen	stockhigh	stocklow	stockclose	stockvolume	stockadjclose
2013-01-25	275.00	284.72	274.40	283.99	4968100	283.99
2013-01-28	283.78	284.48	274.40	276.04	4321400	276.04
2013-03-05	274.00	276.68	269.99	275.59	3686200	275.59
2013-03-13	275.24	276.50	272.64	275.10	1884200	275.10
2013-03-08	275.00	275.44	271.50	274.19	1879800	274.19
2013-03-12	271.00	277.40	270.36	274.13	3246200	274.13
2013-03-07	274.10	274.80	271.85	273.88	1939000	273.88
2013-03-06	275.76	276.49	271.83	273.79	2050700	273.79
2013-01-24	269.37	276.65	269.37	273.46	3417000	273.46
2013-03-04	265.36	273.30	264.14	273.11	3453000	273.11
2013-01-30	283.00	284.20	267.11	272.76	13075400	272.76
2013-01-14	268.00	274.26	267.54	272.73	4275000	272.73
2013-01-18	270.83	274.50	269.60	272.12	2942000	272.12
2013-01-15	270.68	272.73	269.30	271.90	2326900	271.90
2013-03-11	273.43	273.99	270.40	271.24	1904900	271.24

AWS console supports various management functions of the cluster, we can reboot the cluster, we can modify parameters, we can resize it by defining different node type (XL->8XL) or decreasing/increasing the number of nodes. We can also delete the cluster via AWS console.
AWS-Redshift11

Conclusion

Amazon Web Services Redshift is another big step to make cloud services available for enterprise computing. It offers a data warehouse capability with minimal effort to start up and scale as operations demand. It is a great complement to other database services such as DynamoDB for NoSQL requirements and RDS for relational database services.

Prediction API – Machine Learning from Google


Introduction

One of the exciting APIs among the 50+ APIs offered by Google is the Prediction API. It provides pattern matching and machine learning capabilities like recommendations or categorization. The notion is similar to the machine learning capabilities that we can see in other solutions (e.g. in Apache Mahout): we can train the system with a set of training data and then the applications based on Prediction API can recommend (“predict”) what products the user might like or  they can categories spams, etc.

In this post we go through an example how to categorize SMS messages – whether they are spams or valuable texts (“hams”).

Using Prediction API

In order to be able to use Prediction API, the service needs to be enabled via Google API console. To upload training data, Prediction API also requires Google Cloud Storage.

The dataset  used in this post is from UCI Machine Learning Repository.  UCI Machine Learning repository has 235 datasets publicly available, this post is based on SMS Spam Collections dataset.

To upload the training data first we need to create a bucket in Google Cloud Storage. From Google API console we need to click on Google Cloud Storage and then on Google Cloud Storage Manager: This will open a webpage whe we can create new buckets and upload or delete files.

GoogleStorage2

The UCI SMS Spam Collection file is not suitable as is for Prediction API, it needs to be converted into the following format (the categories – ham/spam – need to be quoted as well as the SMS text):

“ham” “Go until jurong point, crazy.. Available only in bugis n great world la e buffet… Cine there got amore wat…”

GoogleStorage4

Google Prediction API offers a handful of commands that can be invoked via REST interface. The simplest way of testing Prediction API is to use Prediction API explorer.

GooglePrediction1

Once the training data is available on Google Cloud Storage, we can start training the machine learning system behind Prediction API. To begin training our model, we need to run prediction.trainedmodels.insert. All commands require authentication, it is based on OAuth 2.0 standard.

GooglePrediction2

In the insert menu we need to specify the fields that we want to be included in the response.  In the request body we need to define an id (this will be used as a reference to the model in the commands used later on), a storageDataLocation where we have the training data uploaded (the Google Cloud Storage path) and the modelType (could be regression or classification, for spam filtering it is classification):

GooglePrediction-SpamInsert1

The training runs for a while, we can check the status using prediction.trainedmodels.get command. The status field is going to be RUNNING and then will be changed to DONE, once the training is finished.

GooglePrediction-SpamGet1

GooglePrediction-SpamGet2

Now we are ready to run our test against the machine learning system and it is going to classify whether the given text is spam or ham. The Prediction API command for this action is prediction.trainedmodels.predict. In the id field we have to refer to the id that we defined for the  prediction.trainedmodels.insert command (bighadoop-00001) and we also need to specify the request body – input will be csvInstance and then we enter the text that we want to get categorized (e.g. “Free entry”)

GooglePrediction-SpamPredict1

The system then returns with the category (spam) and the score (0.822158 for spam, 0.177842 for ham):

GooglePrediction-SpamPredict2

Google Prediction API libraries

Google also offers a featured sample application that includes all the code required to run it on Google App Engine. It is called Try-Prediction and the code is written in Python and also in Java. The application can be tested at http://try-prediction.appspot.com.

For instance, if we enter a quote for the Language Detection model from Niels Bohr: “Prediction is very difficult, especially if it’s about the future.”, it will return that it is likely to be an English text (54,4%).

TryPrediction

The key part of the Python code is in predict.py:

class PredictAPI(webapp.RequestHandler):
  '''This class handles Ajax prediction requests, i.e. not user initiated
     web sessions but remote procedure calls initiated from the Javascript
     client code running the browser.
  '''

  def get(self):
    try:
      # Read server-side OAuth 2.0 credentials from datastore and
      # raise an exception if credentials not found.
      credentials = StorageByKeyName(CredentialsModel, USER_AGENT, 
                                    'credentials').locked_get()
      if not credentials or credentials.invalid:
        raise Exception('missing OAuth 2.0 credentials')

      # Authorize HTTP session with server credentials and obtain  
      # access to prediction API client library.
      http = credentials.authorize(httplib2.Http())
      service = build('prediction', 'v1.4', http=http)
      papi = service.trainedmodels()

      # Read and parse JSON model description data.
      models = parse_json_file(MODELS_FILE)

      # Get reference to user's selected model.
      model_name = self.request.get('model')
      model = models[model_name]

      # Build prediction data (csvInstance) dynamically based on form input.
      vals = []
      for field in model['fields']:
        label = field['label']
        val = str(self.request.get(label))
        vals.append(val)
      body = {'input' : {'csvInstance' : vals }}
      logging.info('model:' + model_name + ' body:' + str(body))

      # Make a prediction and return JSON results to Javascript client.
      ret = papi.predict(id=model['model_id'], body=body).execute()
      self.response.out.write(json.dumps(ret))

    except Exception, err:
      # Capture any API errors here and pass response from API back to
      # Javascript client embedded in a special error indication tag.
      err_str = str(err)
      if err_str[0:len(ERR_TAG)] != ERR_TAG:
        err_str = ERR_TAG + err_str + ERR_END
      self.response.out.write(err_str)

The Java version of Prediction web application is as follows:

public class PredictServlet extends HttpServlet {

  @Override
  protected void doGet(HttpServletRequest request,
                       HttpServletResponse response) throws ServletException, 
                                                            IOException {
    Entity credentials = null;
    try {
      // Retrieve server credentials from app engine datastore.
      DatastoreService datastore = 
        DatastoreServiceFactory.getDatastoreService();
      Key credsKey = KeyFactory.createKey("Credentials", "Credentials");
      credentials = datastore.get(credsKey);
    } catch (EntityNotFoundException ex) {
      // If can't obtain credentials, send exception back to Javascript client.
      response.setContentType("text/html");
      response.getWriter().println("exception: " + ex.getMessage());
    }

    // Extract tokens from retrieved credentials.
    AccessTokenResponse tokens = new AccessTokenResponse();
    tokens.accessToken = (String) credentials.getProperty("accessToken");
    tokens.expiresIn = (Long) credentials.getProperty("expiresIn");
    tokens.refreshToken = (String) credentials.getProperty("refreshToken");
    String clientId = (String) credentials.getProperty("clientId");
    String clientSecret = (String) credentials.getProperty("clientSecret");
    tokens.scope = IndexServlet.scope;

    // Set up the HTTP transport and JSON factory
    HttpTransport httpTransport = new NetHttpTransport();
    JsonFactory jsonFactory = new JacksonFactory();

    // Get user requested model, if specified.
    String model_name = request.getParameter("model");

    // Parse model descriptions from models.json file.
    Map models = 
      IndexServlet.parseJsonFile(IndexServlet.modelsFile);

    // Setup reference to user specified model description.
    Map selectedModel = 
      (Map) models.get(model_name);
    
    // Obtain model id (the name under which model was trained), 
    // and iterate over the model fields, building a list of Strings
    // to pass into the prediction request.
    String modelId = (String) selectedModel.get("model_id");
    List params = new ArrayList();
    List<Map > fields = 
      (List<Map >) selectedModel.get("fields");
    for (Map field : fields) {
      // This loop is populating the input csv values for the prediction call.
      String label = field.get("label");
      String value = request.getParameter(label);
      params.add(value);
    }

    // Set up OAuth 2.0 access of protected resources using the retrieved
    // refresh and access tokens, automatically refreshing the access token 
    // whenever it expires.
    GoogleAccessProtectedResource requestInitializer = 
      new GoogleAccessProtectedResource(tokens.accessToken, httpTransport, 
                                        jsonFactory, clientId, clientSecret, 
                                        tokens.refreshToken);

    // Now populate the prediction data, issue the API call and return the
    // JSON results to the Javascript AJAX client.
    Prediction prediction = new Prediction(httpTransport, requestInitializer, 
                                           jsonFactory);
    Input input = new Input();
    InputInput inputInput = new InputInput();
    inputInput.setCsvInstance(params);
    input.setInput(inputInput);
    Output output = 
      prediction.trainedmodels().predict(modelId, input).execute();
    response.getWriter().println(output.toPrettyString());
  }
}

Besides Python and Java support, Google also offers .NET, Objective-C, Ruby, Go, JavaScript, PHP, etc. libraries for Prediction API.