Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3968] improvement(core): Disable KV entity store and optimize CI #3975

Merged
merged 15 commits into from
Jul 8, 2024
Merged
2 changes: 1 addition & 1 deletion .github/workflows/backend-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
architecture: [linux/amd64]
java-version: [ 8, 11, 17 ]
test-mode: [ embedded, deploy ]
backend: [ jdbcBackend, kvBackend]
backend: [ mysql, h2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we can reduce the pipeline to combine embedded with h2, and deploy with msyql, WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, test-mode and backend are different things, I would rather to remove embedded mode in the test-mode to reduce pipelines as backend h2 and MySQL will be run by the user in the real environment, however embedded mode will never exists in real environment.

env:
PLATFORM: ${{ matrix.architecture }}
steps:
Expand Down
5 changes: 2 additions & 3 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,8 @@ allprojects {

// Change poll image pause time from 30s to 60s
param.environment("TESTCONTAINERS_PULL_PAUSE_TIMEOUT", "60")
if (project.hasProperty("jdbcBackend")) {
param.environment("jdbcBackend", "true")
}
val jdbcDatabase = project.properties["jdbcBackend"] as? String ?: "h2"
param.environment("jdbcBackend", jdbcDatabase)

val testMode = project.properties["testMode"] as? String ?: "embedded"
param.systemProperty("gravitino.log.path", project.buildDir.path + "/${project.name}-integration-test.log")
Expand Down
10 changes: 10 additions & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dependencies {

testImplementation(libs.bundles.log4j)
testImplementation(libs.mockito.core)
testImplementation(libs.mockito.inline)
testImplementation(libs.mysql.driver)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
Expand Down Expand Up @@ -101,6 +102,15 @@ tasks {
}

tasks.test {
doFirst {
val testMode = project.properties["testMode"] as? String ?: "embedded"
if (testMode == "deploy") {
environment("GRAVITINO_HOME", project.rootDir.path + "/distribution/package")
} else if (testMode == "embedded") {
environment("GRAVITINO_HOME", project.rootDir.path)
}
}

val skipUTs = project.hasProperty("skipTests")
if (skipUTs) {
// Only run integration tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,27 @@
*/
package com.datastrato.gravitino.catalog.hadoop;

import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_KV_ROCKSDB_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_SERDE;
import static com.datastrato.gravitino.Configs.ENTITY_STORE;
import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE;
import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;
import static com.datastrato.gravitino.Configs.VERSION_RETENTION_COUNT;
import static com.datastrato.gravitino.catalog.hadoop.HadoopCatalog.CATALOG_PROPERTIES_META;
import static com.datastrato.gravitino.catalog.hadoop.HadoopCatalog.FILESET_PROPERTIES_META;
import static com.datastrato.gravitino.catalog.hadoop.HadoopCatalog.SCHEMA_PROPERTIES_META;
import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
import com.datastrato.gravitino.EntitySerDeFactory;
import com.datastrato.gravitino.EntityStore;
import com.datastrato.gravitino.EntityStoreFactory;
import com.datastrato.gravitino.NameIdentifier;
Expand All @@ -49,9 +56,12 @@
import com.datastrato.gravitino.file.FilesetChange;
import com.datastrato.gravitino.storage.IdGenerator;
import com.datastrato.gravitino.storage.RandomIdGenerator;
import com.datastrato.gravitino.storage.relational.service.CatalogMetaService;
import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
import com.datastrato.gravitino.utils.NameIdentifierUtil;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
Expand All @@ -71,13 +81,15 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public class TestHadoopCatalogOperations {

private static final String ROCKS_DB_STORE_PATH =
private static final String STORE_PATH =
"/tmp/gravitino_test_entityStore_" + UUID.randomUUID().toString().replace("-", "");

private static final String H2_file = STORE_PATH + ".mv.db";
private static final String UNFORMALIZED_TEST_ROOT_PATH =
"/tmp/gravitino_test_catalog_" + UUID.randomUUID().toString().replace("-", "");

Expand Down Expand Up @@ -118,28 +130,65 @@ public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationE
@BeforeAll
public static void setUp() {
Config config = Mockito.mock(Config.class);
Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv");
Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFAULT_ENTITY_KV_STORE);
Mockito.when(config.get(Configs.ENTITY_SERDE)).thenReturn("proto");
Mockito.when(config.get(ENTITY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(ROCKS_DB_STORE_PATH);

Assertions.assertEquals(ROCKS_DB_STORE_PATH, config.get(ENTITY_KV_ROCKSDB_BACKEND_PATH));
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);
when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE);
when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE);
when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PATH)).thenReturn(STORE_PATH);

// The following properties are used to create the JDBC connection; they are just for test, in
// the real world,
// they will be set automatically by the configuration file if you set ENTITY_RELATIONAL_STORE
// as EMBEDDED_ENTITY_RELATIONAL_STORE.
when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_URL))
.thenReturn(String.format("jdbc:h2:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", STORE_PATH));
when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_USER)).thenReturn("gravitino");
when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)).thenReturn("gravitino");
when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver");

File f = FileUtils.getFile(STORE_PATH);
f.deleteOnExit();

when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L);
when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);
when(config.get(ENTITY_SERDE)).thenReturn("proto");

store = EntityStoreFactory.createEntityStore(config);
store.initialize(config);
store.setSerDe(EntitySerDeFactory.createEntitySerDe(config));
idGenerator = new RandomIdGenerator();

// Mock
MetalakeMetaService metalakeMetaService = MetalakeMetaService.getInstance();
MetalakeMetaService spyMetaservice = Mockito.spy(metalakeMetaService);
doReturn(1L).when(spyMetaservice).getMetalakeIdByName(Mockito.anyString());

CatalogMetaService catalogMetaService = CatalogMetaService.getInstance();
CatalogMetaService spyCatalogMetaService = Mockito.spy(catalogMetaService);
doReturn(1L)
.when(spyCatalogMetaService)
.getCatalogIdByMetalakeIdAndName(Mockito.anyLong(), Mockito.anyString());

MockedStatic<MetalakeMetaService> metalakeMetaServiceMockedStatic =
Mockito.mockStatic(MetalakeMetaService.class);
MockedStatic<CatalogMetaService> catalogMetaServiceMockedStatic =
Mockito.mockStatic(CatalogMetaService.class);

metalakeMetaServiceMockedStatic
.when(MetalakeMetaService::getInstance)
.thenReturn(spyMetaservice);
catalogMetaServiceMockedStatic
.when(CatalogMetaService::getInstance)
.thenReturn(spyCatalogMetaService);
}

@AfterAll
public static void tearDown() throws IOException {
store.close();
FileUtils.deleteDirectory(FileUtils.getFile(ROCKS_DB_STORE_PATH));
new Path(TEST_ROOT_PATH)
.getFileSystem(new Configuration())
.delete(new Path(TEST_ROOT_PATH), true);

File f = FileUtils.getFile(H2_file);
f.delete();
}

@Test
Expand Down
10 changes: 10 additions & 0 deletions catalogs/catalog-kafka/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.kafka)
testImplementation(libs.mockito.core)
testImplementation(libs.mockito.inline)
testImplementation(libs.mysql.driver)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.mysql)
Expand Down Expand Up @@ -83,6 +84,15 @@ tasks.getByName("generateMetadataFileForMavenJavaPublication") {
}

tasks.test {
doFirst {
val testMode = project.properties["testMode"] as? String ?: "embedded"
if (testMode == "deploy") {
environment("GRAVITINO_HOME", project.rootDir.path + "/distribution/package")
} else if (testMode == "embedded") {
environment("GRAVITINO_HOME", project.rootDir.path)
}
}

val skipUTs = project.hasProperty("skipTests")
if (skipUTs) {
// Only run integration tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,21 @@

import static com.datastrato.gravitino.Catalog.Type.MESSAGING;
import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_KV_ROCKSDB_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER;
import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_SERDE;
import static com.datastrato.gravitino.Configs.ENTITY_STORE;
import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE;
import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;
import static com.datastrato.gravitino.Configs.VERSION_RETENTION_COUNT;
import static com.datastrato.gravitino.StringIdentifier.ID_KEY;
import static com.datastrato.gravitino.catalog.kafka.KafkaCatalog.CATALOG_PROPERTIES_METADATA;
import static com.datastrato.gravitino.catalog.kafka.KafkaCatalog.SCHEMA_PROPERTIES_METADATA;
Expand All @@ -33,10 +43,11 @@
import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS;
import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.PARTITION_COUNT;
import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.REPLICATION_FACTOR;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
import com.datastrato.gravitino.EntitySerDeFactory;
import com.datastrato.gravitino.EntityStore;
import com.datastrato.gravitino.EntityStoreFactory;
import com.datastrato.gravitino.NameIdentifier;
Expand All @@ -55,7 +66,10 @@
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.storage.IdGenerator;
import com.datastrato.gravitino.storage.RandomIdGenerator;
import com.datastrato.gravitino.storage.relational.service.CatalogMetaService;
import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
Expand All @@ -65,12 +79,13 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public class TestKafkaCatalogOperations extends KafkaClusterEmbedded {

private static final String ROCKS_DB_STORE_PATH =
"/tmp/gravitino_test_entityStore_" + genRandomString();
private static final String STORE_PATH = "/tmp/gravitino_test_entityStore_" + genRandomString();
private static final String H2_FILE = STORE_PATH + ".mv.db";
private static final String METALAKE_NAME = "metalake";
private static final String CATALOG_NAME = "test_kafka_catalog";
private static final String DEFAULT_SCHEMA_NAME = "default";
Expand Down Expand Up @@ -114,15 +129,55 @@ public static void setUp() {
Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv");
Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFAULT_ENTITY_KV_STORE);
Mockito.when(config.get(Configs.ENTITY_SERDE)).thenReturn("proto");
Mockito.when(config.get(ENTITY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(ROCKS_DB_STORE_PATH);
Mockito.when(config.get(ENTITY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(STORE_PATH);

Assertions.assertEquals(ROCKS_DB_STORE_PATH, config.get(ENTITY_KV_ROCKSDB_BACKEND_PATH));
Assertions.assertEquals(STORE_PATH, config.get(ENTITY_KV_ROCKSDB_BACKEND_PATH));
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);

when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE);
when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE);
when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PATH)).thenReturn(STORE_PATH);

when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_URL))
.thenReturn(String.format("jdbc:h2:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", STORE_PATH));
when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_USER)).thenReturn("gravitino");
when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)).thenReturn("gravitino");
when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver");

File f = FileUtils.getFile(STORE_PATH);
f.deleteOnExit();

when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L);
when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);
when(config.get(ENTITY_SERDE)).thenReturn("proto");

// Mock
MetalakeMetaService metalakeMetaService = MetalakeMetaService.getInstance();
MetalakeMetaService spyMetaservice = Mockito.spy(metalakeMetaService);
doReturn(1L).when(spyMetaservice).getMetalakeIdByName(Mockito.anyString());

CatalogMetaService catalogMetaService = CatalogMetaService.getInstance();
CatalogMetaService spyCatalogMetaService = Mockito.spy(catalogMetaService);
doReturn(1L)
.when(spyCatalogMetaService)
.getCatalogIdByMetalakeIdAndName(Mockito.anyLong(), Mockito.anyString());

MockedStatic<MetalakeMetaService> metalakeMetaServiceMockedStatic =
Mockito.mockStatic(MetalakeMetaService.class);
MockedStatic<CatalogMetaService> catalogMetaServiceMockedStatic =
Mockito.mockStatic(CatalogMetaService.class);

metalakeMetaServiceMockedStatic
.when(MetalakeMetaService::getInstance)
.thenReturn(spyMetaservice);
catalogMetaServiceMockedStatic
.when(CatalogMetaService::getInstance)
.thenReturn(spyCatalogMetaService);

store = EntityStoreFactory.createEntityStore(config);
store.initialize(config);
store.setSerDe(EntitySerDeFactory.createEntitySerDe(config));
idGenerator = new RandomIdGenerator();
kafkaCatalogEntity =
CatalogEntity.builder()
Expand All @@ -147,7 +202,7 @@ public static void setUp() {
public static void tearDown() throws IOException {
if (store != null) {
store.close();
FileUtils.deleteDirectory(FileUtils.getFile(ROCKS_DB_STORE_PATH));
FileUtils.deleteQuietly(FileUtils.getFile(H2_FILE));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.collect.Maps;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
Expand Down Expand Up @@ -98,6 +102,13 @@ void resetSchema() {
@AfterAll
static void cleanUp() {
paimonCatalogOperations.dropSchema(schemaIdent, true);
String warehousePath = "/tmp/paimon_catalog_warehouse";
try {
FileUtils.deleteDirectory(new File(warehousePath));
Files.delete(Paths.get(warehousePath));
} catch (Exception e) {
// Ignore
}
}

private static CatalogEntity createDefaultCatalogEntity() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -69,9 +71,18 @@ public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationE
}
};

private String tempDir =
private static String tempDir =
String.join(File.separator, System.getProperty("java.io.tmpdir"), "paimon_catalog_warehouse");

@AfterAll
public static void clean() {
try {
FileUtils.deleteDirectory(new File(tempDir));
} catch (Exception e) {
// Ignore
}
}

@Test
public void testCatalogOperation() {
AuditInfo auditInfo =
Expand Down
Loading
Loading