Real-time Big Data Analytics Engine – Twitter’s Storm


Introduction

Hadoop is a batch-oriented big data solution at its heart and leaves gaps in ad-hoc and real-time data processing at massive scale so some people have already started counting its days as we know it now. As one of the alternatives, we have already seen Google BigQuery to support ad-hoc analytics and this time the post is about Twitter’s Storm real-time computation engine which aims to provide solution in the real-time data analytics world. Storm was originally developed by BackType and running now under Twitter’s name, after BackType has been acquired by them. The need for having a dedicated real-time analytics solution was explained by Nathan Marz as follows: “There’s no hack that will turn Hadoop into a realtime system; realtime data processing has a fundamentally different set of requirements than batch processing…. The lack of a “Hadoop of realtime” has become the biggest hole in the data processing ecosystem. Storm fills that hole.”

Storm Architecture

Storm architecture very much resembles to Hadoop architecture; it has two types of nodes: a master node and the worker nodes. The master node runs Nimbus that is copying the code to the cluster nodes and assigns tasks to the workers – it has a similar role as JobTracker in Hadoop. The worker nodes run the Supervisor which starts and stops worker processes – its role is similar to TaskTrackers in Hadoop. The coordination and all states between Nimbus and Supervisors are managed by Zookepeer, so the architecture looks as follows:

Storm is written is Clojure and Java.

One of the key concepts in the Storm is topology; in essence a Storm cluster executes a topology – topology defines the data sources, the processing tasks and the data flow between the nodes. Topology and MapReduce jobs in Hadoop can be considered analogous.

Storm has a concept of streams which are basically a sequence of tuples, they represent the data that is being passed around the Storm nodes. There are two main components to  manipulate  stream data: spouts which are reading data from a source (e.g. a queue or an API, etc) and emit a list of fields. Bolts are consuming the data coming from input streams, processing them and then emit a new stream or store the data in a database.

One important thing when you define a topology is determine how data will be passed around the nodes.  As discussed above, a node (running either spouts our bolts) will emit a stream. Stream grouping functionality will allow to decide which node(s) will receive the emitted tuples. Storms has a number of grouping functions like shuffle grouping (sending streams to a randomly chosen bolt), fields grouping (it guarantees that a given set of fields is always sent to the same bolt), all grouping (the tuples are sent to all instances of the same bolt), direct grouping (the source determines which bolt receives the tuples) and you can implement your own custom grouping method, too.

If you want to know more about Storm internals, you can download the code and find a great tutorial on github.

A Storm application

The best way to start with Storm is to download storm-starter package from github. This contains a variety of examples from basic WorldCount to more complex implementations. In this post we will have a closer look at WordCountTopology.java. It has a maven  m2-pom.xml file so you can compile and execute it using mvn command:

mvn -f m2-pom.xml compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=storm.starter.WordCountTopology

Alternatively, you can import the code as an existing maven project into Eclipse and run it from there.( Import…-> Maven -> Existing Maven Projects ).

The code looks like this:

package storm.starter;

import storm.starter.spout.RandomSentenceSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;

/**
 * This topology demonstrates Storm's stream groupings and multilang capabilities.
 */
public class WordCountTopology {
    public static class SplitSentence extends ShellBolt implements IRichBolt {

        public SplitSentence() {
            super("python", "splitsentence.py");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        @Override
        public Map getComponentConfiguration() {
            return null;
        }
    }  

    public static class WordCount extends BaseBasicBolt {
        Map counts = new HashMap();

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if(count==null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new RandomSentenceSpout(), 5);

        builder.setBolt("split", new SplitSentence(), 8)
                 .shuffleGrouping("spout");
        builder.setBolt("count", new WordCount(), 12)
                 .fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);

        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {        
            conf.setMaxTaskParallelism(3);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());

            Thread.sleep(10000);

            cluster.shutdown();
        }
    }
}

As a first step, Storm topology defines a RandomSentenceSpout.

        builder.setSpout("spout", new RandomSentenceSpout(), 5);

RandomSentenceSpout has a method called nextTuple() that is inherited from ISpout interface. When this method is called, Storm is requesting that the Spout emit tuples to the output collector. In this case, the tuples will be randomly selected sentences from a predefined String array.

    @Override
    public void nextTuple() {
        Utils.sleep(100);
        String[] sentences = new String[] {
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature"};
        String sentence = sentences[_rand.nextInt(sentences.length)];
        _collector.emit(new Values(sentence));
    }

The next step in the topology definition is the SplitSentence bolt. The SplitSentence bolt actually invokes a python code – splitsentence.py – that splits the the sentences into words using python split() method

       public SplitSentence() {
            super("python", "splitsentence.py");
        }

The python code (splitsentence.py):

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

Storm topology is using a shuffleGrouping() method to send the sentences to a random bolt referred as “split”.

The final step in the topology definition is WordCount bolt.  WordCount bolt has an execute() method which is inherited from IBasicBolt inteface:

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if(count==null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }

That method receives the words as tuples and uses a Map to count the number of the words. Then it will emit the result.

You can run the Storm topology in two modes (again, similar to Hadoop stand-alone and distributed modes). One mode is based on LocalCluster class and that enables to run the storm topology on your own machine, debug it, etc. Then when you are ready to run it an a storm cluster, then you shall use StormSubmitter class to submit the topology to the storm cluster:

        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {        
            conf.setMaxTaskParallelism(3);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());

The parallelism can be controller by various methods and arguments, like setNumWorker()s, setMaxTaskParallelism() and parallelism_hints argument in building the topology, see e.g. 5 in builder.setSpout() method. The parallelism_hint defines the number of tasks that should be assigned to execute the given spout. Each task will run on a thread in a process somwehere around the cluster.

builder.setSpout("spout", new RandomSentenceSpout(), 5);

When we run the application, we can see that there are multiple threads running in parallel that are emitting the original random sentences, then another threads are splitting them into words and yet another threads are counting the words.

9722 [Thread-38] INFO  backtype.storm.daemon.task  - Emitting: spout default [snow white and the seven dwarfs]
9722 [Thread-36] INFO  backtype.storm.daemon.task  - Emitting: spout default [i am at two with nature]
9723 [Thread-32] INFO  backtype.storm.daemon.executor  - Processing received message source: spout:10, stream: default, id: {}, [snow white and the seven dwarfs]
9723 [Thread-24] INFO  backtype.storm.daemon.executor  - Processing received message source: spout:9, stream: default, id: {}, [i am at two with nature]
9723 [Thread-22] INFO  backtype.storm.daemon.task  - Emitting: split default ["i"]
9723 [Thread-30] INFO  backtype.storm.daemon.task  - Emitting: split default ["snow"]
9724 [Thread-18] INFO  backtype.storm.daemon.executor  - Processing received message source: split:5, stream: default, id: {}, ["i"]
9724 [Thread-16] INFO  backtype.storm.daemon.executor  - Processing received message source: split:7, stream: default, id: {}, ["snow"]
9724 [Thread-18] INFO  backtype.storm.daemon.task  - Emitting: count default [i, 38]
9724 [Thread-22] INFO  backtype.storm.daemon.task  - Emitting: split default ["am"]
9724 [Thread-30] INFO  backtype.storm.daemon.task  - Emitting: split default ["white"]
9724 [Thread-16] INFO  backtype.storm.daemon.task  - Emitting: count default [snow, 57]

Conclusion

Big Data analytics can come in many flavours; from batch processing to a-hoc analytics to real-time processing. Hadoop, the granddad of all big data is focused on batch-oriented solution – should you need to support real-time analytics, Storm can offer an interesting alternative.