Cloudera Impala – Fast, Interactive Queries with Hadoop


Introduction

As discussed in the previous post about Twitter’s Storm, Hadoop is a batch oriented solution that has a lack of support for ad-hoc, real-time queries. Many of the players in Big Data have realised the need for fast, interactive queries besides the traditional Hadooop approach. Cloudera, one the key solution vendors in Big Data/Hadoop domain has just recently launched Cloudera Impala that addresses this gap.

As Cloudera Engineering team descibed in ther blog, their work was inspired by Google Dremel paper which is also the basis for Google BigQuery. Cloudera Impala provides a HiveQL-like query language for wide variety of SELECT statements with WHERE, GROUP BY, HAVING clauses, with ORDER BY – though currently LIMIT is mandatory with ORDER BY -, joins (LEFT, RIGTH, FULL, OUTER, INNER), UNION ALL, external tables,  etc. It also supports arithmetic and logical operators and Hive built-in functions such as COUNT, SUM, LIKE, IN or BETWEEN. It can access data stored on HDFS but it does not use mapreduce, instead it is based on its own distributed query engine.

The current Impala release (Impala 1.0beta) does not support  DDL statements (CREATE, ALTER, DROP TABLE), all the table creation/modification/deletion functions have to be executed via Hive and then refreshed in Impala shell.

Cloudera Impala is open-source under Apache Licence, the code can be retrieved from Github. Its components are written in C++, Java and Python.

Cloudera Impala Architecture

Cloudera Impala has 3 key components: impalad, impala-state-store and impala-shell.

Impala shell is essentially a short shell script which starts the impala-shell.py python program to run the queries.

SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
SHELL_HOME=${IMPALA_SHELL_HOME:-${SCRIPT_DIR}}

PYTHONPATH="${SHELL_HOME}/gen-py:${SHELL_HOME}/lib:${PYTHONPATH}" \
  python ${SHELL_HOME}/impala_shell.py "$@"

Impalad is running on each Hadoop datanode and and it plans and executes the queries sent from impala-shell.

Impala-state-store stores information (location and status) about all the running impalad instances.

Installing Cloudera Impala

As of writing this article, Cloudera Impala requires 64-bit RHEL/Centos 6.2 or higher. I was running the tests on RHEL6.3 (64-bit) on AWS. You need to have Cloudera CDH4.1, Hive and Mysql installed (the latter is used to store Hive metastore).

Note: AWS t1.micro instances are not suitable for CDH4.1, that requires more memory than t1.micro provides.

Cloudera recommends to use Cloudera Manager to install Impala but I used manual steps, just to ensure that I have a complete understanding of what is going on during the installation.

Step 1: Install CDH4.1

To install CDH4.1 you need to run the following commands (these steps describe how to install Hadoop MRv1 – if you want to have YARN instead, that requires another MapReduce rpms to be installed. However, Cloudera stated in the install instructions that they do not consider MapReduce 2.0 (YARN) production-ready yet thus I decided to stick with MRv1 for these tests. CDH4.1 MRV1 can be installed as a pseudo distribution or a full cluster solution, for the tests we will see pseudo distribution:

$ sudo yum --nogpgcheck localinstall cloudera-cdh-4-0.noarch.rpm
$  sudo rpm --import http://archive.cloudera.com/cdh4/redhat/6/x86_64/cdh/RPM-GPG-KEY-cloudera
$sudo yum install hadoop-0.20-conf-pseudo

This will install Haddop-0.20-MapReduce (jobtracker, tasktracker) and Hadoop HDFS (namenode, secondarynamenode, datanode) as dependencies, too.

After the packages are installed, you need to execute the following command in order to setup Hadoop pseudo-distributed mode properly:

# Format namenode
$ sudo -u hdfs hdfs namenode -format

# Start HDFS daemons
$ for service in /etc/init.d/hadoop-hdfs-*
$ do
$  sudo $service start
$ done

# Create /tmp dir
$ sudo -u hdfs hadoop fs -mkdir /tmp
$ sudo -u hdfs hadoop fs -chmod -R 1777 /tmp

# Create system dirs
$ sudo -u hdfs hadoop fs -mkdir /var
$ sudo -u hdfs hadoop fs -mkdir /var/lib
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred/mapred
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
$ sudo -u hdfs hadoop fs -chmod 1777 /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
$ sudo -u hdfs hadoop fs -chown -R mapred /var/lib/hadoop-hdfs/cache/mapred

# Create user dirs
$ sudo -u hdfs hadoop fs -mkdir  /user/istvan
$ sudo -u hdfs hadoop fs -chown istvan /user/istvan

# Start MapReduce
$ for service in /etc/init.d/hadoop-0.20-mapreduce-*
$ do
$  sudo $service start
$ done

Step 2: Install MySQL (used by Hive as metastore)

$ sudo yum install mysql-server

Step 3: Install Hive

$ sudo yum install hive

Step 4: Configure Hive to use MySQL as metastore
To configure Hive and MySQL to work together you need to install an MysQL connector:

$ curl -L 'http://www.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.22.tar.gz/from/http://mysql.he.net/' | tar xz
$ sudo cp mysql-connector-java-5.1.22/mysql-connector-java-5.1.22-bin.jar /usr/lib/hive/lib/

Then create the Hive metastore database

$ mysql -u root -p
mysql> CREATE DATABASE metastore;
mysql> USE metastore;
mysql> SOURCE /usr/lib/hive/scripts/metastore/upgrade/mysql/hive-schema-0.9.0.mysql.sql;

and hive user:

mysql> CREATE USER 'hive'@'localhost' IDENTIFIED BY 'hive';
mysql> GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'localhost' WITH GRANT OPTION;
mysql> FLUSH PRIVILEGES;
mysql> quit;

To configure Hive to use MySQL as local metatstore you need to modify hive-site.xml as follows:

  
<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://myhost/metastore</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>hive</value>
</property>

<property>
  <name>hive.metastore.local</name>
  <value>true</value>
</property>

Step 5: Install Impala and Impala Shell

$ sudo yum install impala
$ sudo yum install impala-shell

If you follow the manual installation procedure, there will be no impala config files created automatically. You need to create /usr/lib/impala/conf directory and copy the following file into it: core-site.xml, hdfs-site.xml, hive-site.xml and log4j.properties.

$ ls -ltr
total 16
-rw-r--r--. 1 root root 2094 Nov  4 16:37 hive-site.xml
-rw-r--r--. 1 root root 2988 Nov  4 16:43 impala-log4j.properties
-rwxr-xr-x. 1 root root 2052 Nov  4 16:58 hdfs-site.xml
-rwxr-xr-x. 1 root root 1701 Nov  9 16:53 core-site.xml

Configure Impala

In order to support direct reads for Impala you need to configure the following Hadoop properties:

core-site.xml

<property>
  <name>dfs.client.read.shortcircuit</name>
  <value>true</value>
</property&gt

<property>
  <name>dfsclient.read.shortcircuit.skip.checksum</name>
  <value>false</value>
</property&gt

hdfs-site.xml:

<property>
  <name>dfs.datanode.data.dir.perm</name>
  <value>755</value>
</property>

<property>
  <name>dfs.block.local-path-access.user</name>
  <value>impala,mapred,istvan</value>
</property>

You can also enable data locality tracking:

<property>
  <name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
  <value>true</value>
</property>

Step 6: Startup Impala daemons
Login as impala user and execute the following commands:

$ GVLOG_v=1 nohup /usr/bin/impala-state-store -state_store_port=24000 &
$ GVLOG_v=1 nohup /usr/bin/impalad -state_store_host=localhost -nn=localhost -nn_port=8020 -hostname=localhost -ipaddress=127.0.0.1 &

Run Impala queries

In order to run Impala interactive queries from impala shell, we need to create the tables via Hive (remember, the current Impala beta version does not support DDLs). I used Google stockprices in this example (retrieved from http://finance.yahoo.com in csv format):

hive> CREATE EXTERNAL TABLE stockprice
    > (yyyymmdd STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT,
    >  close_price FLOAT, stock_volume INT, adjclose_price FLOAT)
    >  ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > LINES TERMINATED BY '\n'
    > LOCATION '/user/istvan/input/';
OK
Time taken: 14.499 seconds
hive> show tables;
OK
stockprice
Time taken: 16.04 seconds
hive> DESCRIBE stockprice;
OK
yyyymmdd        string
open_price      float
high_price      float
low_price       float
close_price     float
stock_volume    int
adjclose_price  float
Time taken: 0.583 seconds

A similar table – not external one this time – can be created and loaded from hive, too:

hive> CREATE TABLE imp_test
    > (yyyymmdd STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT,
    > close_price FLOAT, stock_volume INT, adjclose_price FLOAT)
    > ;
OK
Time taken: 0.542 seconds
hive> LOAD DATA LOCAL INPATH '/home/istvan/goog_stock.csv' OVERWRITE INTO TABLE imp_test;
Copying data from file:/home/istvan/goog_stock.csv
Copying file: file:/home/istvan/goog_stock.csv
Loading data to table default.imp_test
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /user/hive/warehouse/imp_test
O
Time taken: 1.903 seconds

Then you can run a Hive query to retrieve the top 5 highest prices. As you can see, Hive will initiate a MapReduce job under the hood:

hive> select * from stockprice order by high_price desc;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=
In order to set a constant number of reducers:
  set mapred.reduce.tasks=
Starting Job = job_201211110837_0006, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201211110837_0006
Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201211110837_0006
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2012-11-11 13:16:30,895 Stage-1 map = 0%,  reduce = 0%
2012-11-11 13:16:46,554 Stage-1 map = 100%,  reduce = 0%
2012-11-11 13:17:05,918 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
2012-11-11 13:17:07,061 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
2012-11-11 13:17:08,243 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
....
2012-11-11 13:17:17,274 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
2012-11-11 13:17:18,334 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
MapReduce Total cumulative CPU time: 6 seconds 200 msec
Ended Job = job_201211110837_0006
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 6.2 sec   HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 200 msec
OK
05/10/2012770.71774.38765.01767.652735900767.65
04/10/2012762.75769.89759.4768.052454200768.05
02/10/2012765.2765.99750.27756.992790200756.99
01/10/2012759.05765.0756.21761.783168000761.78
25/09/2012753.05764.89747.66749.166058500749.16
03/10/2012755.72763.92752.2762.52208300762.5
08/10/2012761.0763.58754.15757.841958600757.84
27/09/2012759.95762.84751.65756.53931100756.5
09/10/2012759.67761.32742.53744.093003200744.09
26/09/2012749.85761.24741.0753.465672900753.46
...
Time taken: 82.09 seconds

Running the very same query from impala-shell executes significantly faster. Cloudera claim that it can be executed an order of magnitude or even faster, depending on the query. I can confirm from my experiences that impala-shell returned the result as an average around in one second, compared to the Hive version which took roughly 82 seconds.

hive> [istvan@ip-10-227-137-76 ~]$ impala-shell
Welcome to the Impala shell. Press TAB twice to see a list of available commands.

Copyright (c) 2012 Cloudera, Inc. All rights reserved.
(Build version: Impala v0.1 (1fafe67) built on Mon Oct 22 13:06:45 PDT 2012)
[Not connected] > connect localhost:21000
[localhost:21000] > select * from stockprice order by high_price desc limit 20000;
05/10/2012770.7100219726562774.3800048828125765.010009765625767.65002441406252735900767.6500244140625
04/10/2012762.75769.8900146484375759.4000244140625768.04998779296882454200768.0499877929688
02/10/2012765.2000122070312765.989990234375750.27001953125756.9899902343752790200756.989990234375
01/10/2012759.0499877929688765756.2100219726562761.7800292968753168000761.780029296875
25/09/2012753.0499877929688764.8900146484375747.6599731445312749.15997314453126058500749.1599731445312
03/10/2012755.719970703125763.9199829101562752.2000122070312762.52208300762.5
08/10/2012761763.5800170898438754.1500244140625757.84002685546881958600757.8400268554688
27/09/2012759.9500122070312762.8400268554688751.6500244140625756.53931100756.5
09/10/2012759.6699829101562761.3200073242188742.530029296875744.09002685546883003200744.0900268554688
26/09/2012749.8499755859375761.239990234375741753.46002197265625672900753.4600219726562
18/10/2012755.5399780273438759.419982910156267669512430200695
...

If you use DDL commands from Hive (e.g. create table) then you need to run refresh command in impala-shell to reflect the changes.:

[localhost:21000] > refresh
Successfully refreshed catalog
[localhost:21000] > describe stockprice
yyyymmdd        string
open_price      float
high_price      float
low_price       float
close_price     float
stock_volume    int
adjclose_price  float
[localhost:21000] > show tables;
imp_test
stockprice
[localhost:21000] >

Conclusion

There are more and more efforts in the Big Data world to support ad-hoc, fast queries and realtime data processing for large datasets. Cloudera Impala is certainly an exciting solution that is utilising the same concept as Google BigQuery but promises to support wider range of input formats and by making it available as an open source technology it can attract external developers to improve the software and take it to the next stage.

If you are interested to learn more about Impala, please, check out our book, Impala in Action at Manning Publishing.

Heroku and Cassandra – Cassandra.io RESTful APIs


Introduction

Last time I wrote about Hadoop on Heroku which is on add-on from Treasure Data  – this time I am going to cover NoSQL on Heroku.
There are various datastore services – add-ons in Heroku terms – available from MongoDB (MongoHQ) to CouchDB (Cloudant) to Cassandra (Cassandra.io). This post is devoted to Cassandra.io.

Cassandra.io

Cassandra.io is a hosted and managed Cassandra ring based on Apache Cassandra and makes it accessible via RESTful API. As of writing this article, the Cassandra.io client helper libraries are available in Java, Ruby and PHP, and there is also a Objective-C version in private beta. The libraries can be downloaded from github. I use the Java library in my tests.

Heroku – and Cassandra.io add-on, too – is built on Amazon Elastic Compute Cloud (EC2) and it is supported in all Amazon’s locations. Note: Cassandra.io add-on is in public beta now that means you have only one option called Test available – this is free.

Installing Cassandra.io add-on

To install Cassandra.io add-on you just need to follow the standard way of adding an add-on to an application:

$ heroku login
Enter your Heroku credentials.
Email: address@example.com
Password (typing will be hidden): 
Authentication successful.

$ heroku create
Creating glacial-badlands-1234... done, stack is cedar
http://glacial-badlands-1234.herokuapp.com/ | git@heroku.com:glacial-badlands-1234.git

$ heroku addons:add cassandraio:test --app glacial-badlands-1234
Adding cassandraio:test on glacial-badlands-1234... done, v2 (free)
Use `heroku addons:docs cassandraio:test` to view documentation.

You can check the configuration via Heroku admin console:

Then you need to clone the client helper libraries from github:

$ git clone https://github.com/m2mIO/cassandraio-client-libraries.git

In case of Java client library, you need Google gson library (gson-2.0.jar), too.

Writing Cassandra.io application

The java RESTful API library has one simple configuration  file called sdk.properties. It has very few parameters stored in it – the API url and the version. The original sdk.properties file that is cloned from github has the version wrong (v0.1), it needs to be changed to 1.  You can verify the required configuration parameters using heroku config command.

$ heroku config --app glacial-badlands-1234
=== glacial-badlands-1234 Config Vars
CASSANDRAIO_URL: https://Token:AccountId@api.cassandra.io/1/welcome

You can check the same config parameters from Heroku admin console, thought the URL is misleading:

The sdk.properties file should look like this:

apiUrl = https://api.cassandra.io
version = 1

The Java code – CassandraIOTest.java – is like this:

package io.cassandra.tests;

import java.util.ArrayList;
import java.util.List;

import io.cassandra.sdk.StatusMessageModel;
import io.cassandra.sdk.column.ColumnAPI;
import io.cassandra.sdk.columnfamily.ColumnFamilyAPI;
import io.cassandra.sdk.constants.APIConstants;
import io.cassandra.sdk.data.DataAPI;
import io.cassandra.sdk.data.DataBulkModel;
import io.cassandra.sdk.data.DataColumn;
import io.cassandra.sdk.data.DataMapModel;
import io.cassandra.sdk.data.DataRowkey;
import io.cassandra.sdk.keyspace.KeyspaceAPI;

public class CassandraIOTest {
    // credentials
	private static String TOKEN = "<Token>";
	private static String ACCOUNTID = "<AccountId>";

	// data 
	private static String KS = "AAPL";
	private static String CF = "MarketData";
	private static String COL1 = "Open";
	private static String COL2 = "Close";
	private static String COL3 = "High";
	private static String COL4 = "Low";
	private static String COL5 = "Volume";
	private static String COL6 = "AdjClose";
	private static String RK = "18-05-2012";

	public static void main(String[] args) {	
		try {
			StatusMessageModel sm;

			// Create Keyspace
			KeyspaceAPI keyspaceAPI = new KeyspaceAPI(APIConstants.API_URL, TOKEN, ACCOUNTID);
			sm = keyspaceAPI.createKeyspace(KS);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
				+ sm.getError());

		        // Create ColumnFamily
			ColumnFamilyAPI columnFamilyAPI = new ColumnFamilyAPI(APIConstants.API_URL, TOKEN,
					ACCOUNTID);
			sm = columnFamilyAPI.createColumnFamily(KS, CF,
					APIConstants.COMPARATOR_UTF8);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());

			 // Add Columns (High, Low, Open, Close, Volume, AdjClose)
			ColumnAPI columnAPI = new ColumnAPI(APIConstants.API_URL, TOKEN, ACCOUNTID);
			sm = columnAPI.upsertColumn(KS, CF, COL1,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL2,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL3,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL4,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL5,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL6,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());

			//Add Bulk Data
			DataAPI dataAPI = new DataAPI(APIConstants.API_URL, TOKEN, ACCOUNTID);

			List columns = new ArrayList();
			DataColumn dc = new DataColumn(COL1, "533.96");
			columns.add(dc);
			dc = new DataColumn(COL2, "530.38", 12000);
			columns.add(dc);
			dc = new DataColumn(COL3, "543.41", 12000);
			columns.add(dc);
			dc = new DataColumn(COL4, "522.18", 12000);
			columns.add(dc);
			dc = new DataColumn(COL5, "26125200", 12000);
			columns.add(dc);
			dc = new DataColumn(COL6, "530.12", 12000);
			columns.add(dc);

			List rows = new ArrayList();
			DataRowkey row = new DataRowkey(RK, columns);
			rows.add(row);

			DataBulkModel dataBulk = new DataBulkModel(rows);

			sm = dataAPI.postBulkData(KS, CF, dataBulk);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());

			// Get Data
			DataMapModel dm = dataAPI.getData(KS, CF, RK, 0, null);
			System.out.println(dm.toString());			

			// Delete Keyspace
			sm = keyspaceAPI.deleteKeyspace(KS);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
		}
		catch(Exception e) {
			System.out.println(e.getMessage());
		}

	}
}

