Consuming a live tweets stream in real time is one of the common tasks of big data applications that power the social analytics. In this guide, you will learn how to accomplish it with the Cask Data Application Platform (CDAP).
You will build a CDAP application that consumes data from the public Twitter feed and computes the average tweet size. You will:
- Build a real-time Flow to process tweets in real time;
- Use a Flowlet from the cdap-pack-twitter library that uses the Twitter4j library to connect the Flow and Twitter stream;
- Use a Dataset to persist the results of the analysis; and
- Build a Service to serve the analysis results via a RESTful endpoint.
Following sections will guide you through building an application from scratch. If you are interested in deploying and running the application right away, you can clone its source code from this GitHub repository. In that case, feel free to skip the next two sections and jump right to the Configuring TweetCollectorFlowlet section.
Real-time processing capability within CDAP is supported by a Flow. The application we are building in this guide uses a Flow for processing the tweets consumed from Twitter feed. The processing results are persisted in a Dataset and are made available via RESTful endpoint using a Service.
The Flow consists of two processing nodes called Flowlets:
- A collector Flowlet that consumes data from Twitter feed and output a synthesized Tweet object; and
- An analyzer Flowlet that consumes the tweet emitted by the collector to update the basic statistics of Tweets: total tweets size and count.
The recommended way to build a CDAP application from scratch is to use a Maven project. Use the following directory structure (you’ll find contents of these files described below):
./pom.xml ./src/main/java/co/cask/cdap/guides/twitter/TwitterAnalysisApp.java ./src/main/java/co/cask/cdap/guides/twitter/TweetAnalysisFlow.java ./src/main/java/co/cask/cdap/guides/twitter/StatsRecorderFlowlet.java ./src/main/java/co/cask/cdap/guides/twitter/TweetStatsHandler.java ./src/main/resources/twitter4j.properties
The application will use the cdap-packs-twitter
library which includes an
implementation of TweetCollectorFlowlet
. You'll need to add this
library as a dependency to your project's pom.xml:
...
<dependencies>
...
<dependency>
<groupId>co.cask.cdap.packs</groupId>
<artifactId>cdap-twitter-pack</artifactId>
<version>0.1.0</version>
</dependency>
</dependencies>
Create the TwitterAnalysisApp
class which declares that the application
has a Flow, a Service, and creates a Dataset:
public class TwitterAnalysisApp extends AbstractApplication {
static final String NAME = "TwitterAnalysis";
static final String TABLE_NAME = "tweetStats";
static final String SERVICE_NAME = "TweetStatsService";
@Override
public void configure() {
setName(NAME);
createDataset(TABLE_NAME, KeyValueTable.class);
addFlow(new TweetAnalysisFlow());
addService(SERVICE_NAME, new TweetStatsHandler());
}
}
The TweetAnalysisFlow
makes use of the TweetCollectorFlowlet
that is
available in the cdap-packs-twitter
library:
public class TweetAnalysisFlow extends AbstractFlow {
static final String NAME = "TweetAnalysisFlow";
@Override
public void configure() {
setName(NAME);
setDescription("Collects simple tweet stats");
addFlowlet("collect", new TweetCollectorFlowlet());
addFlowlet("recordStats", new StatsRecorderFlowlet());
connect("collect", "recordStats");
}
}
Tweets pulled by the TweetCollectorFlowlet
are consumed by the
StatsRecorderFlowlet
that updates the total number of tweets and their
total body size in a Dataset:
public class StatsRecorderFlowlet extends AbstractFlowlet {
@UseDataSet(TwitterAnalysisApp.TABLE_NAME)
private KeyValueTable statsTable;
@ProcessInput
public void process(Tweet tweet) {
statsTable.increment(Bytes.toBytes("totalCount"), 1);
statsTable.increment(Bytes.toBytes("totalSize"), tweet.getText().length());
}
}
In a real-world scenario, the Flowlet could perform more sophisticated processing on tweets.
Finally, the TweetStatsHandler
uses the tweetStats
Dataset to compute the
average tweet size and serve it over HTTP:
@Path("/v1")
public class TweetStatsHandler extends AbstractHttpServiceHandler {
@UseDataSet(TwitterAnalysisApp.TABLE_NAME)
private KeyValueTable statsTable;
@Path("avgSize")
@GET
public void sentimentAggregates(HttpServiceRequest request, HttpServiceResponder responder) throws Exception {
long totalCount = statsTable.incrementAndGet(Bytes.toBytes("totalCount"), 0);
long totalSize = statsTable.incrementAndGet(Bytes.toBytes("totalSize"), 0);
responder.sendJson(totalCount > 0 ? totalSize / totalCount : 0);
}
}
In order to utilize the TweetCollectorFlowlet
, a Twitter API key and
Access token must be obtained and configured. Follow the steps provided
by Twitter to obtain OAuth access
tokens.
You can provide these to the TweetCollectorFlowlet
as runtime arguments of
the Flow or put them in twitter4j.properties
in the
src/main/resources/
directory and package it with the application. The
format of the twitter4j.properties
file:
oauth.consumerKey=***************************
oauth.consumerSecret=***************************
oauth.accessToken=***************************
oauth.accessTokenSecret=***************************
The TwitterAnalysisApp application can be built and packaged using the Apache Maven command:
$ mvn clean package
Note that the remaining commands assume that the cdap-cli.sh
script is
available on your PATH. If this is not the case, please add it:
$ export PATH=$PATH:<CDAP home>/bin
If you haven't already started a standalone CDAP installation, start it with the command:
$ cdap.sh start
We can then deploy the application to a standalone CDAP installation and start its components:
$ cdap-cli.sh load artifact target/cdap-twitter-ingest-guide-<version>.jar $ cdap-cli.sh create app TwitterAnalysis cdap-twitter-ingest-guide <version> user $ cdap-cli.sh start flow TwitterAnalysis.TweetAnalysisFlow $ cdap-cli.sh start service TwitterAnalysis.TweetStatsService
Once Flow is started, tweets are pulled and processed. You can query for the average tweet size:
$ curl -w'\n' http://localhost:10000/v3/namespaces/default/apps/TwitterAnalysis/services/TweetStatsService/methods/v1/avgSize
or using the CDAP CLI:
$ cdap-cli.sh call service TwitterAnalysis.TweetStatsService GET 'v1/avgSize'
Example output:
132
- TwitterSentiment tutorial.
Have a question? Discuss at the CDAP User Mailing List.
Copyright © 2014-2015 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.