Skip to content

How to use the parameter tableNameAttribute in streamsx.hbase toolkit

Ahmad Nouri edited this page May 2, 2019 · 3 revisions

How to use the parameter tableNameAttribute in streamsx.hbase toolkit

This SPL sample application demonstrates

How to use the parameter tableNameAttribute in all operators.

How to delete rows from a HBASE table.

How to put data into a HBASE table.

How to scan rows from a HBASE table.

We show 6 ways to scan data from a HBASE table.

Prerequisites software for this sample:

IBM Streams 4.3.0.0 or higher

https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.3.0/com.ibm.streams.welcome.doc/doc/kc-homepage.html

streamsx.hbase toolkit Version 3.6.0 or higher

https://github.com/IBMStreams/streamsx.hbase/releases/tag/v3.6.0

Apacahe HBase Version 1.2 or higher

https://hbase.apache.org/

Preparation

1- create an HBASE table

 hbase shell
 create 'hbase-test-table','appearance','location'

2- Copy the HBASE configuration file from your HBASE server in etc directory in your SPL project. And set the value of hbaseSite parameter in all HBASE operator to "etc/hbase-site.xml"

3- If you use kerberos authentication add the authKeytab and authPrincipal parameters in all HBASE operators.

Copy the HBASE keytab from your HBASE server in your streams server in etc directory of your SPL project.

Change the value of authKeytab and authPrincipal parameters with your kerberos keytab and Principal.

Set these both parameters in all operators.

More details in https://github.com/IBMStreams/streamsx.hbase/wiki/How-to-use-kerberos-authentication-in-streamsx.hbase-toolkit

SPL sample

/* Copyright (C) 2019, International Business Machines Corporation */
/* All Rights Reserved	*/
namespace application ;

use com.ibm.streamsx.hbase::HBASEDelete ;
use com.ibm.streamsx.hbase::HBASEPut ;
use com.ibm.streamsx.hbase::HBASEScan ;

/**  
 * This SPL application demonstrates: 
 * How to use the parameter **tableNameAttribute**
 * How to delete rows from a HBASE table. 
 * How to put data into a HBASE table. 
 * How to use the parameter tableNameAttribute in all operators.
 * How to scan rows from a HBASE table.
 * We show three ways to scan data from a HBASE table.
 *  
 * 
 * In HBASE, the column is divided up into two parts, the columnFamily
 * and the columnQualifier. All columns in a columnFamily are grouped
 * together on disk, so that which affects the efficiency of access. 
 * The table has a fixed set of column families, and you may not add a 
 * tuple to any other family. ColumnQualifiers, on the other hand, may be added
 * at runtime. 
 * 
 * 
 * To run this example, initialize a table with name 'hbase-test-table' and column families appearance and location.
 * In hbase shell, this is 
 * create 'hbase-test-table','appearance','location'
 * 
 * After running, if you scan the table
 * scan 'hbase-test-table'
 ROW                                      COLUMN+CELL                                                                                                        
 row0   column=location:location0, timestamp=1555512212660, value=value-0                                                  
 row1   column=location:location1, timestamp=1555512213338, value=value-1                                                  
 row10  column=location:location10, timestamp=1555512217965, value=value-10                                                
 row11  column=location:location11, timestamp=1555512218481, value=value-11                                                
 row12  column=location:location12, timestamp=1555512218986, value=value-12                                                
 row13  column=location:location13, timestamp=1555512219493, value=value-13                                                
 row14  column=location:location14, timestamp=1555512219997, value=value-14                                                
 row15  column=location:location15, timestamp=1555512220506, value=value-15                                                
 row16  column=location:location16, timestamp=1555512221036, value=value-16                                                
 row17  column=location:location17, timestamp=1555512221575, value=value-17                                                
 row18  column=location:location18, timestamp=1555512222593, value=value-18                                                
 row19  column=location:location19, timestamp=1555512223097, value=value-19                                                
 row2   column=location:location2, timestamp=1555512213842, value=value-2                                                  
 row3   column=location:location3, timestamp=1555512214348, value=value-3                                                  
 row4   column=location:location4, timestamp=1555512214853, value=value-4                                                  
 row5   column=location:location5, timestamp=1555512215357, value=value-5                                                  
 row6   column=location:location6, timestamp=1555512215859, value=value-6                                                  
 row7   column=location:location7, timestamp=1555512216364, value=value-7                                                  
 row8   column=location:location8, timestamp=1555512216885, value=value-8                                                  
 row9   column=location:location9, timestamp=1555512217416, value=value-9                                                  
 20 row(s) in 1.5760 seconds
 
 * 
 * Copy the HBASE configuration file from your HBASE server in etc directory in your SPL project.
 * And set the value of hbaseSite parameter in all HBASE operator to "etc/hbase-site.xml"
 * If you use kerberos authentication add the authKeytab and authPrincipal parameters in all HBASE operators.
 */
 composite HbaseScan {
	param
		expression<rstring> $authKeytab : getSubmissionTimeValue("authKeytab", "etc/hbase.service.keytab");
		expression<rstring> $authPrincipal : getSubmissionTimeValue("authPrincipal", "hbase/[email protected]");
		expression<rstring> $hbaseSite : getSubmissionTimeValue("hbaseSite", "etc/hbase-site.xml");
		expression<rstring> $tableName : "hbase-test-table" ;
	type
		rowSchema = rstring tbName, rstring row, rstring colF, rstring colQ, rstring value ;
		resultSchema = rstring row, rstring columnFamily, rstring columnQualifier, int32 numResults, list<tuple<rstring value, int64 timeStamp>> value ;
	graph
		
		// generates data to delete rows from a HBASE table
		stream<rowSchema> genDeleteData = Beacon(){
			param
				initDelay : 1.0 ;
				iterations : 20u ;
			output
				genDeleteData : tbName = $tableName, row = "row" +(rstring)IterationCount(), colF = "location", 
				    colQ = "location" +(rstring)IterationCount(), value = "value-" + (rstring)IterationCount();
		}

		// Delete all versions explicitly
		stream<boolean success> deleteRows = HBASEDelete(genDeleteData){
			param
				tableNameAttribute : tbName ;
				rowAttrName : "row" ;
				columnFamilyAttrName : "colF" ;
				columnQualifierAttrName : "colQ" ;
				deleteAllVersions : true ;
				successAttr : "success" ;
				hbaseSite : $hbaseSite;
		}
 
		// generates rows to put into a HBASE table.
		stream<rowSchema> genPutData = Custom(deleteRows){
			logic
				state : {
					mutable int32 i2 = 0 ;
				}

				onPunct deleteRows : {
					if(currentPunct()== Sys.FinalMarker){
						for(int32 i in range(20)){
							submit({ tbName = $tableName, row = "row" +(rstring)i, colF = "location", 
							         colQ = "location" +(rstring)i, value = "value-" +(rstring)i }, genPutData);
						}

					}

				}

		}
 
		// put rows into a HBASE table.
		// Input tuple contains table name , rows and value
		(stream<boolean success> putData ; stream<rstring errorText, tuple<rowSchema> inTuple> errorPutData)= HBASEPut(genPutData){
        	// (stream<boolean success> putData ; stream<rstring errorText, rstring inTuple> errorPutData)= HBASEPut(genPutData){
			param
				tableNameAttribute : tbName ;
				//tableName : $tableName ;
				rowAttrName : "row" ;
				columnFamilyAttrName : "colF" ;
				columnQualifierAttrName : "colQ" ;
				valueAttrName : "value" ;
				// authPrincipal : $authPrincipal; 
				// authKeytab : $authKeytab;
				hbaseSite : $hbaseSite;
				successAttr : "success" ;
		}

		// pass the table name to the next stream
		stream<rstring tbName> passTableName = Custom(putData){
			logic
				onPunct putData : {
					if(currentPunct()== Sys.FinalMarker){
						submit({ tbName = $tableName }, passTableName);
					}

				}

		}

		// scans an HBASE table and return all rows form table  
		// The input tuple includes only the table name.    
		stream<resultSchema> inputFullScan = HBASEScan(passTableName){
			param
				tableNameAttribute : tbName ;
				outputCountAttr : "numResults" ;
				hbaseSite : $hbaseSite;
		}

		stream<rstring tbName, rstring startRow> tableNameStartRow = Custom(inputFullScan){
			logic
				onPunct inputFullScan : {
					if(currentPunct()== Sys.FinalMarker){
						submit({ tbName = $tableName, startRow = "row2" }, tableNameStartRow);
					}

				}

		}

		// scans an HBASE table and return all rows form startRow till end of table  
		// The input tuple includes table name and start row.    
		stream<resultSchema> inputStartRowScan = HBASEScan(tableNameStartRow){
			param
				tableNameAttribute : tbName ;
				outputCountAttr : "numResults" ;
				hbaseSite : $hbaseSite;
		}

		stream<rstring tbName, rstring endRow> tableNameEndRow = Custom(inputStartRowScan){
			logic
				onPunct inputStartRowScan : {
					if(currentPunct()== Sys.FinalMarker){
						submit({ tbName = $tableName, endRow = "row7" }, tableNameEndRow);
					}

				}

		}

		// scans an HBASE table and return all rows form begin till endRow   
		// The input tuple includes table name and end row.    
		stream<resultSchema> inputEndRowScan = HBASEScan(tableNameEndRow){
			param
				tableNameAttribute : tbName ;
				outputCountAttr : "numResults" ;
				hbaseSite : $hbaseSite;
		}

		stream<rstring tbName, rstring startRow, rstring endRow> tableNameStartEndRow = Custom(inputEndRowScan){
			logic
				onPunct inputEndRowScan : {
					if(currentPunct()== Sys.FinalMarker){
						submit({ tbName = $tableName, startRow = "row1", endRow = "row13" }, tableNameStartEndRow);
					}

				}

		}

		// scans an HBASE table and return all rows between starRow and endRow   
		// The input tuple includes table name, start row and end row.    
		stream<resultSchema> inputStartEndRowScan = HBASEScan(tableNameStartEndRow){
			param
				tableNameAttribute : tbName ;
				outputCountAttr : "numResults" ;
				hbaseSite : $hbaseSite;
		}

		// full scan without any input tuple
		stream<resultSchema> fullScan = HBASEScan(){
			param
				initDelay : 40.0 ;
				tableName : $tableName ;
				outputCountAttr : "numResults" ;
				hbaseSite : $hbaseSite;
		}

		// scans from start row till end of table without any input tuple
		stream<resultSchema> startRowScan = HBASEScan(){
			param
				initDelay : 50.0 ;
				tableName : $tableName ;
				outputCountAttr : "numResults" ;
				startRow : "row2" ;
				hbaseSite : $hbaseSite;
		}

		// scans from begin till end row of table without any input tuple
		stream<resultSchema> endRowScan = HBASEScan(){
			param
				initDelay : 60.0 ;
				tableName : $tableName ;
				outputCountAttr : "numResults" ;
				endRow : "row9" ;
				hbaseSite : $hbaseSite;
				
		}

		// returns tuples as string and forward them to the print streams
		stream<rstring streamsName, rstring result> returnTuplesAsString = Custom(putData; errorPutData; deleteRows ; inputFullScan ; 
			inputStartRowScan ; inputEndRowScan ; inputStartEndRowScan ; fullScan ; startRowScan ; endRowScan){
			logic
				onTuple deleteRows : submit({ streamsName = "deleteRows", result =(rstring)deleteRows }, returnTuplesAsString);
				onTuple errorPutData : submit({ streamsName = "errorPutData", result =(rstring)errorPutData }, returnTuplesAsString);
				onTuple putData : submit({ streamsName = "putData", result =(rstring)putData }, returnTuplesAsString);
				onTuple inputFullScan : submit({ streamsName = "inputFullScan", result =(rstring)inputFullScan }, returnTuplesAsString);
				onTuple inputStartRowScan : submit({ streamsName = "inputStartRowScan", result =(rstring)inputStartRowScan }, returnTuplesAsString);
				onTuple inputEndRowScan : submit({ streamsName = "inputEndRowScan", result =(rstring)inputEndRowScan }, returnTuplesAsString);
				onTuple inputStartEndRowScan : submit({ streamsName = "inputStartEndRowScan", result =(rstring)inputStartEndRowScan }, returnTuplesAsString);
				onTuple fullScan : submit({ streamsName = "fullScan", result =(rstring)fullScan }, returnTuplesAsString);
				onTuple startRowScan : submit({ streamsName = "startRowScan", result =(rstring)startRowScan }, returnTuplesAsString);
				onTuple endRowScan : submit({ streamsName = "endRowScan", result =(rstring)endRowScan }, returnTuplesAsString);
		}

		// print results on console
		()as printResult = Custom(returnTuplesAsString){
			logic
				state : {
					mutable int32 tupleCnt = 0 ;
					mutable rstring streamsName1 = "" ;
					mutable rstring streamsName2 = "" ;
				}

				onTuple returnTuplesAsString : {
					streamsName1 = streamsName ;
					if(streamsName1 != streamsName2){
						println("");
						println("************************   " + streamsName + "  **************************");
						streamsName2 = streamsName1 ;
						tupleCnt = 1 ;
					}

					println("Result of " + streamsName + " : " +(rstring)tupleCnt + "  " + result);
					tupleCnt ++ ;
				}

		}

}