The runtime result is:

09-Sep-2012 22:59:18 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/keyspace/AAPL/
Success | Keyspace added successfully. | null
09-Sep-2012 22:59:21 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/columnfamily/AAPL/MarketData/UTF8Type/
Success | MarketData ColumnFamily created successfully | null
09-Sep-2012 22:59:24 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/Open/UTF8Type/?isIndex=true
Failed | Unable to create Column: Open | Cassandra encountered an internal error processing this request: TApplicationError type: 6 message:Internal error processing system_update_column_family
09-Sep-2012 22:59:24 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/Close/UTF8Type/?isIndex=true
Success | Close Column upserted successfully | null
09-Sep-2012 22:59:26 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/High/UTF8Type/?isIndex=true
Success | High Column upserted successfully | null
09-Sep-2012 22:59:27 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/Low/UTF8Type/?isIndex=true
Success | Low Column upserted successfully | null09-Sep-2012 22:59:29 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/Volume/UTF8Type/?isIndex=true

Success | Volume Column upserted successfully | null
09-Sep-2012 22:59:30 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/AdjClose/UTF8Type/?isIndex=true
Success | AdjClose Column upserted successfully | null
Posting JSON: {"rowkeys":[{"rowkey":"18-05-2012","columns":[{"columnname":"Open","columnvalue":"533.96","ttl":0},{"columnname":"Close","columnvalue":"530.38","ttl":12000},{"columnname":"High","columnvalue":"543.41","ttl":12000},{"columnname":"Low","columnvalue":"522.18","ttl":12000},{"columnname":"Volume","columnvalue":"26125200","ttl":12000},{"columnname":"AdjClose","columnvalue":"530.12","ttl":12000}]}]}
09-Sep-2012 22:59:32 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/data/AAPL/MarketData/
Success | Bulk upload successfull. | null
09-Sep-2012 22:59:32 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/data/AAPL/MarketData/18-05-2012/
{Volume=26125200, Open=533.96, Low=522.18, High=543.41, Close=530.38, AdjClose=530.12}
09-Sep-2012 22:59:32 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/keyspace/AAPL/
Success | Keyspace dropped successfully. | null

Analysis

Step 1./ The code creates a keyspace named AAPL using HTTP POST, url: https://api.cassandra.io/1/keyspace/AAPL/
It uses KeySpaceAPI class with Token and AccountId as parameters for the constructor. Token is used as username, while AccountID is the password. (Remember: these attributes can be retrieved using heroku config command or via Heroku Admin console)

Step 2./ Then the code creates a column family called MarketData.It uses ColumnFamilyAPI – with the credentials mentioned above – and the REST url is https://api.cassandra.io/1/columnfamily/AAPL/MarketData/UTF8Type/.

Step 3./ Then the code upserts the coumns called Open, Close, High, Low, Volume and AjdClose. It uses ColumnAPI – same credentials as we already know – and the REST url is https://api.cassandra.io/1/column/AAPL/MarketData/Open/UTF8Type/?isIndex=true where AAPL is the keyspace, MarketData is the column family and Open is the column.

Step 4./ Then the code prepares the data as name/value pairs (Open = “533.96”, Close = “530.38”, etc), defines a rowkey (“18-05-2012”) and the uses DataAPI postBulkData method to upload the data into Cassandra.io. DataAPI credentials are the same as above.

Step 5./ The code then fetches the data using HTTP GET with url: https://api.cassandra.io/1/data/AAPL/MarketData/18-05-2012/. The response is in JSON format: {Volume=26125200, Open=533.96, Low=522.18, High=543.41, Close=530.38, AdjClose=530.12}

