-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add HBase SDK for serving #127
Changes from 21 commits
fd4470c
ad454e1
2528ffe
773eac1
fd3030a
cdfad3d
73dfd8e
acd54c2
f685315
0054a14
e1840ef
0eb1b13
350783e
38f7bc7
691c8d1
8855d0a
a0e44a3
fa28a59
7292b48
f699815
a8a861b
77052b5
2c64047
fbc5c3f
536deb9
58c09fc
a04f6f7
efc1862
abcfb3e
7e7e417
455e253
4355a1a
3b4f01a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
package dev.caraml.serving.store.bigtable; | ||
|
||
import com.google.protobuf.ByteString; | ||
import com.google.protobuf.Timestamp; | ||
import dev.caraml.serving.store.AvroFeature; | ||
import dev.caraml.serving.store.Feature; | ||
import dev.caraml.store.protobuf.serving.ServingServiceProto; | ||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.util.*; | ||
import java.util.stream.Collectors; | ||
import org.apache.avro.AvroRuntimeException; | ||
import org.apache.avro.generic.GenericDatumReader; | ||
import org.apache.avro.generic.GenericRecord; | ||
import org.apache.avro.io.BinaryDecoder; | ||
import org.apache.avro.io.DecoderFactory; | ||
import org.apache.hadoop.hbase.Cell; | ||
import org.apache.hadoop.hbase.TableName; | ||
import org.apache.hadoop.hbase.client.*; | ||
|
||
public class HBaseOnlineRetriever implements SSTableOnlineRetriever<ByteString, Result> { | ||
private final Connection client; | ||
private final HBaseSchemaRegistry schemaRegistry; | ||
|
||
public HBaseOnlineRetriever(Connection client) { | ||
this.client = client; | ||
this.schemaRegistry = new HBaseSchemaRegistry(client); | ||
} | ||
|
||
@Override | ||
public ByteString convertEntityValueToKey( | ||
ServingServiceProto.GetOnlineFeaturesRequest.EntityRow entityRow, List<String> entityNames) { | ||
return ByteString.copyFrom( | ||
entityNames.stream() | ||
.sorted() | ||
.map(entity -> entityRow.getFieldsMap().get(entity)) | ||
.map(this::valueToString) | ||
.collect(Collectors.joining("#")) | ||
.getBytes()); | ||
} | ||
|
||
@Override | ||
public List<List<Feature>> convertRowToFeature( | ||
String tableName, | ||
List<ByteString> rowKeys, | ||
Map<ByteString, Result> rows, | ||
List<ServingServiceProto.FeatureReference> featureReferences) { | ||
BinaryDecoder reusedDecoder = DecoderFactory.get().binaryDecoder(new byte[0], null); | ||
|
||
return rowKeys.stream() | ||
.map( | ||
rowKey -> { | ||
if (!rows.containsKey(rowKey)) { | ||
return Collections.<Feature>emptyList(); | ||
} else { | ||
Result row = rows.get(rowKey); | ||
return featureReferences.stream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the chain is very long, is it possible to break the chain? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done in 4355a1a thank you 🙏 |
||
.map(ServingServiceProto.FeatureReference::getFeatureTable) | ||
.distinct() | ||
.map(cf -> row.getColumnCells(cf.getBytes(), null)) | ||
.filter(ls -> !ls.isEmpty()) | ||
.flatMap( | ||
rowCells -> { | ||
Cell rowCell = rowCells.get(0); // Latest cell | ||
ByteBuffer valueBuffer = | ||
ByteBuffer.wrap(rowCell.getValueArray()) | ||
.position(rowCell.getValueOffset()) | ||
.limit(rowCell.getValueOffset() + rowCell.getValueLength()) | ||
.slice(); | ||
ByteBuffer familyBuffer = | ||
ByteBuffer.wrap(rowCell.getFamilyArray()) | ||
.position(rowCell.getFamilyOffset()) | ||
.limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) | ||
.slice(); | ||
String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); | ||
ByteString value = ByteString.copyFrom(valueBuffer); | ||
|
||
List<Feature> features; | ||
List<ServingServiceProto.FeatureReference> localFeatureReferences = | ||
featureReferences.stream() | ||
.filter( | ||
featureReference -> | ||
featureReference.getFeatureTable().equals(family)) | ||
.collect(Collectors.toList()); | ||
|
||
try { | ||
features = | ||
decodeFeatures( | ||
tableName, | ||
value, | ||
localFeatureReferences, | ||
reusedDecoder, | ||
rowCell.getTimestamp()); | ||
} catch (IOException e) { | ||
throw new RuntimeException("Failed to decode features from BigTable"); | ||
} | ||
|
||
return features.stream(); | ||
}) | ||
.collect(Collectors.toList()); | ||
} | ||
}) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
@Override | ||
public Map<ByteString, Result> getFeaturesFromSSTable( | ||
String tableName, List<ByteString> rowKeys, List<String> columnFamilies) { | ||
try { | ||
Table table = this.client.getTable(TableName.valueOf(tableName)); | ||
|
||
// construct query get list | ||
List<Get> queryGetList = new ArrayList<>(); | ||
rowKeys.forEach( | ||
rowKey -> { | ||
Get get = new Get(rowKey.toByteArray()); | ||
columnFamilies.forEach(cf -> get.addFamily(cf.getBytes())); | ||
|
||
queryGetList.add(get); | ||
}); | ||
|
||
// fetch data from table | ||
Result[] rows = table.get(queryGetList); | ||
|
||
// construct result | ||
Map<ByteString, Result> result = new HashMap<>(); | ||
Arrays.stream(rows) | ||
.filter(row -> !row.isEmpty()) | ||
.forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row)); | ||
|
||
return result; | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
private List<Feature> decodeFeatures( | ||
String tableName, | ||
ByteString value, | ||
List<ServingServiceProto.FeatureReference> featureReferences, | ||
BinaryDecoder reusedDecoder, | ||
long timestamp) | ||
throws IOException { | ||
ByteString schemaReferenceBytes = value.substring(0, 4); | ||
shydefoo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
byte[] featureValueBytes = value.substring(4).toByteArray(); | ||
|
||
HBaseSchemaRegistry.SchemaReference schemaReference = | ||
new HBaseSchemaRegistry.SchemaReference(tableName, schemaReferenceBytes); | ||
|
||
GenericDatumReader<GenericRecord> reader = this.schemaRegistry.getReader(schemaReference); | ||
|
||
reusedDecoder = DecoderFactory.get().binaryDecoder(featureValueBytes, reusedDecoder); | ||
GenericRecord record = reader.read(null, reusedDecoder); | ||
|
||
return featureReferences.stream() | ||
.map( | ||
featureReference -> { | ||
Object featureValue; | ||
try { | ||
featureValue = record.get(featureReference.getName()); | ||
} catch (AvroRuntimeException e) { | ||
// Feature is not found in schema | ||
return null; | ||
} | ||
return new AvroFeature( | ||
featureReference, | ||
Timestamp.newBuilder().setSeconds(timestamp / 1000).build(), | ||
Objects.requireNonNullElseGet(featureValue, Object::new)); | ||
}) | ||
.filter(Objects::nonNull) | ||
.collect(Collectors.toList()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
package dev.caraml.serving.store.bigtable; | ||
|
||
import com.google.common.cache.CacheBuilder; | ||
import com.google.common.cache.CacheLoader; | ||
import com.google.common.cache.LoadingCache; | ||
import com.google.protobuf.ByteString; | ||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.util.concurrent.ExecutionException; | ||
import org.apache.avro.Schema; | ||
import org.apache.avro.generic.GenericDatumReader; | ||
import org.apache.avro.generic.GenericRecord; | ||
import org.apache.hadoop.hbase.Cell; | ||
import org.apache.hadoop.hbase.TableName; | ||
import org.apache.hadoop.hbase.client.Connection; | ||
import org.apache.hadoop.hbase.client.Get; | ||
import org.apache.hadoop.hbase.client.Result; | ||
import org.apache.hadoop.hbase.client.Table; | ||
|
||
public class HBaseSchemaRegistry { | ||
private final Connection hbaseClient; | ||
private final LoadingCache<SchemaReference, GenericDatumReader<GenericRecord>> cache; | ||
|
||
private static String COLUMN_FAMILY = "metadata"; | ||
private static String QUALIFIER = "avro"; | ||
private static String KEY_PREFIX = "schema#"; | ||
|
||
public static class SchemaReference { | ||
private final String tableName; | ||
private final ByteString schemaHash; | ||
|
||
public SchemaReference(String tableName, ByteString schemaHash) { | ||
this.tableName = tableName; | ||
this.schemaHash = schemaHash; | ||
} | ||
|
||
public String getTableName() { | ||
return tableName; | ||
} | ||
|
||
public ByteString getSchemaHash() { | ||
return schemaHash; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
int result = tableName.hashCode(); | ||
result = 31 * result + schemaHash.hashCode(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is 31 represent for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have no idea 😆 |
||
return result; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
|
||
SchemaReference that = (SchemaReference) o; | ||
|
||
if (!tableName.equals(that.tableName)) return false; | ||
return schemaHash.equals(that.schemaHash); | ||
} | ||
} | ||
|
||
public HBaseSchemaRegistry(Connection hbaseClient) { | ||
this.hbaseClient = hbaseClient; | ||
|
||
CacheLoader<SchemaReference, GenericDatumReader<GenericRecord>> schemaCacheLoader = | ||
CacheLoader.from(this::loadReader); | ||
|
||
cache = CacheBuilder.newBuilder().build(schemaCacheLoader); | ||
} | ||
|
||
public GenericDatumReader<GenericRecord> getReader(SchemaReference reference) { | ||
GenericDatumReader<GenericRecord> reader; | ||
try { | ||
reader = this.cache.get(reference); | ||
} catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) { | ||
throw new RuntimeException(String.format("Unable to find Schema"), e); | ||
} | ||
return reader; | ||
} | ||
|
||
private GenericDatumReader<GenericRecord> loadReader(SchemaReference reference) { | ||
try { | ||
Table table = this.hbaseClient.getTable(TableName.valueOf(reference.getTableName())); | ||
|
||
byte[] rowKey = | ||
ByteString.copyFrom(KEY_PREFIX.getBytes()) | ||
.concat(reference.getSchemaHash()) | ||
.toByteArray(); | ||
Get query = new Get(rowKey); | ||
query.addColumn(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); | ||
|
||
Result result = table.get(query); | ||
|
||
Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); | ||
if (last == null) { | ||
// NOTE: this should never happen | ||
throw new RuntimeException("Schema not found"); | ||
} | ||
ByteBuffer schemaBuffer = | ||
ByteBuffer.wrap(last.getValueArray()) | ||
shydefoo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.position(last.getValueOffset()) | ||
.limit(last.getValueOffset() + last.getValueLength()) | ||
.slice(); | ||
Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8()); | ||
return new GenericDatumReader<>(schema); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package dev.caraml.serving.store.bigtable; | ||
|
||
import dev.caraml.serving.store.OnlineRetriever; | ||
import java.io.IOException; | ||
import lombok.Getter; | ||
import lombok.Setter; | ||
import org.apache.hadoop.hbase.HBaseConfiguration; | ||
import org.apache.hadoop.hbase.client.Connection; | ||
import org.apache.hadoop.hbase.client.ConnectionFactory; | ||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | ||
import org.springframework.boot.context.properties.ConfigurationProperties; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
|
||
@Configuration | ||
@ConfigurationProperties(prefix = "caraml.store.hbase") | ||
@ConditionalOnProperty(prefix = "caraml.store", name = "active", havingValue = "hbase") | ||
@Getter | ||
@Setter | ||
public class HBaseStoreConfig { | ||
private String zookeeperQuorum; | ||
private String zookeeperClientPort; | ||
|
||
@Bean | ||
public OnlineRetriever getRetriever() { | ||
org.apache.hadoop.conf.Configuration conf; | ||
conf = HBaseConfiguration.create(); | ||
conf.set("hbase.zookeeper.quorum", zookeeperQuorum); | ||
conf.set("hbase.zookeeper.property.clientPort", zookeeperClientPort); | ||
Connection connection; | ||
try { | ||
connection = ConnectionFactory.createConnection(conf); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
return new HBaseOnlineRetriever(connection); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this block wrapped by
try catch
block? I saw that the connect method can throw the IllegalStateExceptionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bayu-aditya could you take a look at this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in here 455e253 Thank you for your finding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use one try catch and condition is in the try catch block?