Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2591] refactor: refactor garbage collector config and check its value validity #2592

Merged
merged 24 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import static com.datastrato.gravitino.Configs.ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_STORE;
import static com.datastrato.gravitino.Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;
import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;

Expand Down Expand Up @@ -78,7 +78,7 @@ public static void setUp() {

Assertions.assertEquals(ROCKS_DB_STORE_PATH, config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH));
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
Mockito.when(config.get(KV_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);
Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);

store = EntityStoreFactory.createEntityStore(config);
store.initialize(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import static com.datastrato.gravitino.Configs.ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_STORE;
import static com.datastrato.gravitino.Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;
import static com.datastrato.gravitino.StringIdentifier.ID_KEY;
import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS;
Expand Down Expand Up @@ -64,7 +64,7 @@ public static void setUp() {

Assertions.assertEquals(ROCKS_DB_STORE_PATH, config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH));
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
Mockito.when(config.get(KV_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);
Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);

store = EntityStoreFactory.createEntityStore(config);
store.initialize(config);
Expand Down
12 changes: 6 additions & 6 deletions common/src/main/java/com/datastrato/gravitino/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ public abstract class Config {
private final ConcurrentMap<String, String> configMap;

private final Map<String, DeprecatedConfig> deprecatedConfigMap;
// Constant Array to hold all deprecated configurations
// Constant Array to hold all deprecated configuration keys, when a configuration is deprecated,
// we should add it here.
private final DeprecatedConfig[] deprecatedConfigs = {
// Example deprecated configuration
// new DeprecatedConfig(
// "gravitino.test.string",
// "1.0",
// "Please use gravitino.test.string1 instead."),
new DeprecatedConfig(
"gravitino.entity.store.kv.deleteAfterTimeMs",
"0.5.0",
"Please use gravitino.entity.store.deleteAfterTimeMs instead."),
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testConfWithAlternatives() {
ConfigEntry<String> testConf =
new ConfigBuilder("gravitino.test.string")
.alternatives(
Lists.newArrayList("gravitino.test.string.alt1", "gravitino.test.string.alt1"))
Lists.newArrayList("gravitino.test.string.alt1", "gravitino.test.string.alt2"))
.stringConf()
.createWithDefault("test");

Expand All @@ -101,7 +101,7 @@ public void testConfWithAlternatives() {
ConfigEntry<String> testConf1 =
new ConfigBuilder("gravitino.test.string.no-exist")
.alternatives(
Lists.newArrayList("gravitino.test.string.alt1", "gravitino.test.string.alt1"))
Lists.newArrayList("gravitino.test.string.alt1", "gravitino.test.string.alt2"))
.stringConf()
.createWithDefault("test");

Expand Down
70 changes: 68 additions & 2 deletions core/src/main/java/com/datastrato/gravitino/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.datastrato.gravitino.config.ConfigBuilder;
import com.datastrato.gravitino.config.ConfigConstants;
import com.datastrato.gravitino.config.ConfigEntry;
import com.google.common.collect.Lists;
import java.io.File;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -33,6 +34,24 @@ public interface Configs {
Long DEFAULT_KV_DELETE_AFTER_TIME = 604800000L; // 7 days
String KV_DELETE_AFTER_TIME_KEY = "gravitino.entity.store.kv.deleteAfterTimeMs";

// Config for data keep time after soft deletion, in milliseconds.
String STORE_DELETE_AFTER_TIME_KEY = "gravitino.entity.store.deleteAfterTimeMs";
// using the fallback default value
Long DEFAULT_STORE_DELETE_AFTER_TIME = DEFAULT_KV_DELETE_AFTER_TIME;
// The maximum allowed keep time for data deletion, in milliseconds. Equivalent to 30 days.
Long MAX_DELETE_TIME_ALLOW = 1000 * 60 * 60 * 24 * 30L;
// The minimum allowed keep time for data deletion, in milliseconds. Equivalent to 10 minutes.
Long MIN_DELETE_TIME_ALLOW = 1000 * 60 * 10L;

// Count of versions allowed to be retained, including the current version, used to delete old
// versions data.
String VERSION_RETENTION_COUNT_KEY = "gravitino.entity.store.versionRetentionCount";
Long DEFAULT_VERSION_RETENTION_COUNT = 1L;
// The maximum allowed count of versions to be retained
Long MAX_VERSION_RETENTION_COUNT = 10L;
// The minimum allowed count of versions to be retained
Long MIN_VERSION_RETENTION_COUNT = 1L;

// Default path for RocksDB backend is "${GRAVITINO_HOME}/data/rocksdb"
String DEFAULT_KV_ROCKSDB_BACKEND_PATH =
String.join(File.separator, System.getenv("GRAVITINO_HOME"), "data", "rocksdb");
Expand Down Expand Up @@ -146,11 +165,58 @@ public interface Configs {
ConfigEntry<Long> KV_DELETE_AFTER_TIME =
new ConfigBuilder(KV_DELETE_AFTER_TIME_KEY)
.doc(
"The maximum time in milliseconds that the deleted data and old version data is kept")
.version(ConfigConstants.VERSION_0_3_0)
String.format(
"The maximum time in milliseconds that the deleted data and old version data is kept, "
+ "max delete time allow is %s ms(30 days), "
+ "min delete time allow is %s ms(10 minutes)",
MAX_DELETE_TIME_ALLOW, MIN_DELETE_TIME_ALLOW))
.version(ConfigConstants.VERSION_0_5_0)
.deprecated()
.longConf()
.checkValue(
v -> v >= MIN_DELETE_TIME_ALLOW && v <= MAX_DELETE_TIME_ALLOW,
String.format(
"The value of %s is out of range, which must be between %s and %s",
KV_DELETE_AFTER_TIME_KEY, MIN_DELETE_TIME_ALLOW, MAX_DELETE_TIME_ALLOW))
.createWithDefault(DEFAULT_KV_DELETE_AFTER_TIME);

ConfigEntry<Long> STORE_DELETE_AFTER_TIME =
new ConfigBuilder(STORE_DELETE_AFTER_TIME_KEY)
.doc(
String.format(
"The maximum time in milliseconds that the deleted data and old version data is kept, "
+ "max delete time allow is %s ms(30 days), "
+ "min delete time allow is %s ms(10 minutes)",
MAX_DELETE_TIME_ALLOW, MIN_DELETE_TIME_ALLOW))
.version(ConfigConstants.VERSION_0_5_0)
.alternatives(Lists.newArrayList(KV_DELETE_AFTER_TIME_KEY))
.longConf()
.checkValue(
v -> v >= MIN_DELETE_TIME_ALLOW && v <= MAX_DELETE_TIME_ALLOW,
String.format(
"The value of %s is out of range, which must be between %s and %s",
STORE_DELETE_AFTER_TIME_KEY, MIN_DELETE_TIME_ALLOW, MAX_DELETE_TIME_ALLOW))
.createWithDefault(DEFAULT_STORE_DELETE_AFTER_TIME);

ConfigEntry<Long> VERSION_RETENTION_COUNT =
new ConfigBuilder(VERSION_RETENTION_COUNT_KEY)
.doc(
String.format(
"The count of versions allowed to be retained, including the current version, "
+ "max version retention count is %s, "
+ "min version retention count is %s",
MAX_VERSION_RETENTION_COUNT, MIN_VERSION_RETENTION_COUNT))
.version(ConfigConstants.VERSION_0_5_0)
.longConf()
.checkValue(
v -> v >= MIN_VERSION_RETENTION_COUNT && v <= MAX_VERSION_RETENTION_COUNT,
String.format(
"The value of %s is out of range, which must be between %s and %s",
VERSION_RETENTION_COUNT_KEY,
MIN_VERSION_RETENTION_COUNT,
MAX_VERSION_RETENTION_COUNT))
.createWithDefault(DEFAULT_VERSION_RETENTION_COUNT);

// The followings are configurations for tree lock

ConfigEntry<Long> TREE_LOCK_MAX_NODE_IN_MEMORY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.datastrato.gravitino.storage.kv;

import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.storage.kv.KvNameMappingService.GENERAL_NAME_MAPPING_PREFIX;
import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.endOfTransactionId;
import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.generateCommitKey;
Expand Down Expand Up @@ -44,9 +44,6 @@ public final class KvGarbageCollector implements Closeable {
private final Config config;
private final EntityKeyEncoder<byte[]> entityKeyEncoder;

private static final long MAX_DELETE_TIME_ALLOW = 1000 * 60 * 60 * 24 * 30L; // 30 days
private static final long MIN_DELETE_TIME_ALLOW = 1000 * 60 * 10L; // 10 minutes

private static final String TIME_STAMP_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";

@VisibleForTesting
Expand All @@ -65,21 +62,10 @@ public KvGarbageCollector(
this.kvBackend = kvBackend;
this.config = config;
this.entityKeyEncoder = entityKeyEncoder;

long deleteTimeLine = config.get(KV_DELETE_AFTER_TIME);
if (deleteTimeLine > MAX_DELETE_TIME_ALLOW || deleteTimeLine < MIN_DELETE_TIME_ALLOW) {
throw new IllegalArgumentException(
String.format(
"The delete time line is too long or too short, "
+ "please check it. The delete time line is %s ms,"
+ "max delete time allow is %s ms(30 days),"
+ "min delete time allow is %s ms(10 minutes)",
deleteTimeLine, MAX_DELETE_TIME_ALLOW, MIN_DELETE_TIME_ALLOW));
}
}

public void start() {
long dateTimeLineMinute = config.get(KV_DELETE_AFTER_TIME) / 1000 / 60;
long dateTimeLineMinute = config.get(STORE_DELETE_AFTER_TIME) / 1000 / 60;

// We will collect garbage every 10 minutes at least. If the dateTimeLineMinute is larger than
// 100 minutes, we would collect garbage every dateTimeLineMinute/10 minutes.
Expand Down Expand Up @@ -133,7 +119,7 @@ private void collectAndRemoveUncommittedData() throws IOException {
}

private void collectAndRemoveOldVersionData() throws IOException {
long deleteTimeLine = System.currentTimeMillis() - config.get(KV_DELETE_AFTER_TIME);
long deleteTimeLine = System.currentTimeMillis() - config.get(STORE_DELETE_AFTER_TIME);
// Why should we leave shift 18 bits? please refer to TransactionIdGeneratorImpl#nextId
// We can delete the data which is older than deleteTimeLine.(old data with transaction id that
// is smaller than transactionIdToDelete)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_STORE;
import static com.datastrato.gravitino.Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE;
import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;

import com.datastrato.gravitino.Catalog;
Expand Down Expand Up @@ -92,7 +92,7 @@ private void init(String type, Config config) {

Assertions.assertEquals(KV_STORE_PATH, config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH));
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
Mockito.when(config.get(KV_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);
Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);
} else if (type.equals(Configs.RELATIONAL_ENTITY_STORE)) {
File dir = new File(DB_DIR);
if (dir.exists() || !dir.isDirectory()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import static com.datastrato.gravitino.Configs.ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_STORE;
import static com.datastrato.gravitino.Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;
import static com.datastrato.gravitino.storage.kv.BinaryEntityKeyEncoder.BYTABLE_NAMESPACE_SEPARATOR;
import static com.datastrato.gravitino.storage.kv.BinaryEntityKeyEncoder.WILD_CARD;
Expand Down Expand Up @@ -52,7 +52,7 @@ private Config getConfig() throws IOException {
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(3000L);
Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv");
Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFAULT_ENTITY_KV_STORE);
Mockito.when(config.get(KV_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);
Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import static com.datastrato.gravitino.Configs.ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_STORE;
import static com.datastrato.gravitino.Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;

import com.datastrato.gravitino.Config;
Expand Down Expand Up @@ -60,16 +60,20 @@ public void cleanEnv() {
}
}

@Test
void testCreateKvEntityStore() throws IOException {
public Config getConfig() throws IOException {
Config config = Mockito.mock(Config.class);
Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv");
Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFAULT_ENTITY_KV_STORE);
Mockito.when(config.get(Configs.ENTITY_SERDE)).thenReturn("proto");
Mockito.when(config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)).thenReturn("/tmp/gravitino");
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
Mockito.when(config.get(KV_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);
Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);
return config;
}

@Test
void testCreateKvEntityStore() throws IOException {
Config config = getConfig();
FileUtils.deleteDirectory(FileUtils.getFile("/tmp/gravitino"));

try (EntityStore store = EntityStoreFactory.createEntityStore(config)) {
Expand Down Expand Up @@ -173,16 +177,11 @@ void testCreateKvEntityStore() throws IOException {
@Test
@Disabled("KvEntityStore is not thread safe after issue #780")
void testConcurrentIssues() throws IOException, ExecutionException, InterruptedException {
Config config = Mockito.mock(Config.class);
Config config = getConfig();
File baseDir = new File(System.getProperty("java.io.tmpdir"));
File file = Files.createTempDirectory(baseDir.toPath(), "test").toFile();
file.deleteOnExit();
Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv");
Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFAULT_ENTITY_KV_STORE);
Mockito.when(config.get(Configs.ENTITY_SERDE)).thenReturn("proto");
Mockito.when(config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(file.getAbsolutePath());
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
Mockito.when(config.get(KV_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);

ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(
Expand Down
Loading
Loading