Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add HBase SDK for serving #127

Merged
merged 33 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fd4470c
feat: add HBase SDK for serving
bayu-aditya Sep 3, 2024
ad454e1
Add spark changes to use hbase api
shydefoo Aug 30, 2024
2528ffe
Fix connection issues
shydefoo Sep 2, 2024
773eac1
Fix linting
shydefoo Sep 2, 2024
fd3030a
Add configuration for hbase api
shydefoo Sep 5, 2024
cdfad3d
Set platform to linux/amd64
shydefoo Sep 5, 2024
73dfd8e
Update application.yaml to include hbase
shydefoo Sep 5, 2024
acd54c2
Refator BigTableSinkRelation to use updated classes
shydefoo Sep 5, 2024
f685315
Fix issue due to difference in bigtable and hbase response
shydefoo Sep 6, 2024
0054a14
Fix linting
shydefoo Sep 6, 2024
e1840ef
Fix issue due to difference in bigtable and hbase response
shydefoo Sep 6, 2024
0eb1b13
Fix linting
shydefoo Sep 6, 2024
350783e
Remove commented code
shydefoo Sep 9, 2024
38f7bc7
Clean up comments
shydefoo Sep 17, 2024
691c8d1
Fix application yaml
shydefoo Sep 17, 2024
8855d0a
Merge branch 'bayu/hbase' into hbase-poc
shydefoo Sep 17, 2024
a0e44a3
Add option for hbase for stream ingestion jobs
shydefoo Sep 17, 2024
fa28a59
Fix linting
shydefoo Sep 18, 2024
7292b48
Merge pull request #126 from caraml-dev/hbase-poc
shydefoo Sep 20, 2024
f699815
Add region split policy for hbase
shydefoo Sep 20, 2024
a8a861b
Fix linting
shydefoo Sep 23, 2024
77052b5
Refactor Bigtable schema registry classes
shydefoo Sep 25, 2024
2c64047
Add tests to query bigtable (w hbase sdk) and hbase
shydefoo Sep 25, 2024
fbc5c3f
Fix linting
shydefoo Sep 25, 2024
536deb9
Make compressionAlgo and region split policy type configurable
shydefoo Sep 26, 2024
58c09fc
Test using network host mode
shydefoo Sep 26, 2024
a04f6f7
Fix linting
shydefoo Sep 26, 2024
efc1862
Fix hbase unit test
shydefoo Sep 26, 2024
abcfb3e
Refactor functions
shydefoo Sep 26, 2024
7e7e417
Move avro schema length to BaseSchemRegistry
shydefoo Sep 26, 2024
455e253
add try catch block while create bigtable connection
bayu-aditya Sep 27, 2024
4355a1a
move into convertRowCellsToFeatures function
bayu-aditya Oct 2, 2024
3b4f01a
using single try catch block for connection
bayu-aditya Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion caraml-store-serving/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ dependencies {
implementation 'org.apache.commons:commons-lang3:3.10'
implementation 'org.apache.avro:avro:1.10.2'
implementation platform('com.google.cloud:libraries-bom:26.43.0')
implementation 'com.google.cloud:google-cloud-bigtable:2.40.0'
implementation 'com.google.cloud:google-cloud-bigtable:2.39.2'
implementation 'com.google.cloud.bigtable:bigtable-hbase-2.x:2.14.3'
implementation 'commons-codec:commons-codec:1.17.1'
implementation 'io.lettuce:lettuce-core:6.2.0.RELEASE'
implementation 'io.netty:netty-transport-native-epoll:4.1.52.Final:linux-x86_64'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import dev.caraml.serving.store.OnlineRetriever;
import java.io.IOException;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.hbase.client.Connection;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -23,9 +26,21 @@ public class BigTableStoreConfig {
private String appProfileId;
private Boolean enableClientSideMetrics;
private Long timeoutMs;
private Boolean isUsingHBaseSDK;

@Bean
public OnlineRetriever getRetriever() {
// Using HBase SDK
if (isUsingHBaseSDK) {
org.apache.hadoop.conf.Configuration config =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this block wrapped by try catch block? I saw that the connect method can throw the IllegalStateException

Copy link
Contributor

@shydefoo shydefoo Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bayu-aditya could you take a look at this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup sure

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in here 455e253 Thank you for your finding

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use one try catch and condition is in the try catch block?

BigtableConfiguration.configure(projectId, instanceId);
config.set(BigtableOptionsFactory.APP_PROFILE_ID_KEY, appProfileId);

Connection connection = BigtableConfiguration.connect(config);
return new HBaseOnlineRetriever(connection);
}

// Using BigTable SDK
try {
BigtableDataSettings.Builder builder =
BigtableDataSettings.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package dev.caraml.serving.store.bigtable;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import dev.caraml.serving.store.AvroFeature;
import dev.caraml.serving.store.Feature;
import dev.caraml.store.protobuf.serving.ServingServiceProto;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;

public class HBaseOnlineRetriever implements SSTableOnlineRetriever<ByteString, Result> {
private final Connection client;
private final HBaseSchemaRegistry schemaRegistry;

public HBaseOnlineRetriever(Connection client) {
this.client = client;
this.schemaRegistry = new HBaseSchemaRegistry(client);
}

@Override
public ByteString convertEntityValueToKey(
ServingServiceProto.GetOnlineFeaturesRequest.EntityRow entityRow, List<String> entityNames) {
return ByteString.copyFrom(
entityNames.stream()
.sorted()
.map(entity -> entityRow.getFieldsMap().get(entity))
.map(this::valueToString)
.collect(Collectors.joining("#"))
.getBytes());
}

@Override
public List<List<Feature>> convertRowToFeature(
String tableName,
List<ByteString> rowKeys,
Map<ByteString, Result> rows,
List<ServingServiceProto.FeatureReference> featureReferences) {
BinaryDecoder reusedDecoder = DecoderFactory.get().binaryDecoder(new byte[0], null);

return rowKeys.stream()
.map(
rowKey -> {
if (!rows.containsKey(rowKey)) {
return Collections.<Feature>emptyList();
} else {
Result row = rows.get(rowKey);
return featureReferences.stream()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the chain is very long, is it possible to break the chain?

Copy link
Collaborator Author

@bayu-aditya bayu-aditya Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 4355a1a thank you 🙏

.map(ServingServiceProto.FeatureReference::getFeatureTable)
.distinct()
.map(cf -> row.getColumnCells(cf.getBytes(), null))
.filter(ls -> !ls.isEmpty())
.flatMap(
rowCells -> {
Cell rowCell = rowCells.get(0); // Latest cell
ByteBuffer valueBuffer =
ByteBuffer.wrap(rowCell.getValueArray())
.position(rowCell.getValueOffset())
.limit(rowCell.getValueOffset() + rowCell.getValueLength())
.slice();
ByteBuffer familyBuffer =
ByteBuffer.wrap(rowCell.getFamilyArray())
.position(rowCell.getFamilyOffset())
.limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength())
.slice();
String family = ByteString.copyFrom(familyBuffer).toStringUtf8();
ByteString value = ByteString.copyFrom(valueBuffer);

List<Feature> features;
List<ServingServiceProto.FeatureReference> localFeatureReferences =
featureReferences.stream()
.filter(
featureReference ->
featureReference.getFeatureTable().equals(family))
.collect(Collectors.toList());

try {
features =
decodeFeatures(
tableName,
value,
localFeatureReferences,
reusedDecoder,
rowCell.getTimestamp());
} catch (IOException e) {
throw new RuntimeException("Failed to decode features from BigTable");
}

return features.stream();
})
.collect(Collectors.toList());
}
})
.collect(Collectors.toList());
}

@Override
public Map<ByteString, Result> getFeaturesFromSSTable(
String tableName, List<ByteString> rowKeys, List<String> columnFamilies) {
try {
Table table = this.client.getTable(TableName.valueOf(tableName));

// construct query get list
List<Get> queryGetList = new ArrayList<>();
rowKeys.forEach(
rowKey -> {
Get get = new Get(rowKey.toByteArray());
columnFamilies.forEach(cf -> get.addFamily(cf.getBytes()));

queryGetList.add(get);
});

// fetch data from table
Result[] rows = table.get(queryGetList);

// construct result
Map<ByteString, Result> result = new HashMap<>();
Arrays.stream(rows)
.filter(row -> !row.isEmpty())
.forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row));

return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private List<Feature> decodeFeatures(
String tableName,
ByteString value,
List<ServingServiceProto.FeatureReference> featureReferences,
BinaryDecoder reusedDecoder,
long timestamp)
throws IOException {
ByteString schemaReferenceBytes = value.substring(0, 4);
shydefoo marked this conversation as resolved.
Show resolved Hide resolved
byte[] featureValueBytes = value.substring(4).toByteArray();

HBaseSchemaRegistry.SchemaReference schemaReference =
new HBaseSchemaRegistry.SchemaReference(tableName, schemaReferenceBytes);

GenericDatumReader<GenericRecord> reader = this.schemaRegistry.getReader(schemaReference);

reusedDecoder = DecoderFactory.get().binaryDecoder(featureValueBytes, reusedDecoder);
GenericRecord record = reader.read(null, reusedDecoder);

return featureReferences.stream()
.map(
featureReference -> {
Object featureValue;
try {
featureValue = record.get(featureReference.getName());
} catch (AvroRuntimeException e) {
// Feature is not found in schema
return null;
}
return new AvroFeature(
featureReference,
Timestamp.newBuilder().setSeconds(timestamp / 1000).build(),
Objects.requireNonNullElseGet(featureValue, Object::new));
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package dev.caraml.serving.store.bigtable;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;

public class HBaseSchemaRegistry {
private final Connection hbaseClient;
private final LoadingCache<SchemaReference, GenericDatumReader<GenericRecord>> cache;

private static String COLUMN_FAMILY = "metadata";
private static String QUALIFIER = "avro";
private static String KEY_PREFIX = "schema#";

public static class SchemaReference {
private final String tableName;
private final ByteString schemaHash;

public SchemaReference(String tableName, ByteString schemaHash) {
this.tableName = tableName;
this.schemaHash = schemaHash;
}

public String getTableName() {
return tableName;
}

public ByteString getSchemaHash() {
return schemaHash;
}

@Override
public int hashCode() {
int result = tableName.hashCode();
result = 31 * result + schemaHash.hashCode();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is 31 represent for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea 😆

return result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SchemaReference that = (SchemaReference) o;

if (!tableName.equals(that.tableName)) return false;
return schemaHash.equals(that.schemaHash);
}
}

public HBaseSchemaRegistry(Connection hbaseClient) {
this.hbaseClient = hbaseClient;

CacheLoader<SchemaReference, GenericDatumReader<GenericRecord>> schemaCacheLoader =
CacheLoader.from(this::loadReader);

cache = CacheBuilder.newBuilder().build(schemaCacheLoader);
}

public GenericDatumReader<GenericRecord> getReader(SchemaReference reference) {
GenericDatumReader<GenericRecord> reader;
try {
reader = this.cache.get(reference);
} catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) {
throw new RuntimeException(String.format("Unable to find Schema"), e);
}
return reader;
}

private GenericDatumReader<GenericRecord> loadReader(SchemaReference reference) {
try {
Table table = this.hbaseClient.getTable(TableName.valueOf(reference.getTableName()));

byte[] rowKey =
ByteString.copyFrom(KEY_PREFIX.getBytes())
.concat(reference.getSchemaHash())
.toByteArray();
Get query = new Get(rowKey);
query.addColumn(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes());

Result result = table.get(query);

Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes());
if (last == null) {
// NOTE: this should never happen
throw new RuntimeException("Schema not found");
}
ByteBuffer schemaBuffer =
ByteBuffer.wrap(last.getValueArray())
shydefoo marked this conversation as resolved.
Show resolved Hide resolved
.position(last.getValueOffset())
.limit(last.getValueOffset() + last.getValueLength())
.slice();
Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8());
return new GenericDatumReader<>(schema);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package dev.caraml.serving.store.bigtable;

import dev.caraml.serving.store.OnlineRetriever;
import java.io.IOException;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "caraml.store.hbase")
@ConditionalOnProperty(prefix = "caraml.store", name = "active", havingValue = "hbase")
@Getter
@Setter
public class HBaseStoreConfig {
private String zookeeperQuorum;
private String zookeeperClientPort;

@Bean
public OnlineRetriever getRetriever() {
org.apache.hadoop.conf.Configuration conf;
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
conf.set("hbase.zookeeper.property.clientPort", zookeeperClientPort);
Connection connection;
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}

return new HBaseOnlineRetriever(connection);
}
}
Loading
Loading