Step 6./ Finally the code destroys the keyspace using HTTP DELETE, url: https://api.cassandra.io/1/keyspace/AAPL/.

Summary

If you want  to try out a robust, highly available  Casssandra datastore without any upfront infrastructure investment and with an easy to use API, you can certainly have a closer look at Cassandra.io on Heroku. It takes only a few minutes to start up and the APIs offer a simply REST based data management for Java, Ruby and PHP developers.

Big Data on Heroku – Hadoop from Treasure Data


       

This time I write about Heroku and Treasure Data Hadoop solution – I found it really to be  a gem in the Big Data world.

Heroku is a cloud platform as a service (PaaS) owned by Salesforce.com. Originally it started with supporting Ruby as its main programming language but it has been extended to Java, Scala, Node.js, Python and Clojure, too. It also supports a long list of addons including – among others –  RDBMS and NoSQL capabilities and Hadoop-based data warehouse developed by Treasure Data.

Treasure Data Hadoop Architecture

The architecture of Treasure Data Hadoop solution is as as follows:

Heroku Toolbelt

Heroku toolbelt is a command line tooling that consists of heroku, foreman and git packages. As it is described on heroku toolbelt website: it is “everything you need to get started using heroku”. (heroku CLI is based on ruby so you need ruby under the hood, too). Once you have signed up for heroku (you need a verified account meaning that you provided your bank details for potential service charges) and you have installed the heroku toolbelt, you can start right away.

Depending on you environment – I am using Ubuntu 12.04 LTS – you can use alternative installation method like:

$ sudo apt-get install git
$ gem install heroku
$ gem install foreman

Heroku and Treasure Data add-on

If you want to use Treasure Data on Heroku, you need to add the Treasure Data Hadoop add-on –  you need to login, create an application (heroku will generate a fancy name like boiling-tundra for you) and then you need to add your particular add-on to the application you just created:

$ heroku login
Enter your Heroku credentials.
Email: xxx@mail.com
Password (typing will be hidden): 
Found existing public key: /home/istvan/.ssh/id_dsa.pub
Uploading SSH public key /home/istvan/.ssh/id_dsa.pub... done
Authentication successful.

$ heroku create
Creating boiling-tundra-1234... done, stack is cedar
http://boiling-tundra-1234.herokuapp.com/ | git@heroku.com:boiling-tundra-1234.git

$ heroku addons:add treasure-data:nano --app boiling-tundra-1234
Adding treasure-data:nano on boiling-tundra-1234... done, v2 (free)
Use `heroku addons:docs treasure-data:nano` to view documentation.

I just love the coloring scheme and the graphics used in heroku console, it is simply brilliant.

Treasure Data toolbelt

To manage Treaure Data Hadoop on Heroku you need to install Treasure Data toolbelt – it fits very much to heroku CLI,  it is also based on ruby:

$ gem install td

Then you need to install heroku plugin to support heroku commands:

$ heroku plugins:install https://github.com/treasure-data/heroku-td.git
Installing heroku-td... done

To verify that everything is fine, just run:

$ heroku plugins
=== Installed Plugins
heroku-td

and

$ heroku td
usage: heroku td [options] COMMAND [args]

options:
  -c, --config PATH                path to config file (~/.td/td.conf)
  -k, --apikey KEY                 use this API key instead of reading the config file
  -v, --verbose                    verbose mode
  -h, --help                       show help
...

Treasure Data Hadoop – td commands

Now we are ready to execute td commands from heroku. td commands are used to create database and tables, import data, run queries, drop tables, etc. Under the hood td commands are basically HiveQL queries. (According to their website, Treasure Data plans to support Pig as well in the future).

By default Treasure Data td-agent prefers json formatted data, though they can process various other formats (apache log, syslog, etc) and you can write your own parser to process the  uploaded data.

Thus I converted my AAPL stock data (again thanks to http://finance.yahoo.com) into json format:

{"time":"2012-08-20", "open":"650.01", "high":"665.15", "low":"649.90", "close":"665.15", "volume":"21876300", "adjclose":"665.15"}
{"time":"2012-08-17", "open":"640.00", "high":"648.19", "low":"638.81", "close":"648.11", "volume":"15812900", "adjclose":"648.11"}
{"time":"2012-08-16", "open":"631.21", "high":"636.76", "low":"630.50", "close":"636.34", "volume":"9090500", "adjclose":"634.64"}
{"time":"2012-08-15", "open":"631.30", "high":"634.00", "low":"625.75", "close":"630.83", "volume":"9190800", "adjclose":"630.83"}
{"time":"2012-08-14", "open":"631.87", "high":"638.61", "low":"630.21", "close":"631.69", "volume":"12148900", "adjclose":"631.69"}
{"time":"2012-08-13", "open":"623.39", "high":"630.00", "low":"623.25", "close":"630.00", "volume":"9958300", "adjclose":"630.00"}
{"time":"2012-08-10", "open":"618.71", "high":"621.76", "low":"618.70", "close":"621.70", "volume":"6962100", "adjclose":"621.70"}
{"time":"2012-08-09", "open":"617.85", "high":"621.73", "low":"617.80", "close":"620.73", "volume":"7915800", "adjclose":"620.73"}
{"time":"2012-08-08", "open":"619.39", "high":"623.88", "low":"617.10", "close":"619.86", "volume":"8739500", "adjclose":"617.21"}
{"time":"2012-08-07", "open":"622.77", "high":"625.00", "low":"618.04", "close":"620.91", "volume":"10373100", "adjclose":"618.26"}

The first step is to create the database called aapl:

$ heroku td db:create aapl --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /usr/local/heroku/lib/heroku/client.rb:129.
Database 'aapl' is created.

Then create the table called marketdata

$ heroku td table:create aapl marketdata --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /usr/local/heroku/lib/heroku/client.rb:129.
Table 'aapl.marketdata' is created.

Check if the tables has been created successfully:

$ heroku td tables --app boiling-tundra-1234
! DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
! DEPRECATED: More information available at https://github.com/heroku/heroku.rb
! DEPRECATED: Deprecated method called from /usr/local/heroku/lib/heroku/client.rb:129.
+----------+------------+------+-------+--------+
| Database | Table | Type | Count | Schema |
+----------+------------+------+-------+--------+
| aapl | marketdata | log | 0 | |
+----------+------------+------+-------+--------+
1 row in set

Import data:

$ heroku td table:import aapl marketdata --format json --time-key time aapl.json --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /home/istvan/.rvm/gems/ruby-1.9.2-p320/gems/heroku-2.30.3/lib/heroku/client.rb:129.
importing aapl.json...
  uploading 364 bytes...
  imported 10 entries from aapl.json.
done.

Check if the data import was successful – you shoud see count column indicating the number of rows loaded into the table:

$ heroku td tables --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /home/istvan/.rvm/gems/ruby-1.9.2-p320/gems/heroku-2.30.3/lib/heroku/client.rb:129.
+----------+------------+------+-------+--------+
| Database | Table      | Type | Count | Schema |
+----------+------------+------+-------+--------+
| aapl     | marketdata | log  | 10    |        |
+----------+------------+------+-------+--------+
1 row in set

Now we are ready to run HiveQL (td query) against the dataset – this particular query lists the highest prices of AAPL stock on the top and shows the prices in ascending order. (time value is based on UNIX epoch):

$ heroku td query -d aapl -w "SELECT v['time'] as time, v['high'] as high, v['low'] as low FROM marketdata ORDER BY high DESC" --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /home/istvan/.rvm/gems/ruby-1.9.2-p320/gems/heroku-2.30.3/lib/heroku/client.rb:129.
Job 757853 is queued.
Use 'heroku td job:show 757853' to show the status.
queued...
  started at 2012-08-21T21:06:54Z
  Hive history file=/mnt/hive/tmp/617/hive_job_log_617_201208212106_269570447.txt
  Total MapReduce jobs = 1
  Launching Job 1 out of 1
  Number of reduce tasks determined at compile time: 1
  In order to change the average load for a reducer (in bytes):
    set hive.exec.reducers.bytes.per.reducer=
  In order to limit the maximum number of reducers:
    set hive.exec.reducers.max=
  In order to set a constant number of reducers:
    set mapred.reduce.tasks=
  Starting Job = job_201207250829_556135, Tracking URL = http://domU-12-31-39-0A-56-11.compute-1.internal:50030/jobdetails.jsp?jobid=job_201207250829_556135
  Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=10.211.85.219:8021 -kill job_201207250829_556135
  2012-08-21 21:07:21,455 Stage-1 map = 0%,  reduce = 0%
  2012-08-21 21:07:28,480 Stage-1 map = 100%,  reduce = 0%
  2012-08-21 21:07:37,965 Stage-1 map = 100%,  reduce = 100%
  Ended Job = job_201207250829_556135
  OK
  MapReduce time taken: 42.536 seconds
  finished at 2012-08-21T21:07:53Z
  Time taken: 53.781 seconds
Status     : success
Result     :
+------------+--------+--------+
| time        | high   | low   |
+------------+--------+--------+
| 1345417200 | 665.15 | 649.90 |
| 1345158000 | 648.19 | 638.81 |
| 1344898800 | 638.61 | 630.21 |
| 1345071600 | 636.76 | 630.50 |
| 1344985200 | 634.00 | 625.75 |
| 1344812400 | 630.00 | 623.25 |
| 1344294000 | 625.00 | 618.04 |
| 1344380400 | 623.88 | 617.10 |
| 1344553200 | 621.76 | 618.70 |
| 1344466800 | 621.73 | 617.80 |
+------------+--------+--------+
10 rows in set

Finally you can delete the marketdata table:

$ heroku td table:delete aapl marketdata --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /home/istvan/.rvm/gems/ruby-1.9.2-p320/gems/heroku-2.30.3/lib/heroku/client.rb:129.
Do you really delete 'marketdata' in 'aapl'? [y/N]: y
Table 'aapl.marketdata' is deleted.

More details on how to use Treasure Data Hadoop can be found at http://docs.treasure-data.com/articles/quickstart

Cassandra and OpsCenter from Datastax


Cassandra – originally developed at Facebook – is another popular NoSQL database that combines Amazon’s Dynamo distributed systems technologies and Google’s Bigtable data model based on Column Families. It is designed for distributed data at large scale.Its key components are as follows:

Keyscape: it acts as a container for data, similar to RDBMS schema. This determines the replication parameters such as replication factor and replication placement strategy as we will see it later in this post. More details on replication placement strategy can be read here.

Column Familiy: within a keyscape you can have one or more column families. This is similar to tables in RDBMS world. They contain multiple columns which are referenced by row keys.

Column: it is the smallest increment of data. It is a tuple having a name, a value and and a timestamp.

Installing Cassandra from binaries
Datastax is the commercial leader in Apache Cassandra, they offer a complete big data platform (Enterprise Edition) built on Apace Cassandra as well as a free Community Edition. This post is based on the latter one. In 2012 they were listed among the Top10 Big Data startups.

Beside the Cassandra package they also offer a web-based management center  (Datastax OpsCenter), this can make Cassandra cluster management much easier than the command line based alternatives (e.g. cassandra-cli).

To download Datastax Community Edition, go to this link. Both the Datastax Community Server and the OpsCenter Community Edition are available in here.  As of this writing, The Cassandra Community Server version is 1.1.2 (dsc-cassandra-1.1.2-bin.tar.gz) and the OpsCenter is 2.1.1 (opscenter-2.1.1-free.tar.gz).

The installation is as simple as to unzip and untar the tarballs. Then you need to configure the cassandra instance by editing <Cassandra install diractory>/conf/cassandra.yaml file.

A few parameters that needed to be edited:

cluster_name: 'BigHadoop Cluster'
initial_token: 0
listen_address: 10.229.30.238
seed_provider:
    # Addresses of hosts that are deemed contact points.
    # Cassandra nodes use this list of hosts to find each other and learn
    # the topology of the ring.  You must change this if you are running
    # multiple nodes!
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
      parameters:
          # seeds is actually a comma-delimited list of addresses.
          # Ex: ",,"
          - seeds: "10.229.30.238"
rpc_address: 0.0.0.0

My configuration had two nodes, the second node has a similar cassandra.yaml file except for the listen_address and the token. 

Token generation is explained in the Datastax documentation: http://www.datastax.com/docs/1.1/initialize/token_generation.

The second node configuration looks like:

listen_address: 10.226.42.81
token: 85070591730234615865843651857942052864

Since my nodes were running on AWS EC2, I also modified the endpoint-snitch which is used to map IP addresses into datacenters and racks, see more details in here.

endpoint_snitch: Ec2Snitch

Once these configuation changes have been applied, you can start up the cassandra server – in my case on both nodes:

$ cd 
$ bin/cassandra

Once the servers are up, you can validate if they formed a cluster using nodetool:

$  bin/nodetool -h localhost ring
Note: Ownership information does not include topology, please specify a keyspace.
Address         DC          Rack        Status State   Load            Owns                Token
                                                                                           85070591730234615865843651857942052864
10.229.30.238   eu-west     1c          Up     Normal  15.89 KB        50.00%              0
10.226.42.81    eu-west     1a          Up     Normal  20.22 KB        50.00%              85070591730234615865843651857942052864

Installing OpsCenter and OpcCenter agents

The next step is to install the OpsCenter (on one designated node) and the agents on all the nodes. This is again as simply as unzip and untar the tarball that we just downloaded from Datastax site and then edit opscenterd.conf

[webserver]
port = 8888
interface = 0.0.0.0

[agents]
use_ssl = false

Note: I did not want to use SSL between the agents and the OpsCenter so I disabled it.

To start up the OpsCenter:

$ cd 
$ bin/opscenter

In fact, OpsCenter is a python twistd based webserver so you need to have python installed as well. Amazon AMI had python 2.6.7 preinstalled.

$ python -V
Python 2.6.7

OpsCenter also uses iostat which was not preinstalled on my instance, so I had to install sysstat package, too:

$ sudo yum install sysstat

You can also install the agents manually – that is what I did – or automatically, but you have to ensure that they are installed on every node that are members of the cluster. The agent is part of the OpsCenter tarball, it can be found under OpsCenter/agent directory.

To configure the agent you need to edit conf/address.yaml file:

$ cat address.yaml
stomp_interface: "10.229.30.238"
use_ssl: 0

stopm_interface is the OpsCenter interface, while use_ssl: 0 indicates that we do not use SSL for agent communications.

Note: Cassandra and OpsCenter are using TCP ports that are not open by default on an AWS EC2 instance. You need to defined a special security group that opens the following ports: 7000/tcp, 9160/tcp, 8888/tcp, 61210/tcp and 61621/tcp. More details about how these ports are used can be found here.

Using Cassandra

The simplest way to start using Cassandra is its command line tool called cassandra-cli.

[ec2-user@ip-10-229-30-238 bin]$ ./cassandra-cli -h localhost -p 9160
Connected to: "BigHadoop Cluster" on localhost/9160
Welcome to Cassandra CLI version 1.1.2

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown] create keyspace AAPL;
f23e7e5e-22d2-3416-91dc-1cba8276f57d
Waiting for schema agreement...
... schemas agree across the cluster

[default@unknown] use AAPL;
Authenticated 

[default@AAPL] update keyspace AAPL with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1};
f56204f3-89a7-3bd2-8dd0-695e43444b36
Waiting for schema agreement...
... schemas agree across the cluster

[default@AAPL] create column family Marketdata;
c6bb29df-dc38-392e-8cd8-524e4c0ae026
Waiting for schema agreement...
... schemas agree across the cluster

These steps create a keyspace called AAPL, modify the replication parameters mentioned above (replication factor and placement strategy) and create a column family called Marketdata. Then we can use Set command to insert data and Get to retrieve them.

[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Open')] = utf8('533.96');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('High')] = utf8('543.41');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Low')] = utf8('522.18');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Close')] = utf8('530.38');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Volume')] = utf8('26125200');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('AdjClose')] = utf8('530.12');

[default@AAPL] get Marketdata[utf8('18/05/2012')];
=> (column=41646a436c6f7365, value=3533302e3132, timestamp=1344110379812000)      // This is AdjClose   :  530.12
=> (column=436c6f7365, value=3533302e3338, timestamp=1344110378828000)            // This is Close      :  530.38
=> (column=48696768, value=3534332e3431, timestamp=1344110364687000)              // This is High       :  543.41
=> (column=4c6f77, value=3532322e3138, timestamp=1344110373422000)                // This is Low        :  522.18
=> (column=4f70656e, value=3533332e3936, timestamp=1344110350410000)              // This is Open       :  533.96
=> (column=566f6c756d65, value=3236313235323030, timestamp=1344110378832000)      // This is Volume     :  26125200
Returned 6 results.
Elapsed time: 22 msec(s).

Besides the ‘traditional command line interface’, there is also a SQLPlus-like utility known as Cassandra Query Language Shell (cqlsh). This is a utility written in python that supports SQL-like queries (a kind of Hive analogy from Hadoop world).

It supports DDL and DML type of commands so you can run SELECT and INSERT statements as well as CREATE KEYSPACE, CREATE TABLE, ALTER TABLE and DROP TABLE.

$ bin/cqlsh
Connected to BigHadoop Cluster at localhost:9160.
[cqlsh 2.2.0 | Cassandra 1.1.2 | CQL spec 2.0.0 | Thrift protocol 19.32.0]
Use HELP for help.
cqlsh> use AAPL;
cqlsh:AAPL> select * from Marketdata;
 KEY                  | 41646a436c6f7365 | 436c6f7365   | 48696768     | 4c6f77       | 4f70656e     | 566f6c756d65
----------------------+------------------+--------------+--------------+--------------+--------------+------------------
 31382f30352f32303132 |     3533302e3132 | 3533302e3338 | 3534332e3431 | 3532322e3138 | 3533332e3936 | 3236313235323030

cqlsh:AAPL>insert into Marketdata (KEY, '41646a436c6f7365', '436c6f7365', '48696768', '4c6f77', '4f70656e', '566f6c756d65') values ('31372f30352f32313132', '3533302e3132', '3533302e3132', '3534372e35', '3533302e3132','3534352e3331', '3235353832323030') using ttl 86440;

cqlsh:AAPL> select * from Marketdata;
 KEY                  | 41646a436c6f7365 | 436c6f7365   | 48696768     | 4c6f77       | 4f70656e     | 566f6c756d65
----------------------+------------------+--------------+--------------+--------------+--------------+------------------
 31382f30352f32303132 |     3533302e3132 | 3533302e3338 | 3534332e3431 | 3532322e3138 | 3533332e3936 | 3236313235323030
 31372f30352f32313132 |     3533302e3132 | 3533302e3132 |   3534372e35 | 3533302e3132 | 3534352e3331 | 3235353832323030

Monitoring Cassandra Cluster using OpsCenter

Datastax OpsCenter provides a web-based management tool to configure and monitor Cassandra clusters.

To start OpsCenter in a web browser, just go to http://hostname:8888  and then enter the IP address/hostname of the Cassandra nodes

OpsCenter shows the dashboard:

It can also visualize the cluster ring:

You can create a keyspace via the OpsCenter or if it is created using the command line utility, it can retrieve the data model:

Using the Data Explorer menu you can retrieve the row keys and the data stored in the keyspace: