Apache Phoenix – an SQL Driver for HBase


Introduction

HBase is one of the most popular NoSQL databases, it is available in all major Hadoop distributions and also part of AWS Elastic MapReduce as an additional application. Out of the box it has its own data model operations such as Get, Put, Scan and Delete and it does not offer SQL-like capabilities, as oppose to, for instance, Cassandra query language, CQL.
Apache Phoenix is a SQL layer on top of HBase to support the most common SQL-like operations such as CREATE TABLE, SELECT, UPSERT, DELETE, etc. Originally it was developed by Salesforce.com engineers for internal use and was open sourced. In 2013 it became an Apache incubator project.

Architecture

We have covered HBase in more detail in this article. Just a quick recap: HBase architecture is based on three key components: HBase Master server, HBase Region Servers and Zookeeper.

HBase-Architecture

The client needs to find the RegionServers in order to work with the data stored in HBase. In essence, regions are the basic elements for distributing tables across the cluster. In order to find the Region servers, the client first will have to talk to Zookeeper.

HBase-Lookup

The key elements in the HBase datamodel are tables, column families, columns and rowkeys. The tables are made of columns and rows. The individual elements at the column and row intersections (cells in HBase term) are version based on timestamp. The rows are identified by rowkeys which are sorted – these rowkeys can be considered as primary keys and all the data in the table can be accessed via them.

The columns are grouped into column families; at table creation time you do not have to specify all the columns, only the column families. Columns have a prefix derived from the column family and its own qualifier,a column name looks like this: ‘contents:html’.

As we have seen, HBase classic data model is not designed with SQL in mind. Under the hood it is a sorted multidimensional Map. That is where Phoenix comes to the rescue; it offers a SQL skin on HBase. Phoenix is implemented as a JDBC driver. From architecture perspective a Java client using JDBC can be configured to work with Phoenix Driver and can connect to HBase using SQL-like statements. We will demonstrate how to use SQuirreL client, a popular Java-based graphical SQL client together with Phoenix.

Getting Started with Phoenix

You can download Phoenix from Apache download site. Different Phoenix versions are compatible with different HBase versions, so please, read Phoenix documentation to ensure you have the correct setup. In our tests we used Phoenix 3.0.0 with HBase 0.94, the Hadoop distribution was Cloudera CDH4.4 with Hadoop v1.. The Phoenix package contains both Hadoop version 1 and version 2 drivers for the clients so we had to use the appropriate Hadoop-1 files, see the details later on when talking about SQuirreL client.

Once you unzipped the downloaded Phoenix package, you need to copy the relevant Phoenix jar files to the HBase region servers in order to ensure that the Phoenix client can communicate with them, otherwise you may get an error message saying that the client and server jars are not compatible.

$ cd ~/phoenix/phoenix-3.0.0-incubating/common
$ cp phoenix-3.0.0-incubating-client-minimal.jar  /usr/lib/hbase/lib
$ cp phoenix-core-3.0.0-incubating.jar /usr/lib/hbase/lib

After you copied the jar files to the region servers, we had to restart them.

Phoenix provides a command line tool called sqlline – it is a utility written in Python. Its functionality is similar to Oracle SQLPlus or MySQL command line tools; not too sophisticated but does the job for simply use cases.

Before you start using sqlline, you can create a sample database table, populate it and run some simple queries as follows:

$ cd ~/phoenix/phoenix-3.0.0.0-incubating/bin
$ ./psql.py localhost ../examples/web_stat.sql ../examples/web_stat.csv ../examples/web_stat_queries.sql

This will run a CREATE TABLE statement:

CREATE TABLE IF NOT EXISTS WEB_STAT (
     HOST CHAR(2) NOT NULL,
     DOMAIN VARCHAR NOT NULL,
     FEATURE VARCHAR NOT NULL,
     DATE DATE NOT NULL,
     USAGE.CORE BIGINT,
     USAGE.DB BIGINT,
     STATS.ACTIVE_VISITOR INTEGER
     CONSTRAINT PK PRIMARY KEY (HOST, DOMAIN, FEATURE, DATE)
);

Then load the data stored in the web_stat CSV file:

NA,Salesforce.com,Login,2013-01-01 01:01:01,35,42,10
EU,Salesforce.com,Reports,2013-01-02 12:02:01,25,11,2
EU,Salesforce.com,Reports,2013-01-02 14:32:01,125,131,42
NA,Apple.com,Login,2013-01-01 01:01:01,35,22,40
NA,Salesforce.com,Dashboard,2013-01-03 11:01:01,88,66,44
...

And the run a few sample queries on the table, e.g.:

-- Average CPU and DB usage by Domain
SELECT DOMAIN, AVG(CORE) Average_CPU_Usage, AVG(DB) Average_DB_Usage 
FROM WEB_STAT 
GROUP BY DOMAIN 
ORDER BY DOMAIN DESC;

Now you can connect to HBase using sqlline:

$ ./sqlline.py localhost
[cloudera@localhost bin]$ ./sqlline.py localhost
..
Connecting to jdbc:phoenix:localhost
Driver: org.apache.phoenix.jdbc.PhoenixDriver (version 3.0)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
..
Done
sqlline version 1.1.2
0: jdbc:phoenix:localhost> select count(*) from web_stat;
+------------+
|  COUNT(1)  |
+------------+
| 39         |
+------------+
1 row selected (0.112 seconds)
0: jdbc:phoenix:localhost> select host, sum(active_visitor) from web_stat group by host;
+------+---------------------------+
| HOST | SUM(STATS.ACTIVE_VISITOR) |
+------+---------------------------+
| EU   | 698                       |
| NA   | 1639                      |
+------+---------------------------+
2 rows selected (0.294 seconds)
0: jdbc:phoenix:localhost>

Using SQuirreL with Phoenix

If you prefer to use a graphical SQL client with Phoenix, you can download e.g. SQuirreL from here. After that the first step is to copy the appropriate Phoenix driver jar file to SQuirreL lib directory:

$ cd ~/phoenix
$ cp phoenix-3.0.0-incubating/hadoop-1/phoenix-3.0.0.-incubatibg-client.jar ~/squirrel/lib

Now you are ready to configure the JDBC driver in SQuirreL client, as shown in the picture below:

Squirrel-1

Then you can connect to Phoenix using the appropriate connect string (jdbc:phoenix:localhost in our test scenario):

Squirrel-2

Once connected, you can start executing your SQL queries:
Squirrel-3

Phoenix on Amazon Web Services – AWS Elastic MapReduce with Phoenix

You can also use Phoenix with AWS Elastic MapReduce. When you create a cluster, you need to specify Apach Hadoop version, then configure HBase as additional application and define the bootsrap action to load Phoenix onto your AWS EMR cluster. See the details below in the pictures:

AWS-EMR-3

AWS-EMR-5

Once the cluster is running, you can login to the master node using ssh and check your Phoenix configuration.
AWS-EMR-9

Conclusion

SQL is one of the most popular languages used by data scientists and it is likely to remain so. With the advent of Big Data and NoSQL databases the volume, variety and velocity of the data have significantly increased but still the demand for traditional, well-known languages to process them did not change too much. SQL on Hadoop solutions are gaining momentum. Apache Phoenix is interesting open source player to offer SQL layer on top of HBase.

HBase and Hadoop


HBase is a NoSQL database. It is based on Google’s Bigtable distributed storage system – as it is described in Google research paper; “A Bigtable is a sparse, distributed, persistent multi-dimensional sorted map. The map is indexed by a row key, column key, and a timestamp; each value in the map is an uninterpreted array of bytes.” If you want to have a detailed explanation what each word means in this scary definition, I suggest to check this post out.

HBase supports scaling far beyond traditional RDBMS capabilities, it supports automatic sharding and  massive parallel processing capabilities via Mapreduce. HBase is built on top of HDFS and provides fast lookups for large records. See more details about HBase Architecture here.

HBase can be used as data source as well as data sink for Mapreduce jobs. Our example in this post will use HBase as data sink. If you are interested in other examples, have a look at Hadoop wiki, HBase as MapRedude job data source and sink.

HBase distributed storage for stock price information

The example is going to process Apple stock prices downloaded from Yahoo finance web site, this is the same dataset – Apple stock prices –  that we used previously to demonstrate Hive capabilities on Amazon Elastic MapReduce.  It is stored in an AWS S3 bucket called stockprice. The MapReduce job will retrieve the file from there using s3n://AWS Access Key ID:AWS Secret Access Key//bucket/object  url and will store the output in a HBase table called aapl_marketdata. The test environment was based on Hadoop-0.20.2 and HBase-0.90.6.

 Before the MapReduce job can be run, the table needs to be created

$ bin/hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 0.90.6, r1295128, Wed Feb 29 14:29:21 UTC 2012

hbase(main):005:0> create 'aapl_marketdata', 'marketdata'
0 row(s) in 1.4290 seconds

hbase(main):001:0> list
TABLE
aapl_marketdata
1 row(s) in 0.3950 seconds

Now we are ready to run the MapReduce job. It is advisable to have a driver script to run your job and set all the required arguments in there for easier configuration but in essence it is just a plain old java code.

My script looks like this:

$ cat hb.sh
java -classpath /home/ec2-user/hadoop/hadoop-0.20.2-ant.jar:/home/ec2-user/hadoop/hadoop-0.20.2-core.jar:/home/ec2-user/hadoop/hadoop-0.20.2-tools.jar:/home/ec2-user/hadoop/lib/jets3t-0.6.1.jar:/home/ec2-user/aws-java-sdk-1.3.11/aws-java-sdk-1.3.11.jar:/home/ec2-user/hbase/hbase-0.90.6.jar:/home/ec2-user/hbase/lib/commons-codec-1.4.jar:/home/ec2-user/hbase/lib/commons-httpclient-3.1.jar:/home/ec2-user/hbase/lib/commons-cli-1.2.jar:/home/ec2-user/hbase/lib/commons-logging-1.1.1.jar:/home/ec2-user/hbase/lib/zookeeper-3.3.2.jar:/home/ec2-user/hbase/lib/log4j-1.2.16.jar:json_io_1.0.4.jar:awsdemo-hbase.jar:/home/ec2-user/core-site.xml org.awsdemo.hbase.MarketDataApplication s3n://AWSAccessKeyId:AWSSecretAccessKey@stockprice/apple/input/APPL_StockPrices.csv s3n://AWSAccessKeyId:AWSSecretAccessKey@stockprice/apple/output/

Once the MapReduce job was successfully finished, we can check the result in HBase table using bin/hbase shell.

hbase(main):001:0> get ‘table_name’, ‘rowkey’

e.g. hbase(main):001:0> get ‘aapl_marketdata’, ‘AAPL-1984-10-25’

hbase(main):001:0> get 'aapl_marketdata', 'AAPL-1984-10-25'
COLUMN                                  CELL
 marketdata:daily                       timestamp=1341590928097, value={"@type":"org.apache.hadoop.io.MapWritable","@keys":[{"@type":"org.apache.hadoop.io
                                        .Text","bytes":[115,116,111,99,107,83,121,109,98,111,108],"length":11},{"@type":"org.apache.hadoop.io.Text","bytes
                                        ":[115,116,111,99,107,80,114,105,99,101,76,111,119],"length":13},{"@type":"org.apache.hadoop.io.Text","bytes":[115
                                        ,116,111,99,107,80,114,105,99,101,79,112,101,110],"length":14},{"@type":"org.apache.hadoop.io.Text","bytes":[100,9
                                        7,116,101],"length":4},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,80,114,105,99,101,67,108,1
                                        11,115,101],"length":15},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,80,114,105,99,101,65,100
                                        ,106,67,108,111,115,101],"length":18},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,86,111,108,
                                        117,109,101],"length":11},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,80,114,105,99,101,72,10
                                        5,103,104],"length":14}],"@items":[{"@type":"org.apache.hadoop.io.Text","bytes":[65,65,80,76],"length":4},{"@type"
                                        :"org.apache.hadoop.io.Text","bytes":[50,53,46,50,53],"length":5},{"@type":"org.apache.hadoop.io.Text","bytes":[50
                                        ,54,46,50,53],"length":5},{"@type":"org.apache.hadoop.io.Text","bytes":[49,57,56,52,45,49,48,45,50,53],"length":10
                                        },{"@type":"org.apache.hadoop.io.Text","bytes":[50,53,46,50,53],"length":5},{"@type":"org.apache.hadoop.io.Text","
                                        bytes":[50,46,56,56],"length":4},{"@type":"org.apache.hadoop.io.Text","bytes":[53,54,55,54,48,48,48],"length":7},{
                                        "@type":"org.apache.hadoop.io.Text","bytes":[50,54,46,50,53],"length":5}]}
1 row(s) in 0.4140 seconds

The output was generated by JsonWriter and then serialized and stored in HBase, so it requires some ASCII skills to decode the values. E.g. “115,116,111,99,107,83,121,109,98,111,108” means stockSymbol, “115,116,111,99,107,80,114,105,99,101,76,111,119” means stockPriceLow, “115 ,116,111,99,107,80,114,105,99,101,79,112,101,110” means stockPriceOpen, etc. “65,65,80,76” means AAPL, “50,53,46,50,53” means 25.25, you know the rest.

You can also scan the entire table with Hbase shell using

hbase(main):001:0> scan 'aapl_marketdata'

command. If you are done and want to get rid of the data, you need to disable the table and then you can drop it.

hbase(main):002:0> disable 'aapl_marketdata'
0 row(s) in 2.1490 seconds

hbase(main):004:0> drop 'aapl_marketdata'
0 row(s) in 1.1790 seconds

The MapReduce code

The code consist of 4 files.

MarketDataApplication.java:

package org.awsdemo.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;

public class MarketDataApplication {
    public static void main(String[] args) throws Exception {
        System.out.println("MarketDataApplication invoked");
    	int m_rc = 0;
        m_rc = ToolRunner.run(new Configuration(), new MarketDataDriver(), args);
        System.exit(m_rc);
    }
}

MarketDataDriver.java

package org.awsdemo.hbase;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import  org.apache.hadoop.conf.Configured;

public class MarketDataDriver extends Configured implements Tool {
	   @Override
	    public int run(String[] args) throws Exception { 

	        Configuration conf = new Configuration();
	        Job job = new Job(conf, "AAPL Market Data Application");
	        job.setJarByClass(MarketDataApplication.class);
	        job.setInputFormatClass(TextInputFormat.class);
	        job.setMapperClass(MarketDataMapper.class);
	        job.setReducerClass(MarketDataReducer.class);
	        job.setMapOutputKeyClass(Text.class);
	        job.setMapOutputValueClass(MapWritable.class);

	        FileInputFormat.addInputPath(job, new Path(args[0]));
	        FileOutputFormat.setOutputPath(job, new Path(args[1]));

	        TableMapReduceUtil.initTableReducerJob("aapl_marketdata",
	                MarketDataReducer.class, job);

	        boolean jobSucceeded = job.waitForCompletion(true);
	        if (jobSucceeded) {
	            return 0;
	        } else {
	            return -1;
	        }
	    }

	}

MarketDataMapper.java

package org.awsdemo.hbase;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MarketDataMapper extends
    Mapper {

	public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

	final String APPL_STOCK_SYMBOL = "AAPL";

    final Text STOCK_SYMBOL = new Text("stockSymbol");
    final Text DATE = new Text("date");
    final Text STOCK_PRICE_OPEN = new Text("stockPriceOpen");
    final Text STOCK_PRICE_HIGH = new Text("stockPriceHigh");
    final Text STOCK_PRICE_LOW = new Text("stockPriceLow");
    final Text STOCK_PRICE_CLOSE = new Text("stockPriceClose");
    final Text STOCK_VOLUME = new Text("stockVolume");
    final Text STOCK_PRICE_ADJ_CLOSE = new Text("stockPriceAdjClose");

    String strLine = "";

    strLine = value.toString();
    String[] data_values = strLine.split(",");
    MapWritable marketData = new MapWritable();
    marketData.put(STOCK_SYMBOL, new Text(APPL_STOCK_SYMBOL));
    marketData.put(DATE, new Text(data_values[0]));
    marketData.put(STOCK_PRICE_OPEN, new Text(data_values[1]));
    marketData.put(STOCK_PRICE_HIGH, new Text(data_values[2]));
    marketData.put(STOCK_PRICE_LOW, new Text(data_values[3]));
    marketData.put(STOCK_PRICE_CLOSE, new Text(data_values[4]));
    marketData.put(STOCK_VOLUME, new Text(data_values[5]));
	marketData.put(STOCK_PRICE_ADJ_CLOSE, new Text(data_values[6]));

	context.write(new Text(String.format("%s-%s", APPL_STOCK_SYMBOL, data_values[0])), marketData);	    
  }
}

MarketDataReducer.java

package org.awsdemo.hbase;

import java.io.IOException;

import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import com.cedarsoftware.util.io.JsonWriter;

public class MarketDataReducer extends TableReducer {
   public void reduce(Text arg0, Iterable arg1, Context context) {
   // Since the complex key made up of stock symbol and date is unique
   // one value comes for a key.

   MapWritable marketData = null;
   for (MapWritable value : arg1) {
       marketData = value;
       break;
   }

   ImmutableBytesWritable key = new ImmutableBytesWritable(Bytes.toBytes(arg0.toString()));
   Put put = new Put(Bytes.toBytes(arg0.toString()));

   put.add(Bytes.toBytes("marketdata"), Bytes.toBytes("daily"), Bytes.toBytes(JsonWriter.toJson(marketData)));
   try {
       context.write(key, put);
   } catch (IOException e) {
       // TODO Auto-generated catch block
   } catch (InterruptedException e) {
       // TODO Auto-generated catch block
   }
  }
}

Amazon EMR HBase

Amazon Web Services recently launched HBase on it Elastic MapReduce. It runs on the Amazon distribution of Hadoop 0.20.205 (as of writing this post, it is not available yet on MapR M3 or M5 distributions).

You can configure it using Create a New Job Flow menu:

Then select the EC2 instance (they need to be Large or bigger). If you like you can also add Hive or Pig:

Then you can define EC2 keys (if you want to login to the instances using ssh, you need to add your key)

Check summary page and the launch HBase by clicking on Create Job Flow :

The instance will be seen as WAITING status in AWS EMR console:

Now you can login using ssh (and your ssh key) and you can start hbase shell, just as we discussed before:

$ bin/hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell

You can also check the running HBase instances on AWS EC2 console: