Cloudera Impala – Fast, Interactive Queries with Hadoop


As discussed in the previous post about Twitter’s Storm, Hadoop is a batch oriented solution that has a lack of support for ad-hoc, real-time queries. Many of the players in Big Data have realised the need for fast, interactive queries besides the traditional Hadooop approach. Cloudera, one the key solution vendors in Big Data/Hadoop domain has just recently launched Cloudera Impala that addresses this gap.

As Cloudera Engineering team descibed in ther blog, their work was inspired by Google Dremel paper which is also the basis for Google BigQuery. Cloudera Impala provides a HiveQL-like query language for wide variety of SELECT statements with WHERE, GROUP BY, HAVING clauses, with ORDER BY – though currently LIMIT is mandatory with ORDER BY -, joins (LEFT, RIGTH, FULL, OUTER, INNER), UNION ALL, external tables,  etc. It also supports arithmetic and logical operators and Hive built-in functions such as COUNT, SUM, LIKE, IN or BETWEEN. It can access data stored on HDFS but it does not use mapreduce, instead it is based on its own distributed query engine.

The current Impala release (Impala 1.0beta) does not support  DDL statements (CREATE, ALTER, DROP TABLE), all the table creation/modification/deletion functions have to be executed via Hive and then refreshed in Impala shell.

Cloudera Impala is open-source under Apache Licence, the code can be retrieved from Github. Its components are written in C++, Java and Python.

Cloudera Impala Architecture

Cloudera Impala has 3 key components: impalad, impala-state-store and impala-shell.

Impala shell is essentially a short shell script which starts the python program to run the queries.

SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

  python ${SHELL_HOME}/ "$@"

Impalad is running on each Hadoop datanode and and it plans and executes the queries sent from impala-shell.

Impala-state-store stores information (location and status) about all the running impalad instances.

Installing Cloudera Impala

As of writing this article, Cloudera Impala requires 64-bit RHEL/Centos 6.2 or higher. I was running the tests on RHEL6.3 (64-bit) on AWS. You need to have Cloudera CDH4.1, Hive and Mysql installed (the latter is used to store Hive metastore).

Note: AWS t1.micro instances are not suitable for CDH4.1, that requires more memory than t1.micro provides.

Cloudera recommends to use Cloudera Manager to install Impala but I used manual steps, just to ensure that I have a complete understanding of what is going on during the installation.

Step 1: Install CDH4.1

To install CDH4.1 you need to run the following commands (these steps describe how to install Hadoop MRv1 – if you want to have YARN instead, that requires another MapReduce rpms to be installed. However, Cloudera stated in the install instructions that they do not consider MapReduce 2.0 (YARN) production-ready yet thus I decided to stick with MRv1 for these tests. CDH4.1 MRV1 can be installed as a pseudo distribution or a full cluster solution, for the tests we will see pseudo distribution:

$ sudo yum --nogpgcheck localinstall cloudera-cdh-4-0.noarch.rpm
$  sudo rpm --import
$sudo yum install hadoop-0.20-conf-pseudo

This will install Haddop-0.20-MapReduce (jobtracker, tasktracker) and Hadoop HDFS (namenode, secondarynamenode, datanode) as dependencies, too.

After the packages are installed, you need to execute the following command in order to setup Hadoop pseudo-distributed mode properly:

# Format namenode
$ sudo -u hdfs hdfs namenode -format

# Start HDFS daemons
$ for service in /etc/init.d/hadoop-hdfs-*
$ do
$  sudo $service start
$ done

# Create /tmp dir
$ sudo -u hdfs hadoop fs -mkdir /tmp
$ sudo -u hdfs hadoop fs -chmod -R 1777 /tmp

# Create system dirs
$ sudo -u hdfs hadoop fs -mkdir /var
$ sudo -u hdfs hadoop fs -mkdir /var/lib
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred/mapred
$ sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
$ sudo -u hdfs hadoop fs -chmod 1777 /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
$ sudo -u hdfs hadoop fs -chown -R mapred /var/lib/hadoop-hdfs/cache/mapred

# Create user dirs
$ sudo -u hdfs hadoop fs -mkdir  /user/istvan
$ sudo -u hdfs hadoop fs -chown istvan /user/istvan

# Start MapReduce
$ for service in /etc/init.d/hadoop-0.20-mapreduce-*
$ do
$  sudo $service start
$ done

Step 2: Install MySQL (used by Hive as metastore)

$ sudo yum install mysql-server

Step 3: Install Hive

$ sudo yum install hive

Step 4: Configure Hive to use MySQL as metastore
To configure Hive and MySQL to work together you need to install an MysQL connector:

$ curl -L '' | tar xz
$ sudo cp mysql-connector-java-5.1.22/mysql-connector-java-5.1.22-bin.jar /usr/lib/hive/lib/

Then create the Hive metastore database

$ mysql -u root -p
mysql> CREATE DATABASE metastore;
mysql> USE metastore;
mysql> SOURCE /usr/lib/hive/scripts/metastore/upgrade/mysql/hive-schema-0.9.0.mysql.sql;

and hive user:

mysql> CREATE USER 'hive'@'localhost' IDENTIFIED BY 'hive';
mysql> GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'localhost' WITH GRANT OPTION;
mysql> quit;

To configure Hive to use MySQL as local metatstore you need to modify hive-site.xml as follows:






Step 5: Install Impala and Impala Shell

$ sudo yum install impala
$ sudo yum install impala-shell

If you follow the manual installation procedure, there will be no impala config files created automatically. You need to create /usr/lib/impala/conf directory and copy the following file into it: core-site.xml, hdfs-site.xml, hive-site.xml and

$ ls -ltr
total 16
-rw-r--r--. 1 root root 2094 Nov  4 16:37 hive-site.xml
-rw-r--r--. 1 root root 2988 Nov  4 16:43
-rwxr-xr-x. 1 root root 2052 Nov  4 16:58 hdfs-site.xml
-rwxr-xr-x. 1 root root 1701 Nov  9 16:53 core-site.xml

Configure Impala

In order to support direct reads for Impala you need to configure the following Hadoop properties:







You can also enable data locality tracking:


Step 6: Startup Impala daemons
Login as impala user and execute the following commands:

$ GVLOG_v=1 nohup /usr/bin/impala-state-store -state_store_port=24000 &
$ GVLOG_v=1 nohup /usr/bin/impalad -state_store_host=localhost -nn=localhost -nn_port=8020 -hostname=localhost -ipaddress= &

Run Impala queries

In order to run Impala interactive queries from impala shell, we need to create the tables via Hive (remember, the current Impala beta version does not support DDLs). I used Google stockprices in this example (retrieved from in csv format):

hive> CREATE EXTERNAL TABLE stockprice
    > (yyyymmdd STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT,
    >  close_price FLOAT, stock_volume INT, adjclose_price FLOAT)
    > LOCATION '/user/istvan/input/';
Time taken: 14.499 seconds
hive> show tables;
Time taken: 16.04 seconds
hive> DESCRIBE stockprice;
yyyymmdd        string
open_price      float
high_price      float
low_price       float
close_price     float
stock_volume    int
adjclose_price  float
Time taken: 0.583 seconds

A similar table – not external one this time – can be created and loaded from hive, too:

hive> CREATE TABLE imp_test
    > (yyyymmdd STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT,
    > close_price FLOAT, stock_volume INT, adjclose_price FLOAT)
    > ;
Time taken: 0.542 seconds
hive> LOAD DATA LOCAL INPATH '/home/istvan/goog_stock.csv' OVERWRITE INTO TABLE imp_test;
Copying data from file:/home/istvan/goog_stock.csv
Copying file: file:/home/istvan/goog_stock.csv
Loading data to table default.imp_test
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /user/hive/warehouse/imp_test
Time taken: 1.903 seconds

Then you can run a Hive query to retrieve the top 5 highest prices. As you can see, Hive will initiate a MapReduce job under the hood:

hive> select * from stockprice order by high_price desc;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=
In order to set a constant number of reducers:
  set mapred.reduce.tasks=
Starting Job = job_201211110837_0006, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201211110837_0006
Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201211110837_0006
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2012-11-11 13:16:30,895 Stage-1 map = 0%,  reduce = 0%
2012-11-11 13:16:46,554 Stage-1 map = 100%,  reduce = 0%
2012-11-11 13:17:05,918 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
2012-11-11 13:17:07,061 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
2012-11-11 13:17:08,243 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
2012-11-11 13:17:17,274 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
2012-11-11 13:17:18,334 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.2 sec
MapReduce Total cumulative CPU time: 6 seconds 200 msec
Ended Job = job_201211110837_0006
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 6.2 sec   HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 200 msec
Time taken: 82.09 seconds

Running the very same query from impala-shell executes significantly faster. Cloudera claim that it can be executed an order of magnitude or even faster, depending on the query. I can confirm from my experiences that impala-shell returned the result as an average around in one second, compared to the Hive version which took roughly 82 seconds.

hive> [istvan@ip-10-227-137-76 ~]$ impala-shell
Welcome to the Impala shell. Press TAB twice to see a list of available commands.

Copyright (c) 2012 Cloudera, Inc. All rights reserved.
(Build version: Impala v0.1 (1fafe67) built on Mon Oct 22 13:06:45 PDT 2012)
[Not connected] > connect localhost:21000
[localhost:21000] > select * from stockprice order by high_price desc limit 20000;

If you use DDL commands from Hive (e.g. create table) then you need to run refresh command in impala-shell to reflect the changes.:

[localhost:21000] > refresh
Successfully refreshed catalog
[localhost:21000] > describe stockprice
yyyymmdd        string
open_price      float
high_price      float
low_price       float
close_price     float
stock_volume    int
adjclose_price  float
[localhost:21000] > show tables;
[localhost:21000] >


There are more and more efforts in the Big Data world to support ad-hoc, fast queries and realtime data processing for large datasets. Cloudera Impala is certainly an exciting solution that is utilising the same concept as Google BigQuery but promises to support wider range of input formats and by making it available as an open source technology it can attract external developers to improve the software and take it to the next stage.

If you are interested to learn more about Impala, please, check out our book, Impala in Action at Manning Publishing.

Heroku and Cassandra – RESTful APIs


Last time I wrote about Hadoop on Heroku which is on add-on from Treasure Data  – this time I am going to cover NoSQL on Heroku.
There are various datastore services – add-ons in Heroku terms – available from MongoDB (MongoHQ) to CouchDB (Cloudant) to Cassandra ( This post is devoted to is a hosted and managed Cassandra ring based on Apache Cassandra and makes it accessible via RESTful API. As of writing this article, the client helper libraries are available in Java, Ruby and PHP, and there is also a Objective-C version in private beta. The libraries can be downloaded from github. I use the Java library in my tests.

Heroku – and add-on, too – is built on Amazon Elastic Compute Cloud (EC2) and it is supported in all Amazon’s locations. Note: add-on is in public beta now that means you have only one option called Test available – this is free.

Installing add-on

To install add-on you just need to follow the standard way of adding an add-on to an application:

$ heroku login
Enter your Heroku credentials.
Password (typing will be hidden): 
Authentication successful.

$ heroku create
Creating glacial-badlands-1234... done, stack is cedar |

$ heroku addons:add cassandraio:test --app glacial-badlands-1234
Adding cassandraio:test on glacial-badlands-1234... done, v2 (free)
Use `heroku addons:docs cassandraio:test` to view documentation.

You can check the configuration via Heroku admin console:

Then you need to clone the client helper libraries from github:

$ git clone

In case of Java client library, you need Google gson library (gson-2.0.jar), too.

Writing application

The java RESTful API library has one simple configuration  file called It has very few parameters stored in it – the API url and the version. The original file that is cloned from github has the version wrong (v0.1), it needs to be changed to 1.  You can verify the required configuration parameters using heroku config command.

$ heroku config --app glacial-badlands-1234
=== glacial-badlands-1234 Config Vars

You can check the same config parameters from Heroku admin console, thought the URL is misleading:

The file should look like this:

apiUrl =
version = 1

The Java code – – is like this:

package io.cassandra.tests;

import java.util.ArrayList;
import java.util.List;

import io.cassandra.sdk.StatusMessageModel;
import io.cassandra.sdk.column.ColumnAPI;
import io.cassandra.sdk.columnfamily.ColumnFamilyAPI;
import io.cassandra.sdk.constants.APIConstants;
import io.cassandra.sdk.keyspace.KeyspaceAPI;

public class CassandraIOTest {
    // credentials
	private static String TOKEN = "<Token>";
	private static String ACCOUNTID = "<AccountId>";

	// data 
	private static String KS = "AAPL";
	private static String CF = "MarketData";
	private static String COL1 = "Open";
	private static String COL2 = "Close";
	private static String COL3 = "High";
	private static String COL4 = "Low";
	private static String COL5 = "Volume";
	private static String COL6 = "AdjClose";
	private static String RK = "18-05-2012";

	public static void main(String[] args) {	
		try {
			StatusMessageModel sm;

			// Create Keyspace
			KeyspaceAPI keyspaceAPI = new KeyspaceAPI(APIConstants.API_URL, TOKEN, ACCOUNTID);
			sm = keyspaceAPI.createKeyspace(KS);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
				+ sm.getError());

		        // Create ColumnFamily
			ColumnFamilyAPI columnFamilyAPI = new ColumnFamilyAPI(APIConstants.API_URL, TOKEN,
			sm = columnFamilyAPI.createColumnFamily(KS, CF,
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());

			 // Add Columns (High, Low, Open, Close, Volume, AdjClose)
			ColumnAPI columnAPI = new ColumnAPI(APIConstants.API_URL, TOKEN, ACCOUNTID);
			sm = columnAPI.upsertColumn(KS, CF, COL1,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL2,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL3,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL4,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL5,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL6,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());

			//Add Bulk Data
			DataAPI dataAPI = new DataAPI(APIConstants.API_URL, TOKEN, ACCOUNTID);

			List columns = new ArrayList();
			DataColumn dc = new DataColumn(COL1, "533.96");
			dc = new DataColumn(COL2, "530.38", 12000);
			dc = new DataColumn(COL3, "543.41", 12000);
			dc = new DataColumn(COL4, "522.18", 12000);
			dc = new DataColumn(COL5, "26125200", 12000);
			dc = new DataColumn(COL6, "530.12", 12000);

			List rows = new ArrayList();
			DataRowkey row = new DataRowkey(RK, columns);

			DataBulkModel dataBulk = new DataBulkModel(rows);

			sm = dataAPI.postBulkData(KS, CF, dataBulk);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());

			// Get Data
			DataMapModel dm = dataAPI.getData(KS, CF, RK, 0, null);

			// Delete Keyspace
			sm = keyspaceAPI.deleteKeyspace(KS);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
		catch(Exception e) {


The runtime result is:

09-Sep-2012 22:59:18 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
Success | Keyspace added successfully. | null
09-Sep-2012 22:59:21 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
Success | MarketData ColumnFamily created successfully | null
09-Sep-2012 22:59:24 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
Failed | Unable to create Column: Open | Cassandra encountered an internal error processing this request: TApplicationError type: 6 message:Internal error processing system_update_column_family
09-Sep-2012 22:59:24 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
Success | Close Column upserted successfully | null
09-Sep-2012 22:59:26 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
Success | High Column upserted successfully | null
09-Sep-2012 22:59:27 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
Success | Low Column upserted successfully | null09-Sep-2012 22:59:29 io.cassandra.sdk.CassandraIoSDK constructAPIUrl

Success | Volume Column upserted successfully | null
09-Sep-2012 22:59:30 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
Success | AdjClose Column upserted successfully | null
Posting JSON: {"rowkeys":[{"rowkey":"18-05-2012","columns":[{"columnname":"Open","columnvalue":"533.96","ttl":0},{"columnname":"Close","columnvalue":"530.38","ttl":12000},{"columnname":"High","columnvalue":"543.41","ttl":12000},{"columnname":"Low","columnvalue":"522.18","ttl":12000},{"columnname":"Volume","columnvalue":"26125200","ttl":12000},{"columnname":"AdjClose","columnvalue":"530.12","ttl":12000}]}]}
09-Sep-2012 22:59:32 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
Success | Bulk upload successfull. | null
09-Sep-2012 22:59:32 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
{Volume=26125200, Open=533.96, Low=522.18, High=543.41, Close=530.38, AdjClose=530.12}
09-Sep-2012 22:59:32 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
Success | Keyspace dropped successfully. | null


Step 1./ The code creates a keyspace named AAPL using HTTP POST, url:
It uses KeySpaceAPI class with Token and AccountId as parameters for the constructor. Token is used as username, while AccountID is the password. (Remember: these attributes can be retrieved using heroku config command or via Heroku Admin console)

Step 2./ Then the code creates a column family called MarketData.It uses ColumnFamilyAPI – with the credentials mentioned above – and the REST url is

Step 3./ Then the code upserts the coumns called Open, Close, High, Low, Volume and AjdClose. It uses ColumnAPI – same credentials as we already know – and the REST url is where AAPL is the keyspace, MarketData is the column family and Open is the column.

Step 4./ Then the code prepares the data as name/value pairs (Open = “533.96”, Close = “530.38”, etc), defines a rowkey (“18-05-2012”) and the uses DataAPI postBulkData method to upload the data into DataAPI credentials are the same as above.

Step 5./ The code then fetches the data using HTTP GET with url: The response is in JSON format: {Volume=26125200, Open=533.96, Low=522.18, High=543.41, Close=530.38, AdjClose=530.12}

Step 6./ Finally the code destroys the keyspace using HTTP DELETE, url:


If you want  to try out a robust, highly available  Casssandra datastore without any upfront infrastructure investment and with an easy to use API, you can certainly have a closer look at on Heroku. It takes only a few minutes to start up and the APIs offer a simply REST based data management for Java, Ruby and PHP developers.

Cassandra and OpsCenter from Datastax

Cassandra – originally developed at Facebook – is another popular NoSQL database that combines Amazon’s Dynamo distributed systems technologies and Google’s Bigtable data model based on Column Families. It is designed for distributed data at large scale.Its key components are as follows:

Keyscape: it acts as a container for data, similar to RDBMS schema. This determines the replication parameters such as replication factor and replication placement strategy as we will see it later in this post. More details on replication placement strategy can be read here.

Column Familiy: within a keyscape you can have one or more column families. This is similar to tables in RDBMS world. They contain multiple columns which are referenced by row keys.

Column: it is the smallest increment of data. It is a tuple having a name, a value and and a timestamp.

Installing Cassandra from binaries
Datastax is the commercial leader in Apache Cassandra, they offer a complete big data platform (Enterprise Edition) built on Apace Cassandra as well as a free Community Edition. This post is based on the latter one. In 2012 they were listed among the Top10 Big Data startups.

Beside the Cassandra package they also offer a web-based management center  (Datastax OpsCenter), this can make Cassandra cluster management much easier than the command line based alternatives (e.g. cassandra-cli).

To download Datastax Community Edition, go to this link. Both the Datastax Community Server and the OpsCenter Community Edition are available in here.  As of this writing, The Cassandra Community Server version is 1.1.2 (dsc-cassandra-1.1.2-bin.tar.gz) and the OpsCenter is 2.1.1 (opscenter-2.1.1-free.tar.gz).

The installation is as simple as to unzip and untar the tarballs. Then you need to configure the cassandra instance by editing <Cassandra install diractory>/conf/cassandra.yaml file.

A few parameters that needed to be edited:

cluster_name: 'BigHadoop Cluster'
initial_token: 0
    # Addresses of hosts that are deemed contact points.
    # Cassandra nodes use this list of hosts to find each other and learn
    # the topology of the ring.  You must change this if you are running
    # multiple nodes!
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
          # seeds is actually a comma-delimited list of addresses.
          # Ex: ",,"
          - seeds: ""

My configuration had two nodes, the second node has a similar cassandra.yaml file except for the listen_address and the token. 

Token generation is explained in the Datastax documentation:

The second node configuration looks like:

token: 85070591730234615865843651857942052864

Since my nodes were running on AWS EC2, I also modified the endpoint-snitch which is used to map IP addresses into datacenters and racks, see more details in here.

endpoint_snitch: Ec2Snitch

Once these configuation changes have been applied, you can start up the cassandra server – in my case on both nodes:

$ cd 
$ bin/cassandra

Once the servers are up, you can validate if they formed a cluster using nodetool:

$  bin/nodetool -h localhost ring
Note: Ownership information does not include topology, please specify a keyspace.
Address         DC          Rack        Status State   Load            Owns                Token
                                                                                           85070591730234615865843651857942052864   eu-west     1c          Up     Normal  15.89 KB        50.00%              0    eu-west     1a          Up     Normal  20.22 KB        50.00%              85070591730234615865843651857942052864

Installing OpsCenter and OpcCenter agents

The next step is to install the OpsCenter (on one designated node) and the agents on all the nodes. This is again as simply as unzip and untar the tarball that we just downloaded from Datastax site and then edit opscenterd.conf

port = 8888
interface =

use_ssl = false

Note: I did not want to use SSL between the agents and the OpsCenter so I disabled it.

To start up the OpsCenter:

$ cd 
$ bin/opscenter

In fact, OpsCenter is a python twistd based webserver so you need to have python installed as well. Amazon AMI had python 2.6.7 preinstalled.

$ python -V
Python 2.6.7

OpsCenter also uses iostat which was not preinstalled on my instance, so I had to install sysstat package, too:

$ sudo yum install sysstat

You can also install the agents manually – that is what I did – or automatically, but you have to ensure that they are installed on every node that are members of the cluster. The agent is part of the OpsCenter tarball, it can be found under OpsCenter/agent directory.

To configure the agent you need to edit conf/address.yaml file:

$ cat address.yaml
stomp_interface: ""
use_ssl: 0

stopm_interface is the OpsCenter interface, while use_ssl: 0 indicates that we do not use SSL for agent communications.

Note: Cassandra and OpsCenter are using TCP ports that are not open by default on an AWS EC2 instance. You need to defined a special security group that opens the following ports: 7000/tcp, 9160/tcp, 8888/tcp, 61210/tcp and 61621/tcp. More details about how these ports are used can be found here.

Using Cassandra

The simplest way to start using Cassandra is its command line tool called cassandra-cli.

[ec2-user@ip-10-229-30-238 bin]$ ./cassandra-cli -h localhost -p 9160
Connected to: "BigHadoop Cluster" on localhost/9160
Welcome to Cassandra CLI version 1.1.2

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown] create keyspace AAPL;
Waiting for schema agreement...
... schemas agree across the cluster

[default@unknown] use AAPL;

[default@AAPL] update keyspace AAPL with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1};
Waiting for schema agreement...
... schemas agree across the cluster

[default@AAPL] create column family Marketdata;
Waiting for schema agreement...
... schemas agree across the cluster

These steps create a keyspace called AAPL, modify the replication parameters mentioned above (replication factor and placement strategy) and create a column family called Marketdata. Then we can use Set command to insert data and Get to retrieve them.

[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Open')] = utf8('533.96');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('High')] = utf8('543.41');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Low')] = utf8('522.18');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Close')] = utf8('530.38');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Volume')] = utf8('26125200');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('AdjClose')] = utf8('530.12');

[default@AAPL] get Marketdata[utf8('18/05/2012')];
=> (column=41646a436c6f7365, value=3533302e3132, timestamp=1344110379812000)      // This is AdjClose   :  530.12
=> (column=436c6f7365, value=3533302e3338, timestamp=1344110378828000)            // This is Close      :  530.38
=> (column=48696768, value=3534332e3431, timestamp=1344110364687000)              // This is High       :  543.41
=> (column=4c6f77, value=3532322e3138, timestamp=1344110373422000)                // This is Low        :  522.18
=> (column=4f70656e, value=3533332e3936, timestamp=1344110350410000)              // This is Open       :  533.96
=> (column=566f6c756d65, value=3236313235323030, timestamp=1344110378832000)      // This is Volume     :  26125200
Returned 6 results.
Elapsed time: 22 msec(s).

Besides the ‘traditional command line interface’, there is also a SQLPlus-like utility known as Cassandra Query Language Shell (cqlsh). This is a utility written in python that supports SQL-like queries (a kind of Hive analogy from Hadoop world).

It supports DDL and DML type of commands so you can run SELECT and INSERT statements as well as CREATE KEYSPACE, CREATE TABLE, ALTER TABLE and DROP TABLE.

$ bin/cqlsh
Connected to BigHadoop Cluster at localhost:9160.
[cqlsh 2.2.0 | Cassandra 1.1.2 | CQL spec 2.0.0 | Thrift protocol 19.32.0]
Use HELP for help.
cqlsh> use AAPL;
cqlsh:AAPL> select * from Marketdata;
 KEY                  | 41646a436c6f7365 | 436c6f7365   | 48696768     | 4c6f77       | 4f70656e     | 566f6c756d65
 31382f30352f32303132 |     3533302e3132 | 3533302e3338 | 3534332e3431 | 3532322e3138 | 3533332e3936 | 3236313235323030

cqlsh:AAPL>insert into Marketdata (KEY, '41646a436c6f7365', '436c6f7365', '48696768', '4c6f77', '4f70656e', '566f6c756d65') values ('31372f30352f32313132', '3533302e3132', '3533302e3132', '3534372e35', '3533302e3132','3534352e3331', '3235353832323030') using ttl 86440;

cqlsh:AAPL> select * from Marketdata;
 KEY                  | 41646a436c6f7365 | 436c6f7365   | 48696768     | 4c6f77       | 4f70656e     | 566f6c756d65
 31382f30352f32303132 |     3533302e3132 | 3533302e3338 | 3534332e3431 | 3532322e3138 | 3533332e3936 | 3236313235323030
 31372f30352f32313132 |     3533302e3132 | 3533302e3132 |   3534372e35 | 3533302e3132 | 3534352e3331 | 3235353832323030

Monitoring Cassandra Cluster using OpsCenter

Datastax OpsCenter provides a web-based management tool to configure and monitor Cassandra clusters.

To start OpsCenter in a web browser, just go to http://hostname:8888  and then enter the IP address/hostname of the Cassandra nodes

OpsCenter shows the dashboard:

It can also visualize the cluster ring:

You can create a keyspace via the OpsCenter or if it is created using the command line utility, it can retrieve the data model:

Using the Data Explorer menu you can retrieve the row keys and the data stored in the keyspace: