-
Notifications
You must be signed in to change notification settings - Fork 12
How to use the parameter tableNameAttribute in streamsx.hbase toolkit
This SPL sample application demonstrates
We show 6 ways to scan data from a HBASE table.
IBM Streams 4.3.0.0 or higher
streamsx.hbase toolkit Version 3.6.0 or higher
https://github.com/IBMStreams/streamsx.hbase/releases/tag/v3.6.0
Apacahe HBase Version 1.2 or higher
1- create an HBASE table
hbase shell
create 'hbase-test-table','appearance','location'
2- Copy the HBASE configuration file from your HBASE server in etc directory in your SPL project. And set the value of hbaseSite parameter in all HBASE operator to "etc/hbase-site.xml"
3- If you use kerberos authentication add the authKeytab and authPrincipal parameters in all HBASE operators.
Copy the HBASE keytab from your HBASE server in your streams server in etc directory of your SPL project.
Change the value of authKeytab and authPrincipal parameters with your kerberos keytab and Principal.
Set these both parameters in all operators.
More details in https://github.com/IBMStreams/streamsx.hbase/wiki/How-to-use-kerberos-authentication-in-streamsx.hbase-toolkit
/* Copyright (C) 2019, International Business Machines Corporation */
/* All Rights Reserved */
namespace application ;
use com.ibm.streamsx.hbase::HBASEDelete ;
use com.ibm.streamsx.hbase::HBASEPut ;
use com.ibm.streamsx.hbase::HBASEScan ;
/**
* This SPL application demonstrates:
* How to use the parameter **tableNameAttribute**
* How to delete rows from a HBASE table.
* How to put data into a HBASE table.
* How to use the parameter tableNameAttribute in all operators.
* How to scan rows from a HBASE table.
* We show three ways to scan data from a HBASE table.
*
*
* In HBASE, the column is divided up into two parts, the columnFamily
* and the columnQualifier. All columns in a columnFamily are grouped
* together on disk, so that which affects the efficiency of access.
* The table has a fixed set of column families, and you may not add a
* tuple to any other family. ColumnQualifiers, on the other hand, may be added
* at runtime.
*
*
* To run this example, initialize a table with name 'hbase-test-table' and column families appearance and location.
* In hbase shell, this is
* create 'hbase-test-table','appearance','location'
*
* After running, if you scan the table
* scan 'hbase-test-table'
ROW COLUMN+CELL
row0 column=location:location0, timestamp=1555512212660, value=value-0
row1 column=location:location1, timestamp=1555512213338, value=value-1
row10 column=location:location10, timestamp=1555512217965, value=value-10
row11 column=location:location11, timestamp=1555512218481, value=value-11
row12 column=location:location12, timestamp=1555512218986, value=value-12
row13 column=location:location13, timestamp=1555512219493, value=value-13
row14 column=location:location14, timestamp=1555512219997, value=value-14
row15 column=location:location15, timestamp=1555512220506, value=value-15
row16 column=location:location16, timestamp=1555512221036, value=value-16
row17 column=location:location17, timestamp=1555512221575, value=value-17
row18 column=location:location18, timestamp=1555512222593, value=value-18
row19 column=location:location19, timestamp=1555512223097, value=value-19
row2 column=location:location2, timestamp=1555512213842, value=value-2
row3 column=location:location3, timestamp=1555512214348, value=value-3
row4 column=location:location4, timestamp=1555512214853, value=value-4
row5 column=location:location5, timestamp=1555512215357, value=value-5
row6 column=location:location6, timestamp=1555512215859, value=value-6
row7 column=location:location7, timestamp=1555512216364, value=value-7
row8 column=location:location8, timestamp=1555512216885, value=value-8
row9 column=location:location9, timestamp=1555512217416, value=value-9
20 row(s) in 1.5760 seconds
*
* Copy the HBASE configuration file from your HBASE server in etc directory in your SPL project.
* And set the value of hbaseSite parameter in all HBASE operator to "etc/hbase-site.xml"
* If you use kerberos authentication add the authKeytab and authPrincipal parameters in all HBASE operators.
*/
composite HbaseScan {
param
expression<rstring> $authKeytab : getSubmissionTimeValue("authKeytab", "etc/hbase.service.keytab");
expression<rstring> $authPrincipal : getSubmissionTimeValue("authPrincipal", "hbase/[email protected]");
expression<rstring> $hbaseSite : getSubmissionTimeValue("hbaseSite", "etc/hbase-site.xml");
expression<rstring> $tableName : "hbase-test-table" ;
type
rowSchema = rstring tbName, rstring row, rstring colF, rstring colQ, rstring value ;
resultSchema = rstring row, rstring columnFamily, rstring columnQualifier, int32 numResults, list<tuple<rstring value, int64 timeStamp>> value ;
graph
// generates data to delete rows from a HBASE table
stream<rowSchema> genDeleteData = Beacon(){
param
initDelay : 1.0 ;
iterations : 20u ;
output
genDeleteData : tbName = $tableName, row = "row" +(rstring)IterationCount(), colF = "location",
colQ = "location" +(rstring)IterationCount(), value = "value-" + (rstring)IterationCount();
}
// Delete all versions explicitly
stream<boolean success> deleteRows = HBASEDelete(genDeleteData){
param
tableNameAttribute : tbName ;
rowAttrName : "row" ;
columnFamilyAttrName : "colF" ;
columnQualifierAttrName : "colQ" ;
deleteAllVersions : true ;
successAttr : "success" ;
hbaseSite : $hbaseSite;
}
// generates rows to put into a HBASE table.
stream<rowSchema> genPutData = Custom(deleteRows){
logic
state : {
mutable int32 i2 = 0 ;
}
onPunct deleteRows : {
if(currentPunct()== Sys.FinalMarker){
for(int32 i in range(20)){
submit({ tbName = $tableName, row = "row" +(rstring)i, colF = "location",
colQ = "location" +(rstring)i, value = "value-" +(rstring)i }, genPutData);
}
}
}
}
// put rows into a HBASE table.
// Input tuple contains table name , rows and value
(stream<boolean success> putData ; stream<rstring errorText, tuple<rowSchema> inTuple> errorPutData)= HBASEPut(genPutData){
// (stream<boolean success> putData ; stream<rstring errorText, rstring inTuple> errorPutData)= HBASEPut(genPutData){
param
tableNameAttribute : tbName ;
//tableName : $tableName ;
rowAttrName : "row" ;
columnFamilyAttrName : "colF" ;
columnQualifierAttrName : "colQ" ;
valueAttrName : "value" ;
// authPrincipal : $authPrincipal;
// authKeytab : $authKeytab;
hbaseSite : $hbaseSite;
successAttr : "success" ;
}
// pass the table name to the next stream
stream<rstring tbName> passTableName = Custom(putData){
logic
onPunct putData : {
if(currentPunct()== Sys.FinalMarker){
submit({ tbName = $tableName }, passTableName);
}
}
}
// scans an HBASE table and return all rows form table
// The input tuple includes only the table name.
stream<resultSchema> inputFullScan = HBASEScan(passTableName){
param
tableNameAttribute : tbName ;
outputCountAttr : "numResults" ;
hbaseSite : $hbaseSite;
}
stream<rstring tbName, rstring startRow> tableNameStartRow = Custom(inputFullScan){
logic
onPunct inputFullScan : {
if(currentPunct()== Sys.FinalMarker){
submit({ tbName = $tableName, startRow = "row2" }, tableNameStartRow);
}
}
}
// scans an HBASE table and return all rows form startRow till end of table
// The input tuple includes table name and start row.
stream<resultSchema> inputStartRowScan = HBASEScan(tableNameStartRow){
param
tableNameAttribute : tbName ;
outputCountAttr : "numResults" ;
hbaseSite : $hbaseSite;
}
stream<rstring tbName, rstring endRow> tableNameEndRow = Custom(inputStartRowScan){
logic
onPunct inputStartRowScan : {
if(currentPunct()== Sys.FinalMarker){
submit({ tbName = $tableName, endRow = "row7" }, tableNameEndRow);
}
}
}
// scans an HBASE table and return all rows form begin till endRow
// The input tuple includes table name and end row.
stream<resultSchema> inputEndRowScan = HBASEScan(tableNameEndRow){
param
tableNameAttribute : tbName ;
outputCountAttr : "numResults" ;
hbaseSite : $hbaseSite;
}
stream<rstring tbName, rstring startRow, rstring endRow> tableNameStartEndRow = Custom(inputEndRowScan){
logic
onPunct inputEndRowScan : {
if(currentPunct()== Sys.FinalMarker){
submit({ tbName = $tableName, startRow = "row1", endRow = "row13" }, tableNameStartEndRow);
}
}
}
// scans an HBASE table and return all rows between starRow and endRow
// The input tuple includes table name, start row and end row.
stream<resultSchema> inputStartEndRowScan = HBASEScan(tableNameStartEndRow){
param
tableNameAttribute : tbName ;
outputCountAttr : "numResults" ;
hbaseSite : $hbaseSite;
}
// full scan without any input tuple
stream<resultSchema> fullScan = HBASEScan(){
param
initDelay : 40.0 ;
tableName : $tableName ;
outputCountAttr : "numResults" ;
hbaseSite : $hbaseSite;
}
// scans from start row till end of table without any input tuple
stream<resultSchema> startRowScan = HBASEScan(){
param
initDelay : 50.0 ;
tableName : $tableName ;
outputCountAttr : "numResults" ;
startRow : "row2" ;
hbaseSite : $hbaseSite;
}
// scans from begin till end row of table without any input tuple
stream<resultSchema> endRowScan = HBASEScan(){
param
initDelay : 60.0 ;
tableName : $tableName ;
outputCountAttr : "numResults" ;
endRow : "row9" ;
hbaseSite : $hbaseSite;
}
// returns tuples as string and forward them to the print streams
stream<rstring streamsName, rstring result> returnTuplesAsString = Custom(putData; errorPutData; deleteRows ; inputFullScan ;
inputStartRowScan ; inputEndRowScan ; inputStartEndRowScan ; fullScan ; startRowScan ; endRowScan){
logic
onTuple deleteRows : submit({ streamsName = "deleteRows", result =(rstring)deleteRows }, returnTuplesAsString);
onTuple errorPutData : submit({ streamsName = "errorPutData", result =(rstring)errorPutData }, returnTuplesAsString);
onTuple putData : submit({ streamsName = "putData", result =(rstring)putData }, returnTuplesAsString);
onTuple inputFullScan : submit({ streamsName = "inputFullScan", result =(rstring)inputFullScan }, returnTuplesAsString);
onTuple inputStartRowScan : submit({ streamsName = "inputStartRowScan", result =(rstring)inputStartRowScan }, returnTuplesAsString);
onTuple inputEndRowScan : submit({ streamsName = "inputEndRowScan", result =(rstring)inputEndRowScan }, returnTuplesAsString);
onTuple inputStartEndRowScan : submit({ streamsName = "inputStartEndRowScan", result =(rstring)inputStartEndRowScan }, returnTuplesAsString);
onTuple fullScan : submit({ streamsName = "fullScan", result =(rstring)fullScan }, returnTuplesAsString);
onTuple startRowScan : submit({ streamsName = "startRowScan", result =(rstring)startRowScan }, returnTuplesAsString);
onTuple endRowScan : submit({ streamsName = "endRowScan", result =(rstring)endRowScan }, returnTuplesAsString);
}
// print results on console
()as printResult = Custom(returnTuplesAsString){
logic
state : {
mutable int32 tupleCnt = 0 ;
mutable rstring streamsName1 = "" ;
mutable rstring streamsName2 = "" ;
}
onTuple returnTuplesAsString : {
streamsName1 = streamsName ;
if(streamsName1 != streamsName2){
println("");
println("************************ " + streamsName + " **************************");
streamsName2 = streamsName1 ;
tupleCnt = 1 ;
}
println("Result of " + streamsName + " : " +(rstring)tupleCnt + " " + result);
tupleCnt ++ ;
}
}
}