Big Data on Heroku – Hadoop from Treasure Data


       

This time I write about Heroku and Treasure Data Hadoop solution – I found it really to be  a gem in the Big Data world.

Heroku is a cloud platform as a service (PaaS) owned by Salesforce.com. Originally it started with supporting Ruby as its main programming language but it has been extended to Java, Scala, Node.js, Python and Clojure, too. It also supports a long list of addons including – among others –  RDBMS and NoSQL capabilities and Hadoop-based data warehouse developed by Treasure Data.

Treasure Data Hadoop Architecture

The architecture of Treasure Data Hadoop solution is as as follows:

Heroku Toolbelt

Heroku toolbelt is a command line tooling that consists of heroku, foreman and git packages. As it is described on heroku toolbelt website: it is “everything you need to get started using heroku”. (heroku CLI is based on ruby so you need ruby under the hood, too). Once you have signed up for heroku (you need a verified account meaning that you provided your bank details for potential service charges) and you have installed the heroku toolbelt, you can start right away.

Depending on you environment – I am using Ubuntu 12.04 LTS – you can use alternative installation method like:

$ sudo apt-get install git
$ gem install heroku
$ gem install foreman

Heroku and Treasure Data add-on

If you want to use Treasure Data on Heroku, you need to add the Treasure Data Hadoop add-on –  you need to login, create an application (heroku will generate a fancy name like boiling-tundra for you) and then you need to add your particular add-on to the application you just created:

$ heroku login
Enter your Heroku credentials.
Email: xxx@mail.com
Password (typing will be hidden): 
Found existing public key: /home/istvan/.ssh/id_dsa.pub
Uploading SSH public key /home/istvan/.ssh/id_dsa.pub... done
Authentication successful.

$ heroku create
Creating boiling-tundra-1234... done, stack is cedar
http://boiling-tundra-1234.herokuapp.com/ | git@heroku.com:boiling-tundra-1234.git

$ heroku addons:add treasure-data:nano --app boiling-tundra-1234
Adding treasure-data:nano on boiling-tundra-1234... done, v2 (free)
Use `heroku addons:docs treasure-data:nano` to view documentation.

I just love the coloring scheme and the graphics used in heroku console, it is simply brilliant.

Treasure Data toolbelt

To manage Treaure Data Hadoop on Heroku you need to install Treasure Data toolbelt – it fits very much to heroku CLI,  it is also based on ruby:

$ gem install td

Then you need to install heroku plugin to support heroku commands:

$ heroku plugins:install https://github.com/treasure-data/heroku-td.git
Installing heroku-td... done

To verify that everything is fine, just run:

$ heroku plugins
=== Installed Plugins
heroku-td

and

$ heroku td
usage: heroku td [options] COMMAND [args]

options:
  -c, --config PATH                path to config file (~/.td/td.conf)
  -k, --apikey KEY                 use this API key instead of reading the config file
  -v, --verbose                    verbose mode
  -h, --help                       show help
...

Treasure Data Hadoop – td commands

Now we are ready to execute td commands from heroku. td commands are used to create database and tables, import data, run queries, drop tables, etc. Under the hood td commands are basically HiveQL queries. (According to their website, Treasure Data plans to support Pig as well in the future).

By default Treasure Data td-agent prefers json formatted data, though they can process various other formats (apache log, syslog, etc) and you can write your own parser to process the  uploaded data.

Thus I converted my AAPL stock data (again thanks to http://finance.yahoo.com) into json format:

{"time":"2012-08-20", "open":"650.01", "high":"665.15", "low":"649.90", "close":"665.15", "volume":"21876300", "adjclose":"665.15"}
{"time":"2012-08-17", "open":"640.00", "high":"648.19", "low":"638.81", "close":"648.11", "volume":"15812900", "adjclose":"648.11"}
{"time":"2012-08-16", "open":"631.21", "high":"636.76", "low":"630.50", "close":"636.34", "volume":"9090500", "adjclose":"634.64"}
{"time":"2012-08-15", "open":"631.30", "high":"634.00", "low":"625.75", "close":"630.83", "volume":"9190800", "adjclose":"630.83"}
{"time":"2012-08-14", "open":"631.87", "high":"638.61", "low":"630.21", "close":"631.69", "volume":"12148900", "adjclose":"631.69"}
{"time":"2012-08-13", "open":"623.39", "high":"630.00", "low":"623.25", "close":"630.00", "volume":"9958300", "adjclose":"630.00"}
{"time":"2012-08-10", "open":"618.71", "high":"621.76", "low":"618.70", "close":"621.70", "volume":"6962100", "adjclose":"621.70"}
{"time":"2012-08-09", "open":"617.85", "high":"621.73", "low":"617.80", "close":"620.73", "volume":"7915800", "adjclose":"620.73"}
{"time":"2012-08-08", "open":"619.39", "high":"623.88", "low":"617.10", "close":"619.86", "volume":"8739500", "adjclose":"617.21"}
{"time":"2012-08-07", "open":"622.77", "high":"625.00", "low":"618.04", "close":"620.91", "volume":"10373100", "adjclose":"618.26"}

The first step is to create the database called aapl:

$ heroku td db:create aapl --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /usr/local/heroku/lib/heroku/client.rb:129.
Database 'aapl' is created.

Then create the table called marketdata

$ heroku td table:create aapl marketdata --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /usr/local/heroku/lib/heroku/client.rb:129.
Table 'aapl.marketdata' is created.

Check if the tables has been created successfully:

$ heroku td tables --app boiling-tundra-1234
! DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
! DEPRECATED: More information available at https://github.com/heroku/heroku.rb
! DEPRECATED: Deprecated method called from /usr/local/heroku/lib/heroku/client.rb:129.
+----------+------------+------+-------+--------+
| Database | Table | Type | Count | Schema |
+----------+------------+------+-------+--------+
| aapl | marketdata | log | 0 | |
+----------+------------+------+-------+--------+
1 row in set

Import data:

$ heroku td table:import aapl marketdata --format json --time-key time aapl.json --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /home/istvan/.rvm/gems/ruby-1.9.2-p320/gems/heroku-2.30.3/lib/heroku/client.rb:129.
importing aapl.json...
  uploading 364 bytes...
  imported 10 entries from aapl.json.
done.

Check if the data import was successful – you shoud see count column indicating the number of rows loaded into the table:

$ heroku td tables --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /home/istvan/.rvm/gems/ruby-1.9.2-p320/gems/heroku-2.30.3/lib/heroku/client.rb:129.
+----------+------------+------+-------+--------+
| Database | Table      | Type | Count | Schema |
+----------+------------+------+-------+--------+
| aapl     | marketdata | log  | 10    |        |
+----------+------------+------+-------+--------+
1 row in set

Now we are ready to run HiveQL (td query) against the dataset – this particular query lists the highest prices of AAPL stock on the top and shows the prices in ascending order. (time value is based on UNIX epoch):

$ heroku td query -d aapl -w "SELECT v['time'] as time, v['high'] as high, v['low'] as low FROM marketdata ORDER BY high DESC" --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /home/istvan/.rvm/gems/ruby-1.9.2-p320/gems/heroku-2.30.3/lib/heroku/client.rb:129.
Job 757853 is queued.
Use 'heroku td job:show 757853' to show the status.
queued...
  started at 2012-08-21T21:06:54Z
  Hive history file=/mnt/hive/tmp/617/hive_job_log_617_201208212106_269570447.txt
  Total MapReduce jobs = 1
  Launching Job 1 out of 1
  Number of reduce tasks determined at compile time: 1
  In order to change the average load for a reducer (in bytes):
    set hive.exec.reducers.bytes.per.reducer=
  In order to limit the maximum number of reducers:
    set hive.exec.reducers.max=
  In order to set a constant number of reducers:
    set mapred.reduce.tasks=
  Starting Job = job_201207250829_556135, Tracking URL = http://domU-12-31-39-0A-56-11.compute-1.internal:50030/jobdetails.jsp?jobid=job_201207250829_556135
  Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=10.211.85.219:8021 -kill job_201207250829_556135
  2012-08-21 21:07:21,455 Stage-1 map = 0%,  reduce = 0%
  2012-08-21 21:07:28,480 Stage-1 map = 100%,  reduce = 0%
  2012-08-21 21:07:37,965 Stage-1 map = 100%,  reduce = 100%
  Ended Job = job_201207250829_556135
  OK
  MapReduce time taken: 42.536 seconds
  finished at 2012-08-21T21:07:53Z
  Time taken: 53.781 seconds
Status     : success
Result     :
+------------+--------+--------+
| time        | high   | low   |
+------------+--------+--------+
| 1345417200 | 665.15 | 649.90 |
| 1345158000 | 648.19 | 638.81 |
| 1344898800 | 638.61 | 630.21 |
| 1345071600 | 636.76 | 630.50 |
| 1344985200 | 634.00 | 625.75 |
| 1344812400 | 630.00 | 623.25 |
| 1344294000 | 625.00 | 618.04 |
| 1344380400 | 623.88 | 617.10 |
| 1344553200 | 621.76 | 618.70 |
| 1344466800 | 621.73 | 617.80 |
+------------+--------+--------+
10 rows in set

Finally you can delete the marketdata table:

$ heroku td table:delete aapl marketdata --app boiling-tundra-1234
 !    DEPRECATED: Heroku::Client#deprecate is deprecated, please use the heroku-api gem.
 !    DEPRECATED: More information available at https://github.com/heroku/heroku.rb
 !    DEPRECATED: Deprecated method called from /home/istvan/.rvm/gems/ruby-1.9.2-p320/gems/heroku-2.30.3/lib/heroku/client.rb:129.
Do you really delete 'marketdata' in 'aapl'? [y/N]: y
Table 'aapl.marketdata' is deleted.

More details on how to use Treasure Data Hadoop can be found at http://docs.treasure-data.com/articles/quickstart

Cassandra and OpsCenter from Datastax


Cassandra – originally developed at Facebook – is another popular NoSQL database that combines Amazon’s Dynamo distributed systems technologies and Google’s Bigtable data model based on Column Families. It is designed for distributed data at large scale.Its key components are as follows:

Keyscape: it acts as a container for data, similar to RDBMS schema. This determines the replication parameters such as replication factor and replication placement strategy as we will see it later in this post. More details on replication placement strategy can be read here.

Column Familiy: within a keyscape you can have one or more column families. This is similar to tables in RDBMS world. They contain multiple columns which are referenced by row keys.

Column: it is the smallest increment of data. It is a tuple having a name, a value and and a timestamp.

Installing Cassandra from binaries
Datastax is the commercial leader in Apache Cassandra, they offer a complete big data platform (Enterprise Edition) built on Apace Cassandra as well as a free Community Edition. This post is based on the latter one. In 2012 they were listed among the Top10 Big Data startups.

Beside the Cassandra package they also offer a web-based management center  (Datastax OpsCenter), this can make Cassandra cluster management much easier than the command line based alternatives (e.g. cassandra-cli).

To download Datastax Community Edition, go to this link. Both the Datastax Community Server and the OpsCenter Community Edition are available in here.  As of this writing, The Cassandra Community Server version is 1.1.2 (dsc-cassandra-1.1.2-bin.tar.gz) and the OpsCenter is 2.1.1 (opscenter-2.1.1-free.tar.gz).

The installation is as simple as to unzip and untar the tarballs. Then you need to configure the cassandra instance by editing <Cassandra install diractory>/conf/cassandra.yaml file.

A few parameters that needed to be edited:

cluster_name: 'BigHadoop Cluster'
initial_token: 0
listen_address: 10.229.30.238
seed_provider:
    # Addresses of hosts that are deemed contact points.
    # Cassandra nodes use this list of hosts to find each other and learn
    # the topology of the ring.  You must change this if you are running
    # multiple nodes!
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
      parameters:
          # seeds is actually a comma-delimited list of addresses.
          # Ex: ",,"
          - seeds: "10.229.30.238"
rpc_address: 0.0.0.0

My configuration had two nodes, the second node has a similar cassandra.yaml file except for the listen_address and the token. 

Token generation is explained in the Datastax documentation: http://www.datastax.com/docs/1.1/initialize/token_generation.

The second node configuration looks like:

listen_address: 10.226.42.81
token: 85070591730234615865843651857942052864

Since my nodes were running on AWS EC2, I also modified the endpoint-snitch which is used to map IP addresses into datacenters and racks, see more details in here.

endpoint_snitch: Ec2Snitch

Once these configuation changes have been applied, you can start up the cassandra server – in my case on both nodes:

$ cd 
$ bin/cassandra

Once the servers are up, you can validate if they formed a cluster using nodetool:

$  bin/nodetool -h localhost ring
Note: Ownership information does not include topology, please specify a keyspace.
Address         DC          Rack        Status State   Load            Owns                Token
                                                                                           85070591730234615865843651857942052864
10.229.30.238   eu-west     1c          Up     Normal  15.89 KB        50.00%              0
10.226.42.81    eu-west     1a          Up     Normal  20.22 KB        50.00%              85070591730234615865843651857942052864

Installing OpsCenter and OpcCenter agents

The next step is to install the OpsCenter (on one designated node) and the agents on all the nodes. This is again as simply as unzip and untar the tarball that we just downloaded from Datastax site and then edit opscenterd.conf

[webserver]
port = 8888
interface = 0.0.0.0

[agents]
use_ssl = false

Note: I did not want to use SSL between the agents and the OpsCenter so I disabled it.

To start up the OpsCenter:

$ cd 
$ bin/opscenter

In fact, OpsCenter is a python twistd based webserver so you need to have python installed as well. Amazon AMI had python 2.6.7 preinstalled.

$ python -V
Python 2.6.7

OpsCenter also uses iostat which was not preinstalled on my instance, so I had to install sysstat package, too:

$ sudo yum install sysstat

You can also install the agents manually – that is what I did – or automatically, but you have to ensure that they are installed on every node that are members of the cluster. The agent is part of the OpsCenter tarball, it can be found under OpsCenter/agent directory.

To configure the agent you need to edit conf/address.yaml file:

$ cat address.yaml
stomp_interface: "10.229.30.238"
use_ssl: 0

stopm_interface is the OpsCenter interface, while use_ssl: 0 indicates that we do not use SSL for agent communications.

Note: Cassandra and OpsCenter are using TCP ports that are not open by default on an AWS EC2 instance. You need to defined a special security group that opens the following ports: 7000/tcp, 9160/tcp, 8888/tcp, 61210/tcp and 61621/tcp. More details about how these ports are used can be found here.

Using Cassandra

The simplest way to start using Cassandra is its command line tool called cassandra-cli.

[ec2-user@ip-10-229-30-238 bin]$ ./cassandra-cli -h localhost -p 9160
Connected to: "BigHadoop Cluster" on localhost/9160
Welcome to Cassandra CLI version 1.1.2

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown] create keyspace AAPL;
f23e7e5e-22d2-3416-91dc-1cba8276f57d
Waiting for schema agreement...
... schemas agree across the cluster

[default@unknown] use AAPL;
Authenticated 

[default@AAPL] update keyspace AAPL with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1};
f56204f3-89a7-3bd2-8dd0-695e43444b36
Waiting for schema agreement...
... schemas agree across the cluster

[default@AAPL] create column family Marketdata;
c6bb29df-dc38-392e-8cd8-524e4c0ae026
Waiting for schema agreement...
... schemas agree across the cluster

These steps create a keyspace called AAPL, modify the replication parameters mentioned above (replication factor and placement strategy) and create a column family called Marketdata. Then we can use Set command to insert data and Get to retrieve them.

[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Open')] = utf8('533.96');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('High')] = utf8('543.41');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Low')] = utf8('522.18');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Close')] = utf8('530.38');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('Volume')] = utf8('26125200');
[default@AAPL] set Marketdata[utf8('18/05/2012')][utf8('AdjClose')] = utf8('530.12');

[default@AAPL] get Marketdata[utf8('18/05/2012')];
=> (column=41646a436c6f7365, value=3533302e3132, timestamp=1344110379812000)      // This is AdjClose   :  530.12
=> (column=436c6f7365, value=3533302e3338, timestamp=1344110378828000)            // This is Close      :  530.38
=> (column=48696768, value=3534332e3431, timestamp=1344110364687000)              // This is High       :  543.41
=> (column=4c6f77, value=3532322e3138, timestamp=1344110373422000)                // This is Low        :  522.18
=> (column=4f70656e, value=3533332e3936, timestamp=1344110350410000)              // This is Open       :  533.96
=> (column=566f6c756d65, value=3236313235323030, timestamp=1344110378832000)      // This is Volume     :  26125200
Returned 6 results.
Elapsed time: 22 msec(s).

Besides the ‘traditional command line interface’, there is also a SQLPlus-like utility known as Cassandra Query Language Shell (cqlsh). This is a utility written in python that supports SQL-like queries (a kind of Hive analogy from Hadoop world).

It supports DDL and DML type of commands so you can run SELECT and INSERT statements as well as CREATE KEYSPACE, CREATE TABLE, ALTER TABLE and DROP TABLE.

$ bin/cqlsh
Connected to BigHadoop Cluster at localhost:9160.
[cqlsh 2.2.0 | Cassandra 1.1.2 | CQL spec 2.0.0 | Thrift protocol 19.32.0]
Use HELP for help.
cqlsh> use AAPL;
cqlsh:AAPL> select * from Marketdata;
 KEY                  | 41646a436c6f7365 | 436c6f7365   | 48696768     | 4c6f77       | 4f70656e     | 566f6c756d65
----------------------+------------------+--------------+--------------+--------------+--------------+------------------
 31382f30352f32303132 |     3533302e3132 | 3533302e3338 | 3534332e3431 | 3532322e3138 | 3533332e3936 | 3236313235323030

cqlsh:AAPL>insert into Marketdata (KEY, '41646a436c6f7365', '436c6f7365', '48696768', '4c6f77', '4f70656e', '566f6c756d65') values ('31372f30352f32313132', '3533302e3132', '3533302e3132', '3534372e35', '3533302e3132','3534352e3331', '3235353832323030') using ttl 86440;

cqlsh:AAPL> select * from Marketdata;
 KEY                  | 41646a436c6f7365 | 436c6f7365   | 48696768     | 4c6f77       | 4f70656e     | 566f6c756d65
----------------------+------------------+--------------+--------------+--------------+--------------+------------------
 31382f30352f32303132 |     3533302e3132 | 3533302e3338 | 3534332e3431 | 3532322e3138 | 3533332e3936 | 3236313235323030
 31372f30352f32313132 |     3533302e3132 | 3533302e3132 |   3534372e35 | 3533302e3132 | 3534352e3331 | 3235353832323030

Monitoring Cassandra Cluster using OpsCenter

Datastax OpsCenter provides a web-based management tool to configure and monitor Cassandra clusters.

To start OpsCenter in a web browser, just go to http://hostname:8888  and then enter the IP address/hostname of the Cassandra nodes

OpsCenter shows the dashboard:

It can also visualize the cluster ring:

You can create a keyspace via the OpsCenter or if it is created using the command line utility, it can retrieve the data model:

Using the Data Explorer menu you can retrieve the row keys and the data stored in the keyspace: