Apache Phoenix – an SQL Driver for HBase


Introduction

HBase is one of the most popular NoSQL databases, it is available in all major Hadoop distributions and also part of AWS Elastic MapReduce as an additional application. Out of the box it has its own data model operations such as Get, Put, Scan and Delete and it does not offer SQL-like capabilities, as oppose to, for instance, Cassandra query language, CQL.
Apache Phoenix is a SQL layer on top of HBase to support the most common SQL-like operations such as CREATE TABLE, SELECT, UPSERT, DELETE, etc. Originally it was developed by Salesforce.com engineers for internal use and was open sourced. In 2013 it became an Apache incubator project.

Architecture

We have covered HBase in more detail in this article. Just a quick recap: HBase architecture is based on three key components: HBase Master server, HBase Region Servers and Zookeeper.

HBase-Architecture

The client needs to find the RegionServers in order to work with the data stored in HBase. In essence, regions are the basic elements for distributing tables across the cluster. In order to find the Region servers, the client first will have to talk to Zookeeper.

HBase-Lookup

The key elements in the HBase datamodel are tables, column families, columns and rowkeys. The tables are made of columns and rows. The individual elements at the column and row intersections (cells in HBase term) are version based on timestamp. The rows are identified by rowkeys which are sorted – these rowkeys can be considered as primary keys and all the data in the table can be accessed via them.

The columns are grouped into column families; at table creation time you do not have to specify all the columns, only the column families. Columns have a prefix derived from the column family and its own qualifier,a column name looks like this: ‘contents:html’.

As we have seen, HBase classic data model is not designed with SQL in mind. Under the hood it is a sorted multidimensional Map. That is where Phoenix comes to the rescue; it offers a SQL skin on HBase. Phoenix is implemented as a JDBC driver. From architecture perspective a Java client using JDBC can be configured to work with Phoenix Driver and can connect to HBase using SQL-like statements. We will demonstrate how to use SQuirreL client, a popular Java-based graphical SQL client together with Phoenix.

Getting Started with Phoenix

You can download Phoenix from Apache download site. Different Phoenix versions are compatible with different HBase versions, so please, read Phoenix documentation to ensure you have the correct setup. In our tests we used Phoenix 3.0.0 with HBase 0.94, the Hadoop distribution was Cloudera CDH4.4 with Hadoop v1.. The Phoenix package contains both Hadoop version 1 and version 2 drivers for the clients so we had to use the appropriate Hadoop-1 files, see the details later on when talking about SQuirreL client.

Once you unzipped the downloaded Phoenix package, you need to copy the relevant Phoenix jar files to the HBase region servers in order to ensure that the Phoenix client can communicate with them, otherwise you may get an error message saying that the client and server jars are not compatible.

$ cd ~/phoenix/phoenix-3.0.0-incubating/common
$ cp phoenix-3.0.0-incubating-client-minimal.jar  /usr/lib/hbase/lib
$ cp phoenix-core-3.0.0-incubating.jar /usr/lib/hbase/lib

After you copied the jar files to the region servers, we had to restart them.

Phoenix provides a command line tool called sqlline – it is a utility written in Python. Its functionality is similar to Oracle SQLPlus or MySQL command line tools; not too sophisticated but does the job for simply use cases.

Before you start using sqlline, you can create a sample database table, populate it and run some simple queries as follows:

$ cd ~/phoenix/phoenix-3.0.0.0-incubating/bin
$ ./psql.py localhost ../examples/web_stat.sql ../examples/web_stat.csv ../examples/web_stat_queries.sql

This will run a CREATE TABLE statement:

CREATE TABLE IF NOT EXISTS WEB_STAT (
     HOST CHAR(2) NOT NULL,
     DOMAIN VARCHAR NOT NULL,
     FEATURE VARCHAR NOT NULL,
     DATE DATE NOT NULL,
     USAGE.CORE BIGINT,
     USAGE.DB BIGINT,
     STATS.ACTIVE_VISITOR INTEGER
     CONSTRAINT PK PRIMARY KEY (HOST, DOMAIN, FEATURE, DATE)
);

Then load the data stored in the web_stat CSV file:

NA,Salesforce.com,Login,2013-01-01 01:01:01,35,42,10
EU,Salesforce.com,Reports,2013-01-02 12:02:01,25,11,2
EU,Salesforce.com,Reports,2013-01-02 14:32:01,125,131,42
NA,Apple.com,Login,2013-01-01 01:01:01,35,22,40
NA,Salesforce.com,Dashboard,2013-01-03 11:01:01,88,66,44
...

And the run a few sample queries on the table, e.g.:

-- Average CPU and DB usage by Domain
SELECT DOMAIN, AVG(CORE) Average_CPU_Usage, AVG(DB) Average_DB_Usage 
FROM WEB_STAT 
GROUP BY DOMAIN 
ORDER BY DOMAIN DESC;

Now you can connect to HBase using sqlline:

$ ./sqlline.py localhost
[cloudera@localhost bin]$ ./sqlline.py localhost
..
Connecting to jdbc:phoenix:localhost
Driver: org.apache.phoenix.jdbc.PhoenixDriver (version 3.0)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
..
Done
sqlline version 1.1.2
0: jdbc:phoenix:localhost> select count(*) from web_stat;
+------------+
|  COUNT(1)  |
+------------+
| 39         |
+------------+
1 row selected (0.112 seconds)
0: jdbc:phoenix:localhost> select host, sum(active_visitor) from web_stat group by host;
+------+---------------------------+
| HOST | SUM(STATS.ACTIVE_VISITOR) |
+------+---------------------------+
| EU   | 698                       |
| NA   | 1639                      |
+------+---------------------------+
2 rows selected (0.294 seconds)
0: jdbc:phoenix:localhost>

Using SQuirreL with Phoenix

If you prefer to use a graphical SQL client with Phoenix, you can download e.g. SQuirreL from here. After that the first step is to copy the appropriate Phoenix driver jar file to SQuirreL lib directory:

$ cd ~/phoenix
$ cp phoenix-3.0.0-incubating/hadoop-1/phoenix-3.0.0.-incubatibg-client.jar ~/squirrel/lib

Now you are ready to configure the JDBC driver in SQuirreL client, as shown in the picture below:

Squirrel-1

Then you can connect to Phoenix using the appropriate connect string (jdbc:phoenix:localhost in our test scenario):

Squirrel-2

Once connected, you can start executing your SQL queries:
Squirrel-3

Phoenix on Amazon Web Services – AWS Elastic MapReduce with Phoenix

You can also use Phoenix with AWS Elastic MapReduce. When you create a cluster, you need to specify Apach Hadoop version, then configure HBase as additional application and define the bootsrap action to load Phoenix onto your AWS EMR cluster. See the details below in the pictures:

AWS-EMR-3

AWS-EMR-5

Once the cluster is running, you can login to the master node using ssh and check your Phoenix configuration.
AWS-EMR-9

Conclusion

SQL is one of the most popular languages used by data scientists and it is likely to remain so. With the advent of Big Data and NoSQL databases the volume, variety and velocity of the data have significantly increased but still the demand for traditional, well-known languages to process them did not change too much. SQL on Hadoop solutions are gaining momentum. Apache Phoenix is interesting open source player to offer SQL layer on top of HBase.

Pivotal Hadoop Distribution and HAWQ realtime query engine


Introduction

SQL on Hadoop and the support for interactive, ad-hoc queries in Hadoop is in increasing demand and all the vendors are providing their answer to these requirements. In the open source world Cloudera’s Impala, Apache Drill (backed by MapR), Hortonworks’s Stinger initiatives are competing in this market, just to mention a few key players. There are also strong offerings from BI and analytics vendors such as Pivotal (HAWQ), Teradata (SQL-H) or IBM (BigSQL).
In this post we will cover Pivotal Hadoop Distribution (Pivotal HD) and HAWQ, Pivotal’s interactive distributed SQL query engine.

Getting started with Pivotal HD

Pivotal HD contains the most well-known open source components such as HDFS, MapReduce, YARN, Hive, Pig, HBase, Flume, Sqoop and Mahout. There are also additional components available such as the Command Center, Unified Storage Services, Data Loader, Spring and HAWQ as an add-on. (Pivotal has an offering called GemFire XD which is a distributed in-memory data grid but that is out of scope for our current discussion).

PivotalHD_ArchitectDiagram

Let us take an example how to use Pivotal HD to answer the following question: what was the highest price of the Apple, Google and Nokia stocks ever and when those stocks reached the peak value?

First we are going to develop a MapReduce algorithm to calculate these values and then we will run SQL queries in HAWQ to get the same result. Our test environment is based on Pivotal HD Single Node virtual machine running on VMWare VMPlayer and it is using a 64-bit CentOS 6.4 distribution. Pivotal HD virtual machine does not contain Eclipse so we had to download that separately from eclipse.org.

Once we have the environment set, the next step is to create a maven project.

$ mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=highest_stock_price -DartifactId=highest_stock_price

This command will create a pom.xml where we have the basic project settings and junit added as a dependency. Then we need to edit pom.xml and add the other relevant dependencies and build settings.
After that we can start writing our Hadoop application in Eclipse. The code is also uploaded to Github (https://github.com/iszegedi/Pivotal-HD-and-HAWQ-blog) for your reference.

pivotal-eclipse-dev

The key Java classes are HighestStockPriceDriver.java which is the main driver file for our MapReduce application, the HighestStockPriceMapper.java which contains the map() function and the HighestStockPriceReducer.java which is running the reduce() function.

Then we can compile the code and package it into a jar file:

$ mvn clean compile
$ mvn -DskipTests package

The next step is to copy our data sets into a Hadoop HDFS directory.

$ hadoop fs -mkdir /stock_demo/input
$ hadoop fs -put *.csv /stock_demo/input/
$ hadoop fs -ls /stock_demo/input/
Found 3 items
-rw-r--r--   1 gpadmin hadoop     403395 2013-12-31 00:25 /stock_demo/input/apple.csv
-rw-r--r--   1 gpadmin hadoop     134696 2013-12-31 00:25 /stock_demo/input/google.csv
-rw-r--r--   1 gpadmin hadoop     248405 2013-12-31 00:25 /stock_demo/input/nokia.csv

The format of the files (apple.csv, nokia.csv, google.csv) is as follows (the columns are Symbol, Date, Open, High, Low, Close, Volume, Adj Close):

$ head -5 apple.csv
AAPL,2013-09-06,498.44,499.38,489.95,498.22,12788700,498.22
AAPL,2013-09-05,500.25,500.68,493.64,495.27,8402100,495.27
AAPL,2013-09-04,499.56,502.24,496.28,498.69,12299300,498.69
AAPL,2013-09-03,493.10,500.60,487.35,488.58,11854600,488.58
AAPL,2013-08-30,492.00,492.95,486.50,487.22,9724900,487.22

Now we are ready to run our MapReduce algorithm on the data sets:

$ hadoop jar target/highest_stock_price-1.0.jar highest_stock_price/HighestStockPriceDriver /stock_demo/input/ /stock_demo/output/

$ hadoop fs -cat /stock_demo/output/part*
AAPL:	2012-09-19	685.76
GOOG:	2013-07-15	924.69
NOK:	2000-06-19	42.24

We can check the status of the Hadoop job using the following command:

$ hadoop job -status job_1388420266428_0001
DEPRECATED: Use of this script to execute mapred command is deprecated.
Instead use the mapred command for it.

13/12/31 00:31:15 INFO service.AbstractService: Service:org.apache.hadoop.yarn.client.YarnClientImpl is inited.
13/12/31 00:31:15 INFO service.AbstractService: Service:org.apache.hadoop.yarn.client.YarnClientImpl is started.
13/12/31 00:31:17 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server

Job: job_1388420266428_0001
Job File: hdfs://pivhdsne:8020/user/history/done/2013/12/31/000000/job_1388420266428_0001_conf.xml
Job Tracking URL : http://localhost:19888/jobhistory/job/job_1388420266428_0001
Uber job : false
Number of maps: 3
Number of reduces: 1
map() completion: 1.0
reduce() completion: 1.0
Job state: SUCCEEDED
....
....

This will show us that there were 3 mappers and 1 reducer run. It will also show the number of input and output records and bytes.

HAWQ interactive distributed query engine

The common complaints with regards to the classic Hadoop MapReduce algorithms are that they require fairly extensive Java experience and they are rather tuned for batch type of data processing, they are not really suitable for exploratory data analysis using ad-hoc interactive queries. That is where HAWQ can come to the rescue.

HAWQ is a massively parallel SQL query engine. The underlying engine is based on PostgreSQL (version 8.2.15, as of writing this post) so it can support the standard SQL statements out of the box. The key architecture components are HAWQ master, HAWQ segments, HAWQ storage and HAWQ interconnect.

HAWQ-architecture

HAWQ master is responsible for accepting the connections from the clients and it also manages the system tables containing metadata about HAWQ itself (however, no user data is stored on the master). The master then parses and optimises the queries and develops an execution plan which is then dispatched to the segments.

HAWQ segments are the processing units, they are responsible of executing the local database operations on their own data sets.

HAWQ-query-execution

HAWQ stores all the user data in HDFS. HAWQ interconnect refers to the UDP based inter-process communication between the segments.

Now let us see how we can answer the same question about stock prices that we did with our MapReduce job.

First we need to login to our client (psql which is the same client that we know well from PostgeSQL databases) and create our schema and table:

$ psql
psql (8.2.15)
Type "help" for help.

gpadmin=# create schema stock_demo;
gpadmin=# create table stock_demo.stock
gpadmin-# (
gpadmin(# symbol TEXT,
gpadmin(# date TEXT,
gpadmin(# open NUMERIC(6,2),
gpadmin(# high NUMERIC(6,2),
gpadmin(# low NUMERIC(6,2),
gpadmin(# close NUMERIC(6,2),
gpadmin(# volume INTEGER,
gpadmin(# adjclose NUMERIC(6,2)
gpadmin(# )
gpadmin-# with (appendonly=true) distributed randomly;

The next step is to load the data into this HAWQ table, we can use the following commands to do this:

$ cat google.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';"
$ cat nokia.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';"
$ cat apple.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';"

Now we can login again to our psql client and run the SQL queries:

gpadmin=# select count(*) from stock_demo.stock;
 count 
-------
 14296
(1 row)

gpadmin=# select symbol, date, adjclose from stock_demo.stock where adjclose in
gpadmin-# ( select max(adjclose) as max_adj_close from stock_demo.stock 
gpadmin(#   group by symbol )
gpadmin-# order by symbol;
 symbol |    date    | adjclose 
--------+------------+----------
 AAPL   | 2012-09-19 |   685.76
 GOOG   | 2013-07-15 |   924.69
 NOK    | 2000-06-19 |    42.24
(3 rows)

These SQL queries relied on HAWQ internal table,thus we had to load the data into it from our local file system. HAWQ also support the notion of external tables using PXF (Pivotal eXtension Framework). It is an external table interface in HAWQ that allows to read data directly from HDFS directories. It has a concept of fragmenters, accessors and resolvers which are used to split the data files into smaller chunks and read them into HAWQ without having the need to explicitly load them into HAWQ internal tables.

If we want to use external table, we need to create it using the following SQL statement:

gpadmin=# create external table stock_demo.stock_pxf
gpadmin-# (
gpadmin(# symbol TEXT,
gpadmin(# date TEXT,
gpadmin(# open NUMERIC(6,2),
gpadmin(# high NUMERIC(6,2),
gpadmin(# low NUMERIC(6,2),
gpadmin(# close NUMERIC(6,2),
gpadmin(# volume INTEGER,
gpadmin(# adjclose NUMERIC(6,2)
gpadmin(# )
gpadmin-# location ('pxf://pivhdsne:50070/stock_demo/input/*.csv?Fragmenter=HdfsDataFragmenter&Accessor=TextFileAccessor&Resolver=TextResolver')
gpadmin-# format 'TEXT' (delimiter = E'\,');

Then we can run the same queries against the external table as before:

gpadmin=# select count(*) from stock_demo.stock_pxf;
 count 
-------
 14296
(1 row)

gpadmin=# select symbol, date, adjclose from stock_demo.stock_pxf where adjclose in 
gpadmin-# ( select max(adjclose) as max_adj_close from stock_demo.stock_pxf
gpadmin(#   group by symbol )
gpadmin-# order by symbol;
 symbol |    date    | adjclose 
--------+------------+----------
 AAPL   | 2012-09-19 |   685.76
 GOOG   | 2013-07-15 |   924.69
 NOK    | 2000-06-19 |    42.24
(3 rows)

Conclusion

SQL on Hadoop is gaining significant momentum, the demand to be able to run ad-hoc, interactive queries as well as batch data processing on top of Hadoop is increasing. Most of the key players in big data world have started providing solutions to address these needs. 2014 seems to be an interesting year to see how these offerings are going to evolve.

OpenStack Savanna – Fast Hadoop Cluster Provisioning on OpenStack


Introduction

OpenStack is one of the most popular open source cloud computing projects to provide Infrastructure as a Service solution. Its key core components are Compute (Nova), Networking (Neutron, formerly known as Quantum), Storage (object and block storage, Swift and Cinder, respectively), Openstack Dashboard (Horizon), Identity Service (Keystone) and Image Servie (Glance).

openstack-software-diagram

There are other official incubated projects like Metering (Celiometer) and Orchestration and Service Definition (Heat).

Savanna is a Hadoop as a Service for OpenStack introduced by Mirantis. It is still in an early phase (v.02 has been released in Summer, 2013) and according to its roadmap version 1.0 is targeted for official OpenStack incubation. In principle, Heat also could be used for Hadoop cluster provisioning but Savanna is especially tuned for providing Hadoop specific API functionality while Heat is meant to be used for generic purpose .

Savanna Architecture

Savanna is integrated with the core OpenStack components such as Keystone, Nova, Glance, Swift and Horizon. It has a REST API that supports the Hadoop cluster provisioning steps.

savanna-architecture

Savanna API is implemented as a WSGI server that, by default, listens to port 8386. In addition, Savanna can also be integrated with Horizon, the OpenStack Dashboard to create a Hadoop cluster from the management console.  Savanna also comes with a Vanilla plugin that deploys a Hadoop cluster image. The standard out-of-the-box Vanilla plugin supports Hadoop 1.1.2 version.

Installing Savanna

The simplest option to try out Savanna is to use devstack in a virtual machine. I was using an Ubuntu 12.04 virtual instance in my tests. In that environment we need to execute the following commands to install devstack and Savanna API:

$ sudo apt-get install git-core
$ git clone https://github.com/openstack-dev/devstack.git
$ vi localrc  # edit localrc
ADMIN_PASSWORD=nova
MYSQL_PASSWORD=nova
RABBIT_PASSWORD=nova
SERVICE_PASSWORD=$ADMIN_PASSWORD
SERVICE_TOKEN=nova

# Enable Swift
ENABLED_SERVICES+=,swift

SWIFT_HASH=66a3d6b56c1f479c8b4e70ab5c2000f5
SWIFT_REPLICAS=1
SWIFT_DATA_DIR=$DEST/data

# Force checkout prerequsites
# FORCE_PREREQ=1

# keystone is now configured by default to use PKI as the token format which produces huge tokens.
# set UUID as keystone token format which is much shorter and easier to work with.
KEYSTONE_TOKEN_FORMAT=UUID

# Change the FLOATING_RANGE to whatever IPs VM is working in.
# In NAT mode it is subnet VMWare Fusion provides, in bridged mode it is your local network.
FLOATING_RANGE=192.168.55.224/27

# Enable auto assignment of floating IPs. By default Savanna expects this setting to be enabled
EXTRA_OPTS=(auto_assign_floating_ip=True)

# Enable logging
SCREEN_LOGDIR=$DEST/logs/screen

$ ./stack.sh  # this will take a while to execute
$ sudo apt-get install python-setuptools python-virtualenv python-dev
$ virtualenv savanna-venv
$ savanna-venv/bin/pip install savanna
$ mkdir savanna-venv/etc
$ cp savanna-venv/share/savanna/savanna.conf.sample savanna-venv/etc/savanna.conf
# To start Savanna API:
$ savanna-venv/bin/python savanna-venv/bin/savanna-api --config-file savanna-venv/etc/savanna.conf

To install Savanna UI integrated with Horizon, we need to run the following commands:

$ sudo pip install savanna-dashboard
$ cd /opt/stack/horizon/openstack-dashboard
$ vi settings.py
HORIZON_CONFIG = {
    'dashboards': ('nova', 'syspanel', 'settings', 'savanna'),

INSTALLED_APPS = (
    'savannadashboard',
    ....

$ cd /opt/stack/horizon/openstack-dashboard/local
$ vi local_settings.py
SAVANNA_URL = 'http://localhost:8386/v1.0'

$ sudo service apache2 restart

Provisioning a Hadoop cluster

As a first step, we need to configure Keystone related environment variables to get the authentication token:

ubuntu@ip-10-59-33-68:~$ vi .bashrc
$ export OS_AUTH_URL=http://127.0.0.1:5000/v2.0/
$ export OS_TENANT_NAME=admin
$ export OS_USERNAME=admin
$ export OS_PASSWORD=nova
ubuntu@ip-10-59-33-68:~$ source .bashrc

ubuntu@ip-10-59-33-68:~$ ubuntu@ip-10-59-33-68:~$ env | grep OS
OS_PASSWORD=nova
OS_AUTH_URL=http://127.0.0.1:5000/v2.0/
OS_USERNAME=admin
OS_TENANT_NAME=admin
ubuntu@ip-10-59-33-68:~$ keystone token-get
+-----------+----------------------------------+
|  Property |              Value               |
+-----------+----------------------------------+
|  expires  |       2013-08-09T20:31:12Z       |
|     id    | bdb582c836e3474f979c5aa8f844c000 |
| tenant_id | 2f46e214984f4990b9c39d9c6222f572 |
|  user_id  | 077311b0a8304c8e86dc0dc168a67091 |
+-----------+----------------------------------+

$ export AUTH_TOKEN="bdb582c836e3474f979c5aa8f844c000"
$ export TENANT_ID="2f46e214984f4990b9c39d9c6222f572"

Then we need to create the Glance image that we want to use for our Hadoop cluster. In our example we have used Mirantis’s vanilla image but we can also build our own image:

$ wget http://savanna-files.mirantis.com/savanna-0.2-vanilla-1.1.2-ubuntu-12.10.qcow2
$ glance image-create --name=savanna-0.2-vanilla-hadoop-ubuntu.qcow2 --disk-format=qcow2 --container-format=bare < ./savanna-0.2-vanilla-1.1.2-ubuntu-12.10.qcow2
ubuntu@ip-10-59-33-68:~/devstack$ glance image-list
+--------------------------------------+-----------------------------------------+-------------+------------------+-----------+--------+
| ID                                   | Name                                    | Disk Format | Container Format | Size      | Status |
+--------------------------------------+-----------------------------------------+-------------+------------------+-----------+--------+
| d0d64f5c-9c15-4e7b-ad4c-13859eafa7b8 | cirros-0.3.1-x86_64-uec                 | ami         | ami              | 25165824  | active |
| fee679ee-e0c0-447e-8ebd-028050b54af9 | cirros-0.3.1-x86_64-uec-kernel          | aki         | aki              | 4955792   | active |
| 1e52089b-930a-4dfc-b707-89b568d92e7e | cirros-0.3.1-x86_64-uec-ramdisk         | ari         | ari              | 3714968   | active |
| d28051e2-9ddd-45f0-9edc-8923db46fdf9 | savanna-0.2-vanilla-hadoop-ubuntu.qcow2 | qcow2       | bare             | 551699456 | active |
+--------------------------------------+-----------------------------------------+-------------+------------------+-----------+--------+
$  export IMAGE_ID=d28051e2-9ddd-45f0-9edc-8923db46fdf9

Then we have installed httpie, an open source HTTP client that can be used to send REST requests to Savanna API:

$ sudo pip install httpie

From now on we will use httpie to send Savanna commands. We need to register the image with Savanna:

$ export SAVANNA_URL="http://localhost:8386/v1.0/$TENANT_ID"
$ http POST $SAVANNA_URL/images/$IMAGE_ID X-Auth-Token:$AUTH_TOKEN username=ubuntu

HTTP/1.1 202 ACCEPTED
Content-Length: 411
Content-Type: application/json
Date: Thu, 08 Aug 2013 21:28:07 GMT

{
    "image": {
        "OS-EXT-IMG-SIZE:size": 551699456,
        "created": "2013-08-08T21:05:55Z",
        "description": "None",
        "id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9",
        "metadata": {
            "_savanna_description": "None",
            "_savanna_username": "ubuntu"
        },
        "minDisk": 0,
        "minRam": 0,
        "name": "savanna-0.2-vanilla-hadoop-ubuntu.qcow2",
        "progress": 100,
        "status": "ACTIVE",
        "tags": [],
        "updated": "2013-08-08T21:28:07Z",
        "username": "ubuntu"
    }
}

$ http $SAVANNA_URL/images/$IMAGE_ID/tag X-Auth-Token:$AUTH_TOKEN tags:='["vanilla", "1.1.2", "ubuntu"]'

HTTP/1.1 202 ACCEPTED
Content-Length: 532
Content-Type: application/json
Date: Thu, 08 Aug 2013 21:29:25 GMT

{
    "image": {
        "OS-EXT-IMG-SIZE:size": 551699456,
        "created": "2013-08-08T21:05:55Z",
        "description": "None",
        "id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9",
        "metadata": {
            "_savanna_description": "None",
            "_savanna_tag_1.1.2": "True",
            "_savanna_tag_ubuntu": "True",
            "_savanna_tag_vanilla": "True",
            "_savanna_username": "ubuntu"
        },
        "minDisk": 0,
        "minRam": 0,
        "name": "savanna-0.2-vanilla-hadoop-ubuntu.qcow2",
        "progress": 100,
        "status": "ACTIVE",
        "tags": [
            "vanilla",
            "ubuntu",
            "1.1.2"
        ],
        "updated": "2013-08-08T21:29:25Z",
        "username": "ubuntu"
    }
}

Then we need to create a nodegroup templates (json files) that will be sent to Savanna. There is one template for the master nodes (namenode, jobtracker) and another template for the worker nodes such as datanode and tasktracker. The Hadoop version is 1.1.2

$ vi ng_master_template_create.json
{
    "name": "test-master-tmpl",
    "flavor_id": "2",
    "plugin_name": "vanilla",
    "hadoop_version": "1.1.2",
    "node_processes": ["jobtracker", "namenode"]
}

$ vi ng_worker_template_create.json
{
    "name": "test-worker-tmpl",
    "flavor_id": "2",
    "plugin_name": "vanilla",
    "hadoop_version": "1.1.2",
    "node_processes": ["tasktracker", "datanode"]
}

$ http $SAVANNA_URL/node-group-templates X-Auth-Token:$AUTH_TOKEN < ng_master_template_create.json
HTTP/1.1 202 ACCEPTED
Content-Length: 387
Content-Type: application/json
Date: Thu, 08 Aug 2013 21:58:00 GMT

{
    "node_group_template": {
        "created": "2013-08-08T21:58:00",
        "flavor_id": "2",
        "hadoop_version": "1.1.2",
        "id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c",
        "name": "test-master-tmpl",
        "node_configs": {},
        "node_processes": [
            "jobtracker",
            "namenode"
        ],
        "plugin_name": "vanilla",
        "updated": "2013-08-08T21:58:00",
        "volume_mount_prefix": "/volumes/disk",
        "volumes_per_node": 0,
        "volumes_size": 10
    }
}

$ http $SAVANNA_URL/node-group-templates X-Auth-Token:$AUTH_TOKEN < ng_worker_template_create.json
HTTP/1.1 202 ACCEPTED
Content-Length: 388
Content-Type: application/json
Date: Thu, 08 Aug 2013 21:59:41 GMT

{
    "node_group_template": {
        "created": "2013-08-08T21:59:41",
        "flavor_id": "2",
        "hadoop_version": "1.1.2",
        "id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6",
        "name": "test-worker-tmpl",
        "node_configs": {},
        "node_processes": [
            "tasktracker",
            "datanode"
        ],
        "plugin_name": "vanilla",
        "updated": "2013-08-08T21:59:41",
        "volume_mount_prefix": "/volumes/disk",
        "volumes_per_node": 0,
        "volumes_size": 10
    }
}

The next step is to define the cluster template:

$ vi cluster_template_create.json

{
    "name": "demo-cluster-template",
    "plugin_name": "vanilla",
    "hadoop_version": "1.1.2",
    "node_groups": [
        {
            "name": "master",
            "node_group_template_id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c",
            "count": 1
        },
        {
            "name": "workers",
            "node_group_template_id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6",
            "count": 2
        }
    ]
}

http $SAVANNA_URL/cluster-templates X-Auth-Token:$AUTH_TOKEN < cluster_template_create.json
HTTP/1.1 202 ACCEPTED
Content-Length: 815
Content-Type: application/json
Date: Fri, 09 Aug 2013 07:04:24 GMT

{
    "cluster_template": {
        "anti_affinity": [],
        "cluster_configs": {},
        "created": "2013-08-09T07:04:24",
        "hadoop_version": "1.1.2",
        "id": "{
    "name": "cluster-1",
    "plugin_name": "vanilla",
    "hadoop_version": "1.1.2",
    "cluster_template_id" : "64c4117b-acee-4da7-937b-cb964f0471a9",
    "user_keypair_id": "stack",
    "default_image_id": "3f9fc974-b484-4756-82a4-bff9e116919b"
}",
        "name": "demo-cluster-template",
        "node_groups": [
            {
                "count": 1,
                "flavor_id": "2",
                "name": "master",
                "node_configs": {},
                "node_group_template_id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c",
                "node_processes": [
                    "jobtracker",
                    "namenode"
                ],
                "volume_mount_prefix": "/volumes/disk",
                "volumes_per_node": 0,
                "volumes_size": 10
            },
            {
                "count": 2,
                "flavor_id": "2",
                "name": "workers",
                "node_configs": {},
                "node_group_template_id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6",
                "node_processes": [
                    "tasktracker",
                    "datanode"
                ],
                "volume_mount_prefix": "/volumes/disk",
                "volumes_per_node": 0,
                "volumes_size": 10
            }
        ],
        "plugin_name": "vanilla",
        "updated": "2013-08-09T07:04:24"
    }
}

Now we are ready to create the Hadoop cluster:

$ vi cluster_create.json
{
    "name": "cluster-1",
    "plugin_name": "vanilla",
    "hadoop_version": "1.1.2",
    "cluster_template_id" : "64c4117b-acee-4da7-937b-cb964f0471a9",
    "user_keypair_id": "savanna",
    "default_image_id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9"
}

$ http $SAVANNA_URL/clusters X-Auth-Token:$AUTH_TOKEN < cluster_create.json
HTTP/1.1 202 ACCEPTED
Content-Length: 1153
Content-Type: application/json
Date: Fri, 09 Aug 2013 07:28:14 GMT

{
    "cluster": {
        "anti_affinity": [],
        "cluster_configs": {},
        "cluster_template_id": "64c4117b-acee-4da7-937b-cb964f0471a9",
        "created": "2013-08-09T07:28:14",
        "default_image_id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9",
        "hadoop_version": "1.1.2",
        "id": "d919f1db-522f-45ab-aadd-c078ba3bb4e3",
        "info": {},
        "name": "cluster-1",
        "node_groups": [
            {
                "count": 1,
                "created": "2013-08-09T07:28:14",
                "flavor_id": "2",
                "instances": [],
                "name": "master",
                "node_configs": {},
                "node_group_template_id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c",
                "node_processes": [
                    "jobtracker",
                    "namenode"
                ],
                "updated": "2013-08-09T07:28:14",
                "volume_mount_prefix": "/volumes/disk",
                "volumes_per_node": 0,
                "volumes_size": 10
            },
            {
                "count": 2,
                "created": "2013-08-09T07:28:14",
                "flavor_id": "2",
                "instances": [],
                "name": "workers",
                "node_configs": {},
                "node_group_template_id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6",
                "node_processes": [
                    "tasktracker",
                    "datanode"
                ],
                "updated": "2013-08-09T07:28:14",
                "volume_mount_prefix": "/volumes/disk",
                "volumes_per_node": 0,
                "volumes_size": 10
            }
        ],
        "plugin_name": "vanilla",
        "status": "Validating",
        "updated": "2013-08-09T07:28:14",
        "user_keypair_id": "savanna"
    }
}

After a while we can run nova command to check if the instances are created and running:

$ nova list
+--------------------------------------+-----------------------+--------+------------+-------------+----------------------------------+
| ID                                   | Name                  | Status | Task State | Power State | Networks                         |
+--------------------------------------+-----------------------+--------+------------+-------------+----------------------------------+
| 1a9f43bf-cddb-4556-877b-cc993730da88 | cluster-1-master-001  | ACTIVE | None       | Running     | private=10.0.0.2, 192.168.55.227 |
| bb55f881-1f96-4669-a94a-58cbf4d88f39 | cluster-1-workers-001 | ACTIVE | None       | Running     | private=10.0.0.3, 192.168.55.226 |
| 012a24e2-fa33-49f3-b051-9ee2690864df | cluster-1-workers-002 | ACTIVE | None       | Running     | private=10.0.0.4, 192.168.55.225 |
+--------------------------------------+-----------------------+--------+------------+-------------+----------------------------------+

Now we can login to the Hadoop master instance and run the required Hadoop commands:

$ ssh -i savanna.pem ubuntu@10.0.0.2
$ sudo chmod 777 /usr/share/hadoop
$ sudo su hadoop
$ cd /usr/share/hadoop
$ hadoop jar hadoop-example-1.1.2.jar pi 10 100

Savanna UI via Horizon

In order to create nodegroup templates, to create cluster template and to create the cluster itself we have used a command line tool – httpie – to send REST API calls. The same functionality is also availabe via Horizon, the standard OpenStack dashboard.

First we need to register the image with Savanna:

savanna-image-registry

Then we need to create the nodegroup templates:

savanna-noderoup-templateAfter that we have to create the cluster template:

savanna-cluster-template

And finally we have to create the cluster:

savann-cluster

Hadoop REST API – WebHDFS


Introduction

Hadoop provides a Java native API to support file system operations such as create, rename or delete files and directories, open, read or write files, set permissions, etc. A very basic example can be found on Apache wiki about how to read and write files from Hadoop.

This is great for applications running within the Hadoop cluster but there may be use cases where an external application needs to manipulate HDFS like it needs to create directories and write files to that directory or read the content of a file stored on HDFS. Hortonworks developed an additional API to support these requirements based on standard REST functionalities.

WebHDFS REST API

WebHDFS concept is based on HTTP operations like GET, PUT, POST and DELETE. Operations like OPEN, GETFILESTATUS, LISTSTATUS are using HTTP GET, others like CREATE, MKDIRS, RENAME, SETPERMISSIONS are relying on HTTP PUT. APPEND operations is based on HTTP POST, while DELETE is using HTTP DELETE.

Authentication can be based on user.name query parameter (as part of the HTTP query string) or if security is turned on then it relies on Kerberos.

The standard URL format is as follows: http://host:port/webhdfs/v1/?op=operation&user.name=username

In some cases namenode returns a URL using HTTP 307 Temporary Redirect mechanism with a location URL referring to the appropriate datanode. Then the client needs to follow that URL to execute the file operations on that particular datanode.

By default the namenode and datanode ports are 50070 and 50075, respectively, see more details about the default HDFS ports on Cloudera blog.

In order to configure WebHDFS, we need to hdfs-site.xml as follows:

        <property>
           <name>dfs.webhdfs.enabled</name>
           <value>true</value>
        </property>

WebHDFS examples

As the simplest approach, we can use curl to invoke WebHDFS REST API

1./ Check directory status

$ curl -i "http://localhost:50070/webhdfs/v1/tmp?user.name=istvan&op=GETFILESTATUS"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370210454798&s=zKjRgOMQ1Q3NB1kXqHJ6GPa6TlY=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

{"FileStatus":{"accessTime":0,"blockSize":0,"group":"supergroup","length":0,"modificationTime":1370174432465,"owner":"istvan","pathSuffix":"","permission":"755","replication":0,"type":"DIRECTORY"}}

This is similar to execute the Hadoop ls filesystem command:

$ bin/hadoop fs -ls /
Warning: $HADOOP_HOME is deprecated.

Found 1 items
drwxr-xr-x - istvan supergroup 0 2013-06-02 13:00 /tmp

2./ Create a directory

$ curl -i -X PUT "http://localhost:50070/webhdfs/v1/tmp/webhdfs?user.name=istvan&op=MKDIRS"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370210530831&s=YGwbkw0xRVpEAgbZpX7wlo56RMI=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

The equivalent Hadoop filesystem command is as follows:

$ bin/hadoop fs -ls /tmp
Warning: $HADOOP_HOME is deprecated.

Found 2 items
drwxr-xr-x   - istvan supergroup          0 2013-06-02 12:17 /tmp/hadoop-istvan
drwxr-xr-x   - istvan supergroup          0 2013-06-02 13:02 /tmp/webhdfs

3./ Create a file

To create a file requires two steps. First we need to run the command against the namenode then follows the redirection and execute the WebHDFS API against the appropriate datanode.

Step 1:

curl -i -X PUT "http://localhost:50070/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?user.name=istvan&op=CREATE"
HTTP/1.1 307 TEMPORARY_REDIRECT
Content-Type: application/octet-stream
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370210936666&s=BLAIjTpNwurdsgvFxNL3Zf4bzpg=";Path=/
Location: http://istvan-pc:50075/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=CREATE&user.name=istvan&overwrite=false
Content-Length: 0
Server: Jetty(6.1.26)

Step 2:

$ curl -i -T webhdfs-test.txt "http://istvan-pc:50075/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=CREATE&user.name=istvan&overwrite=false"
HTTP/1.1 100 Continue

HTTP/1.1 201 Created
Content-Type: application/octet-stream
Location: webhdfs://0.0.0.0:50070/tmp/webhdfs/webhdfs-test.txt
Content-Length: 0
Server: Jetty(6.1.26)

To validate the result of the WebHDFS API we can run the following Hadoop filesystem command:

$ bin/hadoop fs -ls /tmp/webhdfs
Warning: $HADOOP_HOME is deprecated.

Found 1 items
-rw-r--r--   1 istvan supergroup         20 2013-06-02 13:09 /tmp/webhdfs/webhdfs-test.txt

4./ Open and read a file

In this case we run curl with -L option to follow the HTTP temporary redirect URL.

$ curl -i -L "http://localhost:50070/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=OPEN&user.name=istvan"
HTTP/1.1 307 TEMPORARY_REDIRECT
Content-Type: application/octet-stream
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211032526&s=suBorvpvTUs6z/sw5n5PiZWsUnU=";Path=/
Location: http://istvan-pc:50075/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=OPEN&user.name=istvan&offset=0
Content-Length: 0
Server: Jetty(6.1.26)

HTTP/1.1 200 OK
Content-Type: application/octet-stream
Content-Length: 20
Server: Jetty(6.1.26)

Hadoop WebHDFS test

The corresponding Hadoop filesystem is as follows:

$ bin/hadoop fs -cat /tmp/webhdfs/webhdfs-test.txt
Warning: $HADOOP_HOME is deprecated.

Hadoop WebHDFS test

5./ Rename a directory

$ curl -i -X PUT "http://localhost:50070/webhdfs/v1/tmp/webhdfs?op=RENAME&user.name=istvan&destination=/tmp/webhdfs-new"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211103159&s=Gq/EBWZTBaoMk0tkGoodV+gU6jc=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

To validate the result we can run the following Hadoop filesystem command:

$ bin/hadoop fs -ls /tmp
Warning: $HADOOP_HOME is deprecated.

Found 2 items
drwxr-xr-x   - istvan supergroup          0 2013-06-02 12:17 /tmp/hadoop-istvan
drwxr-xr-x   - istvan supergroup          0 2013-06-02 13:09 /tmp/webhdfs-new

6./ Delete a directory

This scenario results in an exception if the directory is not empty since a non-empty directory cannot be deleted.

$ curl -i -X DELETE "http://localhost:50070/webhdfs/v1/tmp/webhdfs-new?op=DELETE&user.name=istvan"
HTTP/1.1 403 Forbidden
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211266383&s=QFIJMWsy61vygFExl91Sgg5ME/Q=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

{"RemoteException":{"exception":"IOException","javaClassName":"java.io.IOException","message":"/tmp/webhdfs-new is non empty"}}

First the file in the directory needs to be deleted and then the empty directory can be deleted, too.

$ curl -i -X DELETE "http://localhost:50070/webhdfs/v1/tmp/webhdfs-new/webhdfs-test.txt?op=DELETE&user.name=istvan"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211375617&s=cG6727hbqGkrk/GO4yNRiZw4QxQ=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

$ bin/hadoop fs -ls /tmp/webhdfs-newWarning: $HADOOP_HOME is deprecated.

$ curl -i -X DELETE "http://localhost:50070/webhdfs/v1/tmp/webhdfs-new?op=DELETE&user.name=istvan&destination=/tmp/webhdfs-new"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211495893&s=hZcZFDOL0x7exEhn14RlMgF4a/c=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

$ bin/hadoop fs -ls /tmpWarning: $HADOOP_HOME is deprecated.

Found 1 items
drwxr-xr-x   - istvan supergroup          0 2013-06-02 12:17 /tmp/hadoop-istvan

Conclusion

WebHDFS provides a simple, standard way to execute Hadoop filesystem operations by an external client that does not necessarily run on the Hadoop cluster itself. The requirement for WebHDFS is that the client needs to have a direct connection to namenode and datanodes via the predefined ports. Hadoop HDFS over HTTP – that was inspired by HDFS Proxy – addresses these limitations by providing a proxy layer based on preconfigured Tomcat bundle; it is interoperable with WebHDFS API but does not require the firewall ports to be open for the client.

Mahout on Windows Azure – Machine Learning Using Microsoft HDInsight


Introduction

Our last post was about Microsoft and Hortonworks joint effort to deliver Hadoop on Microsoft Windows Azure dubbed HDInsight. One of the key Microsoft HDInsight components is Mahout, a scalable machine learning library that provides a number of algorithms relying on the Hadoop platform. Machine learning supports a wide range of use cases from email spam filtering to fraud detection to recommending books or movies, similar to Amazon.com features.These algorithms can be divided into three main categories: recommenders/collaborative filtering, categorization and clustering. More details about these algorithms can be read on Apache Mahout wiki.

Recommendation engine on Azure

A standard recommender example in machine learning is a movie recommender. Surprisingly enough this example is not among the provided HDInsight examples so we need to implement it on our own using mahout-0.5 components.

The movie recommender is an item based recommendation algorithm: the key concept is that having a large dataset of users, movies and values indicating how much a user liked that particular movie, the algorithm will recommend movies to the users. A commonly used dataset for movie recommendation is from GroupLens.

The downloadable file that is part of the 100K dataset (u.data) is not suitable for Mahout as is because its format is like:

user item value timestamp
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
....

Mahout requires the data to be in the following format: userid,itemid,value
so the content has to be converted to

196,242,3
186,302,3
22,377,1
....

There is no web based console to execute Mahout on Azure, we need to go to Remote Desktop to download the RDP configuration and then login to Azure headnode via RDP. Then we have to run Hadoop command line to get a prompt.

c:>c:\apps\dist\hadoop-1.1.0-SNAPSHOT\bin\hadoop jar c:\apps\dist\mahout-0.5\mahout-examples-0.5-job.jar org.apache.mahout.driver.MahoutDriver recommenditembased --input recommend.csv --output recommendout --tempDir recommendtmp --usersFile user-ids.txt --similarityClassname SIMILARITY_EUCLIDEAN_DISTANCE --numRecommendations 5

The standard mahout.cmd seems to have a few bugs, if we run mahout.cmd then it will throw an error complaining about java usage. I had to modify the file to remove setting HADOOP_CLASSPATH envrionment variable, see the changes in bold-italic:

@rem run it
if not [%MAHOUT_LOCAL%] == [] (
    echo "MAHOUT_LOCAL is set, running locally"
    %JAVA% %JAVA_HEAP_MAX% %MAHOUT_OPTS% -classpath %MAHOUT_CLASSPATH% %CLASS% %*
) else (
    if [%MAHOUT_JOB%] == [] (
        echo "ERROR: Could not find mahout-examples-*.job in %MAHOUT_HOME% or %MAHOUT_HOME%\examples\target"
        goto :eof
    ) else (
@rem  set HADOOP_CLASSPATH=%MAHOUT_CLASSPATH%
        if /I [%1] == [hadoop] (
            echo Running: %HADOOP_HOME%\bin\%*
            call %HADOOP_HOME%\bin\%*
        ) else (
            echo Running: %HADOOP_HOME%\bin\hadoop jar %MAHOUT_JOB% %CLASS% %*
            call %HADOOP_HOME%\bin\hadoop jar %MAHOUT_JOB% %CLASS% %*
        )
    )
)

After this change we can run mahout as expected:

c:\apps\dist\mahout-0.5\bin>mahout.cmd recommenditembased --input recommend.csv --output recommendout --tempDir recommendtmp --usersFile user-ids.txt --similarityClassname SIMILARITY_EUCLIDEAN_DISTANCE --numRecommendations 5

Input argument defines the path to the input directory, output argument determines the path to output directory.

The numRecommendations means the number of recommendations per user.

The usersFile defines the users to recommend for (in our case it contained 3 users only, 112, 286, 310:

c:>hadoop fs -cat user-ids.txt
112
286
301

The similarityClass is the name of the distributed similarity class and it can be SIMILARITY_EUCLIDEAN_DISTANCE,  SIMILARITY_LOGLIKELIHOOD, SIMILARITY_PEARSON_CORRELATION, etc. This class determine the algorithm to calculate similarities between the items.

The execution of MapReduce tasks can be monitored via Hadoop MapReduce admin console:

mahout-recommender-console

mahout-recommender-console1

Once the job is finished, we need to use hadoop filesystem commands to display the output file produced by the RecommenderJob:

c:\apps\dist\hadoop-1.1.0-SNAPSHOT>hadoop fs -ls .
Found 5 items
drwxr-xr-x   - istvan supergroup          0 2012-12-21 11:00 /user/istvan/.Trash

-rw-r--r--   3 istvan supergroup    1079173 2012-12-23 22:40 /user/istvan/recomm
end.csv
drwxr-xr-x   - istvan supergroup          0 2012-12-24 12:24 /user/istvan/recomm
endout
drwxr-xr-x   - istvan supergroup          0 2012-12-24 12:22 /user/istvan/recomm
endtmp
-rw-r--r--   3 istvan supergroup         15 2012-12-23 22:40 /user/istvan/user-i
ds.txt

c:\apps\dist\hadoop-1.1.0-SNAPSHOT>hadoop fs -ls recommendout
Found 3 items
-rw-r--r--   3 istvan supergroup          0 2012-12-24 12:24 /user/istvan/recomm
endout/_SUCCESS
drwxr-xr-x   - istvan supergroup          0 2012-12-24 12:23 /user/istvan/recomm
endout/_logs
-rw-r--r--   3 istvan supergroup        153 2012-12-24 12:24 /user/istvan/recomm
endout/part-r-00000

c:\apps\dist\hadoop-1.1.0-SNAPSHOT>hadoop fs -cat recommendout/part-r*
112     [1228:5.0,1473:5.0,1612:5.0,1624:5.0,1602:5.0]
286     [1620:5.0,1617:5.0,1615:5.0,1612:5.0,1611:5.0]
301     [1620:5.0,1607:5.0,1534:5.0,1514:5.0,1503:5.0]

Thus the RecommenderJob recommends item 1228, 1473, 1612, 1624 and 1602 to user 112; item 1620, 1617, 1615, 1612 and 1611 for user 286 and 1620, 1607, 1534, 1514 and 1503 for user 301, respectively.

For those inclined to theory and scientific papers, I suggest to read the paper from Sarwar, Karypis, Konstand and Riedl that provides the background of the item based recommendation algorithms.

Mahout examples on Azure

Hadoop on Azure comes with two predefined examples: one for classification, one for clustering. They require command line to be executed – a smilar way as described above for the item based recommendation engine.

mahout

The classification demo is based on naive Bayes classifier- first you need to train your classifier with a set of known data and then you can run the algorithm on the actual data set. This concept is called supervised learning.

To run the classification example we need to download the 20news-bydate.tar.gz file from http://people.csail.mit.edu/jrennie/20Newsgroups/20news-bydate.tar.gz and unzip it under mahout-0.5/examples/bin/work directory. The data set has two subsets, one for training the classifier and the other one to run the test. Then we can run the command:

c:\apps\dist\mahout-0.5\examples\bin> build-20news-bayes.cmd

This will kick off the Hadoop MapReduce job and after a while it will spit out the confusion matrix based on Bayes algorithm. The confusion matrix will tell us what categories were correctly identified by the classifier and what were incorrect.

For instance, it has a category called rec.motorcycles (column a), and the classifier correctly identified 381 items out of 398 belonging to this cathegory, while it defined 9 items incorrectly as belonging to rec.autos (column f), 2 items incorrectly as belonging to sci.electronics (column n), etc.

WORK_PATH=c:\apps\dist\mahout-0.5\examples\bin\work\20news-bydate\
Running: c:\apps\dist\hadoop-1.1.0-SNAPSHOT\bin\hadoop jar c:\apps\dist\mahout-0
.5\bin\..\\mahout-examples-0.5-job.jar org.apache.mahout.driver.MahoutDriver tes
tclassifier   -m examples/bin/work/20news-bydate/bayes-model   -d examples/bin/w
ork/20news-bydate/bayes-test-input   -type bayes   -ng 1   -source hdfs   -metho
d "mapreduce"

12/12/24 17:55:58 INFO mapred.JobClient:     Map output records=7532
12/12/24 17:55:59 INFO bayes.BayesClassifierDriver: ============================
===========================
Confusion Matrix
-------------------------------------------------------
a       b       c       d       e       f       g       h       i       j
k       l       m       n       o       p       q       r       s       t
u       <--Classified as
381     0       0       0       0       9       1       0       0       0
1       0       0       2       0       1       0       0       3       0
0        |  398         a     = rec.motorcycles
1       284     0       0       0       0       1       0       6       3
11      0       66      3       0       1       6       0       4       9
0        |  395         b     = comp.windows.x
2       0       339     2       0       3       5       1       0       0
0       0       1       1       12      1       7       0       2       0
0        |  376         c     = talk.politics.mideast
4       0       1       327     0       2       2       0       0       2
1       1       0       5       1       4       12      0       2       0
0        |  364         d     = talk.politics.guns
7       0       4       32      27      7       7       2       0       12
0       0       6       0       100     9       7       31      0       0
0        |  251         e     = talk.religion.misc
10      0       0       0       0       359     2       2       0       1
3       0       1       6       0       1       0       0       11      0
0        |  396         f     = rec.autos
0       0       0       0       0       1       383     9       1       0
0       0       0       0       0       0       0       0       3       0
0        |  397         g     = rec.sport.baseball
1       0       0       0       0       0       9       382     0       0
0       0       1       1       1       0       2       0       2       0
0        |  399         h     = rec.sport.hockey
2       0       0       0       0       4       3       0       330     4
4       0       5       12      0       0       2       0       12      7
0        |  385         i     = comp.sys.mac.hardware
0       3       0       0       0       0       1       0       0       368
0       0       10      4       1       3       2       0       2       0
0        |  394         j     = sci.space
0       0       0       0       0       3       1       0       27      2
291     0       11      25      0       0       1       0       13      18
0        |  392         k     = comp.sys.ibm.pc.hardware
8       0       1       109     0       6       11      4       1       18
0       98      1       3       11      10      27      1       1       0
0        |  310         l     = talk.politics.misc
0       11      0       0       0       3       6       0       10      6
11      0       299     13      0       2       13      0       7       8
0        |  389         m     = comp.graphics
6       0       1       0       0       4       2       0       5       2
12      0       8       321     0       4       14      0       8       6
0        |  393         n     = sci.electronics
2       0       0       0       0       0       4       1       0       3
1       0       3       1       372     6       0       2       1       2
0        |  398         o     = soc.religion.christian
4       0       0       1       0       2       3       3       0       4
2       0       7       12      6       342     1       0       9       0
0        |  396         p     = sci.med
0       1       0       1       0       1       4       0       3       0
1       0       8       4       0       2       369     0       1       1
0        |  396         q     = sci.crypt
10      0       4       10      1       5       6       2       2       6
2       0       2       1       86      15      14      152     0       1
0        |  319         r     = alt.atheism
4       0       0       0       0       9       1       1       8       1
12      0       3       6       0       2       0       0       341     2
0        |  390         s     = misc.forsale
8       5       0       0       0       1       6       0       8       5
50      0       40      2       1       0       9       0       3       256
0        |  394         t     = comp.os.ms-windows.misc
0       0       0       0       0       0       0       0       0       0
0       0       0       0       0       0       0       0       0       0
0        |  0           u     = unknown
Default Category: unknown: 20

12/12/24 17:55:59 INFO driver.MahoutDriver: Program took 129826 ms

c:\apps\dist\mahout-0.5\examples\bin

Again for those interested in theory and scientific papers, I suggest to read the following webpage.

Microsoft and Hadoop – Windows Azure HDInsight


Introduction

Traditionally Microsoft Windows used to be a sort of stepchild in Hadoop world – the ‘hadoop’ command to manage actions from command line and the startup/shutdown scripts were written in Linux/*nix in mind assuming bash. Thus if you wanted to run Hadoop on Windows, you had to install cygwin. Also Apache Hadoop document states the following (quotes from Hadoop R1.1.0 documentation):
“•GNU/Linux is supported as a development and production platform. Hadoop has been demonstrated on GNU/Linux clusters with 2000 nodes
•Win32 is supported as a development platform. Distributed operation has not been well tested on Win32, so it is not supported as a production platform.”

Microsoft and Hortonworks joined their forces to make Hadoop available on Windows Server for on-premise deployments as well as on Windows Azure to support big data in the cloud, too.

This post covers Windows Azure HDInsight (Hadoop on Azure, see https://www.hadooponazure.com) . As of writing, the service requires an invitation to participate in the CTP (Community Technology Preview) but the invitation process is very efficiently managed – after filling in the survey, I received the service access code within a couple of days.

New Cluster Request

The first step is to request a new cluster, you need to define the cluster name and the credentials to be able to login to the headnode. By default the cluster consists of 3 nodes.

After a few minutes, you will have a running cluster,  then click on the “Go to Cluster” link to navigate to the main page.

WordCount with HDInsight on  Azure

No Hadoop test is complete without the standard WordCount application – Microsoft Azure HDInsight provides an example file (davinci.txt) and the Java jar file to run wordcount  – the Hello World of Hadoop.

First you need to go to the JavaScript console to upload the text file using fs.put():

js> fs.put()

Choose File ->  Browse
Destination: /user/istvan/example/data/davinci

Create a Job:

CreateJob

The actual command that Microsoft Azure HDInsight executes is as follows:

c:\apps\dist\hadoop-1.1.0-SNAPSHOT\bin\hadoop.cmd jar c:\apps\Jobs\templates\634898986181212311.hadoop-examples-1.1.0-SNAPSHOT.jar wordcount /user/istvan/example/data/davinci davinci-output

You can validate the output from JavaScript console:

js> result = fs.read("davinci-output")
"(Lo)cra"	1
"1490	1
"1498,"	1
"35"	1
"40,"	1
"AS-IS".	1
"A_	1
"Absoluti	1
"Alack!	1

Microsoft HDInsight Streaming – Hadoop job in C#

Hadoop Streaming is a utility to support running external map and reduce jobs. These external jobs can be written in various programming languages such as Python or Ruby – should we talk about Microsoft HDInsight, the example better be based on .NET C#…

The demo application for C# streaming is again a wordcount example using the imitation of Unix cat and wc commands. You could run the demo from the “Samples” tile but I prefer to demonstrate Hadoop Streaming from the command line to have a closer look at what is going on under the hood.

In order to run Hadoop command line from Windows cmd prompt, you need to login to the HDInsight headnode using Remote Desktop. First you need to click on “Remote Desktop” tile, then login the remote node using the credentials you defined at cluster creation time. Once you logged in, click on Hadoop Coomand Line shortcut.

In Hadoop Command Line, go to the Hadoop distribution directory (As of writing this post, Microsoft Azure HDInsight is based on Hadoop 1.1.0):

c:> cd \apps\dist
c:> hadoop fs -get /example/apps/wc.exe .
c:> hadoop fs -get /example/apps/cat.exe .
c:> cd \apps\dist\hadoop-1.1.0-SNAPSHOT
c:\apps\dist\hadoop-1.1.0-SNAPSHOT> hadoop jar lib\hadoop-streaming.jar -input "/user/istvan/example/data/davinci" -output "/user/istvan/example/dataoutput" -mapper "..\..\jars\cat.exe" -reducer "..\..\jars\wc.exe" -file "c:\Apps\dist\wc.exe" -file "c:\Apps\dist\cat.exe"

The C# code for wc.exe is as follows:

using System;
using System.IO;
using System.Linq;

namespace wc
{
    class wc
    {
        static void Main(string[] args)
        {
            string line;
            var count = 0;

            if (args.Length > 0){
                Console.SetIn(new StreamReader(args[0]));
            }

            while ((line = Console.ReadLine()) != null) {
                count += line.Count(cr => (cr == ' ' || cr == '\n'));
            }
            Console.WriteLine(count);
        }
    }
}

And the code for cat.exe is:

using System;
using System.IO;

namespace cat
{
    class cat
    {
        static void Main(string[] args)
        {
            if (args.Length > 0)
            {
                Console.SetIn(new StreamReader(args[0])); 
            }

            string line;
            while ((line = Console.ReadLine()) != null) 
            {
                Console.WriteLine(line);
            }

        }

    }
}

Interactive console

Microsoft Azure HDInsight comes with two types of interactive console: one is the standard Hadoop Hive console, the other one is unique in Hadoop world, it is based on JavaScript.

Let us start with Hive. You need to upload your data using the javascript fs.put() method as described above. Then you can create your Hive table and run a select query as follows :

CREATE TABLE stockprice (yyyymmdd STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, stock_volume INT, adjclose_price FLOAT)
row format delimited fields terminated by ',' lines terminated by '\n' location '/user/istvan/input/';

select yyyymmdd, high_price, stock_volume from stockprice order by high_price desc;

InteractiveHive

InteractiveHive-Select

The other flavor of HDInsight interactive console is based on JavaScript – as said before, this is a unique offering from Microsoft – in fact, the JavaScript commands are converted to Pig statements.

JavascriptConsole

The syntax resembles a kind of LINQ style query, though not the same:

js> pig.from("/user/istvan/input/goog_stock.csv", "date,open,high,low,close,volume,adjclose", ",").select("date, high, volume").orderBy("high DESC").to("result")

js> result = fs.read("result")
05/10/2012	774.38	2735900
04/10/2012	769.89	2454200
02/10/2012	765.99	2790200
01/10/2012	765	3168000
25/09/2012	764.89	6058500

Under the hood

Microsoft and Hortonworks have re-implemented the key binaries (namenode, jobtracker, secondarynamenode, datanode, tasktracker) as executables (exe files) and they are running as services in the background. The key ‘hadoop’ command – which is traditionally a bash script – is also re-implemented as hadoop.cmd.

The distribution consists of Hadoop 1.1.0, Pig-0.9.3, Hive 0.9.0, Mahout 0.5 and Sqoop 1.4.2.

Cloudera Impala – Fast, Interactive Queries with Hadoop


Introduction

As discussed in the previous post about Twitter’s Storm, Hadoop is a batch oriented solution that has a lack of support for ad-hoc, real-time queries. Many of the players in Big Data have realised the need for fast, interactive queries besides the traditional Hadooop approach. Cloudera, one the key solution vendors in Big Data/Hadoop domain has just recently launched Cloudera Impala that addresses this gap.

As Cloudera Engineering team descibed in ther blog, their work was inspired by Google Dremel paper which is also the basis for Google BigQuery. Cloudera Impala provides a HiveQL-like query language for wide variety of SELECT statements with WHERE, GROUP BY, HAVING clauses, with ORDER BY – though currently LIMIT is mandatory with ORDER BY -, joins (LEFT, RIGTH, FULL, OUTER, INNER), UNION ALL, external tables,  etc. It also supports arithmetic and logical operators and Hive built-in functions such as COUNT, SUM, LIKE, IN or BETWEEN. It can access data stored on HDFS but it does not use mapreduce, instead it is based on its own distributed query engine.

The current Impala release (Impala 1.0beta) does not support  DDL statements (CREATE, ALTER, DROP TABLE), all the table creation/modification/deletion functions have to be executed via Hive and then refreshed in Impala shell.

Cloudera Impala is open-source under Apache Licence, the code can be retrieved from Github. Its components are written in C++, Java and Python.

Cloudera Impala Architecture

Cloudera Impala has 3 key components: impalad, impala-state-store and impala-shell.

Impala shell is essentially a short shell script which starts the impala-shell.py python program to run the queries.

SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
SHELL_HOME=${IMPALA_SHELL_HOME:-${SCRIPT_DIR}}

PYTHONPATH="${SHELL_HOME}/gen-py:${SHELL_HOME}/lib:${PYTHONPATH}" \
  python ${SHELL_HOME}/impala_shell.py "$@"

Impalad is running on each Hadoop datanode and and it plans and executes the queries sent from impala-shell.

Impala-state-store stores information (location and status) about all the running impalad instances.

Installing Cloudera Impala

As of writing this article, Cloudera Impala requires 64-bit RHEL/Centos 6.2 or higher. I was running the tests on RHEL6.3 (64-bit) on AWS. You need to have Cloudera CDH4.1, Hive and Mysql installed (the latter is used to store Hive metastore).

Note: AWS t1.micro instances are not suitable for CDH4.1, that requires more memory than t1.micro provides.

Cloudera recommends to use Cloudera Manager to install Impala but I used manual steps, just to ensure that I have a complete understanding of what is going on during the installation.

Step 1: Install CDH4.1

To install CDH4.1 you need to run the following commands (these steps describe how to install Hadoop MRv1 – if you want to have YARN instead, that requires another MapReduce rpms to be installed. However, Cloudera stated in the install instructions that they do not consider MapReduce 2.0 (YARN) production-ready yet thus I decided to stick with MRv1 for these tests. CDH4.1 MRV1 can be installed as a pseudo distribution or a full cluster solution, for the tests we will see pseudo distribution:

$ sudo yum --nogpgcheck localinstall cloudera-cdh-4-0.noarch.rpm
$  sudo rpm --import http://archive.cloudera.com/cdh4/redhat/6/x86_64/cdh/RPM-GPG-KEY-cloudera
$sudo yum install hadoop-0.20-conf-pseudo

This will install Haddop-0.20-MapReduce (jobtracker, tasktracker) and Hadoop HDFS (namenode, secondarynamenode, datanode) as dependencies, too.

After the packages are installed, you need to execute the following command in order to setup Hadoop pseudo-distributed mode properly:

# Format namenode
$ sudo -u hdfs hdfs namenode -format

# Start HDFS daemons
$ for service in /etc/init.d/hadoop-hdfs-*
$ do
$  sudo $service start
$ done

# Create /tmp dir
$ sudo -u hdfs hadoop fs -mkdir /tmp
$ sudo -u hdfs hadoop fs -chmod -R 1777 /tmp

# Create system dirs
$ sudo -u hdfs hadoop fs -mkdir /var
$ sudo -u hdfs hadoop fs -mkdir /var/lib
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred/mapred
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
$ sudo -u hdfs hadoop fs -chmod 1777 /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
$ sudo -u hdfs hadoop fs -chown -R mapred /var/lib/hadoop-hdfs/cache/mapred

# Create user dirs
$ sudo -u hdfs hadoop fs -mkdir  /user/istvan
$ sudo -u hdfs hadoop fs -chown istvan /user/istvan

# Start MapReduce
$ for service in /etc/init.d/hadoop-0.20-mapreduce-*
$ do
$  sudo $service start
$ done

Step 2: Install MySQL (used by Hive as metastore)

$ sudo yum install mysql-server

Step 3: Install Hive

$ sudo yum install hive

Step 4: Configure Hive to use MySQL as metastore
To configure Hive and MySQL to work together you need to install an MysQL connector:

$ curl -L 'http://www.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.22.tar.gz/from/http://mysql.he.net/' | tar xz
$ sudo cp mysql-connector-java-5.1.22/mysql-connector-java-5.1.22-bin.jar /usr/lib/hive/lib/

Then create the Hive metastore database

$ mysql -u root -p
mysql> CREATE DATABASE metastore;
mysql> USE metastore;
mysql> SOURCE /usr/lib/hive/scripts/metastore/upgrade/mysql/hive-schema-0.9.0.mysql.sql;

and hive user:

mysql> CREATE USER 'hive'@'localhost' IDENTIFIED BY 'hive';
mysql> GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'localhost' WITH GRANT OPTION;
mysql> FLUSH PRIVILEGES;
mysql> quit;

To configure Hive to use MySQL as local metatstore you need to modify hive-site.xml as follows:

  
<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://myhost/metastore</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>hive</value>
</property>

<property>
  <name>hive.metastore.local</name>
  <value>true</value>
</property>

Step 5: Install Impala and Impala Shell

$ sudo yum install impala
$ sudo yum install impala-shell

If you follow the manual installation procedure, there will be no impala config files created automatically. You need to create /usr/lib/impala/conf directory and copy the following file into it: core-site.xml, hdfs-site.xml, hive-site.xml and log4j.properties.

$ ls -ltr
total 16
-rw-r--r--. 1 root root 2094 Nov  4 16:37 hive-site.xml
-rw-r--r--. 1 root root 2988 Nov  4 16:43 impala-log4j.properties
-rwxr-xr-x. 1 root root 2052 Nov  4 16:58 hdfs-site.xml
-rwxr-xr-x. 1 root root 1701 Nov  9 16:53 core-site.xml

Configure Impala

In order to support direct reads for Impala you need to configure the following Hadoop properties:

core-site.xml

<property>
  <name>dfs.client.read.shortcircuit</name>
  <value>true</value>
</property&gt

<property>
  <name>dfsclient.read.shortcircuit.skip.checksum</name>
  <value>false</value>
</property&gt

hdfs-site.xml:

<property>
  <name>dfs.datanode.data.dir.perm</name>
  <value>755</value>
</property>

<property>
  <name>dfs.block.local-path-access.user</name>
  <value>impala,mapred,istvan</value>
</property>

You can also enable data locality tracking:

<property>
  <name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
  <value>true</value>
</property>

Step 6: Startup Impala daemons
Login as impala user and execute the following commands:

$ GVLOG_v=1 nohup /usr/bin/impala-state-store -state_store_port=24000 &
$ GVLOG_v=1 nohup /usr/bin/impalad -state_store_host=localhost -nn=localhost -nn_port=8020 -hostname=localhost -ipaddress=127.0.0.1 &

Run Impala queries

In order to run Impala interactive queries from impala shell, we need to create the tables via Hive (remember, the current Impala beta version does not support DDLs). I used Google stockprices in this example (retrieved from http://finance.yahoo.com in csv format):

hive> CREATE EXTERNAL TABLE stockprice
    > (yyyymmdd STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT,
    >  close_price FLOAT, stock_volume INT, adjclose_price FLOAT)
    >  ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > LINES TERMINATED BY '\n'
    > LOCATION '/user/istvan/input/';
OK
Time taken: 14.499 seconds
hive> show tables;
OK
stockprice
Time taken: 16.04 seconds
hive> DESCRIBE stockprice;
OK
yyyymmdd        string
open_price      float
high_price      float
low_price       float
close_price     float
stock_volume    int
adjclose_price  float
Time taken: 0.583 seconds

A similar table – not external one this time – can be created and loaded from hive, too:

hive> CREATE TABLE imp_test
    > (yyyymmdd STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT,
    > close_price FLOAT, stock_volume INT, adjclose_price FLOAT)
    > ;
OK
Time taken: 0.542 seconds
hive> LOAD DATA LOCAL INPATH '/home/istvan/goog_stock.csv' OVERWRITE INTO TABLE imp_test;
Copying data from file:/home/istvan/goog_stock.csv
Copying file: file:/home/istvan/goog_stock.csv
Loading data to table default.imp_test
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /user/hive/warehouse/imp_test
O
Time taken: 1.903 seconds

Then you can run a Hive query to retrieve the top 5 highest prices. As you can see, Hive will initiate a MapReduce job under the hood:

hive> select * from stockprice order by high_price desc;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=
In order to set a constant number of reducers:
  set mapred.reduce.tasks=
Starting Job = job_201211110837_0006, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201211110837_0006
Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201211110837_0006
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2012-11-11 13:16:30,895 Stage-1 map = 0%,  reduce = 0%
2012-11-11 13:16:46,554 Stage-1 map = 100%,  reduce = 0%
2012-11-11 13:17:05,918 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
2012-11-11 13:17:07,061 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
2012-11-11 13:17:08,243 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
....
2012-11-11 13:17:17,274 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
2012-11-11 13:17:18,334 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
MapReduce Total cumulative CPU time: 6 seconds 200 msec
Ended Job = job_201211110837_0006
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 6.2 sec   HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 200 msec
OK
05/10/2012770.71774.38765.01767.652735900767.65
04/10/2012762.75769.89759.4768.052454200768.05
02/10/2012765.2765.99750.27756.992790200756.99
01/10/2012759.05765.0756.21761.783168000761.78
25/09/2012753.05764.89747.66749.166058500749.16
03/10/2012755.72763.92752.2762.52208300762.5
08/10/2012761.0763.58754.15757.841958600757.84
27/09/2012759.95762.84751.65756.53931100756.5
09/10/2012759.67761.32742.53744.093003200744.09
26/09/2012749.85761.24741.0753.465672900753.46
...
Time taken: 82.09 seconds

Running the very same query from impala-shell executes significantly faster. Cloudera claim that it can be executed an order of magnitude or even faster, depending on the query. I can confirm from my experiences that impala-shell returned the result as an average around in one second, compared to the Hive version which took roughly 82 seconds.

hive> [istvan@ip-10-227-137-76 ~]$ impala-shell
Welcome to the Impala shell. Press TAB twice to see a list of available commands.

Copyright (c) 2012 Cloudera, Inc. All rights reserved.
(Build version: Impala v0.1 (1fafe67) built on Mon Oct 22 13:06:45 PDT 2012)
[Not connected] > connect localhost:21000
[localhost:21000] > select * from stockprice order by high_price desc limit 20000;
05/10/2012770.7100219726562774.3800048828125765.010009765625767.65002441406252735900767.6500244140625
04/10/2012762.75769.8900146484375759.4000244140625768.04998779296882454200768.0499877929688
02/10/2012765.2000122070312765.989990234375750.27001953125756.9899902343752790200756.989990234375
01/10/2012759.0499877929688765756.2100219726562761.7800292968753168000761.780029296875
25/09/2012753.0499877929688764.8900146484375747.6599731445312749.15997314453126058500749.1599731445312
03/10/2012755.719970703125763.9199829101562752.2000122070312762.52208300762.5
08/10/2012761763.5800170898438754.1500244140625757.84002685546881958600757.8400268554688
27/09/2012759.9500122070312762.8400268554688751.6500244140625756.53931100756.5
09/10/2012759.6699829101562761.3200073242188742.530029296875744.09002685546883003200744.0900268554688
26/09/2012749.8499755859375761.239990234375741753.46002197265625672900753.4600219726562
18/10/2012755.5399780273438759.419982910156267669512430200695
...

If you use DDL commands from Hive (e.g. create table) then you need to run refresh command in impala-shell to reflect the changes.:

[localhost:21000] > refresh
Successfully refreshed catalog
[localhost:21000] > describe stockprice
yyyymmdd        string
open_price      float
high_price      float
low_price       float
close_price     float
stock_volume    int
adjclose_price  float
[localhost:21000] > show tables;
imp_test
stockprice
[localhost:21000] >

Conclusion

There are more and more efforts in the Big Data world to support ad-hoc, fast queries and realtime data processing for large datasets. Cloudera Impala is certainly an exciting solution that is utilising the same concept as Google BigQuery but promises to support wider range of input formats and by making it available as an open source technology it can attract external developers to improve the software and take it to the next stage.

If you are interested to learn more about Impala, please, check out our book, Impala in Action at Manning Publishing.