Heroku and Cassandra – Cassandra.io RESTful APIs


Introduction

Last time I wrote about Hadoop on Heroku which is on add-on from Treasure Data  – this time I am going to cover NoSQL on Heroku.
There are various datastore services – add-ons in Heroku terms – available from MongoDB (MongoHQ) to CouchDB (Cloudant) to Cassandra (Cassandra.io). This post is devoted to Cassandra.io.

Cassandra.io

Cassandra.io is a hosted and managed Cassandra ring based on Apache Cassandra and makes it accessible via RESTful API. As of writing this article, the Cassandra.io client helper libraries are available in Java, Ruby and PHP, and there is also a Objective-C version in private beta. The libraries can be downloaded from github. I use the Java library in my tests.

Heroku – and Cassandra.io add-on, too – is built on Amazon Elastic Compute Cloud (EC2) and it is supported in all Amazon’s locations. Note: Cassandra.io add-on is in public beta now that means you have only one option called Test available – this is free.

Installing Cassandra.io add-on

To install Cassandra.io add-on you just need to follow the standard way of adding an add-on to an application:

$ heroku login
Enter your Heroku credentials.
Email: address@example.com
Password (typing will be hidden): 
Authentication successful.

$ heroku create
Creating glacial-badlands-1234... done, stack is cedar
http://glacial-badlands-1234.herokuapp.com/ | git@heroku.com:glacial-badlands-1234.git

$ heroku addons:add cassandraio:test --app glacial-badlands-1234
Adding cassandraio:test on glacial-badlands-1234... done, v2 (free)
Use `heroku addons:docs cassandraio:test` to view documentation.

You can check the configuration via Heroku admin console:

Then you need to clone the client helper libraries from github:

$ git clone https://github.com/m2mIO/cassandraio-client-libraries.git

In case of Java client library, you need Google gson library (gson-2.0.jar), too.

Writing Cassandra.io application

The java RESTful API library has one simple configuration  file called sdk.properties. It has very few parameters stored in it – the API url and the version. The original sdk.properties file that is cloned from github has the version wrong (v0.1), it needs to be changed to 1.  You can verify the required configuration parameters using heroku config command.

$ heroku config --app glacial-badlands-1234
=== glacial-badlands-1234 Config Vars
CASSANDRAIO_URL: https://Token:AccountId@api.cassandra.io/1/welcome

You can check the same config parameters from Heroku admin console, thought the URL is misleading:

The sdk.properties file should look like this:

apiUrl = https://api.cassandra.io
version = 1

The Java code – CassandraIOTest.java – is like this:

package io.cassandra.tests;

import java.util.ArrayList;
import java.util.List;

import io.cassandra.sdk.StatusMessageModel;
import io.cassandra.sdk.column.ColumnAPI;
import io.cassandra.sdk.columnfamily.ColumnFamilyAPI;
import io.cassandra.sdk.constants.APIConstants;
import io.cassandra.sdk.data.DataAPI;
import io.cassandra.sdk.data.DataBulkModel;
import io.cassandra.sdk.data.DataColumn;
import io.cassandra.sdk.data.DataMapModel;
import io.cassandra.sdk.data.DataRowkey;
import io.cassandra.sdk.keyspace.KeyspaceAPI;

public class CassandraIOTest {
    // credentials
	private static String TOKEN = "<Token>";
	private static String ACCOUNTID = "<AccountId>";

	// data 
	private static String KS = "AAPL";
	private static String CF = "MarketData";
	private static String COL1 = "Open";
	private static String COL2 = "Close";
	private static String COL3 = "High";
	private static String COL4 = "Low";
	private static String COL5 = "Volume";
	private static String COL6 = "AdjClose";
	private static String RK = "18-05-2012";

	public static void main(String[] args) {	
		try {
			StatusMessageModel sm;

			// Create Keyspace
			KeyspaceAPI keyspaceAPI = new KeyspaceAPI(APIConstants.API_URL, TOKEN, ACCOUNTID);
			sm = keyspaceAPI.createKeyspace(KS);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
				+ sm.getError());

		        // Create ColumnFamily
			ColumnFamilyAPI columnFamilyAPI = new ColumnFamilyAPI(APIConstants.API_URL, TOKEN,
					ACCOUNTID);
			sm = columnFamilyAPI.createColumnFamily(KS, CF,
					APIConstants.COMPARATOR_UTF8);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());

			 // Add Columns (High, Low, Open, Close, Volume, AdjClose)
			ColumnAPI columnAPI = new ColumnAPI(APIConstants.API_URL, TOKEN, ACCOUNTID);
			sm = columnAPI.upsertColumn(KS, CF, COL1,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL2,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL3,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL4,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL5,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
			sm = columnAPI.upsertColumn(KS, CF, COL6,
					APIConstants.COMPARATOR_UTF8, true);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());

			//Add Bulk Data
			DataAPI dataAPI = new DataAPI(APIConstants.API_URL, TOKEN, ACCOUNTID);

			List columns = new ArrayList();
			DataColumn dc = new DataColumn(COL1, "533.96");
			columns.add(dc);
			dc = new DataColumn(COL2, "530.38", 12000);
			columns.add(dc);
			dc = new DataColumn(COL3, "543.41", 12000);
			columns.add(dc);
			dc = new DataColumn(COL4, "522.18", 12000);
			columns.add(dc);
			dc = new DataColumn(COL5, "26125200", 12000);
			columns.add(dc);
			dc = new DataColumn(COL6, "530.12", 12000);
			columns.add(dc);

			List rows = new ArrayList();
			DataRowkey row = new DataRowkey(RK, columns);
			rows.add(row);

			DataBulkModel dataBulk = new DataBulkModel(rows);

			sm = dataAPI.postBulkData(KS, CF, dataBulk);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());

			// Get Data
			DataMapModel dm = dataAPI.getData(KS, CF, RK, 0, null);
			System.out.println(dm.toString());			

			// Delete Keyspace
			sm = keyspaceAPI.deleteKeyspace(KS);
			System.out.println(sm.getMessage() + " | " + sm.getDetail() + " | "
					+ sm.getError());
		}
		catch(Exception e) {
			System.out.println(e.getMessage());
		}

	}
}

The runtime result is:

09-Sep-2012 22:59:18 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/keyspace/AAPL/
Success | Keyspace added successfully. | null
09-Sep-2012 22:59:21 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/columnfamily/AAPL/MarketData/UTF8Type/
Success | MarketData ColumnFamily created successfully | null
09-Sep-2012 22:59:24 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/Open/UTF8Type/?isIndex=true
Failed | Unable to create Column: Open | Cassandra encountered an internal error processing this request: TApplicationError type: 6 message:Internal error processing system_update_column_family
09-Sep-2012 22:59:24 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/Close/UTF8Type/?isIndex=true
Success | Close Column upserted successfully | null
09-Sep-2012 22:59:26 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/High/UTF8Type/?isIndex=true
Success | High Column upserted successfully | null
09-Sep-2012 22:59:27 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/Low/UTF8Type/?isIndex=true
Success | Low Column upserted successfully | null09-Sep-2012 22:59:29 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/Volume/UTF8Type/?isIndex=true

Success | Volume Column upserted successfully | null
09-Sep-2012 22:59:30 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/column/AAPL/MarketData/AdjClose/UTF8Type/?isIndex=true
Success | AdjClose Column upserted successfully | null
Posting JSON: {"rowkeys":[{"rowkey":"18-05-2012","columns":[{"columnname":"Open","columnvalue":"533.96","ttl":0},{"columnname":"Close","columnvalue":"530.38","ttl":12000},{"columnname":"High","columnvalue":"543.41","ttl":12000},{"columnname":"Low","columnvalue":"522.18","ttl":12000},{"columnname":"Volume","columnvalue":"26125200","ttl":12000},{"columnname":"AdjClose","columnvalue":"530.12","ttl":12000}]}]}
09-Sep-2012 22:59:32 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/data/AAPL/MarketData/
Success | Bulk upload successfull. | null
09-Sep-2012 22:59:32 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/data/AAPL/MarketData/18-05-2012/
{Volume=26125200, Open=533.96, Low=522.18, High=543.41, Close=530.38, AdjClose=530.12}
09-Sep-2012 22:59:32 io.cassandra.sdk.CassandraIoSDK constructAPIUrl
INFO: API URL: https://api.cassandra.io/1/keyspace/AAPL/
Success | Keyspace dropped successfully. | null

Analysis

Step 1./ The code creates a keyspace named AAPL using HTTP POST, url: https://api.cassandra.io/1/keyspace/AAPL/
It uses KeySpaceAPI class with Token and AccountId as parameters for the constructor. Token is used as username, while AccountID is the password. (Remember: these attributes can be retrieved using heroku config command or via Heroku Admin console)

Step 2./ Then the code creates a column family called MarketData.It uses ColumnFamilyAPI – with the credentials mentioned above – and the REST url is https://api.cassandra.io/1/columnfamily/AAPL/MarketData/UTF8Type/.

Step 3./ Then the code upserts the coumns called Open, Close, High, Low, Volume and AjdClose. It uses ColumnAPI – same credentials as we already know – and the REST url is https://api.cassandra.io/1/column/AAPL/MarketData/Open/UTF8Type/?isIndex=true where AAPL is the keyspace, MarketData is the column family and Open is the column.

Step 4./ Then the code prepares the data as name/value pairs (Open = “533.96”, Close = “530.38”, etc), defines a rowkey (“18-05-2012”) and the uses DataAPI postBulkData method to upload the data into Cassandra.io. DataAPI credentials are the same as above.

Step 5./ The code then fetches the data using HTTP GET with url: https://api.cassandra.io/1/data/AAPL/MarketData/18-05-2012/. The response is in JSON format: {Volume=26125200, Open=533.96, Low=522.18, High=543.41, Close=530.38, AdjClose=530.12}

Step 6./ Finally the code destroys the keyspace using HTTP DELETE, url: https://api.cassandra.io/1/keyspace/AAPL/.

Summary

If you want  to try out a robust, highly available  Casssandra datastore without any upfront infrastructure investment and with an easy to use API, you can certainly have a closer look at Cassandra.io on Heroku. It takes only a few minutes to start up and the APIs offer a simply REST based data management for Java, Ruby and PHP developers.

Cassandra and OpsCenter from Datastax


Cassandra – originally developed at Facebook – is another popular NoSQL database that combines Amazon’s Dynamo distributed systems technologies and Google’s Bigtable data model based on Column Families. It is designed for distributed data at large scale.Its key components are as follows:

Keyscape: it acts as a container for data, similar to RDBMS schema. This determines the replication parameters such as replication factor and replication placement strategy as we will see it later in this post. More details on replication placement strategy can be read here.

Column Familiy: within a keyscape you can have one or more column families. This is similar to tables in RDBMS world. They contain multiple columns which are referenced by row keys.

Column: it is the smallest increment of data. It is a tuple having a name, a value and and a timestamp.

Installing Cassandra from binaries
Datastax is the commercial leader in Apache Cassandra, they offer a complete big data platform (Enterprise Edition) built on Apace Cassandra as well as a free Community Edition. This post is based on the latter one. In 2012 they were listed among the Top10 Big Data startups.

Beside the Cassandra package they also offer a web-based management center  (Datastax OpsCenter), this can make Cassandra cluster management much easier than the command line based alternatives (e.g. cassandra-cli).

To download Datastax Community Edition, go to this link. Both the Datastax Community Server and the OpsCenter Community Edition are available in here.  As of this writing, The Cassandra Community Server version is 1.1.2 (dsc-cassandra-1.1.2-bin.tar.gz) and the OpsCenter is 2.1.1 (opscenter-2.1.1-free.tar.gz).

The installation is as simple as to unzip and untar the tarballs. Then you need to configure the cassandra instance by editing <Cassandra install diractory>/conf/cassandra.yaml file.

A few parameters that needed to be edited:

cluster_name: 'BigHadoop Cluster'
initial_token: 0
listen_address: 10.229.30.238
seed_provider:
    # Addresses of hosts that are deemed contact points.
    # Cassandra nodes use this list of hosts to find each other and learn
    # the topology of the ring.  You must change this if you are running
    # multiple nodes!
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
      parameters:
          # seeds is actually a comma-delimited list of addresses.
          # Ex: ",,"
          - seeds: "10.229.30.238"
rpc_address: 0.0.0.0

My configuration had two nodes, the second node has a similar cassandra.yaml file except for the listen_address and the token. 

Token generation is explained in the Datastax documentation: http://www.datastax.com/docs/1.1/initialize/token_generation.

The second node configuration looks like:

listen_address: 10.226.42.81
token: 85070591730234615865843651857942052864

Since my nodes were running on AWS EC2, I also modified the endpoint-snitch which is used to map IP addresses into datacenters and racks, see more details in here.

endpoint_snitch: Ec2Snitch

Once these configuation changes have been applied, you can start up the cassandra server – in my case on both nodes:

$ cd 
$ bin/cassandra

Once the servers are up, you can validate if they formed a cluster using nodetool:

$  bin/nodetool -h localhost ring
Note: Ownership information does not include topology, please specify a keyspace.
Address         DC          Rack        Status State   Load            Owns                Token
                                                                                           85070591730234615865843651857942052864
10.229.30.238   eu-west     1c          Up     Normal  15.89 KB        50.00%              0
10.226.42.81    eu-west     1a          Up     Normal  20.22 KB        50.00%              85070591730234615865843651857942052864

Installing OpsCenter and OpcCenter agents

The next step is to install the OpsCenter (on one designated node) and the agents on all the nodes. This is again as simply as unzip and untar the tarball that we just downloaded from Datastax site and then edit opscenterd.conf

[webserver]
port = 8888
interface = 0.0.0.0

[agents]
use_ssl = false

Note: I did not want to use SSL between the agents and the OpsCenter so I disabled it.

To start up the OpsCenter:

$ cd 
$ bin/opscenter

In fact, OpsCenter is a python twistd based webserver so you need to have python installed as well. Amazon AMI had python 2.6.7 preinstalled.

$ python -V
Python 2.6.7

OpsCenter also uses iostat which was not preinstalled on my instance, so I had to install sysstat package, too:

$ sudo yum install sysstat

You can also install the agents manually – that is what I did – or automatically, but you have to ensure that they are installed on every node that are members of the cluster. The agent is part of the OpsCenter tarball, it can be found under OpsCenter/agent directory.

To configure the agent you need to edit conf/address.yaml file:

$ cat address.yaml
stomp_interface: "10.229.30.238"
use_ssl: 0

stopm_interface is the OpsCenter interface, while use_ssl: 0 indicates that we do not use SSL for agent communications.

Note: Cassandra and OpsCenter are using TCP ports that are not open by default on an AWS EC2 instance. You need to defined a special security group that opens the following ports: 7000/tcp, 9160/tcp, 8888/tcp, 61210/tcp and 61621/tcp. More details about how these ports are used can be found here.

Using Cassandra

The simplest way to start using Cassandra is its command line tool called cassandra-cli.

[ec2-user@ip-10-229-30-238 bin]$ ./cassandra-cli -h localhost -p 9160
Connected to: "BigHadoop Cluster" on localhost/9160
Welcome to Cassandra CLI version 1.1.2

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown] create keyspace AAPL;
f23e7e5e-22d2-3416-91dc-1cba8276f57d
Waiting for schema agreement...
... schemas agree across the cluster

[default@unknown] use AAPL;
Authenticated 

[default@AAPL] update keyspace AAPL with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1};
f56204f3-89a7-3bd2-8dd0-695e43444b36
Waiting for schema agreement...
... schemas agree across the cluster

[default@AAPL] create column family Marketdata;
c6bb29df-dc38-392e-8cd8-524e4c0ae026
Waiting for schema agreement...
... schemas agree across the cluster

These steps create a keyspace called AAPL, modify the replication parameters mentioned above (replication factor and placement strategy) and create a column family called Marketdata. Then we can use Set command to insert data and Get to retrieve them.

[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Open')] = utf8('533.96');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('High')] = utf8('543.41');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Low')] = utf8('522.18');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Close')] = utf8('530.38');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Volume')] = utf8('26125200');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('AdjClose')] = utf8('530.12');

[default@AAPL] get Marketdata[utf8('18/05/2012')];
=> (column=41646a436c6f7365, value=3533302e3132, timestamp=1344110379812000)      // This is AdjClose   :  530.12
=> (column=436c6f7365, value=3533302e3338, timestamp=1344110378828000)            // This is Close      :  530.38
=> (column=48696768, value=3534332e3431, timestamp=1344110364687000)              // This is High       :  543.41
=> (column=4c6f77, value=3532322e3138, timestamp=1344110373422000)                // This is Low        :  522.18
=> (column=4f70656e, value=3533332e3936, timestamp=1344110350410000)              // This is Open       :  533.96
=> (column=566f6c756d65, value=3236313235323030, timestamp=1344110378832000)      // This is Volume     :  26125200
Returned 6 results.
Elapsed time: 22 msec(s).

Besides the ‘traditional command line interface’, there is also a SQLPlus-like utility known as Cassandra Query Language Shell (cqlsh). This is a utility written in python that supports SQL-like queries (a kind of Hive analogy from Hadoop world).

It supports DDL and DML type of commands so you can run SELECT and INSERT statements as well as CREATE KEYSPACE, CREATE TABLE, ALTER TABLE and DROP TABLE.

$ bin/cqlsh
Connected to BigHadoop Cluster at localhost:9160.
[cqlsh 2.2.0 | Cassandra 1.1.2 | CQL spec 2.0.0 | Thrift protocol 19.32.0]
Use HELP for help.
cqlsh> use AAPL;
cqlsh:AAPL> select * from Marketdata;
 KEY                  | 41646a436c6f7365 | 436c6f7365   | 48696768     | 4c6f77       | 4f70656e     | 566f6c756d65
----------------------+------------------+--------------+--------------+--------------+--------------+------------------
 31382f30352f32303132 |     3533302e3132 | 3533302e3338 | 3534332e3431 | 3532322e3138 | 3533332e3936 | 3236313235323030

cqlsh:AAPL>insert into Marketdata (KEY, '41646a436c6f7365', '436c6f7365', '48696768', '4c6f77', '4f70656e', '566f6c756d65') values ('31372f30352f32313132', '3533302e3132', '3533302e3132', '3534372e35', '3533302e3132','3534352e3331', '3235353832323030') using ttl 86440;

cqlsh:AAPL> select * from Marketdata;
 KEY                  | 41646a436c6f7365 | 436c6f7365   | 48696768     | 4c6f77       | 4f70656e     | 566f6c756d65
----------------------+------------------+--------------+--------------+--------------+--------------+------------------
 31382f30352f32303132 |     3533302e3132 | 3533302e3338 | 3534332e3431 | 3532322e3138 | 3533332e3936 | 3236313235323030
 31372f30352f32313132 |     3533302e3132 | 3533302e3132 |   3534372e35 | 3533302e3132 | 3534352e3331 | 3235353832323030

Monitoring Cassandra Cluster using OpsCenter

Datastax OpsCenter provides a web-based management tool to configure and monitor Cassandra clusters.

To start OpsCenter in a web browser, just go to http://hostname:8888  and then enter the IP address/hostname of the Cassandra nodes

OpsCenter shows the dashboard:

It can also visualize the cluster ring:

You can create a keyspace via the OpsCenter or if it is created using the command line utility, it can retrieve the data model:

Using the Data Explorer menu you can retrieve the row keys and the data stored in the keyspace: