-
Notifications
You must be signed in to change notification settings - Fork 12
How to connect to several HBASE databases from one SPL application
With the new version of streamsx.hbase toolkit (version 3.6.0 or higher ) it is possible to connect to several HBASE databases from one IBM stream SPL application.
https://github.com/IBMStreams/streamsx.hbase/releases
It is possible to read data from one HBASE database and write them into other HBASE database.
Or it is possible to split the incoming streams in two or n streams and write every stream in a different HBASE database.
The following SPL sample demonstrates how to connect from one IBM stream application to two different HDFS servers.
Before you start with the SPL application, please perform the following steps:
-
Add the IP Address and the host names of both HAdoop clusters into /etc/hosts file of stream server.
-
Copy the hbase-site-xml file from first Hadoop Cluster 1 into etc directory of your SPL project. etc/hbase-site-1.xml
-
Copy the hbase-site-xml file from first Hadoop Cluster 2 into etc directory of your SPL project etc/hbase-site-2.xml
-
Copy the hdfs-user keytab from first HDFS Cluster 1 into etc/ directory of your SPL project etc/hbase.headless-1.keytab
-
Copy the hdfs-user keytab from secund HDFS Cluster 2 into etc directory of your SPL project etc/hbase.headless-2.keytab
-
Change the name of principals with your HBase principals
-
Create a test table on both HBASE databases:
hbase shell create 'test_table' , all'
In some configuration the Kerberos key distribution centre is installed on the same Hadoop server.
Make sure that the both Hadoop server have the same realm as kerberos configuration.
The ambari creates for two Hadoop servers separate keytab files with the same name.
For example
/etc/security/keytabs/hbase.headless.keytab
But they are different keytab files and have different principals.
More details in : https://github.com/IBMStreams/streamsx.hbase/wiki/How-to-use-kerberos-authentication-in-streamsx.hbase-toolkit
Here is the SPL sample:
/*
This SPL application demonstrates how to put the input stream in two or several HBase databases.
It is also possible to put one row at the same time in two different HBase databases.
The custom operator splits the input stream in two streams.
The HBaseSink_1 operator writes the input1 stream into HBase database 1 (hbaseSite : "etc/hbase-site-1.xml";)
The HBaseSink_2 operator writes the input2 stream into HBase database 2 (hbaseSite : "etc/hbase-site-2.xml";)
*/
namespace application ;
use com.ibm.streamsx.hbase::HBASEPut ;
composite HbaseTest {
param
expression<rstring> $authKeytab1 : getSubmissionTimeValue("authKeytab1", "etc/hbase.headless-1.keytab");
expression<rstring> $authPrincipal1 : getSubmissionTimeValue("authPrincipal1", "[email protected]");
expression<rstring> $authKeytab2 : getSubmissionTimeValue("authKeytab2", "etc/hbase.headless-2.keytab");
expression<rstring> $authPrincipal2 : getSubmissionTimeValue("authPrincipal2", "[email protected]");
type
HbasePut = rstring table, rstring key, rstring value, rstring DR ;
graph
// read input files
// The csv file (data/input.csv) has in the last column the name of HBase cluster
// #table, key, value, DR
// test_table, key1, value1, hbase1
// test_table, key2, value2, hbase2
// test_table, key3, value2, Both
stream<HbasePut> readInputs = FileSource(){
param
file : "input.csv" ;
format : csv ;
}
// This custom operator splits the input streams in two separate streams (conditional to the value of "DR")
(stream<HbasePut> Input_1 ; stream<HbasePut> Input_2)= Custom(readInputs as I){
logic
onTuple readInputs : {
printStringLn((rstring)readInputs);
if(DR == "hbase1"){
submit(I, Input_1);
}
else if(DR == "hbase2"){
submit(I, Input_2);
}
else if(DR == "Both"){
submit(I, Input_1);
submit(I, Input_2);
}
else {
printStringLn("DR parameter should be hbase1/hbase2/Both");
}
}
}
()as HBaseSink_1 = HBaseSink(Input_1){
param
authKeytab : $authKeytab1 ;
authPrincipal : $authPrincipal1 ;
hbaseSite : "etc/hbase-site-1.xml" ;
}
()as HBaseSink_2 = HBaseSink(Input_2){
param
authKeytab : $authKeytab2 ;
authPrincipal : $authPrincipal2 ;
hbaseSite : "etc/hbase-site-2.xml" ;
}
/*
// It is also possibe to write the data into 3. or 4. HBase cluster
() as ANOTHER_sndHBaseSin = HBaseSink(input3){
param
authKeytab : $authKeytab3;
authPrincipal : $authPrincipal3;
hbaseSite : "etc/hbase-site-3.xml";
}
*/
}
The SPL code of HBaseSink composite
/*
// This composite is wrapper for HBASEPut operator.
// The HBASEPut needs 3 parameters to connect to a HBse server.
// authKeytab : A keytab is a file containing pairs of Kerberos principals
// and encrypted keys that are derived from the Kerberos password.
// authPrincipal: A Kerberos principal for keytab .
// hbaseSite: The HBase configuration file hbase-site.xml.
*/
namespace application ;
use com.ibm.streamsx.hbase::HBASEPut ;
public composite HBaseSink(input Data ) {
param
expression<rstring> $authKeytab ;
expression<rstring> $authPrincipal ;
expression<rstring> $hbaseSite ;
graph
()as printData = Custom(Data){
logic
onTuple Data : printStringLn((rstring)Data);
}
(stream<boolean success> putToHbase ; stream<rstring errorText> Error)= HBASEPut(Data){
param
authPrincipal : $authPrincipal ;
authKeytab : $authKeytab ;
hbaseSite : $hbaseSite ;
tableNameAttribute : table ;
rowAttrName : "key" ;
valueAttrName : "value" ;
staticColumnFamily : "all" ;
staticColumnQualifier : "all" ;
successAttr : "success" ;
}
()as printPutToHbase = Custom(putToHbase){
logic
onTuple putToHbase : printStringLn((rstring)putToHbase);
}
()as printError = Custom(Error){
logic
onTuple Error : printStringLn((rstring)Error);
}
}