Skip to content

How to connect to several HBASE databases from one SPL application

Ahmad Nouri edited this page Jun 26, 2019 · 8 revisions

How to connect to several HBASE databases from one SPL application

With the new version of streamsx.hbase toolkit (version 3.6.0 or higher ) it is possible to connect to several HBASE databases from one IBM stream SPL application.

https://github.com/IBMStreams/streamsx.hbase/releases

It is possible to read data from one HBASE database and write them into other HBASE database.

Or it is possible to split the incoming streams in two or n streams and write every stream in a different HBASE database.

Preparation

The following SPL sample demonstrates how to connect from one IBM stream application to two different HDFS servers.

Before you start with the SPL application, please perform the following steps:

  1. Add the IP Address and the host names of both HAdoop clusters into /etc/hosts file of stream server.

  2. Copy the hbase-site-xml file from first Hadoop Cluster 1 into etc directory of your SPL project. etc/hbase-site-1.xml

  3. Copy the hbase-site-xml file from first Hadoop Cluster 2 into etc directory of your SPL project etc/hbase-site-2.xml

  4. Copy the hdfs-user keytab from first HDFS Cluster 1 into etc/ directory of your SPL project etc/hbase.headless-1.keytab

  5. Copy the hdfs-user keytab from secund HDFS Cluster 2 into etc directory of your SPL project etc/hbase.headless-2.keytab

  6. Change the name of principals with your HBase principals

  7. Create a test table on both HBASE databases:

    hbase shell create 'test_table' , all'

In some configuration the Kerberos key distribution centre is installed on the same Hadoop server.

Make sure that the both Hadoop server have the same realm as kerberos configuration.

The ambari creates for two Hadoop servers separate keytab files with the same name.

For example

/etc/security/keytabs/hbase.headless.keytab

But they are different keytab files and have different principals.

More details in : https://github.com/IBMStreams/streamsx.hbase/wiki/How-to-use-kerberos-authentication-in-streamsx.hbase-toolkit

SPL sample

Here is the SPL sample:

/*
This SPL application demonstrates how to put the input stream in two or several HBase databases.
It is also possible to put one row at the same time in two different HBase databases.
The custom operator splits the input stream in two streams.
The HBaseSink_1 operator writes the input1 stream into HBase database 1 (hbaseSite :  "etc/hbase-site-1.xml";)
The HBaseSink_2 operator writes the input2 stream into HBase database 2 (hbaseSite :  "etc/hbase-site-2.xml";)

*/
namespace application ;

use com.ibm.streamsx.hbase::HBASEPut ;

composite HbaseTest {
param
	expression<rstring> $authKeytab1 : getSubmissionTimeValue("authKeytab1", "etc/hbase.headless-1.keytab");
	expression<rstring> $authPrincipal1 : getSubmissionTimeValue("authPrincipal1", "[email protected]");
	expression<rstring> $authKeytab2 : getSubmissionTimeValue("authKeytab2", "etc/hbase.headless-2.keytab");
	expression<rstring> $authPrincipal2 : getSubmissionTimeValue("authPrincipal2", "[email protected]");
type
	HbasePut = rstring table, rstring key, rstring value, rstring DR ;
graph

// read input files
// The csv file (data/input.csv) has in the last column the name of HBase cluster 
// #table, key, value, DR
// test_table, key1, value1, hbase1
// test_table, key2, value2, hbase2
// test_table, key3, value2, Both

	stream<HbasePut> readInputs = FileSource(){
		param
			file : "input.csv" ;
			format : csv ;
	}

	// This custom operator splits the input streams in two separate streams (conditional to the value of "DR")
	(stream<HbasePut> Input_1 ; stream<HbasePut> Input_2)= Custom(readInputs as I){
		logic
		onTuple readInputs : {
			printStringLn((rstring)readInputs);
			if(DR == "hbase1"){
				submit(I, Input_1);
			}

			else if(DR == "hbase2"){
				submit(I, Input_2);
			}

			else if(DR == "Both"){
				submit(I, Input_1);
				submit(I, Input_2);
			}

			else {
				printStringLn("DR parameter should be hbase1/hbase2/Both");
			}

		}

	}

	()as HBaseSink_1 = HBaseSink(Input_1){
		param
			authKeytab : $authKeytab1 ;
			authPrincipal : $authPrincipal1 ;
			hbaseSite : "etc/hbase-site-1.xml" ;
	}

	()as HBaseSink_2 = HBaseSink(Input_2){
		param
			authKeytab : $authKeytab2 ;
			authPrincipal : $authPrincipal2 ;
			hbaseSite : "etc/hbase-site-2.xml" ;
	}

	/*

	// It is also possibe to write the data into 3. or 4. HBase cluster 

       () as ANOTHER_sndHBaseSin = HBaseSink(input3){
	    param
	        authKeytab 	: $authKeytab3;
	        authPrincipal 	: $authPrincipal3; 
	        hbaseSite 	: "etc/hbase-site-3.xml";
        }
	*/
}

The SPL code of HBaseSink composite

 /*
 //  This composite is wrapper for HBASEPut operator.
 //  The HBASEPut needs 3 parameters to connect to a HBse server. 
 //  authKeytab : A keytab is a file containing pairs of Kerberos principals 
 //  and encrypted keys that are derived from the Kerberos password.
 //  authPrincipal: A Kerberos principal for keytab .
 //  hbaseSite: The HBase configuration file hbase-site.xml.   
 */
 namespace application ;

 use com.ibm.streamsx.hbase::HBASEPut ;

 public composite HBaseSink(input Data ) {
param
	expression<rstring> $authKeytab ;
	expression<rstring> $authPrincipal ;
	expression<rstring> $hbaseSite ;
graph
	()as printData = Custom(Data){
		logic
			onTuple Data : printStringLn((rstring)Data);
	}

	(stream<boolean success> putToHbase ; stream<rstring errorText> Error)= HBASEPut(Data){
		param
			authPrincipal : $authPrincipal ;
			authKeytab : $authKeytab ;
			hbaseSite : $hbaseSite ;
			tableNameAttribute : table ;
			rowAttrName : "key" ;
			valueAttrName : "value" ;
			staticColumnFamily : "all" ;
			staticColumnQualifier : "all" ;
			successAttr : "success" ;
	}

	()as printPutToHbase = Custom(putToHbase){
		logic
			onTuple putToHbase : printStringLn((rstring)putToHbase);
	}

	()as printError = Custom(Error){
		logic
			onTuple Error : printStringLn((rstring)Error);
	}

}