NewSQL at Cloud Scale – Spring, Hibernate and Amazon Web Services with NuoDB


Introduction

In the previous articles, we have shown the NuoDB NewSQL architecture, its key components and how to scale it easily at transaction and storage tiers. We have also demonstrated JDBC and Hibernate with NuoDB. In this closing post of the 3-article series, we demonstrate Spring and Hibernate with NuoDB at cloud scale using AWS capabilities (AWS EC2 and CloudFormation).

Spring and Hibernate with NuoDB

Spring PetClinic is a sample application used to be distributed with Spring Framework.  It is designed to show how the Spring application frameworks can be used to build simple, but powerful database-oriented applications.This year it has been refactored to be based on a new architecture and the source code can be downloaded from github. Out of the box Spring PetClinic supports HSQL and MySQL databases and in this post we port it to use NuoDB.

To get the code we need to run:

$ git clone https://github.com/SpringSource/spring-petclinic.git

The application can be built and run using a maven command, but first, we need to implement the NuoDB-related changes, all of which are around configuring the database, updating Hibernate and Spring configurations.  In other words, no application sources needed modification.

The scripts for the db layer can be found under ~/spring/spring-petclinic/src/main/resources/db directory. We created a new nuodb directory in there and created two SQL scripts, initDB.sql and populateDB.sql.

$ cd ~/spring/spring-petclinic/src/main/resources/db
$ mkdir nuodb
$ vi initDB.sql  # edit initDB.sql
DROP TABLE vet_specialties IF EXISTS;
DROP TABLE vets IF EXISTS;
DROP TABLE specialties IF EXISTS;
DROP TABLE visits IF EXISTS;
DROP TABLE pets IF EXISTS;
DROP TABLE types IF EXISTS;
DROP TABLE owners IF EXISTS;

CREATE TABLE vets (
  id         INTEGER primary key  generated always as identity,
  first_name VARCHAR(30),
  last_name  VARCHAR(30)
);
CREATE INDEX vets_last_name ON vets (last_name);

CREATE TABLE specialties (
  id   INTEGER primary key  generated always as identity,
  name VARCHAR(80)
);
CREATE INDEX specialties_name ON specialties (name);

CREATE TABLE vet_specialties (
  vet_id       INTEGER NOT NULL,
  specialty_id INTEGER NOT NULL
);
ALTER TABLE vet_specialties ADD CONSTRAINT fk_vet_specialties_vets FOREIGN KEY (vet_id) REFERENCES vets (id);
ALTER TABLE vet_specialties ADD CONSTRAINT fk_vet_specialties_specialties FOREIGN KEY (specialty_id) REFERENCES specialties (id);

CREATE TABLE types (
  id   INTEGER primary key  generated always as identity,
  name VARCHAR(80)
);
CREATE INDEX types_name ON types (name);

CREATE TABLE owners (
  id         INTEGER primary key  generated always as identity,
  first_name VARCHAR(30),
  last_name  VARCHAR(30),
  address    VARCHAR(255),
  city       VARCHAR(80),
  telephone  VARCHAR(20)
);
CREATE INDEX owners_last_name ON owners (last_name);

CREATE TABLE pets (
  id         INTEGER primary key  generated always as identity,
  name       VARCHAR(30),
  birth_date DATE,
  type_id    INTEGER NOT NULL,
  owner_id   INTEGER NOT NULL
);
ALTER TABLE pets ADD CONSTRAINT fk_pets_owners FOREIGN KEY (owner_id) REFERENCES owners (id);
ALTER TABLE pets ADD CONSTRAINT fk_pets_types FOREIGN KEY (type_id) REFERENCES types (id);
CREATE INDEX pets_name ON pets (name);

CREATE TABLE visits (
  id          INTEGER primary key  generated always as identity,
  pet_id      INTEGER NOT NULL,
  visit_date  DATE,
  description VARCHAR(255)
);
ALTER TABLE visits ADD CONSTRAINT fk_visits_pets FOREIGN KEY (pet_id) REFERENCES pets (id);
CREATE INDEX visits_pet_id ON visits (pet_id);
$ vi  populateDB.qsl  # edit populateDB.sql
INSERT INTO vets VALUES (NULL, 'James', 'Carter');
INSERT INTO vets VALUES (NULL, 'Helen', 'Leary');
INSERT INTO vets VALUES (NULL, 'Linda', 'Douglas');
INSERT INTO vets VALUES (NULL, 'Rafael', 'Ortega');
INSERT INTO vets VALUES (NULL, 'Henry', 'Stevens');
INSERT INTO vets VALUES (NULL, 'Sharon', 'Jenkins');

INSERT INTO specialties VALUES (NULL, 'radiology');
INSERT INTO specialties VALUES (NULL, 'surgery');
INSERT INTO specialties VALUES (NULL, 'dentistry');

INSERT INTO vet_specialties VALUES (2, 1);
INSERT INTO vet_specialties VALUES (3, 2);
INSERT INTO vet_specialties VALUES (3, 3);
INSERT INTO vet_specialties VALUES (4, 2);
INSERT INTO vet_specialties VALUES (5, 1);

INSERT INTO types VALUES (NULL, 'cat');
INSERT INTO types VALUES (NULL, 'dog');
INSERT INTO types VALUES (NULL, 'lizard');
INSERT INTO types VALUES (NULL, 'snake');
INSERT INTO types VALUES (NULL, 'bird');
INSERT INTO types VALUES (NULL, 'hamster');

INSERT INTO owners VALUES (NULL, 'George', 'Franklin', '110 W. Liberty St.', 'Madison', '6085551023');
INSERT INTO owners VALUES (NULL, 'Betty', 'Davis', '638 Cardinal Ave.', 'Sun Prairie', '6085551749');
INSERT INTO owners VALUES (NULL, 'Eduardo', 'Rodriquez', '2693 Commerce St.', 'McFarland', '6085558763');
INSERT INTO owners VALUES (NULL, 'Harold', 'Davis', '563 Friendly St.', 'Windsor', '6085553198');
INSERT INTO owners VALUES (NULL, 'Peter', 'McTavish', '2387 S. Fair Way', 'Madison', '6085552765');
INSERT INTO owners VALUES (NULL, 'Jean', 'Coleman', '105 N. Lake St.', 'Monona', '6085552654');
INSERT INTO owners VALUES (NULL, 'Jeff', 'Black', '1450 Oak Blvd.', 'Monona', '6085555387');
INSERT INTO owners VALUES (NULL, 'Maria', 'Escobito', '345 Maple St.', 'Madison', '6085557683');
INSERT INTO owners VALUES (NULL, 'David', 'Schroeder', '2749 Blackhawk Trail', 'Madison', '6085559435');
INSERT INTO owners VALUES (NULL, 'Carlos', 'Estaban', '2335 Independence La.', 'Waunakee', '6085555487');

INSERT INTO pets VALUES (NULL, 'Leo', '2010-09-07', 1, 1);
INSERT INTO pets VALUES (NULL, 'Basil', '2012-08-06', 6, 2);
INSERT INTO pets VALUES (NULL, 'Rosy', '2011-04-17', 2, 3);
INSERT INTO pets VALUES (NULL, 'Jewel', '2010-03-07', 2, 3);
INSERT INTO pets VALUES (NULL, 'Iggy', '2010-11-30', 3, 4);
INSERT INTO pets VALUES (NULL, 'George', '2010-01-20', 4, 5);
INSERT INTO pets VALUES (NULL, 'Samantha', '2012-09-04', 1, 6);
INSERT INTO pets VALUES (NULL, 'Max', '2012-09-04', 1, 6);
INSERT INTO pets VALUES (NULL, 'Lucky', '2011-08-06', 5, 7);
INSERT INTO pets VALUES (NULL, 'Mulligan', '2007-02-24', 2, 8);
INSERT INTO pets VALUES (NULL, 'Freddy', '2010-03-09', 5, 9);
INSERT INTO pets VALUES (NULL, 'Lucky', '2010-06-24', 2, 10);
INSERT INTO pets VALUES (NULL, 'Sly', '2012-06-08', 1, 10);

INSERT INTO visits VALUES (NULL, 7, '2013-01-01', 'rabies shot');
INSERT INTO visits VALUES (NULL, 8, '2013-01-02', 'rabies shot');
INSERT INTO visits VALUES (NULL, 8, '2013-01-03', 'neutered');
INSERT INTO visits VALUES (NULL, 7, '2013-01-04', 'spayed');

Then we had to modify business-config.xml under ~/spring/spring-petclinic/src/main/resources/spring directory to refer to the correct NuoDB hibernate.dialect using the hibernate.dialect property:

  # business-config.xml

   
        <bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean"
              p:dataSource-ref="dataSource">
            <property name="jpaVendorAdapter">
                <bean class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter" />
            </property>
            <property name="jpaPropertyMap">
              <map>
                <entry key="hibernate.show_sql" value="${jpa.showSql}" />
                <entry key="hibernate.dialect" value="${hibernate.dialect}" />
                <entry key="hibernate.temp.use_jdbc_metadata_defaults" value="false" />
              </map>
           </property>
           <property name="persistenceUnitName" value="petclinic"/>
           <property name="packagesToScan" value="org.springframework.samples.petclinic"/>
        </bean>

In order to support Hibernate 4, we also set hibernate.temp.use_jdbc_metadata_defaults to false.

The database properties shall be configured in data-access.properties file – this file contains the appropriate JDBC parameters (used in data-source.xml) as well as the Hibernate dialect. The sample below has a reference to a local NuoDB instance running on an Ubuntu virtual machine and an AWS EC2 instance (commented out) that will be configured later on in this article:

jdbc.driverClassName=com.nuodb.jdbc.Driver
jdbc.url=jdbc:com.nuodb://192.168.80.128/spring?schema=user
#jdbc.url=jdbc:com.nuodb://ec2-46-51-162-14.eu-west-1.compute.amazonaws.com/spring?schema=user
jdbc.username=spring
jdbc.password=spring

# Properties that control the population of schema and data for a new data source
jdbc.initLocation=classpath:db/nuodb/initDB.sql
jdbc.dataLocation=classpath:db/nuodb/populateDB.sql

# Property that determines which Hibernate dialect to use
# (only applied with "applicationContext-hibernate.xml")
hibernate.dialect=com.nuodb.hibernate.NuoDBDialect

# Property that determines which database to use with an AbstractJpaVendorAdapter
jpa.showSql=true

The datasource-config.xml file defines the data source bean using JDBC:

    
    <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"
          p:driverClassName="${jdbc.driverClassName}" p:url="${jdbc.url}"
          p:username="${jdbc.username}" p:password="${jdbc.password}"/>

The original pom.xml from Github refers to Hibernate 4 and NuoDB can support Hibernate 4 so my pom.xml looked like this

# pom.xml              
          <properties>
                
                <hibernate.version> 4.1.0.Final</hibernate.version>
                
                <hibernate-validator.version> 4.2.0.Final</hibernate-validator.version>
                <nuodb.version>1.1</nuodb.version>
           </properties>

           <dependencies>
                
                <dependency>
                        <groupId>com.nuodb</groupId>
                        <artifactId>nuodb-hibernate</artifactId>
                        <version>${nuodb.version}</version>
                </dependency>
                <dependency>
                        <groupId>com.nuodb</groupId>
                        <artifactId>nuodb-jdbc</artifactId>
                        <version>${nuodb.version}</version>
                </dependency>
           </dependencies>

Finally we need to change web.xml in ~/spring/spring-petclinic/src/main/webapp/WEB-INF directory to use jpa – the out-ot-the-box configuration is using jdbc.

   <context-param>
        <param-name>spring.profiles.active</param-name>
        <param-value>jpa</param-value>
    </context-param>

Now we can run the maven command to compile the code, initialize and populate NuoDB tables and then listen to port 9966 for requests.

 $ mvn tomcat7:run

and we can connect Spring using http://localhost:9966/petclinic/.

Spring-PetClinic-1

Spring PetClinic using NuoDB on AWS

Until now we used a local NuoDB instance, it is time to migrate the database layer onto AWS cloud. To run NuoDB on AWS EC2 instances, one of the simplest options is to use AWS CloudFormation. There are CloudFormation templates available on github.

To download them, we can run

$ git clone https://github.com/nuodb/cloudformation.git

Then we need to open AWS management console, go to CloudFormation and select Create Stack and upload the NuoDB template. (e.g. NuoDB-1.1.template in our case). Then we can define the number of agents (in addition to the broker), the domainname, the domain admin username and password and the EC2 instance type.

NuoDB-CF-1

Once we click on continue, the EC2 instances are going to be created, the status first will be  CREATE_IN_PROGRESS, then CREATE_COMPLETE.

As a result, we will get 3 EC2 instances (since we selected 2 agents); one dedicated to NuoDB broker and the other two for NuoDB agents. We will also have 3 EBS volumes, one for each EC2 instance.

CAUTION: The NuoDB CloudFormation script uses an EC2 auto-scaling group which ensures that the number of agents you selected will always be running. As a result, if you just stop the EC2 instances at the end of your test, AWS will restart them. In order to stop everything properly and clean up your test environment, open the AWS management console, go to CloudFormation, select the NuoDB stack and click “Delete Stack.” This will terminate all instances that were started by the CloudFormation template. More on AWS auto-scaling and how to use the command line tool can be found on AWS website.

Now we can connect to the EC2 host that runs the NuoDB broker:

NuoDB-Console-1

Once we logged in, we can start a database:

NuoDB-Console-2

In the next steps we can define the database name (spring), leave “allow non-durable database” option un-selected and the archive and journal directories for storage manager running on one of the hosts (/home/ec2-user/nuodb/data and /home/ec2-user/nuodb/journal respectively). Please, note that the /home/ec2-user directory has to have the appropriate rights to allow the creation of the data and journal directory (e.g. -rwxrwxrwx in our test).

NuoDB-Console-3

After that we can define the transaction engine running on the other EC2 host:

NuoDB-Console-4

Now we have 3 EC2 hosts : one dedicated to NuoDB broker to serve client connections, one for the storage manager with filesystem storage and one for the transaction engine.

Now, if we change the jdbc connection string in Spring PetClinic (remember, it is defined in data-access.properties), we can connect our application to the NuoDB using AWS servers. The reason why the connection is possible because AWS security groups allow any servers to be connected.

$ vi data-access.properties
#jdbc.url=jdbc:com.nuodb://192.168.80.128/spring?schema=user
jdbc.url=jdbc:com.nuodb://ec2-46-51-162-14.eu-west-1.compute.amazonaws.com/spring?schema=user

We can then rerun the Spring PetClinic command, this time with the database in the AWS cloud:

$ mvn tomcat7:run

Once Tomcat server is up and running, we can go to http://ec2-46-51-162-14.eu-west-1.compute.amazonaws.com:9966/petclinic/  and  we can search for vets, we can add new owners, we can search for them,  etc.

PetClinic-3

PetClinic-1

PetClinic-2

Scale out and Resilience

Scale out is a method of adding computing resources by adding additional computers to the system, rather than increasing the computing resources on the computers in the system.  Resilience is the ability to provide and maintain an acceptable level of service in case of faults.

If we want to scale out our database and also make it resilient, we can simply go back to the NuoDB console and add a new process using the Add Process menu. For instance, we can add a transaction engine to the EC2 server that was originally running the storage manager, and we can also add a storage manager to the EC2 server previously running a transaction server. This way, we have two EC2 servers both running one instance of the transaction engine and the storage manager.

Scaling out the database is a seamless process for the Spring PetClinic application; we just start up another EC2 server with an agent and add a transaction engine or a storage engine.

As can be seen in the NuoDB Performance Report, NuoDB scales almost linearly. The diagram below shows how number of transactions per second (TPS) can be increased by adding a new node:

NuoDBPerf

Conclusion

As we have seen in this series, NuoDB combines the standard SQL and ACID properties with elastic scalability that makes it perfectly suitable to be a robust cloud data management system of the 21st century. Its unique architectural approach provides high performance reads and writes and geo-distributed 24/7 operations with built-in resilience. Moreover,applications using well-know frameworks such as Spring and Hibernate can be easily ported or developed with NuoDB as a NewSQL database that meets cloud scale demands.

Advertisements

Large Scale Data Analytics with XtremeData Parallel SQL Database Engine


Introduction

A few months ago I have posted an article about Amazon Web Services Redshift – Amazon’s Data Warehouse solution in the cloud. XtremeData dbX is a similar massive parallel SQL database engine that can be run on-premise as well as in the cloud. It is a purpose-built high-performance and scalable solution for data warehouse applications and large scale analytics using innovative technologies such as vector execution model, dynamic data distribution and automatic load balancing.

XtremeData dbX Architecture

XtremeData dbX is a full SQL and ACID compliant database engine based on shared-nothing, massive parallel query execution. The underlying technology relies on PostgreSQL (similarly to AWS Redshift). In essence, the key architecture components are the head node and multiple data nodes – this is a fairly common patterns in massive parallel execution scenarios.

The head node manages the client connections, parses and plans the queries and sends the result back to the clients. The data nodes manage data storage and execute queries.

dbx-query-exec

XtremeData dbX in AWS cloud

Besides the on-premise deployment option, XtremeData dbX is available in AWS cloud. We need to go to AWS Marketplace to register for it.There are different editions; one for dbX Head and another for dbX Data nodes.

dbx-awsmarketplace

XtremeData provides a document how to setup dbX head and data nodes in the AWS cloud but I decided to implement AWS CloudFormation templates to make the deployment easier. Since there are two different AMIs for head and data nodes, I created two templates.

The AWS CloudFormation dbX-Head template:

{
  "AWSTemplateFormatVersion": "2010-09-09",
  "Description": "XtremeData dbX-Head CloudFormation",
  "Parameters" : {
    "ClusterName" : {
      "Description" : "Name of the XtremeData Cluster",
      "Type" : "String",
      "MinLength": "1",
      "MaxLength": "64",
      "AllowedPattern" : "[-_ a-zA-Z0-9]*",
      "ConstraintDescription" : "can contain only alphanumeric characters, spaces, dashes and underscores."
    },
    "KeyName" : {
      "Description" : "Name of an existing EC2 KeyPair to enable SSH access to the instances",
      "Type" : "String",
      "MinLength": "1",
      "MaxLength": "64",
      "AllowedPattern" : "[-_ a-zA-Z0-9]*",
      "ConstraintDescription" : "can contain only alphanumeric characters, spaces, dashes and underscores."
    },
    "InstanceType" : {
        "Description" : "XtremeData dbX-Head EC2 instance type",
        "Type" : "String",
        "Default" : "m1.medium",
        "AllowedValues" : [ "m1.medium","m1.large","m1.xlarge","m2.xlarge","m2.2xlarge","m2.4xlarge", "c1.xlarge","hi1.4xlarge"],
        "ConstraintDescription" : "must be a valid EC2 instance type."
      }
  },
  "Mappings" : {
    "AWSRegion2AMI" : {
      "us-east-1"      : {"AMI" : "ami-4c2f5d25"},
      "us-west-2"      : {"AMI" : "ami-c127b7f1"},
      "us-west-1"      : {"AMI" : "ami-84cde4c1"}
    }
  },
  "Resources": {
    "XtremeDataSecurityGroup": {
      "Type": "AWS::EC2::SecurityGroup",
      "Properties": {
        "GroupDescription": "Enable XTremedata Access",
        "SecurityGroupIngress": [
          {
            "IpProtocol": "tcp",
            "FromPort": "22",
            "ToPort": "22",
            "CidrIp": "0.0.0.0/0"
          }
        ]
      }
    },
	"XtremeDataSecurityGroupIngress" : {
	   "Type": "AWS::EC2::SecurityGroupIngress",
	   "Properties": {
            "GroupName": { "Ref": "XtremeDataSecurityGroup" },
            "IpProtocol": "tcp",
            "FromPort": "0",
            "ToPort": "65535",
            "SourceSecurityGroupName": { "Ref": "XtremeDataSecurityGroup" }
       }
	},
    "XtremeDataHeadInstance": {
      "Type": "AWS::EC2::Instance",
      "Properties": {
		"UserData" : { "Fn::Base64" : { "Ref" : "ClusterName" }},
        "SecurityGroups": [ { "Ref": "XtremeDataSecurityGroup" } ],
		"ImageId" : { "Fn::FindInMap" : [ "AWSRegion2AMI", { "Ref" : "AWS::Region" }, "AMI" ]},
        "InstanceType": {"Ref" : "InstanceType"},
        "KeyName": { "Ref" : "KeyName" },
		"BlockDeviceMappings" : [
               {
                  "DeviceName" : "/dev/sdf",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
               {
                  "DeviceName" : "/dev/sdg",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
			   {
                  "DeviceName" : "/dev/sdh",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
               {
                  "DeviceName" : "/dev/sdi",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               }
            ],
        "Tags" : [
          { "Key" : "Name", "Value" : "XtremeData dxX-Head"}
        ]
      }
    }
  }
}

This will create the security group for dbX node communications, define userdata (cluster name) and allocate 4 EBS volumes (100 GB each) to dbX Head node.

The AWS CloudFormation dbX-Data template looks as follows:

{
  "AWSTemplateFormatVersion": "2010-09-09",
  "Description": "XtremeData dbX-Data CloudFormation",
  "Parameters" : {
    "ClusterNameAnddbXHead" : {
      "Description" : "Name of the XtremeData Cluster and the dbX-Head EC2 hostname concatenated with semi-colon",
      "Type" : "String",
      "MinLength": "1",
      "MaxLength": "64"
    },
	"XtremeSecurityGroup" : {
      "Description" : "Name of the XtremeData SecurityGroup",
      "Type" : "String",
      "MinLength": "1",
      "MaxLength": "64",
      "AllowedPattern" : "[-_ a-zA-Z0-9]*",
      "ConstraintDescription" : "can contain only alphanumeric characters, spaces, dashes and underscores."
    },
	"NumberOfDataNodes" : {
      "Description" : "Max. Number of Xtreme Data Instances to start (in addition to the Head Instance)",
      "Type" : "Number",
      "Default" : "2"
    },
    "HeadAvailZone" : {
      "Description" : "Name of the Availability Node where dbX-Head instance is running",
      "Type" : "String",
	  "MinLength": "1",
      "MaxLength": "64",
      "AllowedPattern" : "[-_ a-zA-Z0-9]*",
      "ConstraintDescription" : "can contain only alphanumeric characters, spaces, dashes and underscores."
    }, 
    "KeyName" : {
      "Description" : "Name of an existing EC2 KeyPair to enable SSH access to the instances",
      "Type" : "String",
      "MinLength": "1",
      "MaxLength": "64",
      "AllowedPattern" : "[-_ a-zA-Z0-9]*",
      "ConstraintDescription" : "can contain only alphanumeric characters, spaces, dashes and underscores."
    },
    "InstanceType" : {
        "Description" : "XtremeData dbX-Data EC2 instance type",
        "Type" : "String",
        "Default" : "m1.large",
        "AllowedValues" : [ "m1.large","m1.xlarge","m2.xlarge","m2.2xlarge","m2.4xlarge", "c1.xlarge","hi1.4xlarge"],
        "ConstraintDescription" : "must be a valid EC2 instance type."
      }
  },
  "Mappings" : {
    "AWSRegion2AMI" : {
      "us-east-1"      : {"AMI" : "ami-442c5e2d"},
      "us-west-2"      : {"AMI" : "ami-1d28b82d"},
      "us-west-1"      : {"AMI" : "ami-66cce523"}
    }
  },
  "Resources": {
    "XtremeDataNodeGroup" : {
      "Type" : "AWS::AutoScaling::AutoScalingGroup",
      "Properties" : {
        "AvailabilityZones" : [ { "Ref" : "HeadAvailZone" } ],
        "LaunchConfigurationName" : { "Ref" : "DataNodeLaunchConfig" },
        "MinSize" : "0",
        "MaxSize" : {"Ref" : "NumberOfDataNodes"},
		"DesiredCapacity" : {"Ref" : "NumberOfDataNodes"},
        "Tags" : [
          { "Key" : "Name", "Value" : "XtremeData dbX-Data", "PropagateAtLaunch" : "true" }
        ]
      }
    },

	"DataNodeLaunchConfig" : {
       "Type" : "AWS::AutoScaling::LaunchConfiguration",
	   "Properties": {
	      "UserData" : { "Fn::Base64" : { "Ref" : "ClusterNameAnddbXHead" }},
          "KeyName" : { "Ref" : "KeyName" },
          "SecurityGroups" : [ { "Ref" : "XtremeSecurityGroup" } ],
          "InstanceType" : { "Ref" : "InstanceType" },
		  "ImageId" : { "Fn::FindInMap" : [ "AWSRegion2AMI", { "Ref" : "AWS::Region" }, "AMI" ]},
		  "BlockDeviceMappings" : [
               {
                  "DeviceName" : "/dev/sdf",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
               {
                  "DeviceName" : "/dev/sdg",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
			   {
                  "DeviceName" : "/dev/sdh",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               },
               {
                  "DeviceName" : "/dev/sdi",
                  "Ebs" : {
                     "VolumeType" : "standard",
                     "DeleteOnTermination" : "true",
                     "VolumeSize" : "100"
                  }
               }
          ]
		}
    }
  }
}

This template uses cluster name and AWS EC2 hostname concatenated with ; for userdata, it also uses the availability zones for dbX Head node and the security group that we just defined for dbX Head node. In addition, it allows to define the number of data nodes – it will be used to configure an auto-scaling group to ensure that we have the requested amount of data instances running.

CAUTION: Please, note that it is not enough just to terminate the nodes if you have auto-scaling defined since AWS will initiate new nodes. We need to terminate the instances using as-terminate-instance-in-auto-scaling-group command and then delete the auto-scaling config with as-delete-auto-scaling-group and as-delete-launch-config. More information about AWS auto-scaling can be found here.

To start an XtremeData dbX Head node, we need to go to AWS console Cloud Formation section, then select Create Stack:

dbx-2

Once we submit the configuration, the status will become CREATE_IN_PROGRESS and then after a while CREATE_COMPLETE.

Then we can create another stack for the data nodes using the second CloudFormation template:

dbx-8

After a while we should see one head node and the requested number of datanodes running in AWS EC2 console:

dbx-10

Creating a database in ExtremeData dbX

Once we have fired up the nodes, we can login to the head node as ec2-user using ssh (or e.g. putty on Windows).

First we need to initiate the cluster (please, note that this is a destructive operation, no data will be preserved), then start the cluster and the dbX database engine:

[ec2-user@ip-10-224-122-90 home]$ cluster_init -i 2
init started
examining nodes
initializing head ebs disks: xvdf xvdg xvdh xvdi
mdadm: Defaulting to version 1.2 metadata
mdadm: array /dev/md0 started.
mkfs.xfs: Specified data stripe unit 512 is not the same as the volume stripe unit 128
initializing node ebs disks: xvdf xvdg xvdh xvdi
clearing head ephemeral disks
clearing nodes ephemeral disks
cluster_init done

[ec2-user@ip-10-224-122-90 home]$ cluster_start
initializing head ephemeral disks
  head ephemeral disks:  xvdb
    raid-ing
mdadm: /dev/xvdb appears to contain an ext2fs file system
    size=419395584K  mtime=Sat Jul  6 21:52:00 2013
mdadm: Defaulting to version 1.2 metadata
mdadm: array /dev/md1 started.
mkfs.xfs: Specified data stripe unit 512 is not the same as the volume stripe unit 128
mkfs.xfs: Specified data stripe width 512 is not the same as the volume stripe width 128
examining nodes
assembling node EBSs: xvdf xvdg xvdh xvdi
initializing node tmp disks: xvdb xvdc
  raid-ing
cluster_start done

[ec2-user@ip-10-224-122-90 home]$ dbx_start
new dbx installation initialized
Initializing kernel nfsd:                                  [  OK  ]
Starting NFS services:                                     [  OK  ]
Starting NFS mountd:                                       [  OK  ]
Stopping RPC idmapd:                                       [  OK  ]
Starting RPC idmapd:                                       [  OK  ]
Starting NFS daemon:                                       [  OK  ]
rpc.svcgssd is stopped
rpc.mountd (pid 3402) is running...
nfsd (pid 3467 3466 3465 3464 3463 3462 3461 3460) is running...
Generating binary repository file ...
file 'xdapp.conf2.txt.conf0' generated.
starting dbx nodes
Starting xdu daemon...                                     [  OK  ]
dbx startup done
[ec2-user@ip-10-224-122-90 home]$
[ec2-user@ip-10-224-122-90 home]$

Now we can login as dbxdba user (we can do it again with ssh or putty since the key is copied over from ec2-user). Then we need to change the dbxdba password (using passwd Linux commad) in order to be able to login using dbX tools.

Now we are ready to create a server and once the server is created, we can then create a database. The simplest way to manage the database and run SQL queries is to use a command line tool called xdudb:

$ xdudb create aws_server 32145
||==================||

[dbxdba@ip-10-224-122-90 ~]$ xdudb list
||==================||
aws_server
[dbxdba@ip-10-224-122-90 ~]$ xdudb status
||==================||
ip-10-224-122-90 configuration: 1 head node and 2 data nodes
running dbx services          : 1 head node and 2 data nodes

[dbxdba@ip-10-224-122-90 ~]$ xdudb start aws_server
||==================||

[dbxdba@ip-10-224-122-90 ~]$ xdudb info aws_server
||==================||
Name        : aws_server
NodeSet     : Default_NS
Owner       : dbxdba
Port        : 32145
H|0   |head    |10.224.122.90  |head |/volumes/data/dbstore/dbxdba/aws_server/head
D|1   |node00  |10.154.137.31  |n0   |/hd/dbxdba/aws_server/n0
D|2   |node01  |10.165.7.225   |n1   |/hd/dbxdba/aws_server/n1

[dbxdba@ip-10-224-122-90 ~]$ xdudb dbcreate aws_server aws_db
CREATE DATABASE

Now we can create a table, insert some rows and run select statements.

[dbxdba@ip-10-224-122-90 ~]$ xdudb sql aws_server aws_db
Welcome to dbX psql 3.2.5, the interactive SQL terminal.

Type:  \copyright for distribution terms
       \h for help with SQL commands
       \? for help with psql commands
       \g or terminate with semicolon to execute query
       \q to quit

aws_db=# CREATE TABLE products (
aws_db(# product_no INTEGER,
aws_db(# name VARCHAR(30),
aws_db(# price NUMERIC(10,2)
aws_db(# );
CREATE TABLE
aws_db=# INSERT INTO products VALUES(1, 'Product1', 100.00);
INSERT 0 1
aws_db=# SELECT * FROM products;
 product_no |   name   | price
------------+----------+--------
          1 | Product1 | 100.00
(1 row, Query Total: 1)

WIth the help of the xdudb command line tool we can check dbX version, check disk usage and mounted filesystems, etc.

[dbxdba@ip-10-224-122-90 ~]$ xdudb version
ip-10-224-122-90        2013-07-07 00:03:10
  DB           : dbX SQL 3.2.5
               : fpga.run r4347
  command line : xdudb 1.4
  GUI          : xdadm 3.0.2
  daemon       : xdutils 4.5.8
  OS           : Linux 3.4.43-43.43.amzn1.x86_64

[dbxdba@ip-10-224-122-90 ~]$ xdudb  du aws_server
||==================||
H|0   |head    |10.224.122.90  |head |60M  |/volumes/data/dbstore/dbxdba/aws_server/head
D|1   |node00  |10.154.137.31  |n0   |58M  |/hd/dbxdba/aws_server/n0
D|2   |node01  |10.165.7.225   |n1   |58M  |/hd/dbxdba/aws

[dbxdba@ip-10-224-122-90 ~]$ xdudb  df
head : 10.224.122.90
Filesystem            Size  Used Avail Use% Mounted on
/dev/md0              400G   95M  400G   1% /volumes/data
node00 : 10.154.137.31
Filesystem            Size  Used Avail Use% Mounted on
/dev/md0              400G   93M  400G   1% /volumes/data
node01 : 10.165.7.225
Filesystem            Size  Used Avail Use% Mounted on
/dev/md0              400G   93M  400G   1% /volumes/data

xdAdm web based management GUI

In addition to the command line tool that we described in the AWS cloud deployment scenario, there is also a web base GUI to manage the applicance – it is based on Apache Tomcat. We just go to http://dbx-headnode:8080/xdadm url and login. The we can manage the servers and databases, check nodesets and monitor the system:

dbx-15

XtremeData dbX virtual machine

If someone just wants to try out dbX database engine then she can download a virtual machine from XtremeData web site. The instance can be run e.g using vmplayer. Once the user logged in to the virtual machine as dbxdba, he can startup a server:

$ xdudb start dbxtest

Splunk Storm – Machine Data Processing in the Cloud


Introduction

Splunk is a platform to process machine data from various sources such as weblogs, syslogs, log4j logs and can also work with JSON and CSV file formats thus any application that produces JSON or CSV output can be seen as a source for Splunk. As the volume and variety of machine data are increasing, Splunk is becoming a more and more interesting player in big data world, too.

Splunk can be considered as a search engine for IT data. Splunk collects data from multiple sources, indexes them and the users can search them using Splunk proprietary language called SPL (Search Processing Language). The search results can then be used to create reports and dashboards to visualize the data.

Splunk Architecture

Under the hood Splunk architecture has the following key components:
forwarders are used to forward data to Splunk receiver instances. Receiver instances are normally indexers.
indexers that are splunk instances to index data. Indexes are stored in files. There are two types of files; raw datafiles which store the data in compressed format and index files that contain metadata for search queries. During indexing, Splunk extracts default fields and identifies events based on timestamps or creates them if there is no timestamp found.
search head and search peers. In a distributed environment search head manages the search requests, directs them to search peers and then merges result back to the users.
Splunk Web is a graphical user interface based on Python application server.
SplunkArchitecture

Splunk Storm

Splunk Storm is a cloud service version of Splunk. Splunk Storm runs in the Amazon cloud and uses of both Elastic Block Storage (EBS) and the Simple Storage Service (S3).

The price plan is based on monthly fee, it depends on the volume of the data that you want to store. As of writing this article, there is a free tier with 1 GB storage, while for example 100 GB storage volume costs 400 USD and the maximum 1 TB storage volume costs 3,000 USD per month.

To get started, we need to sign up and crate a project.
Splunk-1

Then we can define the data inputs. There are four options: upload a file, use forwarders, use the API (it is in beta yet) or use network data sent directly from the servers.

As a first test, we will use data files uploaded from a local directory. We used a sample apache web access.log and a syslog available from http://www.monitorware.com/en/logsamples/

It takes a some time to index the files and then they become available for search queries.

Splunk-11

We can run a search query to identify all HTTP client side error codes:

"source="access_log.txt" status>="400" AND status <="500"

Splunk-15

If we want to identify all the access log entries with HTTP POST method, we can run the following search query:

source="access_log.txt" method="POST"

In a similar way, if we want to find all the messages from the uploaded syslog file that were generated by the kernel process then we can run the following query:

source="syslog-messages.txt" process="kernel"

Splunk-16

Splunk forwarder and Twitter API

As a next example, we want to test output generated by our program using Twitter API. The program will generate JSON format in a file using Python based Twitter API. The directory is monitored by a Splunk forwarder and once the file is created in the predefined directory, the forwarder will send it to Splunk Storm.

First we need to create an application in Twitter via https://dev/twitter.com portal. The application will have its customer_key, customer_secret, access_token_key and access_token_secret that is going to be required by the Twitter API.

Twitter-6

The Twitter API that we are going to use for the Python application is downloadable from Github, https://github.com/bear/python-twitter.git .

This API depends oauth2, simplejson and httplib2 so we need to installed them first. Then we can get the code from Github and build and install the package.

$ git clone https://github.com/bear/python-twitter.git

# Build and Install:
$ python setup.py build
$ python setup.py install

The Twitter application code – twtr.py –  is as follows:

# twtr.py
import sys
import twitter

if len(sys.argv) < 3:
    print "Usage: " + sys.argv[0] + " keyword count"
    sys.exit(1)

keyword = sys.argv[1]
count = sys.argv[2]
# Twitter API 1.1. Count - up to a maximum of 100
# https://dev.twitter.com/docs/api/1.1/get/search/tweets
if int(count) > 100:
    count = 100

api = twitter.Api(consumer_key="CONSUMER_KEY", consumer_secret="CONSUMER_SECRET", access_token_key="ACCESS_TOKEN_KEY", access_token_secret="4PXvz7QIiwtwhFrFXFEkc9wY7iBOdgusD8ZQLvUhabM" )

search_result = api.GetSearch(term=keyword, count=count)

for s in search_result:
    print s.AsJsonString()

The Python program can be run as follows:

$ python twtr.py "big data" 100

Installing Splunk forwarder

Then we need to install Splunk forwarder, see http://www.splunk.com/download/universalforwarder . We also need to download the Splunk credentials that will allow the forwarder to send data to our project. Once the forwarder and the ceredentials are installed we can login and add a directory (twitter_status) for our forwarder to be monitored. We defined the sourcetype as json_notimestamp.

# Download splunk forwarder
$ wget -O splunkforwarder-5.0.3-163460-Linux-x86_64.tgz 'http://www.splunk.com/page/download_track?file=5.0.3/universalforwarder/linux/splunkforwarder-5.0.3-163460-Linux-x86_64.tgz&ac=&wget=true&name=wget&typed=releases&elq=8ccba442-db76-4fc8-b36b-36252bb61257'

# Install and start splunk forwarder
$ tar xvzf splunkforwarder-5.0.3-163460-Linux-x86_64.tgz
$ export SPLUNK_HOME=/home/ec2-user/splunkforwarder
$ $SPLUNK_HOME/bin/splunk start
# Install project credentials
$ $SPLUNK_HOME/bin/splunk install app ./stormforwarder_2628fbc8d76811e2b09622000a1cdcf0.spl -auth admin:changeme
App '/home/ec2-user/stormforwarder_2628fbc8d76811e2b09622000a1cdcf0.spl' installed

# Login
$SPLUNK_HOME/bin/splunk login -auth admin:changeme

#' Add monitor (directory or file)
 $SPLUNK_HOME/bin/splunk add monitor /home/ec2-user/splunk_blog/twitter_status -sourcetype json_no_timestamp
Added monitor of '/home/ec2-user/splunk_blog/twitter_status'.

Now we are ready to run the Python code using Twitter API:

$ python twtr.py "big data" 100 | tee twitter_status/twitter_status.txt

The program creates a twitter_status.txt file under twitter_status directory which is monitored by Splunk forwarder. The forwarder sends the output file to Splunk Storm. After some time it will appears under the inputs sections as authenticated forwarder. The  file will be shown as a source together with the previously uploaded apache access log and syslog.
SPlunk-17

Splunk-18

If we want to search for users with location London, the search query looks like this:

source="/home/ec2-user/splunk_blog/twitter_status/twitter_status.txt" user.location="London, UK"

We can also define a search query to show the top 10 timezones from the Twitter result and from the search result it is easy to create a Report with just a few clicks on the web user interface. The report allows to chose multiple visualization options like column, area or pie chart types, etc.

source="/home/ec2-user/splunk_blog/twitter_status/twitter_status.txt" | top limit=10 user.time_zone

SPlunk-22

Splunk-25

Conclusion

As mentioned in the beginning of this article, the variety and the volume generated by machines are increasing dramatically; sensor data, application logs, web access logs, syslogs, database and filesystem audit logs are just a few examples of the potential data sources that require attention but can pose difficulties to process and analyse them in a timely manner. Splunk is a great tool to deal with the ever increasing data volume and with Splunk Storm users can start analysing their data in the cloud without hassle.

Hadoop REST API – WebHDFS


Introduction

Hadoop provides a Java native API to support file system operations such as create, rename or delete files and directories, open, read or write files, set permissions, etc. A very basic example can be found on Apache wiki about how to read and write files from Hadoop.

This is great for applications running within the Hadoop cluster but there may be use cases where an external application needs to manipulate HDFS like it needs to create directories and write files to that directory or read the content of a file stored on HDFS. Hortonworks developed an additional API to support these requirements based on standard REST functionalities.

WebHDFS REST API

WebHDFS concept is based on HTTP operations like GET, PUT, POST and DELETE. Operations like OPEN, GETFILESTATUS, LISTSTATUS are using HTTP GET, others like CREATE, MKDIRS, RENAME, SETPERMISSIONS are relying on HTTP PUT. APPEND operations is based on HTTP POST, while DELETE is using HTTP DELETE.

Authentication can be based on user.name query parameter (as part of the HTTP query string) or if security is turned on then it relies on Kerberos.

The standard URL format is as follows: http://host:port/webhdfs/v1/?op=operation&user.name=username

In some cases namenode returns a URL using HTTP 307 Temporary Redirect mechanism with a location URL referring to the appropriate datanode. Then the client needs to follow that URL to execute the file operations on that particular datanode.

By default the namenode and datanode ports are 50070 and 50075, respectively, see more details about the default HDFS ports on Cloudera blog.

In order to configure WebHDFS, we need to hdfs-site.xml as follows:

        <property>
           <name>dfs.webhdfs.enabled</name>
           <value>true</value>
        </property>

WebHDFS examples

As the simplest approach, we can use curl to invoke WebHDFS REST API

1./ Check directory status

$ curl -i "http://localhost:50070/webhdfs/v1/tmp?user.name=istvan&op=GETFILESTATUS"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370210454798&s=zKjRgOMQ1Q3NB1kXqHJ6GPa6TlY=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

{"FileStatus":{"accessTime":0,"blockSize":0,"group":"supergroup","length":0,"modificationTime":1370174432465,"owner":"istvan","pathSuffix":"","permission":"755","replication":0,"type":"DIRECTORY"}}

This is similar to execute the Hadoop ls filesystem command:

$ bin/hadoop fs -ls /
Warning: $HADOOP_HOME is deprecated.

Found 1 items
drwxr-xr-x - istvan supergroup 0 2013-06-02 13:00 /tmp

2./ Create a directory

$ curl -i -X PUT "http://localhost:50070/webhdfs/v1/tmp/webhdfs?user.name=istvan&op=MKDIRS"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370210530831&s=YGwbkw0xRVpEAgbZpX7wlo56RMI=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

The equivalent Hadoop filesystem command is as follows:

$ bin/hadoop fs -ls /tmp
Warning: $HADOOP_HOME is deprecated.

Found 2 items
drwxr-xr-x   - istvan supergroup          0 2013-06-02 12:17 /tmp/hadoop-istvan
drwxr-xr-x   - istvan supergroup          0 2013-06-02 13:02 /tmp/webhdfs

3./ Create a file

To create a file requires two steps. First we need to run the command against the namenode then follows the redirection and execute the WebHDFS API against the appropriate datanode.

Step 1:

curl -i -X PUT "http://localhost:50070/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?user.name=istvan&op=CREATE"
HTTP/1.1 307 TEMPORARY_REDIRECT
Content-Type: application/octet-stream
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370210936666&s=BLAIjTpNwurdsgvFxNL3Zf4bzpg=";Path=/
Location: http://istvan-pc:50075/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=CREATE&user.name=istvan&overwrite=false
Content-Length: 0
Server: Jetty(6.1.26)

Step 2:

$ curl -i -T webhdfs-test.txt "http://istvan-pc:50075/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=CREATE&user.name=istvan&overwrite=false"
HTTP/1.1 100 Continue

HTTP/1.1 201 Created
Content-Type: application/octet-stream
Location: webhdfs://0.0.0.0:50070/tmp/webhdfs/webhdfs-test.txt
Content-Length: 0
Server: Jetty(6.1.26)

To validate the result of the WebHDFS API we can run the following Hadoop filesystem command:

$ bin/hadoop fs -ls /tmp/webhdfs
Warning: $HADOOP_HOME is deprecated.

Found 1 items
-rw-r--r--   1 istvan supergroup         20 2013-06-02 13:09 /tmp/webhdfs/webhdfs-test.txt

4./ Open and read a file

In this case we run curl with -L option to follow the HTTP temporary redirect URL.

$ curl -i -L "http://localhost:50070/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=OPEN&user.name=istvan"
HTTP/1.1 307 TEMPORARY_REDIRECT
Content-Type: application/octet-stream
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211032526&s=suBorvpvTUs6z/sw5n5PiZWsUnU=";Path=/
Location: http://istvan-pc:50075/webhdfs/v1/tmp/webhdfs/webhdfs-test.txt?op=OPEN&user.name=istvan&offset=0
Content-Length: 0
Server: Jetty(6.1.26)

HTTP/1.1 200 OK
Content-Type: application/octet-stream
Content-Length: 20
Server: Jetty(6.1.26)

Hadoop WebHDFS test

The corresponding Hadoop filesystem is as follows:

$ bin/hadoop fs -cat /tmp/webhdfs/webhdfs-test.txt
Warning: $HADOOP_HOME is deprecated.

Hadoop WebHDFS test

5./ Rename a directory

$ curl -i -X PUT "http://localhost:50070/webhdfs/v1/tmp/webhdfs?op=RENAME&user.name=istvan&destination=/tmp/webhdfs-new"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211103159&s=Gq/EBWZTBaoMk0tkGoodV+gU6jc=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

To validate the result we can run the following Hadoop filesystem command:

$ bin/hadoop fs -ls /tmp
Warning: $HADOOP_HOME is deprecated.

Found 2 items
drwxr-xr-x   - istvan supergroup          0 2013-06-02 12:17 /tmp/hadoop-istvan
drwxr-xr-x   - istvan supergroup          0 2013-06-02 13:09 /tmp/webhdfs-new

6./ Delete a directory

This scenario results in an exception if the directory is not empty since a non-empty directory cannot be deleted.

$ curl -i -X DELETE "http://localhost:50070/webhdfs/v1/tmp/webhdfs-new?op=DELETE&user.name=istvan"
HTTP/1.1 403 Forbidden
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211266383&s=QFIJMWsy61vygFExl91Sgg5ME/Q=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

{"RemoteException":{"exception":"IOException","javaClassName":"java.io.IOException","message":"/tmp/webhdfs-new is non empty"}}

First the file in the directory needs to be deleted and then the empty directory can be deleted, too.

$ curl -i -X DELETE "http://localhost:50070/webhdfs/v1/tmp/webhdfs-new/webhdfs-test.txt?op=DELETE&user.name=istvan"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211375617&s=cG6727hbqGkrk/GO4yNRiZw4QxQ=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

$ bin/hadoop fs -ls /tmp/webhdfs-newWarning: $HADOOP_HOME is deprecated.

$ curl -i -X DELETE "http://localhost:50070/webhdfs/v1/tmp/webhdfs-new?op=DELETE&user.name=istvan&destination=/tmp/webhdfs-new"
HTTP/1.1 200 OK
Content-Type: application/json
Expires: Thu, 01-Jan-1970 00:00:00 GMT
Set-Cookie: hadoop.auth="u=istvan&p=istvan&t=simple&e=1370211495893&s=hZcZFDOL0x7exEhn14RlMgF4a/c=";Path=/
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

$ bin/hadoop fs -ls /tmpWarning: $HADOOP_HOME is deprecated.

Found 1 items
drwxr-xr-x   - istvan supergroup          0 2013-06-02 12:17 /tmp/hadoop-istvan

Conclusion

WebHDFS provides a simple, standard way to execute Hadoop filesystem operations by an external client that does not necessarily run on the Hadoop cluster itself. The requirement for WebHDFS is that the client needs to have a direct connection to namenode and datanodes via the predefined ports. Hadoop HDFS over HTTP – that was inspired by HDFS Proxy – addresses these limitations by providing a proxy layer based on preconfigured Tomcat bundle; it is interoperable with WebHDFS API but does not require the firewall ports to be open for the client.

Introduction to NuoDB – An Elastically Scalable Cloud Database


Introduction

Traditional relational databases are built upon a synchronous, client-server architecture that is often limited in terms of scalability requirements that are posed by distributed computing systems. As a result, various sharding, caching, and replication techniques emerged to cope with these demands. On the other hand, NoSQL solutions have emerged on the ground of the CAP theorem. Data management systems like BigTable, HBase, MongoDB, Cassandra, and Dynamo offer different capabilities depending on how they balance consistency, availability, and partition tolerance. However, they gave up supporting SQL and ACID properties, which are critical in the relational database world.

NuoDB is a complete re-think of relational databases that is built on a new foundation; partial, on-demand replication. Under the hood, NuoDB is an asynchronous, decentralized, peer-to-peer database. It uses the concept of Atoms, these are objects that are being replicated . In NuoDB everything is an Atom; database, schema, sequence, table, index, records, blobs, data are all Atoms. NuoDB holds a patent on this peer-to-peer object replication.

NuoDB Architecture

NuoDB architecture has three layers: management layer, SQL layer and data layer. The management layer is comprised of an agent that manages the NuoDB processes running on a particular computer, it starts and stops them and it also collects statistics from the transaction and storage engines. Certain agents are configured to be a broker – brokers communicate with the client initially and then the broker introduces the client to the transaction engine. From then on the client can communicate directly with the transaction engines. NuoDB management layer also offers a command line and a web-based management tool to manage the databases. NuoDB also offer a command line loader for exporting and importing data.

At the  SQL layer NuoDB has transaction engines that provide access to a single database.The transaction engine parses, compiles, optimizes and executes the SQL statements on behalf of the clients.

At the data layer NuoDB has storage managers that provide persistence of the data. A storage manager uses key/value pairs to store the information but it can also use more sophisticated stores e.g. HDFS.

In case of a minimal configuration we can run every components (broker, transaction engine and storage manager) on the same machine. NuoDB can be easily scaled out and can be made redundant by adding multiple brokers, transaction engines and storage managers. In more complex scenarios we can run NuoDB in the AWS cloud or across multiple corporate datacenters providing geo-redundancy. Below is an example of a redundant architecture with two brokers, two transaction engines and two storage managers.NuoDBRedundantArchitecture

Getting Started

NuoDB is available on multiple platforms like Windows 32 and 64-bit, Linux 64-bit (Ubuntu, RHEL, Suse), Mac OSX 10.7, Solaris 11 (Inter 64-bit).  For this article we used a Ubuntu 12.04 LTS virtual machine.

First we need to start up the components as discussed above; the broker/agent, then from the command line management tool we can start up the transaction engine and the storage manager. We also need to configure the properties file to contain the settings for the domain.

$ vi ./etc/stock.properties
# A flag specifying whether this agent should be run as a connection broker
broker = true

# The name used to identify the domain that this agent is a part of 
domain = stock

# The default administrative password, and the secret used by agents to
# setup and maintain the domain securely
domainPassword = stock
# Start agent
$ java -DpropertiesUrl=file:///home/notroot/nuodb/etc/stock.properties -jar jar/nuoagent.jar --verbose &>/tmp/stock.log &

# Start command line manager
$ java -jar jar/nuodbmanager.jar --broker localhost --password stock
nuodb [stock] > show domain summary

Hosts:
[broker] localhost/127.0.0.1:48004

## Create a new domain administrator user
nuodb [stock] > create domain administrator user istvan password istvan

## Start Storage Manager
nuodb [stock] > start process sm
Database: stock
Host: localhost
Process command-line options: --dba-user stock --dba-password stock
Archive directory: /home/notroot/nuodb/data
Initialize archive: true

Started: [SM] ubuntu/127.0.1.1:59914 [ pid = 3467 ] ACTIVE

## ps -ef | grep nuodb
## notroot   3467  3396  0 12:01 pts/0    00:00:00 /home/notroot/nuodb-1.0.1.128.linux.x86_64/bin/nuodb --connect-key 7124934669079864995

## Start Transaction Engine
nuodb [stock/stock] > start process te
Host: localhost
Process command-line options: --dba-user stock --dba-password stock

Started: [TE] ubuntu/127.0.1.1:60545 [ pid = 3494 ] ACTIVE

## ps -ef| grep nuodb
## notroot   3494  3396  0 12:06 pts/0    00:00:00 /home/notroot/nuodb-1.0.1.128.linux.x86_64/bin/nuodb --connect-key 8587006928753483386

Note, that we started the storage manager with initialize yes option. This is only for the first time, any subsequent startup shall use initialize no option, otherwise the data will be overwritten.

Then we can connect to the database using nuosql client – the first argument is the name of the database (stock), and we need to specify the database admin username/password. After login we can set the schema with use command to stock.:

$ bin/nuosql stock --user stock --password stock
SQL> use stock
SQL> show
	autocommit state is on
	semicolon completion is required
	current schema is STOCK
SQL> show tables

	No tables found in schema STOCK

SQL> create table Stock
   > (
   >    Id             Integer not NULL generated always as identity primary key,
   >    StockDate      Date,
   >    StockOpen      Decimal(8,2),
   >    StockHigh      Decimal(8,2),
   >    StockLow       Decimal(8,2),
   >    StockClose     Decimal(8,2),
   >    StockVolume    Integer,
   >    StockAdjClose  Decimal(8,2)
   > );
SQL> show tables

	Tables in schema STOCK

		STOCK

We can then load the data stored in csv file format into the database table. The CSV file – google.csv for stock information – was downloaded from http://finance.yahoo.com

$ bin/nuoloader --schema stock --user stock --password stock --import "/home/notroot/nuodb/samples/stock/google.csv",skip --to "insert into Stock values(default,?,?,?,?,?,?,?)" stock &> /tmp/nuoloader.log

Imported 2163 rows, failed 0 rows, size 101897 bytes from /home/notroot/nuodb/sa
mples/stock/google.csv

Then we can login again using nuosql and run a regular SQL query to retrieve the top 10 stock values and the corresponding date (ordered by adj close value):

notroot@ubuntu:~/nuodb$ bin/nuosql stock --user stock --password stockSQL> use stock
SQL> select count(*) from stock;
 COUNT  
 ------ 
  2163  

SQL> select StockDate, StockOpen,StockClose, StockVolume, StockAdjClose from stock order by StockAdjClose desc limit 10;

 STOCKDATE  STOCKOPEN  STOCKCLOSE  STOCKVOLUME  STOCKADJCLOSE  
 ---------- ---------- ----------- ------------ -------------- 

 2013-03-05   828.93     838.60      4044100        838.60     
 2013-03-11   831.69     834.82      1594700        834.82     
 2013-03-07   834.06     832.60      2052700        832.60     
 2013-03-08   834.50     831.52      2911900        831.52     
 2013-03-06   841.03     831.38      2873000        831.38     
 2013-03-12   830.71     827.61      2008300        827.61     
 2013-03-13   827.90     825.31      1641300        825.31     
 2013-03-14   826.99     821.54      1651200        821.54     
 2013-03-04   805.30     821.50      2775600        821.50     
 2013-03-20   816.83     814.71      1463800        814.71

Java Client – JDBC for NuoDB

NuoDB supports various programming languages for client applications such as Java, .NET, PHP, Ruby and Node.js. In this section we demonstrate that NuoDB supports JDBC in the same way that it is available for traditional relational databases. The Java program needs to add nuodbjdbc.jar to its classpath.

Below is an example Java code (StockDB.java) to retrieve the highest stock value ever (ordered by adj close) and the related date:

$ cat StockDB.java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class StockDB {

    /** The driver class provided by NimbusDB. */
    public static final String DRIVER_CLASS =
	"com.nuodb.jdbc.Driver";

    /** The base URL for connecting to a local database server. */
    public static final String DATABASE_URL =
	"jdbc:com.nuodb://localhost/";

    // the established connection to a local server
    private final Connection dbConnection;

    /**
     * Creates an instance of DB and connects to a local server,
     * as the given user, to work with the given named database
     *
     * @param user the user name for the connection
     * @param password the password for the given user
     * @param dbName the name of the database at the server to use
     */
    public StockDB(String user, String password, String dbName)
	throws SQLException
    {
	Properties properties = new Properties();
	properties.put("user", user);
	properties.put("password", password);
	properties.put("schema", "stock");

	dbConnection =
	    DriverManager.getConnection(DATABASE_URL + dbName, properties);
    }

    /** Closes the connection to the server. */
    public void close() throws SQLException {
	dbConnection.close();
    }

    /**
     * Gets the name for the given id, or null if no name exists.
     *
     * @param an identifier
     * @return the name associate with the identifier, or null
     */
    public String getDateAndAdjClose() throws SQLException {
	Statement stmt = dbConnection.createStatement();
	ResultSet rs = stmt.
	    executeQuery("select stockdate, stockadjclose from stock order by stockadjclose desc limit 1");
	try {
	    if (rs.next())
		return rs.getString(1) + ", " + rs.getString(2);
	    return null;
	} finally {
	    rs.close();
	    stmt.close();
	}
    }

    /** Main-line for this example. */
    public static void main(String [] args) throws Exception {
	Class.forName(DRIVER_CLASS);

	StockDB stockDB = new StockDB("stock", "stock", "stock");
	System.out.println("Date and AdjClose: "  + stockDB.getDateAndAdjClose());

	stockDB.close();
    }

}

Then we can run the Java program as follows:

notroot@ubuntu:~/nuodb/samples/java$ javac StockDB.java 
notroot@ubuntu:~/nuodb/samples/java$ java -classpath .:../../jar/nuodbjdbc.jar StockDB
Date and AdjClose: 2013-03-05, 838.60

Amazon Web Services Redshift – Data Warehouse in the Cloud


Introduction

Amazon Web Services has made publicly available its fully managed, petabyte-scale data warehouse cloud service in February, 2013. It promises a high performance, secure, easily scalable data warehouse solution that costs 1/10th of a traditional data warehouse (less than 1,000 USD/TB/year, according to the AWS Introduction to Redshift presentation: http://aws.amazon.com/redshift/) , it is compatible with the traditional BI tools and ready to be running within minutes. As of writing this article the service is available in US East region only but supposed to be rolled out to other regions, too. The service is manageable via the regular AWS tools: AWS management console, command line tools (aws commands based on python) and API based on HTTP requests/responses.

Under the hood

Under the hood, AWS Redshift is based on PostgreSQL 8.0.2. The architecture consist of 1 leader node – a node which is responsible for managing the communications with the clients, developing the execution plan and then distributing the compiled code to the compute nodes-, and 1 or more compute nodes that are exetung the code and then sending back the result to the leader node for aggregation. The compute nodes can have either 2-cores, 15GB RAM and 2 TB storage node (dubbed as XL node) or a 16-cores, 120 GB RAM and 16 TB storage node (dubbed as 8XL node). More details about the Redshift archtecture can be found at http://docs.aws.amazon.com/redshift/latest/dg/c_internal_arch_system_operation.html

AWS-Redshift-Arch

Launching a cluster

The easiest way to launch a cluster is via AWS console.

We need to define the basic attributes like cluster identifier, database name, database port, master username and password:

AWS-RedShift1

Then we need to select the node type (XL or 8XL) and the number of compute nodes. A cluster can be single or multi-node, the minimum config is a one XL node cluster, while the maximum config is sixteen 8XL nodes – you can do the math in terms of cores, memory and storage.

AWS-Redshift2

Then we can configure additional parameters (like database encyption or security groups)

AWS-Redshift4

We can then review the configuration and are ready to launch the service:

AWS-RedShift5

The status will be first “creating” for a while then it will become “available”. This is when the JDBC url will become known and can be used for configuring the clients.

AWS-RedShift8

In order to make the service accessible, we need to configure the security options (either a security group – if Redshift is going to be accessed from EC2 – or a CIDR/IP (Classless- Inter-Domain Routing IP range)  – if Redshift is to be accessed from public Internet.  The system will automatically recognise the IP address of the client connected to AWS console.

AWS-RedShift6

And that is it! From then on the client can be connected to the Redshift cluster.

We used SQLWorkbench to test the service, the same way as suggested by AWS Redshift documentation. It is a Java based open source SQL tool. The connection parameters are the standard JDBC attributes:

AWS-Redshift9

The PostgreSQL version can be checked using

select version();

version
PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc (GCC) 3.4.2 20041017 (Red Hat 3.4.2-6.fc3), Redshift 1.0.546

We tested the service with Amazon stock prices downloaded from Yahoo Finance.

The content has been uploaded to a S3 bucket called stockprice (S3://stockprice/amzn.csv). We had to make it accessible for everyone to read. (open/download).

Then we could create the appropriate table using standard SQL command:

CREATE TABLE stockprice (
     stockdate date not null,
     stockopen decimal(6,2),
     stockhigh decimal(6,2),
     stocklow decimal(6,2),
     stockclose decimal(6,2),
     stockvolume integer,
     stockadjclose decimal(6,2)
     );

Table 'stockprice' created

Execution time: 0.15s

desc stockprice
COLUMN_NAME	DATA_TYPE	PK	NULLABLE	DEFAULT	AUTOINCREMENT	REMARKS	POSITION
stockdate	date	NO	YES		NO		1
stockopen	numeric(6,2)	NO	YES		NO		2
stockhigh	numeric(6,2)	NO	YES		NO		3
stocklow	numeric(6,2)	NO	YES		NO		4
stockclose	numeric(6,2)	NO	YES		NO		5
stockvolume	integer	NO	YES		NO		6
stockadjclose	numeric(6,2)	NO	YES		NO		7

To load the data into stockprice table, we had to use copy command with the S3 source file (it could also be an Amazon DynamoDB source).

copy stockprice from 's3://stockprices/amzn.csv' CREDENTIALS 'aws_access_key_id=XXXXXXX;aws_secret_access_key=XXXXXXX' delimiter ',';

If there is any error during the load operation, it can be verified by running a select statement on the stl_load_errors table. (e.g. incorrect data format).

And then we can run our SQL statements to analyse the data.

select * from stockprice order by stockadjclose desc limit 100;

stockdate	stockopen	stockhigh	stocklow	stockclose	stockvolume	stockadjclose
2013-01-25	275.00	284.72	274.40	283.99	4968100	283.99
2013-01-28	283.78	284.48	274.40	276.04	4321400	276.04
2013-03-05	274.00	276.68	269.99	275.59	3686200	275.59
2013-03-13	275.24	276.50	272.64	275.10	1884200	275.10
2013-03-08	275.00	275.44	271.50	274.19	1879800	274.19
2013-03-12	271.00	277.40	270.36	274.13	3246200	274.13
2013-03-07	274.10	274.80	271.85	273.88	1939000	273.88
2013-03-06	275.76	276.49	271.83	273.79	2050700	273.79
2013-01-24	269.37	276.65	269.37	273.46	3417000	273.46
2013-03-04	265.36	273.30	264.14	273.11	3453000	273.11
2013-01-30	283.00	284.20	267.11	272.76	13075400	272.76
2013-01-14	268.00	274.26	267.54	272.73	4275000	272.73
2013-01-18	270.83	274.50	269.60	272.12	2942000	272.12
2013-01-15	270.68	272.73	269.30	271.90	2326900	271.90
2013-03-11	273.43	273.99	270.40	271.24	1904900	271.24

AWS console supports various management functions of the cluster, we can reboot the cluster, we can modify parameters, we can resize it by defining different node type (XL->8XL) or decreasing/increasing the number of nodes. We can also delete the cluster via AWS console.
AWS-Redshift11

Conclusion

Amazon Web Services Redshift is another big step to make cloud services available for enterprise computing. It offers a data warehouse capability with minimal effort to start up and scale as operations demand. It is a great complement to other database services such as DynamoDB for NoSQL requirements and RDS for relational database services.

R and Hadoop Data Analysis – RHadoop


Introduction

R is a programming language and a software suite used for data analysis, statistical computing and data visualization. It is highly extensible and has object oriented features and strong graphical capabilities. At its heart R is an interpreted language and comes with a command line interpreter – available for Linux, Windows and Mac machines – but there are IDEs as well to support development like RStudio or JGR.

R and Hadoop can complement each other very well, they are a natural match in big data analytics and visualization. One of the most well-known R packages to support Hadoop functionalities is RHadoop that was developed by RevolutionAnalytics.

Installing RHadoop

RHadoop is a collection of three R packages: rmr, rhdfs and rhbase. rmr package provides Hadoop MapReduce functionality in R, rhdfs provides HDFS file management in R and rhbase provides HBase database management from within R.

To install these R packages, first we need to install R base package. On Ubuntu 12.04 LTS we can do it running:

$ sudo apt-get install r-base

Then we need to install RHadoop packages with their dependencies.  rmr requires RCpp, RJSONIO, digest, functional, stringr and plyr, while rhdfs requires rJava.

As part of the installation, we need to reconfigure Java for rJava package and we also need to set HADOOP_CMD variable for rhdfs package. The installation requires the corresponding tar.gz archives to be downloaded and then we can run R CMD INSTALL command with sudo privileges.

sudo R CMD INSTALL Rcpp Rcpp_0.10.2.tar.gz
sudo R CMD INSTALL RJSONIO RJSONIO_1.0-1.tar.gz
sudo R CMD INSTALL digest digest_0.6.2.tar.gz
sudo R CMD INSTALL functional functional_0.1.tar.gz
sudo R CMD INSTALL stringr stringr_0.6.2.tar.g
sudo R CMD INSTALL plyr plyr_1.8.tar.gz
sudo R CMD INSTALL rmr rmr2_2.0.2.tar.gz

sudo JAVA_HOME=/home/istvan/jdk1.6.0_38/jre R CMD javareconf
sudo R CMD INSTALL rJava rJava_0.9-3.tar.gz 
sudo HADOOP_CMD=/home/istvan/hadoop/bin/hadoop R CMD INSTALL rhdfs rhdfs_1.0.5.tar.gz 
sudo R CMD INSTALL rhdfs rhdfs_1.0.5.tar.gz

Getting started with RHadoop

In principle, RHadoop MapReduce is a similar operation to R lapply function that applies a function over a list or vector.

Without mapreduce function we could write a simple R code to double all the numbers from 1 to 100:

> ints = 1:100
> doubleInts = sapply(ints, function(x) 2*x)
> head(doubleInts)
[1]  2  4  6  8 10 12

With RHadoop rmr package we could use mapreduce function to implement the same calculations – see doubleInts.R script:

Sys.setenv(HADOOP_HOME="/home/istvan/hadoop")
Sys.setenv(HADOOP_CMD="/home/istvan/hadoop/bin/hadoop")

library(rmr2)
library(rhdfs)

ints = to.dfs(1:100)
calc = mapreduce(input = ints,
                   map = function(k, v) cbind(v, 2*v))

from.dfs(calc)

$val
         v    
  [1,]   1   2
  [2,]   2   4
  [3,]   3   6
  [4,]   4   8
  [5,]   5  10
.....

If we want to run HDFS filesystem commands from R, we first need to initialize rhdfs using hdfs.init() function, then we can run the well-known ls, rm, mkdir, stat, etc commands:

> hdfs.init()
> hdfs.ls("/tmp")
  permission  owner      group size          modtime               file
1 drwxr-xr-x istvan supergroup    0 2013-02-25 21:59    /tmp/RtmpC94L4R
2 drwxr-xr-x istvan supergroup    0 2013-02-25 21:49 /tmp/hadoop-istvan
> hdfs.stat("/tmp")
      perms isDir block replication  owner      group size              modtime path
1 rwxr-xr-x  TRUE     0           0 istvan supergroup    0 45124-08-29 23:58:48 /tmp

Data analysis with RHadoop

The following example demonstrates how to use RHadoop for data analysis. Let us assume that we need to determine how many countries have greater GDP than Apple Inc.’s revenue in 2012. (It was 156,508 millions USD, see more details  http://www.google.com/finance?q=NASDAQ%3AAAPL&fstype=ii&ei=5eErUcCpB8KOwAOkQQ)

GDP data can be downloaded from Worldbank data catalog site. The data needs to be adjusted to be suitable for MapReduce algorithm. The final format that we used for data analysis is as follows (where the last column is the GDP of the given country in millions USD):

Country Code,Number,Country Name,GDP,
USA,1,United States,14991300
CHN,2,China,7318499
JPN,3,Japan,5867154
DEU,4,Germany,3600833
FRA,5,France,2773032
....

The gdp.R script looks like this:

Sys.setenv(HADOOP_HOME="/home/istvan/hadoop")
Sys.setenv(HADOOP_CMD="/home/istvan/hadoop/bin/hadoop")

library(rmr2)
library(rhdfs)

setwd("/home/istvan/rhadoop/blogs/")
gdp <- read.csv("GDP_converted.csv")
head(gdp)

hdfs.init()
gdp.values <- to.dfs(gdp)

# AAPL revenue in 2012 in millions USD
aaplRevenue = 156508

gdp.map.fn <- function(k,v) {
key <- ifelse(v[4] < aaplRevenue, "less", "greater")
keyval(key, 1)
}

count.reduce.fn <- function(k,v) {
keyval(k, length(v))
}

count <- mapreduce(input=gdp.values,
                   map = gdp.map.fn,
                   reduce = count.reduce.fn)

from.dfs(count)

R will initiate a Hadoop streaming job to process the data using mapreduce algorithm.

packageJobJar: [/tmp/Rtmp4llUjl/rmr-local-env1e025ac0444f, /tmp/Rtmp4llUjl/rmr-global-env1e027a86f559, /tmp/Rtmp4llUjl/rmr-streaming-map1e0214a61fa5, /tmp/Rtmp4llUjl/rmr-streaming-reduce1e026da4f6c9, /tmp/hadoop-istvan/hadoop-unjar1158187086349476064/] [] /tmp/streamjob430878637581358129.jar tmpDir=null
13/02/25 22:28:12 INFO mapred.FileInputFormat: Total input paths to process : 1
13/02/25 22:28:12 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-istvan/mapred/local]
13/02/25 22:28:12 INFO streaming.StreamJob: Running job: job_201302252148_0006
13/02/25 22:28:12 INFO streaming.StreamJob: To kill this job, run:
13/02/25 22:28:12 INFO streaming.StreamJob: /home/istvan/hadoop-1.0.4/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201302252148_0006
13/02/25 22:28:12 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201302252148_0006
13/02/25 22:28:13 INFO streaming.StreamJob:  map 0%  reduce 0%
13/02/25 22:28:25 INFO streaming.StreamJob:  map 100%  reduce 0%
13/02/25 22:28:37 INFO streaming.StreamJob:  map 100%  reduce 100%
13/02/25 22:28:43 INFO streaming.StreamJob: Job complete: job_201302252148_0006
13/02/25 22:28:43 INFO streaming.StreamJob: Output: /tmp/Rtmp4llUjl/file1e025f146f8f

Then we will get the data saying how many countries have greater and how many contries have less GDP than Apple Inc.’s revenue in year 2012. The result is that 55 countries had greater GDP than Apple and 138 countries had less.

$key
   GDP      
1  "greater"
56 "less"   

$val
[1]  55 138

The following screenshot from RStudio shows the histogram of GDPs – there are 15 countries having more than 1,000 millions USD GDP; 1 country is in the range of 14,000 – 15,000 millions USD, 1 country is in the range of 7,000 – 8,000 millions USD and 1 country is in the range of 5,000 – 6,000 USD.

GDP-histogram

Conclusion

If someone needs to combine strong data analytics and visualization features  with big data capabilities supported by Hadoop, it is certainly worth to have a closer look at RHadoop features. It has packages to integrate R with MapReduce, HDFS and HBase, the key components of the Hadoop ecosystem. For more details, please read the R and Hadoop Big Data Analytics whitepaper.