Database.com – Salesforce.com’s Cloud Database


Introduction

Database.com is Salesforce.com’s multitenant Database as a Service platform that aims to be the cloud database engine for application developers. As opposed to Force.com, the Platform as a Service offering from Salesforce.com, it does not support user interface elements such as page layouts or custom views, there is no support for VisualForce, it has no Visual Workflows capabilities and there are no reports and dashboards available. Database.com is focusing on advanced relational database functionalities and supports Salesforce Object Query Language (SOQL) and Saleforce Object Search Language (SOSL) that proved to be popular in enterprise cloud applications development.

Database.com offers a REST API which makes it ideal for mobile and social applications that require data storage with state-of-the-art security model and identity and access management.

Creating objects in Database.com

If you are familiar with Force.com, using Database.com can naturally be related to those experiences. First of all, the user has to sign up at database.com. Once the registration has been completed, the user can login to the platform. The first webpage is a System Overview providing details about number of objects and data records, API usage, etc.

dbcom-1

Then we can create our custom objects. In our example we will create a stockprice object (which is essentially a table in traditional RDBMS speak) that will store stock price information such as open and close price, volume, etc. We need to navigate to Create->Objects and click either on New Custom Object or Schema Builder button. In our example we are going to show how to use Schema Builder.
dbcom-2-1

By clicking on the button Schema Builder will open and that is where we can define the object name and a few other parameters:dbcom-3

Once the object is created, we can then start defining the fields. We can use the palette on the left hand side of the Schema Builder and just drag and drop the appropriate data types such as number or date onto the canvas.

dbcom-5-1.

We can also define validation rules to ensure that the values in the fields (colums in RDBMS world) fulfill the requirements.

dbcom-7

Once we are done, we can check our object, it has seven custom fields: AdjClose, Close, Date, High, Low, Open and Volume.

dbcom-6

Loading data into Database.com objects

Now, as we have the object created, the next step is to load data into it. In principle we could insert data using the Workbench tool but in this example we are going to use Salesforce.com’s bulk tool called Data Loader that can be very helpful for uploading massive amount of data from our computer. Data Loader is a Windows application that can be downloaded under Data Management->Data Loader menu:

dbcom-8-1

Once it is installed, we can use it to load data in our StockPrice object. The financial data was retrieved from http://finance.yahoo.com.

When we start up the Data Loader, we need to login first. The username is that same that we have used to login to Database.com, whilst the password is the concatenated string of the password for Database.com and the security token that can be generated under My Personal Information menu within Database.com.

From the Data Loader then we can select which object we want to use and we can also the specify the file to be uploaded – it has to be in CSV format with a header. In our case the file format was as follows:

Date,Open,High,Low,Close,Volume,Adj Close
2013-10-09,856.28,862.65,842.98,855.86,2651300,855.86
2013-10-08,865.32,865.98,851.63,853.67,1943700,853.67
2013-10-07,867.45,873.99,864.11,865.74,1293600,865.74
2013-10-04,875,877.51,870,872.35,1358000,872.35

We can also define mapping between file column headers and the field names if we want to.

dbcom-13

When the data has been uploaded, we can open Developer Console from Database.com to validate whether all the data are successfully inserted. We need to go to Query Editor, enter our SOQL query like SELECT Close__c, Volume__c from StockPrice__c and click on the Excute button:

dbcom-16

Please, note that the API name for the custom fields and the custom table is Close__c, Volume__c and StockPrice__c, indicating that they are custom entities.

Remote access for Database.com

Now, that we have our data loaded into our custom object, the last step is to configure remote access for the remote applications (e.g. our imaginary mobile applications) who wish to run SOQL queries against our object using REST API. The authentication is based on OAuth standard. More details about the Database.com authentication concepts can be read here.

In order to enable remote access we need to go to Develop->Remote Access menu and configure the required parameters. In the Integration section the callback URL is mandatory, in our example we set it to http://localhost:5000/_auth. That is needed for Web Server flow which is the standard authentication method used within the Java template provided by Salesforce.com as a boilerplate application for remote Database.com access.

dbcom-18-1

Accessing Database.com objects from a remote application

Database.com uses OAuth 2.0 authentication to allow users to securely access data. Various authentication flows are supported such as web server flow, user-agent flow, username-password flow. Depending on the actual authentication flow, there are different Database.com endpoints to use. For authorization it is https://login.database.com/services/oauth2/authorize. For token request, the endpoint is https://login.database.com/services/oauth2/token.

Our first example is based on username-password authentication flow. In this case the user already has credentials (username/password) and it is sent as part of the request, togethr with the customer key and customer secret. The customer key and customer segment can be retrieved from Remote Access, we need to navigate to Developer->Remote Access and select the client.

dbcom-20.

The username is the same that we used to login Database.com, whilst the password is the concated string of the Database.com password and the security token (as you may remember, this is the very same notion that we used to login to Data Loader).

The first step is to request the token from Database.com, we demonstrate the REST query using curl  command line tool:

$ curl --data "grant_type=password&client_id=CLIEN_ID&client_secret=CLIENT_SECTER&username=USERNAME&password=PASSWORD" https://login.database.com/services/oauth2/token

This returns a JSON output containing the access_token:

{"id":"https://login.salesforce.com/id/00000001234567/0000000ABCDEFG","issued_at":"12345678","instance_url":"https://computing-business-8114.database.com","signature":"ABCDEFGH","access_token":"12345678abcdefgh"

Then we can submit our SOQL query together with this access token (12345678abcdefgh in this example):

$ curl  https://computing-business-8114.database.com/services/data/v22.0/query/?q=SELECT+Date__c,+Volume__c,+Close__c+from+StockPrice__c+where+Date__c=2010-02-04 --header "Content-Type:application/json" --header "X-PrettyPrint:1" --header "Authorization: OAuth 12345678abcdefgh"

The query will return the Date, Volume and Close fields from StockPrice object in JSON format where the date is February 4th, 2010.

{
  "totalSize" : 1,
  "done" : true,
  "records" : [ {
    "attributes" : {
      "type" : "StockPrice__c",
      "url" : "/services/data/v22.0/sobjects/StockPrice__c/0000000aaaaabbbb"
    },
    "Date__c" : "2010-02-04",
    "Volume__c" : 3377700.0,
    "Close__c" : 526.78
  } ]
}

Salesforce.com also provide a Java template that can be downloaded from Database.com website. This is a sample application running on Jetty and it uses web server flow based on AuthFilter class for OAuth authentication. When we enter http://localhost:5000/, an authentication page will be presented to her:

dbcom-21

If the user clicks on Allow button then she will be sent to the main home page where a SOQL query can be entered:

dbcom-22

We can then enter the query:

select Date__c, Close__c, Volume__c from StockPrice__c where Date__c = 2010-02-04

The result page is supposed to look like this:

dbcom-23

Conclusion

As we have seen, Database.com is an ideal cloud database engine for mobile and social applications. It offers the same enterprise security and identity model that is used by other Salesforce.com platforms, making it a robust database platform choice for cloud developers. Since it is based on REST API, it can be accessed from any programming languages such as Java, C, C#, Ruby, Python, PHP, etc. Salesforce.com has also created a Java SDK for Database.com and Force.com that can be used to create Spring MVC applications quickly from a template.

Advertisements

Large Scale Data Analytics with XtremeData Parallel SQL Database Engine


Introduction

A few months ago I have posted an article about Amazon Web Services Redshift – Amazon’s Data Warehouse solution in the cloud. XtremeData dbX is a similar massive parallel SQL database engine that can be run on-premise as well as in the cloud. It is a purpose-built high-performance and scalable solution for data warehouse applications and large scale analytics using innovative technologies such as vector execution model, dynamic data distribution and automatic load balancing.

XtremeData dbX Architecture

XtremeData dbX is a full SQL and ACID compliant database engine based on shared-nothing, massive parallel query execution. The underlying technology relies on PostgreSQL (similarly to AWS Redshift). In essence, the key architecture components are the head node and multiple data nodes – this is a fairly common patterns in massive parallel execution scenarios.

The head node manages the client connections, parses and plans the queries and sends the result back to the clients. The data nodes manage data storage and execute queries.

dbx-query-exec

XtremeData dbX in AWS cloud

Besides the on-premise deployment option, XtremeData dbX is available in AWS cloud. We need to go to AWS Marketplace to register for it.There are different editions; one for dbX Head and another for dbX Data nodes.

dbx-awsmarketplace

XtremeData provides a document how to setup dbX head and data nodes in the AWS cloud but I decided to implement AWS CloudFormation templates to make the deployment easier. Since there are two different AMIs for head and data nodes, I created two templates.

The AWS CloudFormation dbX-Head template:

{
  "AWSTemplateFormatVersion": "2010-09-09",
  "Description": "XtremeData dbX-Head CloudFormation",
  "Parameters" : {
    "ClusterName" : {
      "Description" : "Name of the XtremeData Cluster",
      "Type" : "String",
      "MinLength": "1",
      "MaxLength": "64",
      "AllowedPattern" : "[-_ a-zA-Z0-9]*",
      "ConstraintDescription" : "can contain only alphanumeric characters, spaces, dashes and underscores."
    },
    "KeyName" : {
      "Description" : "Name of an existing EC2 KeyPair to enable SSH access to the instances",
      "Type" : "String",
      "MinLength": "1",
      "MaxLength": "64",
      "AllowedPattern" : "[-_ a-zA-Z0-9]*",
      "ConstraintDescription" : "can contain only alphanumeric characters, spaces, dashes and underscores."
    },
    "InstanceType" : {
        "Description" : "XtremeData dbX-Head EC2 instance type",
        "Type" : "String",
        "Default" : "m1.medium",
        "AllowedValues" : [ "m1.medium","m1.large","m1.xlarge","m2.xlarge","m2.2xlarge","m2.4xlarge", "c1.xlarge","hi1.4xlarge"],
        "ConstraintDescription" : "must be a valid EC2 instance type."
      }
  },
  "Mappings" : {
    "AWSRegion2AMI" : {
      "us-east-1"      : {"AMI" : "ami-4c2f5d25"},
      "us-west-2"      : {"AMI" : "ami-c127b7f1"},
      "us-west-1"      : {"AMI" : "ami-84cde4c1"}
    }
  },
  "Resources": {
    "XtremeDataSecurityGroup": {
      "Type": "AWS::EC2::SecurityGroup",
      "Properties": {
        "GroupDescription": "Enable XTremedata Access",
        "SecurityGroupIngress": [
          {
            "IpProtocol": "tcp",
            "FromPort": "22",
            "ToPort": "22",
            "CidrIp": "0.0.0.0/0"
          }
        ]
      }
    },
	"XtremeDataSecurityGroupIngress" : {
	   "Type": "AWS::EC2::SecurityGroupIngress",
	   "Properties": {
            "GroupName": { "Ref": "XtremeDataSecurityGroup" },
            "IpProtocol": "tcp",
            "FromPort": "0",
            "ToPort": "65535",
            "SourceSecurityGroupName": { "Ref": "XtremeDataSecurityGroup" }
       }
	},
    "XtremeDataHeadInstance": {
      "Type": "AWS::EC2::Instance",
      "Properties": {
		"UserData" : { "Fn::Base64" : { "Ref" : "ClusterName" }},
        "SecurityGroups": [ { "Ref": "XtremeDataSecurityGroup" } ],
		"ImageId" : { "Fn::FindInMap" : [ "AWSRegion2AMI", { "Ref" : "AWS::Region" }, "AMI" ]},
        "InstanceType": {"Ref" : "InstanceType"},
        "KeyName": { "Ref" : "KeyName" },
		"BlockDeviceMappings" : [
               {
                  "DeviceName" : "/dev/sdf",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
               {
                  "DeviceName" : "/dev/sdg",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
			   {
                  "DeviceName" : "/dev/sdh",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
               {
                  "DeviceName" : "/dev/sdi",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               }
            ],
        "Tags" : [
          { "Key" : "Name", "Value" : "XtremeData dxX-Head"}
        ]
      }
    }
  }
}

This will create the security group for dbX node communications, define userdata (cluster name) and allocate 4 EBS volumes (100 GB each) to dbX Head node.

The AWS CloudFormation dbX-Data template looks as follows:

{
  "AWSTemplateFormatVersion": "2010-09-09",
  "Description": "XtremeData dbX-Data CloudFormation",
  "Parameters" : {
    "ClusterNameAnddbXHead" : {
      "Description" : "Name of the XtremeData Cluster and the dbX-Head EC2 hostname concatenated with semi-colon",
      "Type" : "String",
      "MinLength": "1",
      "MaxLength": "64"
    },
	"XtremeSecurityGroup" : {
      "Description" : "Name of the XtremeData SecurityGroup",
      "Type" : "String",
      "MinLength": "1",
      "MaxLength": "64",
      "AllowedPattern" : "[-_ a-zA-Z0-9]*",
      "ConstraintDescription" : "can contain only alphanumeric characters, spaces, dashes and underscores."
    },
	"NumberOfDataNodes" : {
      "Description" : "Max. Number of Xtreme Data Instances to start (in addition to the Head Instance)",
      "Type" : "Number",
      "Default" : "2"
    },
    "HeadAvailZone" : {
      "Description" : "Name of the Availability Node where dbX-Head instance is running",
      "Type" : "String",
	  "MinLength": "1",
      "MaxLength": "64",
      "AllowedPattern" : "[-_ a-zA-Z0-9]*",
      "ConstraintDescription" : "can contain only alphanumeric characters, spaces, dashes and underscores."
    }, 
    "KeyName" : {
      "Description" : "Name of an existing EC2 KeyPair to enable SSH access to the instances",
      "Type" : "String",
      "MinLength": "1",
      "MaxLength": "64",
      "AllowedPattern" : "[-_ a-zA-Z0-9]*",
      "ConstraintDescription" : "can contain only alphanumeric characters, spaces, dashes and underscores."
    },
    "InstanceType" : {
        "Description" : "XtremeData dbX-Data EC2 instance type",
        "Type" : "String",
        "Default" : "m1.large",
        "AllowedValues" : [ "m1.large","m1.xlarge","m2.xlarge","m2.2xlarge","m2.4xlarge", "c1.xlarge","hi1.4xlarge"],
        "ConstraintDescription" : "must be a valid EC2 instance type."
      }
  },
  "Mappings" : {
    "AWSRegion2AMI" : {
      "us-east-1"      : {"AMI" : "ami-442c5e2d"},
      "us-west-2"      : {"AMI" : "ami-1d28b82d"},
      "us-west-1"      : {"AMI" : "ami-66cce523"}
    }
  },
  "Resources": {
    "XtremeDataNodeGroup" : {
      "Type" : "AWS::AutoScaling::AutoScalingGroup",
      "Properties" : {
        "AvailabilityZones" : [ { "Ref" : "HeadAvailZone" } ],
        "LaunchConfigurationName" : { "Ref" : "DataNodeLaunchConfig" },
        "MinSize" : "0",
        "MaxSize" : {"Ref" : "NumberOfDataNodes"},
		"DesiredCapacity" : {"Ref" : "NumberOfDataNodes"},
        "Tags" : [
          { "Key" : "Name", "Value" : "XtremeData dbX-Data", "PropagateAtLaunch" : "true" }
        ]
      }
    },

	"DataNodeLaunchConfig" : {
       "Type" : "AWS::AutoScaling::LaunchConfiguration",
	   "Properties": {
	      "UserData" : { "Fn::Base64" : { "Ref" : "ClusterNameAnddbXHead" }},
          "KeyName" : { "Ref" : "KeyName" },
          "SecurityGroups" : [ { "Ref" : "XtremeSecurityGroup" } ],
          "InstanceType" : { "Ref" : "InstanceType" },
		  "ImageId" : { "Fn::FindInMap" : [ "AWSRegion2AMI", { "Ref" : "AWS::Region" }, "AMI" ]},
		  "BlockDeviceMappings" : [
               {
                  "DeviceName" : "/dev/sdf",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
               {
                  "DeviceName" : "/dev/sdg",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
			   {
                  "DeviceName" : "/dev/sdh",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
               {
                  "DeviceName" : "/dev/sdi",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               }
          ]
		}
    }
  }
}

This template uses cluster name and AWS EC2 hostname concatenated with ; for userdata, it also uses the availability zones for dbX Head node and the security group that we just defined for dbX Head node. In addition, it allows to define the number of data nodes – it will be used to configure an auto-scaling group to ensure that we have the requested amount of data instances running.

CAUTION: Please, note that it is not enough just to terminate the nodes if you have auto-scaling defined since AWS will initiate new nodes. We need to terminate the instances using as-terminate-instance-in-auto-scaling-group command and then delete the auto-scaling config with as-delete-auto-scaling-group and as-delete-launch-config. More information about AWS auto-scaling can be found here.

To start an XtremeData dbX Head node, we need to go to AWS console Cloud Formation section, then select Create Stack:

dbx-2

Once we submit the configuration, the status will become CREATE_IN_PROGRESS and then after a while CREATE_COMPLETE.

Then we can create another stack for the data nodes using the second CloudFormation template:

dbx-8

After a while we should see one head node and the requested number of datanodes running in AWS EC2 console:

dbx-10

Creating a database in ExtremeData dbX

Once we have fired up the nodes, we can login to the head node as ec2-user using ssh (or e.g. putty on Windows).

First we need to initiate the cluster (please, note that this is a destructive operation, no data will be preserved), then start the cluster and the dbX database engine:

[ec2-user@ip-10-224-122-90 home]$ cluster_init -i 2
init started
examining nodes
initializing head ebs disks: xvdf xvdg xvdh xvdi
mdadm: Defaulting to version 1.2 metadata
mdadm: array /dev/md0 started.
mkfs.xfs: Specified data stripe unit 512 is not the same as the volume stripe unit 128
initializing node ebs disks: xvdf xvdg xvdh xvdi
clearing head ephemeral disks
clearing nodes ephemeral disks
cluster_init done

[ec2-user@ip-10-224-122-90 home]$ cluster_start
initializing head ephemeral disks
  head ephemeral disks:  xvdb
    raid-ing
mdadm: /dev/xvdb appears to contain an ext2fs file system
    size=419395584K  mtime=Sat Jul  6 21:52:00 2013
mdadm: Defaulting to version 1.2 metadata
mdadm: array /dev/md1 started.
mkfs.xfs: Specified data stripe unit 512 is not the same as the volume stripe unit 128
mkfs.xfs: Specified data stripe width 512 is not the same as the volume stripe width 128
examining nodes
assembling node EBSs: xvdf xvdg xvdh xvdi
initializing node tmp disks: xvdb xvdc
  raid-ing
cluster_start done

[ec2-user@ip-10-224-122-90 home]$ dbx_start
new dbx installation initialized
Initializing kernel nfsd:                                  [  OK  ]
Starting NFS services:                                     [  OK  ]
Starting NFS mountd:                                       [  OK  ]
Stopping RPC idmapd:                                       [  OK  ]
Starting RPC idmapd:                                       [  OK  ]
Starting NFS daemon:                                       [  OK  ]
rpc.svcgssd is stopped
rpc.mountd (pid 3402) is running...
nfsd (pid 3467 3466 3465 3464 3463 3462 3461 3460) is running...
Generating binary repository file ...
file 'xdapp.conf2.txt.conf0' generated.
starting dbx nodes
Starting xdu daemon...                                     [  OK  ]
dbx startup done
[ec2-user@ip-10-224-122-90 home]$
[ec2-user@ip-10-224-122-90 home]$

Now we can login as dbxdba user (we can do it again with ssh or putty since the key is copied over from ec2-user). Then we need to change the dbxdba password (using passwd Linux commad) in order to be able to login using dbX tools.

Now we are ready to create a server and once the server is created, we can then create a database. The simplest way to manage the database and run SQL queries is to use a command line tool called xdudb:

$ xdudb create aws_server 32145
||==================||

[dbxdba@ip-10-224-122-90 ~]$ xdudb list
||==================||
aws_server
[dbxdba@ip-10-224-122-90 ~]$ xdudb status
||==================||
ip-10-224-122-90 configuration: 1 head node and 2 data nodes
running dbx services          : 1 head node and 2 data nodes

[dbxdba@ip-10-224-122-90 ~]$ xdudb start aws_server
||==================||

[dbxdba@ip-10-224-122-90 ~]$ xdudb info aws_server
||==================||
Name        : aws_server
NodeSet     : Default_NS
Owner       : dbxdba
Port        : 32145
H|0   |head    |10.224.122.90  |head |/volumes/data/dbstore/dbxdba/aws_server/head
D|1   |node00  |10.154.137.31  |n0   |/hd/dbxdba/aws_server/n0
D|2   |node01  |10.165.7.225   |n1   |/hd/dbxdba/aws_server/n1

[dbxdba@ip-10-224-122-90 ~]$ xdudb dbcreate aws_server aws_db
CREATE DATABASE

Now we can create a table, insert some rows and run select statements.

[dbxdba@ip-10-224-122-90 ~]$ xdudb sql aws_server aws_db
Welcome to dbX psql 3.2.5, the interactive SQL terminal.

Type:  \copyright for distribution terms
       \h for help with SQL commands
       \? for help with psql commands
       \g or terminate with semicolon to execute query
       \q to quit

aws_db=# CREATE TABLE products (
aws_db(# product_no INTEGER,
aws_db(# name VARCHAR(30),
aws_db(# price NUMERIC(10,2)
aws_db(# );
CREATE TABLE
aws_db=# INSERT INTO products VALUES(1, 'Product1', 100.00);
INSERT 0 1
aws_db=# SELECT * FROM products;
 product_no |   name   | price
------------+----------+--------
          1 | Product1 | 100.00
(1 row, Query Total: 1)

WIth the help of the xdudb command line tool we can check dbX version, check disk usage and mounted filesystems, etc.

[dbxdba@ip-10-224-122-90 ~]$ xdudb version
ip-10-224-122-90        2013-07-07 00:03:10
  DB           : dbX SQL 3.2.5
               : fpga.run r4347
  command line : xdudb 1.4
  GUI          : xdadm 3.0.2
  daemon       : xdutils 4.5.8
  OS           : Linux 3.4.43-43.43.amzn1.x86_64

[dbxdba@ip-10-224-122-90 ~]$ xdudb  du aws_server
||==================||
H|0   |head    |10.224.122.90  |head |60M  |/volumes/data/dbstore/dbxdba/aws_server/head
D|1   |node00  |10.154.137.31  |n0   |58M  |/hd/dbxdba/aws_server/n0
D|2   |node01  |10.165.7.225   |n1   |58M  |/hd/dbxdba/aws

[dbxdba@ip-10-224-122-90 ~]$ xdudb  df
head : 10.224.122.90
Filesystem            Size  Used Avail Use% Mounted on
/dev/md0              400G   95M  400G   1% /volumes/data
node00 : 10.154.137.31
Filesystem            Size  Used Avail Use% Mounted on
/dev/md0              400G   93M  400G   1% /volumes/data
node01 : 10.165.7.225
Filesystem            Size  Used Avail Use% Mounted on
/dev/md0              400G   93M  400G   1% /volumes/data

xdAdm web based management GUI

In addition to the command line tool that we described in the AWS cloud deployment scenario, there is also a web base GUI to manage the applicance – it is based on Apache Tomcat. We just go to http://dbx-headnode:8080/xdadm url and login. The we can manage the servers and databases, check nodesets and monitor the system:

dbx-15

XtremeData dbX virtual machine

If someone just wants to try out dbX database engine then she can download a virtual machine from XtremeData web site. The instance can be run e.g using vmplayer. Once the user logged in to the virtual machine as dbxdba, he can startup a server:

$ xdudb start dbxtest

Splunk Storm – Machine Data Processing in the Cloud


Introduction

Splunk is a platform to process machine data from various sources such as weblogs, syslogs, log4j logs and can also work with JSON and CSV file formats thus any application that produces JSON or CSV output can be seen as a source for Splunk. As the volume and variety of machine data are increasing, Splunk is becoming a more and more interesting player in big data world, too.

Splunk can be considered as a search engine for IT data. Splunk collects data from multiple sources, indexes them and the users can search them using Splunk proprietary language called SPL (Search Processing Language). The search results can then be used to create reports and dashboards to visualize the data.

Splunk Architecture

Under the hood Splunk architecture has the following key components:
forwarders are used to forward data to Splunk receiver instances. Receiver instances are normally indexers.
indexers that are splunk instances to index data. Indexes are stored in files. There are two types of files; raw datafiles which store the data in compressed format and index files that contain metadata for search queries. During indexing, Splunk extracts default fields and identifies events based on timestamps or creates them if there is no timestamp found.
search head and search peers. In a distributed environment search head manages the search requests, directs them to search peers and then merges result back to the users.
Splunk Web is a graphical user interface based on Python application server.
SplunkArchitecture

Splunk Storm

Splunk Storm is a cloud service version of Splunk. Splunk Storm runs in the Amazon cloud and uses of both Elastic Block Storage (EBS) and the Simple Storage Service (S3).

The price plan is based on monthly fee, it depends on the volume of the data that you want to store. As of writing this article, there is a free tier with 1 GB storage, while for example 100 GB storage volume costs 400 USD and the maximum 1 TB storage volume costs 3,000 USD per month.

To get started, we need to sign up and crate a project.
Splunk-1

Then we can define the data inputs. There are four options: upload a file, use forwarders, use the API (it is in beta yet) or use network data sent directly from the servers.

As a first test, we will use data files uploaded from a local directory. We used a sample apache web access.log and a syslog available from http://www.monitorware.com/en/logsamples/

It takes a some time to index the files and then they become available for search queries.

Splunk-11

We can run a search query to identify all HTTP client side error codes:

"source="access_log.txt" status>="400" AND status <="500"

Splunk-15

If we want to identify all the access log entries with HTTP POST method, we can run the following search query:

source="access_log.txt" method="POST"

In a similar way, if we want to find all the messages from the uploaded syslog file that were generated by the kernel process then we can run the following query:

source="syslog-messages.txt" process="kernel"

Splunk-16

Splunk forwarder and Twitter API

As a next example, we want to test output generated by our program using Twitter API. The program will generate JSON format in a file using Python based Twitter API. The directory is monitored by a Splunk forwarder and once the file is created in the predefined directory, the forwarder will send it to Splunk Storm.

First we need to create an application in Twitter via https://dev/twitter.com portal. The application will have its customer_key, customer_secret, access_token_key and access_token_secret that is going to be required by the Twitter API.

Twitter-6

The Twitter API that we are going to use for the Python application is downloadable from Github, https://github.com/bear/python-twitter.git .

This API depends oauth2, simplejson and httplib2 so we need to installed them first. Then we can get the code from Github and build and install the package.

$ git clone https://github.com/bear/python-twitter.git

# Build and Install:
$ python setup.py build
$ python setup.py install

The Twitter application code – twtr.py –  is as follows:

# twtr.py
import sys
import twitter

if len(sys.argv) < 3:
    print "Usage: " + sys.argv[0] + " keyword count"
    sys.exit(1)

keyword = sys.argv[1]
count = sys.argv[2]
# Twitter API 1.1. Count - up to a maximum of 100
# https://dev.twitter.com/docs/api/1.1/get/search/tweets
if int(count) > 100:
    count = 100

api = twitter.Api(consumer_key="CONSUMER_KEY", consumer_secret="CONSUMER_SECRET", access_token_key="ACCESS_TOKEN_KEY", access_token_secret="4PXvz7QIiwtwhFrFXFEkc9wY7iBOdgusD8ZQLvUhabM" )

search_result = api.GetSearch(term=keyword, count=count)

for s in search_result:
    print s.AsJsonString()

The Python program can be run as follows:

$ python twtr.py "big data" 100

Installing Splunk forwarder

Then we need to install Splunk forwarder, see http://www.splunk.com/download/universalforwarder . We also need to download the Splunk credentials that will allow the forwarder to send data to our project. Once the forwarder and the ceredentials are installed we can login and add a directory (twitter_status) for our forwarder to be monitored. We defined the sourcetype as json_notimestamp.

# Download splunk forwarder
$ wget -O splunkforwarder-5.0.3-163460-Linux-x86_64.tgz 'http://www.splunk.com/page/download_track?file=5.0.3/universalforwarder/linux/splunkforwarder-5.0.3-163460-Linux-x86_64.tgz&ac=&wget=true&name=wget&typed=releases&elq=8ccba442-db76-4fc8-b36b-36252bb61257'

# Install and start splunk forwarder
$ tar xvzf splunkforwarder-5.0.3-163460-Linux-x86_64.tgz
$ export SPLUNK_HOME=/home/ec2-user/splunkforwarder
$ $SPLUNK_HOME/bin/splunk start
# Install project credentials
$ $SPLUNK_HOME/bin/splunk install app ./stormforwarder_2628fbc8d76811e2b09622000a1cdcf0.spl -auth admin:changeme
App '/home/ec2-user/stormforwarder_2628fbc8d76811e2b09622000a1cdcf0.spl' installed

# Login
$SPLUNK_HOME/bin/splunk login -auth admin:changeme

#' Add monitor (directory or file)
 $SPLUNK_HOME/bin/splunk add monitor /home/ec2-user/splunk_blog/twitter_status -sourcetype json_no_timestamp
Added monitor of '/home/ec2-user/splunk_blog/twitter_status'.

Now we are ready to run the Python code using Twitter API:

$ python twtr.py "big data" 100 | tee twitter_status/twitter_status.txt

The program creates a twitter_status.txt file under twitter_status directory which is monitored by Splunk forwarder. The forwarder sends the output file to Splunk Storm. After some time it will appears under the inputs sections as authenticated forwarder. The  file will be shown as a source together with the previously uploaded apache access log and syslog.
SPlunk-17

Splunk-18

If we want to search for users with location London, the search query looks like this:

source="/home/ec2-user/splunk_blog/twitter_status/twitter_status.txt" user.location="London, UK"

We can also define a search query to show the top 10 timezones from the Twitter result and from the search result it is easy to create a Report with just a few clicks on the web user interface. The report allows to chose multiple visualization options like column, area or pie chart types, etc.

source="/home/ec2-user/splunk_blog/twitter_status/twitter_status.txt" | top limit=10 user.time_zone

SPlunk-22

Splunk-25

Conclusion

As mentioned in the beginning of this article, the variety and the volume generated by machines are increasing dramatically; sensor data, application logs, web access logs, syslogs, database and filesystem audit logs are just a few examples of the potential data sources that require attention but can pose difficulties to process and analyse them in a timely manner. Splunk is a great tool to deal with the ever increasing data volume and with Splunk Storm users can start analysing their data in the cloud without hassle.

Introduction to NuoDB – An Elastically Scalable Cloud Database


Introduction

Traditional relational databases are built upon a synchronous, client-server architecture that is often limited in terms of scalability requirements that are posed by distributed computing systems. As a result, various sharding, caching, and replication techniques emerged to cope with these demands. On the other hand, NoSQL solutions have emerged on the ground of the CAP theorem. Data management systems like BigTable, HBase, MongoDB, Cassandra, and Dynamo offer different capabilities depending on how they balance consistency, availability, and partition tolerance. However, they gave up supporting SQL and ACID properties, which are critical in the relational database world.

NuoDB is a complete re-think of relational databases that is built on a new foundation; partial, on-demand replication. Under the hood, NuoDB is an asynchronous, decentralized, peer-to-peer database. It uses the concept of Atoms, these are objects that are being replicated . In NuoDB everything is an Atom; database, schema, sequence, table, index, records, blobs, data are all Atoms. NuoDB holds a patent on this peer-to-peer object replication.

NuoDB Architecture

NuoDB architecture has three layers: management layer, SQL layer and data layer. The management layer is comprised of an agent that manages the NuoDB processes running on a particular computer, it starts and stops them and it also collects statistics from the transaction and storage engines. Certain agents are configured to be a broker – brokers communicate with the client initially and then the broker introduces the client to the transaction engine. From then on the client can communicate directly with the transaction engines. NuoDB management layer also offers a command line and a web-based management tool to manage the databases. NuoDB also offer a command line loader for exporting and importing data.

At the  SQL layer NuoDB has transaction engines that provide access to a single database.The transaction engine parses, compiles, optimizes and executes the SQL statements on behalf of the clients.

At the data layer NuoDB has storage managers that provide persistence of the data. A storage manager uses key/value pairs to store the information but it can also use more sophisticated stores e.g. HDFS.

In case of a minimal configuration we can run every components (broker, transaction engine and storage manager) on the same machine. NuoDB can be easily scaled out and can be made redundant by adding multiple brokers, transaction engines and storage managers. In more complex scenarios we can run NuoDB in the AWS cloud or across multiple corporate datacenters providing geo-redundancy. Below is an example of a redundant architecture with two brokers, two transaction engines and two storage managers.NuoDBRedundantArchitecture

Getting Started

NuoDB is available on multiple platforms like Windows 32 and 64-bit, Linux 64-bit (Ubuntu, RHEL, Suse), Mac OSX 10.7, Solaris 11 (Inter 64-bit).  For this article we used a Ubuntu 12.04 LTS virtual machine.

First we need to start up the components as discussed above; the broker/agent, then from the command line management tool we can start up the transaction engine and the storage manager. We also need to configure the properties file to contain the settings for the domain.

$ vi ./etc/stock.properties
# A flag specifying whether this agent should be run as a connection broker
broker = true

# The name used to identify the domain that this agent is a part of 
domain = stock

# The default administrative password, and the secret used by agents to
# setup and maintain the domain securely
domainPassword = stock
# Start agent
$ java -DpropertiesUrl=file:///home/notroot/nuodb/etc/stock.properties -jar jar/nuoagent.jar --verbose &>/tmp/stock.log &

# Start command line manager
$ java -jar jar/nuodbmanager.jar --broker localhost --password stock
nuodb [stock] > show domain summary

Hosts:
[broker] localhost/127.0.0.1:48004

## Create a new domain administrator user
nuodb [stock] > create domain administrator user istvan password istvan

## Start Storage Manager
nuodb [stock] > start process sm
Database: stock
Host: localhost
Process command-line options: --dba-user stock --dba-password stock
Archive directory: /home/notroot/nuodb/data
Initialize archive: true

Started: [SM] ubuntu/127.0.1.1:59914 [ pid = 3467 ] ACTIVE

## ps -ef | grep nuodb
## notroot   3467  3396  0 12:01 pts/0    00:00:00 /home/notroot/nuodb-1.0.1.128.linux.x86_64/bin/nuodb --connect-key 7124934669079864995

## Start Transaction Engine
nuodb [stock/stock] > start process te
Host: localhost
Process command-line options: --dba-user stock --dba-password stock

Started: [TE] ubuntu/127.0.1.1:60545 [ pid = 3494 ] ACTIVE

## ps -ef| grep nuodb
## notroot   3494  3396  0 12:06 pts/0    00:00:00 /home/notroot/nuodb-1.0.1.128.linux.x86_64/bin/nuodb --connect-key 8587006928753483386

Note, that we started the storage manager with initialize yes option. This is only for the first time, any subsequent startup shall use initialize no option, otherwise the data will be overwritten.

Then we can connect to the database using nuosql client – the first argument is the name of the database (stock), and we need to specify the database admin username/password. After login we can set the schema with use command to stock.:

$ bin/nuosql stock --user stock --password stock
SQL> use stock
SQL> show
	autocommit state is on
	semicolon completion is required
	current schema is STOCK
SQL> show tables

	No tables found in schema STOCK

SQL> create table Stock
   > (
   >    Id             Integer not NULL generated always as identity primary key,
   >    StockDate      Date,
   >    StockOpen      Decimal(8,2),
   >    StockHigh      Decimal(8,2),
   >    StockLow       Decimal(8,2),
   >    StockClose     Decimal(8,2),
   >    StockVolume    Integer,
   >    StockAdjClose  Decimal(8,2)
   > );
SQL> show tables

	Tables in schema STOCK

		STOCK

We can then load the data stored in csv file format into the database table. The CSV file – google.csv for stock information – was downloaded from http://finance.yahoo.com

$ bin/nuoloader --schema stock --user stock --password stock --import "/home/notroot/nuodb/samples/stock/google.csv",skip --to "insert into Stock values(default,?,?,?,?,?,?,?)" stock &> /tmp/nuoloader.log

Imported 2163 rows, failed 0 rows, size 101897 bytes from /home/notroot/nuodb/sa
mples/stock/google.csv

Then we can login again using nuosql and run a regular SQL query to retrieve the top 10 stock values and the corresponding date (ordered by adj close value):

notroot@ubuntu:~/nuodb$ bin/nuosql stock --user stock --password stockSQL> use stock
SQL> select count(*) from stock;
 COUNT  
 ------ 
  2163  

SQL> select StockDate, StockOpen,StockClose, StockVolume, StockAdjClose from stock order by StockAdjClose desc limit 10;

 STOCKDATE  STOCKOPEN  STOCKCLOSE  STOCKVOLUME  STOCKADJCLOSE  
 ---------- ---------- ----------- ------------ -------------- 

 2013-03-05   828.93     838.60      4044100        838.60     
 2013-03-11   831.69     834.82      1594700        834.82     
 2013-03-07   834.06     832.60      2052700        832.60     
 2013-03-08   834.50     831.52      2911900        831.52     
 2013-03-06   841.03     831.38      2873000        831.38     
 2013-03-12   830.71     827.61      2008300        827.61     
 2013-03-13   827.90     825.31      1641300        825.31     
 2013-03-14   826.99     821.54      1651200        821.54     
 2013-03-04   805.30     821.50      2775600        821.50     
 2013-03-20   816.83     814.71      1463800        814.71

Java Client – JDBC for NuoDB

NuoDB supports various programming languages for client applications such as Java, .NET, PHP, Ruby and Node.js. In this section we demonstrate that NuoDB supports JDBC in the same way that it is available for traditional relational databases. The Java program needs to add nuodbjdbc.jar to its classpath.

Below is an example Java code (StockDB.java) to retrieve the highest stock value ever (ordered by adj close) and the related date:

$ cat StockDB.java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class StockDB {

    /** The driver class provided by NimbusDB. */
    public static final String DRIVER_CLASS =
	"com.nuodb.jdbc.Driver";

    /** The base URL for connecting to a local database server. */
    public static final String DATABASE_URL =
	"jdbc:com.nuodb://localhost/";

    // the established connection to a local server
    private final Connection dbConnection;

    /**
     * Creates an instance of DB and connects to a local server,
     * as the given user, to work with the given named database
     *
     * @param user the user name for the connection
     * @param password the password for the given user
     * @param dbName the name of the database at the server to use
     */
    public StockDB(String user, String password, String dbName)
	throws SQLException
    {
	Properties properties = new Properties();
	properties.put("user", user);
	properties.put("password", password);
	properties.put("schema", "stock");

	dbConnection =
	    DriverManager.getConnection(DATABASE_URL + dbName, properties);
    }

    /** Closes the connection to the server. */
    public void close() throws SQLException {
	dbConnection.close();
    }

    /**
     * Gets the name for the given id, or null if no name exists.
     *
     * @param an identifier
     * @return the name associate with the identifier, or null
     */
    public String getDateAndAdjClose() throws SQLException {
	Statement stmt = dbConnection.createStatement();
	ResultSet rs = stmt.
	    executeQuery("select stockdate, stockadjclose from stock order by stockadjclose desc limit 1");
	try {
	    if (rs.next())
		return rs.getString(1) + ", " + rs.getString(2);
	    return null;
	} finally {
	    rs.close();
	    stmt.close();
	}
    }

    /** Main-line for this example. */
    public static void main(String [] args) throws Exception {
	Class.forName(DRIVER_CLASS);

	StockDB stockDB = new StockDB("stock", "stock", "stock");
	System.out.println("Date and AdjClose: "  + stockDB.getDateAndAdjClose());

	stockDB.close();
    }

}

Then we can run the Java program as follows:

notroot@ubuntu:~/nuodb/samples/java$ javac StockDB.java 
notroot@ubuntu:~/nuodb/samples/java$ java -classpath .:../../jar/nuodbjdbc.jar StockDB
Date and AdjClose: 2013-03-05, 838.60

Amazon Web Services Redshift – Data Warehouse in the Cloud


Introduction

Amazon Web Services has made publicly available its fully managed, petabyte-scale data warehouse cloud service in February, 2013. It promises a high performance, secure, easily scalable data warehouse solution that costs 1/10th of a traditional data warehouse (less than 1,000 USD/TB/year, according to the AWS Introduction to Redshift presentation: http://aws.amazon.com/redshift/) , it is compatible with the traditional BI tools and ready to be running within minutes. As of writing this article the service is available in US East region only but supposed to be rolled out to other regions, too. The service is manageable via the regular AWS tools: AWS management console, command line tools (aws commands based on python) and API based on HTTP requests/responses.

Under the hood

Under the hood, AWS Redshift is based on PostgreSQL 8.0.2. The architecture consist of 1 leader node – a node which is responsible for managing the communications with the clients, developing the execution plan and then distributing the compiled code to the compute nodes-, and 1 or more compute nodes that are exetung the code and then sending back the result to the leader node for aggregation. The compute nodes can have either 2-cores, 15GB RAM and 2 TB storage node (dubbed as XL node) or a 16-cores, 120 GB RAM and 16 TB storage node (dubbed as 8XL node). More details about the Redshift archtecture can be found at http://docs.aws.amazon.com/redshift/latest/dg/c_internal_arch_system_operation.html

AWS-Redshift-Arch

Launching a cluster

The easiest way to launch a cluster is via AWS console.

We need to define the basic attributes like cluster identifier, database name, database port, master username and password:

AWS-RedShift1

Then we need to select the node type (XL or 8XL) and the number of compute nodes. A cluster can be single or multi-node, the minimum config is a one XL node cluster, while the maximum config is sixteen 8XL nodes – you can do the math in terms of cores, memory and storage.

AWS-Redshift2

Then we can configure additional parameters (like database encyption or security groups)

AWS-Redshift4

We can then review the configuration and are ready to launch the service:

AWS-RedShift5

The status will be first “creating” for a while then it will become “available”. This is when the JDBC url will become known and can be used for configuring the clients.

AWS-RedShift8

In order to make the service accessible, we need to configure the security options (either a security group – if Redshift is going to be accessed from EC2 – or a CIDR/IP (Classless- Inter-Domain Routing IP range)  – if Redshift is to be accessed from public Internet.  The system will automatically recognise the IP address of the client connected to AWS console.

AWS-RedShift6

And that is it! From then on the client can be connected to the Redshift cluster.

We used SQLWorkbench to test the service, the same way as suggested by AWS Redshift documentation. It is a Java based open source SQL tool. The connection parameters are the standard JDBC attributes:

AWS-Redshift9

The PostgreSQL version can be checked using

select version();

version
PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc (GCC) 3.4.2 20041017 (Red Hat 3.4.2-6.fc3), Redshift 1.0.546

We tested the service with Amazon stock prices downloaded from Yahoo Finance.

The content has been uploaded to a S3 bucket called stockprice (S3://stockprice/amzn.csv). We had to make it accessible for everyone to read. (open/download).

Then we could create the appropriate table using standard SQL command:

CREATE TABLE stockprice (
     stockdate date not null,
     stockopen decimal(6,2),
     stockhigh decimal(6,2),
     stocklow decimal(6,2),
     stockclose decimal(6,2),
     stockvolume integer,
     stockadjclose decimal(6,2)
     );

Table 'stockprice' created

Execution time: 0.15s

desc stockprice
COLUMN_NAME	DATA_TYPE	PK	NULLABLE	DEFAULT	AUTOINCREMENT	REMARKS	POSITION
stockdate	date	NO	YES		NO		1
stockopen	numeric(6,2)	NO	YES		NO		2
stockhigh	numeric(6,2)	NO	YES		NO		3
stocklow	numeric(6,2)	NO	YES		NO		4
stockclose	numeric(6,2)	NO	YES		NO		5
stockvolume	integer	NO	YES		NO		6
stockadjclose	numeric(6,2)	NO	YES		NO		7

To load the data into stockprice table, we had to use copy command with the S3 source file (it could also be an Amazon DynamoDB source).

copy stockprice from 's3://stockprices/amzn.csv' CREDENTIALS 'aws_access_key_id=XXXXXXX;aws_secret_access_key=XXXXXXX' delimiter ',';

If there is any error during the load operation, it can be verified by running a select statement on the stl_load_errors table. (e.g. incorrect data format).

And then we can run our SQL statements to analyse the data.

select * from stockprice order by stockadjclose desc limit 100;

stockdate	stockopen	stockhigh	stocklow	stockclose	stockvolume	stockadjclose
2013-01-25	275.00	284.72	274.40	283.99	4968100	283.99
2013-01-28	283.78	284.48	274.40	276.04	4321400	276.04
2013-03-05	274.00	276.68	269.99	275.59	3686200	275.59
2013-03-13	275.24	276.50	272.64	275.10	1884200	275.10
2013-03-08	275.00	275.44	271.50	274.19	1879800	274.19
2013-03-12	271.00	277.40	270.36	274.13	3246200	274.13
2013-03-07	274.10	274.80	271.85	273.88	1939000	273.88
2013-03-06	275.76	276.49	271.83	273.79	2050700	273.79
2013-01-24	269.37	276.65	269.37	273.46	3417000	273.46
2013-03-04	265.36	273.30	264.14	273.11	3453000	273.11
2013-01-30	283.00	284.20	267.11	272.76	13075400	272.76
2013-01-14	268.00	274.26	267.54	272.73	4275000	272.73
2013-01-18	270.83	274.50	269.60	272.12	2942000	272.12
2013-01-15	270.68	272.73	269.30	271.90	2326900	271.90
2013-03-11	273.43	273.99	270.40	271.24	1904900	271.24

AWS console supports various management functions of the cluster, we can reboot the cluster, we can modify parameters, we can resize it by defining different node type (XL->8XL) or decreasing/increasing the number of nodes. We can also delete the cluster via AWS console.
AWS-Redshift11

Conclusion

Amazon Web Services Redshift is another big step to make cloud services available for enterprise computing. It offers a data warehouse capability with minimal effort to start up and scale as operations demand. It is a great complement to other database services such as DynamoDB for NoSQL requirements and RDS for relational database services.

Microsoft and Hadoop – Windows Azure HDInsight


Introduction

Traditionally Microsoft Windows used to be a sort of stepchild in Hadoop world – the ‘hadoop’ command to manage actions from command line and the startup/shutdown scripts were written in Linux/*nix in mind assuming bash. Thus if you wanted to run Hadoop on Windows, you had to install cygwin. Also Apache Hadoop document states the following (quotes from Hadoop R1.1.0 documentation):
“•GNU/Linux is supported as a development and production platform. Hadoop has been demonstrated on GNU/Linux clusters with 2000 nodes
•Win32 is supported as a development platform. Distributed operation has not been well tested on Win32, so it is not supported as a production platform.”

Microsoft and Hortonworks joined their forces to make Hadoop available on Windows Server for on-premise deployments as well as on Windows Azure to support big data in the cloud, too.

This post covers Windows Azure HDInsight (Hadoop on Azure, see https://www.hadooponazure.com) . As of writing, the service requires an invitation to participate in the CTP (Community Technology Preview) but the invitation process is very efficiently managed – after filling in the survey, I received the service access code within a couple of days.

New Cluster Request

The first step is to request a new cluster, you need to define the cluster name and the credentials to be able to login to the headnode. By default the cluster consists of 3 nodes.

After a few minutes, you will have a running cluster,  then click on the “Go to Cluster” link to navigate to the main page.

WordCount with HDInsight on  Azure

No Hadoop test is complete without the standard WordCount application – Microsoft Azure HDInsight provides an example file (davinci.txt) and the Java jar file to run wordcount  – the Hello World of Hadoop.

First you need to go to the JavaScript console to upload the text file using fs.put():

js> fs.put()

Choose File ->  Browse
Destination: /user/istvan/example/data/davinci

Create a Job:

CreateJob

The actual command that Microsoft Azure HDInsight executes is as follows:

c:\apps\dist\hadoop-1.1.0-SNAPSHOT\bin\hadoop.cmd jar c:\apps\Jobs\templates\634898986181212311.hadoop-examples-1.1.0-SNAPSHOT.jar wordcount /user/istvan/example/data/davinci davinci-output

You can validate the output from JavaScript console:

js> result = fs.read("davinci-output")
"(Lo)cra"	1
"1490	1
"1498,"	1
"35"	1
"40,"	1
"AS-IS".	1
"A_	1
"Absoluti	1
"Alack!	1

Microsoft HDInsight Streaming – Hadoop job in C#

Hadoop Streaming is a utility to support running external map and reduce jobs. These external jobs can be written in various programming languages such as Python or Ruby – should we talk about Microsoft HDInsight, the example better be based on .NET C#…

The demo application for C# streaming is again a wordcount example using the imitation of Unix cat and wc commands. You could run the demo from the “Samples” tile but I prefer to demonstrate Hadoop Streaming from the command line to have a closer look at what is going on under the hood.

In order to run Hadoop command line from Windows cmd prompt, you need to login to the HDInsight headnode using Remote Desktop. First you need to click on “Remote Desktop” tile, then login the remote node using the credentials you defined at cluster creation time. Once you logged in, click on Hadoop Coomand Line shortcut.

In Hadoop Command Line, go to the Hadoop distribution directory (As of writing this post, Microsoft Azure HDInsight is based on Hadoop 1.1.0):

c:> cd \apps\dist
c:> hadoop fs -get /example/apps/wc.exe .
c:> hadoop fs -get /example/apps/cat.exe .
c:> cd \apps\dist\hadoop-1.1.0-SNAPSHOT
c:\apps\dist\hadoop-1.1.0-SNAPSHOT> hadoop jar lib\hadoop-streaming.jar -input "/user/istvan/example/data/davinci" -output "/user/istvan/example/dataoutput" -mapper "..\..\jars\cat.exe" -reducer "..\..\jars\wc.exe" -file "c:\Apps\dist\wc.exe" -file "c:\Apps\dist\cat.exe"

The C# code for wc.exe is as follows:

using System;
using System.IO;
using System.Linq;

namespace wc
{
    class wc
    {
        static void Main(string[] args)
        {
            string line;
            var count = 0;

            if (args.Length > 0){
                Console.SetIn(new StreamReader(args[0]));
            }

            while ((line = Console.ReadLine()) != null) {
                count += line.Count(cr => (cr == ' ' || cr == '\n'));
            }
            Console.WriteLine(count);
        }
    }
}

And the code for cat.exe is:

using System;
using System.IO;

namespace cat
{
    class cat
    {
        static void Main(string[] args)
        {
            if (args.Length > 0)
            {
                Console.SetIn(new StreamReader(args[0])); 
            }

            string line;
            while ((line = Console.ReadLine()) != null) 
            {
                Console.WriteLine(line);
            }

        }

    }
}

Interactive console

Microsoft Azure HDInsight comes with two types of interactive console: one is the standard Hadoop Hive console, the other one is unique in Hadoop world, it is based on JavaScript.

Let us start with Hive. You need to upload your data using the javascript fs.put() method as described above. Then you can create your Hive table and run a select query as follows :

CREATE TABLE stockprice (yyyymmdd STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, stock_volume INT, adjclose_price FLOAT)
row format delimited fields terminated by ',' lines terminated by '\n' location '/user/istvan/input/';

select yyyymmdd, high_price, stock_volume from stockprice order by high_price desc;

InteractiveHive

InteractiveHive-Select

The other flavor of HDInsight interactive console is based on JavaScript – as said before, this is a unique offering from Microsoft – in fact, the JavaScript commands are converted to Pig statements.

JavascriptConsole

The syntax resembles a kind of LINQ style query, though not the same:

js> pig.from("/user/istvan/input/goog_stock.csv", "date,open,high,low,close,volume,adjclose", ",").select("date, high, volume").orderBy("high DESC").to("result")

js> result = fs.read("result")
05/10/2012	774.38	2735900
04/10/2012	769.89	2454200
02/10/2012	765.99	2790200
01/10/2012	765	3168000
25/09/2012	764.89	6058500

Under the hood

Microsoft and Hortonworks have re-implemented the key binaries (namenode, jobtracker, secondarynamenode, datanode, tasktracker) as executables (exe files) and they are running as services in the background. The key ‘hadoop’ command – which is traditionally a bash script – is also re-implemented as hadoop.cmd.

The distribution consists of Hadoop 1.1.0, Pig-0.9.3, Hive 0.9.0, Mahout 0.5 and Sqoop 1.4.2.

Real-time Big Data Analytics Engine – Twitter’s Storm


Introduction

Hadoop is a batch-oriented big data solution at its heart and leaves gaps in ad-hoc and real-time data processing at massive scale so some people have already started counting its days as we know it now. As one of the alternatives, we have already seen Google BigQuery to support ad-hoc analytics and this time the post is about Twitter’s Storm real-time computation engine which aims to provide solution in the real-time data analytics world. Storm was originally developed by BackType and running now under Twitter’s name, after BackType has been acquired by them. The need for having a dedicated real-time analytics solution was explained by Nathan Marz as follows: “There’s no hack that will turn Hadoop into a realtime system; realtime data processing has a fundamentally different set of requirements than batch processing…. The lack of a “Hadoop of realtime” has become the biggest hole in the data processing ecosystem. Storm fills that hole.”

Storm Architecture

Storm architecture very much resembles to Hadoop architecture; it has two types of nodes: a master node and the worker nodes. The master node runs Nimbus that is copying the code to the cluster nodes and assigns tasks to the workers – it has a similar role as JobTracker in Hadoop. The worker nodes run the Supervisor which starts and stops worker processes – its role is similar to TaskTrackers in Hadoop. The coordination and all states between Nimbus and Supervisors are managed by Zookepeer, so the architecture looks as follows:

Storm is written is Clojure and Java.

One of the key concepts in the Storm is topology; in essence a Storm cluster executes a topology – topology defines the data sources, the processing tasks and the data flow between the nodes. Topology and MapReduce jobs in Hadoop can be considered analogous.

Storm has a concept of streams which are basically a sequence of tuples, they represent the data that is being passed around the Storm nodes. There are two main components to  manipulate  stream data: spouts which are reading data from a source (e.g. a queue or an API, etc) and emit a list of fields. Bolts are consuming the data coming from input streams, processing them and then emit a new stream or store the data in a database.

One important thing when you define a topology is determine how data will be passed around the nodes.  As discussed above, a node (running either spouts our bolts) will emit a stream. Stream grouping functionality will allow to decide which node(s) will receive the emitted tuples. Storms has a number of grouping functions like shuffle grouping (sending streams to a randomly chosen bolt), fields grouping (it guarantees that a given set of fields is always sent to the same bolt), all grouping (the tuples are sent to all instances of the same bolt), direct grouping (the source determines which bolt receives the tuples) and you can implement your own custom grouping method, too.

If you want to know more about Storm internals, you can download the code and find a great tutorial on github.

A Storm application

The best way to start with Storm is to download storm-starter package from github. This contains a variety of examples from basic WorldCount to more complex implementations. In this post we will have a closer look at WordCountTopology.java. It has a maven  m2-pom.xml file so you can compile and execute it using mvn command:

mvn -f m2-pom.xml compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=storm.starter.WordCountTopology

Alternatively, you can import the code as an existing maven project into Eclipse and run it from there.( Import…-> Maven -> Existing Maven Projects ).

The code looks like this:

package storm.starter;

import storm.starter.spout.RandomSentenceSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;

/**
 * This topology demonstrates Storm's stream groupings and multilang capabilities.
 */
public class WordCountTopology {
    public static class SplitSentence extends ShellBolt implements IRichBolt {

        public SplitSentence() {
            super("python", "splitsentence.py");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        @Override
        public Map getComponentConfiguration() {
            return null;
        }
    }  

    public static class WordCount extends BaseBasicBolt {
        Map counts = new HashMap();

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if(count==null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new RandomSentenceSpout(), 5);

        builder.setBolt("split", new SplitSentence(), 8)
                 .shuffleGrouping("spout");
        builder.setBolt("count", new WordCount(), 12)
                 .fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);

        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {        
            conf.setMaxTaskParallelism(3);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());

            Thread.sleep(10000);

            cluster.shutdown();
        }
    }
}

As a first step, Storm topology defines a RandomSentenceSpout.

        builder.setSpout("spout", new RandomSentenceSpout(), 5);

RandomSentenceSpout has a method called nextTuple() that is inherited from ISpout interface. When this method is called, Storm is requesting that the Spout emit tuples to the output collector. In this case, the tuples will be randomly selected sentences from a predefined String array.

    @Override
    public void nextTuple() {
        Utils.sleep(100);
        String[] sentences = new String[] {
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature"};
        String sentence = sentences[_rand.nextInt(sentences.length)];
        _collector.emit(new Values(sentence));
    }

The next step in the topology definition is the SplitSentence bolt. The SplitSentence bolt actually invokes a python code – splitsentence.py – that splits the the sentences into words using python split() method

       public SplitSentence() {
            super("python", "splitsentence.py");
        }

The python code (splitsentence.py):

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

Storm topology is using a shuffleGrouping() method to send the sentences to a random bolt referred as “split”.

The final step in the topology definition is WordCount bolt.  WordCount bolt has an execute() method which is inherited from IBasicBolt inteface:

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if(count==null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }

That method receives the words as tuples and uses a Map to count the number of the words. Then it will emit the result.

You can run the Storm topology in two modes (again, similar to Hadoop stand-alone and distributed modes). One mode is based on LocalCluster class and that enables to run the storm topology on your own machine, debug it, etc. Then when you are ready to run it an a storm cluster, then you shall use StormSubmitter class to submit the topology to the storm cluster:

        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {        
            conf.setMaxTaskParallelism(3);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());

The parallelism can be controller by various methods and arguments, like setNumWorker()s, setMaxTaskParallelism() and parallelism_hints argument in building the topology, see e.g. 5 in builder.setSpout() method. The parallelism_hint defines the number of tasks that should be assigned to execute the given spout. Each task will run on a thread in a process somwehere around the cluster.

builder.setSpout("spout", new RandomSentenceSpout(), 5);

When we run the application, we can see that there are multiple threads running in parallel that are emitting the original random sentences, then another threads are splitting them into words and yet another threads are counting the words.

9722 [Thread-38] INFO  backtype.storm.daemon.task  - Emitting: spout default [snow white and the seven dwarfs]
9722 [Thread-36] INFO  backtype.storm.daemon.task  - Emitting: spout default [i am at two with nature]
9723 [Thread-32] INFO  backtype.storm.daemon.executor  - Processing received message source: spout:10, stream: default, id: {}, [snow white and the seven dwarfs]
9723 [Thread-24] INFO  backtype.storm.daemon.executor  - Processing received message source: spout:9, stream: default, id: {}, [i am at two with nature]
9723 [Thread-22] INFO  backtype.storm.daemon.task  - Emitting: split default ["i"]
9723 [Thread-30] INFO  backtype.storm.daemon.task  - Emitting: split default ["snow"]
9724 [Thread-18] INFO  backtype.storm.daemon.executor  - Processing received message source: split:5, stream: default, id: {}, ["i"]
9724 [Thread-16] INFO  backtype.storm.daemon.executor  - Processing received message source: split:7, stream: default, id: {}, ["snow"]
9724 [Thread-18] INFO  backtype.storm.daemon.task  - Emitting: count default [i, 38]
9724 [Thread-22] INFO  backtype.storm.daemon.task  - Emitting: split default ["am"]
9724 [Thread-30] INFO  backtype.storm.daemon.task  - Emitting: split default ["white"]
9724 [Thread-16] INFO  backtype.storm.daemon.task  - Emitting: count default [snow, 57]

Conclusion

Big Data analytics can come in many flavours; from batch processing to a-hoc analytics to real-time processing. Hadoop, the granddad of all big data is focused on batch-oriented solution – should you need to support real-time analytics, Storm can offer an interesting alternative.