Skip to content

Commit

Permalink
[apache#1744] improvement:(core): Refine interface KvBackend (apach…
Browse files Browse the repository at this point in the history
…e#1747)

### What changes were proposed in this pull request?

Optimize interface `KvBackend` 
- Change method signature
- Optimize Java docs
- Add UT for `RocksDBKvBackend`

### Why are the changes needed?

For better use experientce.

Fix: apache#1744 

### Does this PR introduce _any_ user-facing change?

N/A.

### How was this patch tested?

Add a new UT `TestRocksDBKvBackend`
  • Loading branch information
yuqi1129 authored and ch3yne committed Jan 29, 2024
1 parent fd4e0b8 commit 9467aec
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package com.datastrato.gravitino.storage.kv;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.EntityAlreadyExistsException;
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
Expand All @@ -29,10 +29,9 @@ public interface KvBackend extends Closeable {
* @param value The value of the pair.
* @param overwrite If true, overwrites the existing value.
* @throws IOException If an I/O exception occurs during the operation.
* @throws EntityAlreadyExistsException If the key already exists and overwrite is false.
* @throws AlreadyExistsException If the key already exists and overwrite is false.
*/
void put(byte[] key, byte[] value, boolean overwrite)
throws IOException, EntityAlreadyExistsException;
void put(byte[] key, byte[] value, boolean overwrite) throws IOException, AlreadyExistsException;

/**
* Retrieves the value associated with a given key.
Expand All @@ -47,19 +46,19 @@ void put(byte[] key, byte[] value, boolean overwrite)
* Deletes the key-value pair associated with the given key.
*
* @param key The key to delete.
* @return True if the key-value pair was successfully deleted, false if the key was not found.
* @throws IOException If an I/O exception occurs during deletion.
* @return True, if the key-value pair was successfully deleted, else throw exception.
* @throws IOException If an exception occurs during deletion.
*/
boolean delete(byte[] key) throws IOException;

/**
* Delete the key-value pair associated with the given {@link KvRangeScan}
* Delete the key-value pair associated with the given {@link KvRange}
*
* @param kvRangeScan kv range to to delete
* @return True if the key-value pair was successfully deleted.
* @param kvRange kv range to to delete
* @return True, if the key-value pair was successfully deleted, else throw exception.
* @throws IOException If an I/O exception occurs during deletion.
*/
boolean deleteRange(KvRangeScan kvRangeScan) throws IOException;
boolean deleteRange(KvRange kvRange) throws IOException;

/**
* Scans the specified range using the provided KvRangeScan and returns a list of key-value pairs.
Expand All @@ -68,5 +67,5 @@ void put(byte[] key, byte[] value, boolean overwrite)
* @return A list of key-value pairs within the specified range.
* @throws IOException If an I/O exception occurs during scanning.
*/
List<Pair<byte[], byte[]>> scan(KvRangeScan scanRange) throws IOException;
List<Pair<byte[], byte[]>> scan(KvRange scanRange) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public <E extends Entity & HasIdentifier> List<E> list(
executeInTransaction(
() ->
transactionalKvBackend.scan(
new KvRangeScan.KvRangeScanBuilder()
new KvRange.KvRangeBuilder()
.start(startKey)
.end(endKey)
.startInclusive(true)
Expand Down Expand Up @@ -372,7 +372,7 @@ public boolean delete(NameIdentifier ident, EntityType entityType, boolean casca
byte[] endKey = Bytes.increment(Bytes.wrap(directChild)).get();
List<Pair<byte[], byte[]>> kvs =
transactionalKvBackend.scan(
new KvRangeScan.KvRangeScanBuilder()
new KvRange.KvRangeBuilder()
.start(directChild)
.end(endKey)
.startInclusive(true)
Expand All @@ -394,7 +394,7 @@ public boolean delete(NameIdentifier ident, EntityType entityType, boolean casca

for (byte[] prefix : subEntityPrefix) {
transactionalKvBackend.deleteRange(
new KvRangeScan.KvRangeScanBuilder()
new KvRange.KvRangeBuilder()
.start(prefix)
.startInclusive(true)
.end(Bytes.increment(Bytes.wrap(prefix)).get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void collectAndClean() {
private void collectAndRemoveUncommittedData() throws IOException {
List<Pair<byte[], byte[]>> kvs =
kvBackend.scan(
new KvRangeScan.KvRangeScanBuilder()
new KvRange.KvRangeBuilder()
.start(new byte[] {0x20}) // below 0x20 is control character
.end(new byte[] {0x7F}) // above 0x7F is control character
.startInclusive(true)
Expand Down Expand Up @@ -146,7 +146,7 @@ private void collectAndRemoveOldVersionData() throws IOException {
// TODO(yuqi), Use multi-thread to scan the data in case of the data is too large.
List<Pair<byte[], byte[]>> kvs =
kvBackend.scan(
new KvRangeScan.KvRangeScanBuilder()
new KvRange.KvRangeBuilder()
.start(startKey)
.end(endKey)
.startInclusive(true)
Expand Down Expand Up @@ -193,7 +193,7 @@ private void collectAndRemoveOldVersionData() throws IOException {
// directly.
List<Pair<byte[], byte[]>> newVersionOfKey =
kvBackend.scan(
new KvRangeScan.KvRangeScanBuilder()
new KvRange.KvRangeBuilder()
.start(key)
.end(generateKey(key, transactionId))
.startInclusive(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import lombok.Builder;
import lombok.Data;

/** Represents a range scan configuration for the key-value store. */
/** Represents a range scan/delete configuration for the key-value store. */
@Builder
@Data
public class KvRangeScan {
public class KvRange {
private byte[] start;
private byte[] end;
private boolean startInclusive;
Expand All @@ -33,7 +33,7 @@ public class KvRangeScan {
* @param limit The maximum number of results to retrieve.
* @param predicate The predicate to use to filter key-value pairs.
*/
public KvRangeScan(
public KvRange(
byte[] start,
byte[] end,
boolean startInclusive,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
import com.datastrato.gravitino.EntityAlreadyExistsException;
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.utils.ByteUtils;
import com.datastrato.gravitino.utils.Bytes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -74,21 +75,22 @@ public void initialize(Config config) throws IOException {
public void put(byte[] key, byte[] value, boolean overwrite) throws IOException {
try {
handlePut(key, value, overwrite);
} catch (EntityAlreadyExistsException e) {
} catch (AlreadyExistsException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
}

private void handlePut(byte[] key, byte[] value, boolean overwrite) throws RocksDBException {
@VisibleForTesting
void handlePut(byte[] key, byte[] value, boolean overwrite) throws RocksDBException {
if (overwrite) {
db.put(key, value);
return;
}
byte[] existKey = db.get(key);
if (existKey != null) {
throw new EntityAlreadyExistsException(
throw new AlreadyExistsException(
String.format(
"Key %s already exists in the database, please use overwrite option to overwrite it",
ByteUtils.formatByteArray(key)));
Expand All @@ -106,7 +108,7 @@ public byte[] get(byte[] key) throws IOException {
}

@Override
public List<Pair<byte[], byte[]>> scan(KvRangeScan scanRange) throws IOException {
public List<Pair<byte[], byte[]>> scan(KvRange scanRange) throws IOException {
RocksIterator rocksIterator = db.newIterator();
rocksIterator.seek(scanRange.getStart());

Expand Down Expand Up @@ -158,7 +160,7 @@ public boolean delete(byte[] key) throws IOException {
}

@Override
public boolean deleteRange(KvRangeScan deleteRange) throws IOException {
public boolean deleteRange(KvRange deleteRange) throws IOException {
RocksIterator rocksIterator = db.newIterator();
rocksIterator.seek(deleteRange.getStart());

Expand Down Expand Up @@ -193,4 +195,14 @@ public boolean deleteRange(KvRangeScan deleteRange) throws IOException {
public void close() throws IOException {
db.close();
}

@VisibleForTesting
public RocksDB getDb() {
return db;
}

@VisibleForTesting
public void setDb(RocksDB db) {
this.db = db;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ public boolean delete(byte[] key) throws IOException {
}

@Override
public boolean deleteRange(KvRangeScan kvRangeScan) throws IOException {
List<Pair<byte[], byte[]>> pairs = scan(kvRangeScan);
public boolean deleteRange(KvRange kvRange) throws IOException {
List<Pair<byte[], byte[]>> pairs = scan(kvRange);
pairs.forEach(
p ->
putPairs
Expand All @@ -188,7 +188,7 @@ public boolean deleteRange(KvRangeScan kvRangeScan) throws IOException {
}

@Override
public List<Pair<byte[], byte[]>> scan(KvRangeScan scanRange) throws IOException {
public List<Pair<byte[], byte[]>> scan(KvRange scanRange) throws IOException {
// Why we need to change the end key? Because we use the transaction id to construct a row key
// Assuming the end key is 'a' and the value of endInclusive is true, if we want to scan the
// value of key a, then we need to change the end key to 'b' and set the value of endInclusive
Expand All @@ -200,8 +200,8 @@ public List<Pair<byte[], byte[]>> scan(KvRangeScan scanRange) throws IOException
endInclude = false;
}

KvRangeScan kvRangeScan =
new KvRangeScan.KvRangeScanBuilder()
KvRange kvRange =
new KvRange.KvRangeBuilder()
.start(scanRange.getStart())
.end(end)
.startInclusive(scanRange.isStartInclusive())
Expand All @@ -214,7 +214,7 @@ public List<Pair<byte[], byte[]>> scan(KvRangeScan scanRange) throws IOException
.limit(Integer.MAX_VALUE)
.build();

List<Pair<byte[], byte[]>> rawPairs = kvBackend.scan(kvRangeScan);
List<Pair<byte[], byte[]>> rawPairs = kvBackend.scan(kvRange);
List<Pair<byte[], byte[]>> result = Lists.newArrayList();
int i = 0, j = 0;
while (i < scanRange.getLimit() && j < rawPairs.size()) {
Expand Down Expand Up @@ -286,7 +286,7 @@ byte[] constructKey(byte[] key) {
private byte[] getNextReadableValue(byte[] key) throws IOException {
List<Pair<byte[], byte[]>> pairs =
kvBackend.scan(
new KvRangeScan.KvRangeScanBuilder()
new KvRange.KvRangeBuilder()
.start(key)
.startInclusive(false)
.end(endOfKey(key))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void testCollectGarbage() throws IOException, InterruptedException {
Assertions.assertNull(transactionalKvBackend.get("testC".getBytes()));
List<Pair<byte[], byte[]>> allData =
kvBackend.scan(
new KvRangeScan.KvRangeScanBuilder()
new KvRange.KvRangeBuilder()
.start("_".getBytes())
.end("z".getBytes())
.startInclusive(false)
Expand All @@ -131,7 +131,7 @@ void testCollectGarbage() throws IOException, InterruptedException {

allData =
kvBackend.scan(
new KvRangeScan.KvRangeScanBuilder()
new KvRange.KvRangeBuilder()
.start("_".getBytes())
.end("z".getBytes())
.startInclusive(false)
Expand Down Expand Up @@ -185,7 +185,7 @@ void testRemoveWithGCCollector() throws IOException, InterruptedException {
KvBackend kvBackend = kvEntityStore.backend;
List<Pair<byte[], byte[]>> data =
kvBackend.scan(
new KvRangeScan.KvRangeScanBuilder()
new KvRange.KvRangeBuilder()
.start("_".getBytes())
.end("z".getBytes())
.startInclusive(false)
Expand Down
Loading

0 comments on commit 9467aec

Please sign in to comment.