diff --git a/.gitignore b/.gitignore index 7d98520a8de..5231df57658 100644 --- a/.gitignore +++ b/.gitignore @@ -37,4 +37,7 @@ distribution server/src/main/resources/project.properties dev/docker/hive/packages -docs/build \ No newline at end of file +docs/build + +dev/docker/tools/docker-connector +dev/docker/tools/docker-connector.conf \ No newline at end of file diff --git a/conf/gravitino.conf.template b/conf/gravitino.conf.template index 2da5ca1f30c..1bc918766ab 100644 --- a/conf/gravitino.conf.template +++ b/conf/gravitino.conf.template @@ -8,7 +8,7 @@ gravitino.server.shutdown.timeout = 3000 # THE CONFIGURATION FOR Gravitino WEB SERVER # The host name of the built-in web server -gravitino.server.webserver.host = 127.0.0.1 +gravitino.server.webserver.host = 0.0.0.0 # The http port number of the built-in web server gravitino.server.webserver.httpPort = 8090 # The min thread size of the built-in web server @@ -44,7 +44,7 @@ gravitino.auxService.names = iceberg-rest # Iceberg REST service classpath gravitino.auxService.iceberg-rest.classpath = catalogs/lakehouse-iceberg/libs, catalogs/lakehouse-iceberg/conf # Iceberg REST service host -gravitino.auxService.iceberg-rest.host = 127.0.0.1 +gravitino.auxService.iceberg-rest.host = 0.0.0.0 # Iceberg REST service http port gravitino.auxService.iceberg-rest.httpPort = 9001 diff --git a/dev/docker/hive/Dockerfile b/dev/docker/hive/Dockerfile index 811627999e9..4b0929f959b 100644 --- a/dev/docker/hive/Dockerfile +++ b/dev/docker/hive/Dockerfile @@ -4,7 +4,7 @@ # FROM ubuntu:16.04 -LABEL maintainer="dev@datastrato.com" +LABEL maintainer="support@datastrato.com" ARG HADOOP_PACKAGE_NAME ARG HIVE_PACKAGE_NAME diff --git a/dev/docker/hive/README.md b/dev/docker/hive/README.md index d37958c3506..ce918c202a0 100644 --- a/dev/docker/hive/README.md +++ b/dev/docker/hive/README.md @@ -59,3 +59,6 @@ ssh -p 8022 datastrato@localhost (password: ds123, this is a sudo user) - Config HDFS DataNode data transfer address to `0.0.0.0:50010` explicitly - Map container hostname to `127.0.0.1` before starting Hadoop - Expose `50010` port for the HDFS DataNode + +### 0.1.5 +- Rollback `Map container hostname to 127.0.0.1 before starting Hadoop` of `datastrato/gravitino-ci-hive:0.1.4` diff --git a/dev/docker/hive/start.sh b/dev/docker/hive/start.sh index a73d906b19f..8f9357c478a 100644 --- a/dev/docker/hive/start.sh +++ b/dev/docker/hive/start.sh @@ -9,12 +9,6 @@ service ssh start ssh-keyscan localhost > /root/.ssh/known_hosts ssh-keyscan 0.0.0.0 >> /root/.ssh/known_hosts -# Map the hostname to 127.0.0.1 for external access datanode -hostname=$(cat /etc/hostname) -new_content=$(cat /etc/hosts | sed "/$hostname/s/^/# /") -new_content="${new_content}\n127.0.0.1 ${hostname}" -echo -e "$new_content" > /etc/hosts - # start hadoop ${HADOOP_HOME}/sbin/start-all.sh diff --git a/dev/docker/tools/README.md b/dev/docker/tools/README.md new file mode 100644 index 00000000000..75e9cc0da88 --- /dev/null +++ b/dev/docker/tools/README.md @@ -0,0 +1,11 @@ + + +# Mac Docker Connector +Because Docker Desktop for Mac does not provide access to container IP from host(macOS). +This can result in host(macOS) and containers not being able to access each other's internal services directly over IPs. +The [mac-docker-connector](https://github.com/wenjunxiao/mac-docker-connector) provides the ability for the macOS host to directly access the docker container IP. +Before running the integration tests, make sure to execute the `dev/docker/tools/mac-docker-connector.sh` script. +> Developing Gravitino in a linux environment does not have this limitation and does not require executing the `mac-docker-connector.sh` script ahead of time. diff --git a/dev/docker/tools/mac-docker-connector.sh b/dev/docker/tools/mac-docker-connector.sh new file mode 100755 index 00000000000..529b86baf1a --- /dev/null +++ b/dev/docker/tools/mac-docker-connector.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# +# Copyright 2023 Datastrato. +# This software is licensed under the Apache License version 2. +# +#set -ex + +bin="$(dirname "${BASH_SOURCE-$0}")" +bin="$(cd "${bin}">/dev/null; pwd)" + +OS=$(uname -s) +if [ "${OS}" != "Darwin" ]; then + echo "Only macOS needs to run mac-docker-connector." + exit 1 +fi + +if pgrep -xq "docker-connector"; then + echo "docker-connector is running." + exit 1 +fi + +# Download docker-connector +DOCKER_CONNECTOR_PACKAGE_NAME="docker-connector-darwin.tar.gz" +DOCKER_CONNECTOR_DOWNLOAD_URL="https://github.com/wenjunxiao/mac-docker-connector/releases/download/v3.2/${DOCKER_CONNECTOR_PACKAGE_NAME}" +if [ ! -f "${bin}/docker-connector" ]; then + wget -q -P "${bin}" ${DOCKER_CONNECTOR_DOWNLOAD_URL} + tar -xzf "${bin}/${DOCKER_CONNECTOR_PACKAGE_NAME}" -C "${bin}" + rm -rf "${bin}/${DOCKER_CONNECTOR_PACKAGE_NAME}" +fi + +# Create a docker-connector.conf file with the routes to the docker networks +if [ ! -f "${bin}/docker-connector.conf" ]; then + docker network ls --filter driver=bridge --format "{{.ID}}" | xargs docker network inspect --format "route {{range .IPAM.Config}}{{.Subnet}}{{end}}" > ${bin}/docker-connector.conf +fi + +echo "Start docker-connector requires root privileges, Please enter the root password." +sudo ${bin}/docker-connector -config ${bin}/docker-connector.conf \ No newline at end of file diff --git a/dev/docker/trino/conf/catalog/gravitino.properties.template b/dev/docker/trino/conf/catalog/gravitino.properties.template new file mode 100644 index 00000000000..a9b09b1a61c --- /dev/null +++ b/dev/docker/trino/conf/catalog/gravitino.properties.template @@ -0,0 +1,7 @@ +# +# Copyright 2023 Datastrato. +# This software is licensed under the Apache License version 2. +# +connector.name = gravitino +gravitino.uri = http://GRAVITINO_HOST_IP:GRAVITINO_HOST_PORT +gravitino.metalake = GRAVITINO_METALAKE_NAME \ No newline at end of file diff --git a/dev/docker/trino/conf/catalog/hive.properties.template b/dev/docker/trino/conf/catalog/hive.properties.template new file mode 100644 index 00000000000..e1d427fcecf --- /dev/null +++ b/dev/docker/trino/conf/catalog/hive.properties.template @@ -0,0 +1,7 @@ +# +# Copyright 2023 Datastrato. +# This software is licensed under the Apache License version 2. +# +connector.name = hive +hive.metastore.uri = thrift://HIVE_HOST_IP:9083 +hive.allow-drop-table = true \ No newline at end of file diff --git a/dev/docker/trino/conf/catalog/jmx.properties b/dev/docker/trino/conf/catalog/jmx.properties new file mode 100644 index 00000000000..95763ae17fa --- /dev/null +++ b/dev/docker/trino/conf/catalog/jmx.properties @@ -0,0 +1,5 @@ +# +# Copyright 2023 Datastrato. +# This software is licensed under the Apache License version 2. +# +connector.name = jmx \ No newline at end of file diff --git a/dev/docker/trino/conf/catalog/tpcds.properties b/dev/docker/trino/conf/catalog/tpcds.properties new file mode 100644 index 00000000000..19cb1244f6e --- /dev/null +++ b/dev/docker/trino/conf/catalog/tpcds.properties @@ -0,0 +1,6 @@ +# +# Copyright 2023 Datastrato. +# This software is licensed under the Apache License version 2. +# +connector.name = tpcds +tpcds.splits-per-node = 4 \ No newline at end of file diff --git a/dev/docker/trino/conf/catalog/tpch.properties b/dev/docker/trino/conf/catalog/tpch.properties new file mode 100644 index 00000000000..d18f440da9a --- /dev/null +++ b/dev/docker/trino/conf/catalog/tpch.properties @@ -0,0 +1,6 @@ +# +# Copyright 2023 Datastrato. +# This software is licensed under the Apache License version 2. +# +connector.name = tpch +tpch.splits-per-node = 4 \ No newline at end of file diff --git a/dev/docker/trino/conf/config.properties b/dev/docker/trino/conf/config.properties new file mode 100644 index 00000000000..4b4909b327c --- /dev/null +++ b/dev/docker/trino/conf/config.properties @@ -0,0 +1,16 @@ +# +# Copyright 2023 Datastrato. +# This software is licensed under the Apache License version 2. +# +#single node install config +#coordinator = true +#node-scheduler.include-coordinator = true +#http-server.http.port = 8080 +#discovery-server.enabled = true +#discovery.uri = http://localhost:8080 +#protocol.v1.alternate-header-name = Presto +#hive.hdfs.impersonation.enabled = true +coordinator = true +node-scheduler.include-coordinator = true +http-server.http.port = 8080 +discovery.uri = http://0.0.0.0:8080 diff --git a/dev/docker/trino/conf/jvm.config b/dev/docker/trino/conf/jvm.config new file mode 100644 index 00000000000..4a6efada839 --- /dev/null +++ b/dev/docker/trino/conf/jvm.config @@ -0,0 +1,18 @@ +# +# Copyright 2023 Datastrato. +# This software is licensed under the Apache License version 2. +# +-server +-Xmx1G +-XX:-UseBiasedLocking +-XX:+UseG1GC +-XX:G1HeapRegionSize=32M +-XX:+ExplicitGCInvokesConcurrent +-XX:+HeapDumpOnOutOfMemoryError +-XX:+UseGCOverheadLimit +-XX:+ExitOnOutOfMemoryError +-XX:ReservedCodeCacheSize=256M +-Djdk.attach.allowAttachSelf=true +-Djdk.nio.maxCachedBufferSize=2000000 +-DHADOOP_USER_NAME=hive +-Dlog4j.configurationFile=/etc/trino/log4j2.properties \ No newline at end of file diff --git a/dev/docker/trino/conf/log.properties b/dev/docker/trino/conf/log.properties new file mode 100644 index 00000000000..dce33aedc4d --- /dev/null +++ b/dev/docker/trino/conf/log.properties @@ -0,0 +1,6 @@ +# +# Copyright 2023 Datastrato. +# This software is licensed under the Apache License version 2. +# +# Enable verbose logging from Trino +io.trino = INFO \ No newline at end of file diff --git a/dev/docker/trino/conf/log4j2.properties b/dev/docker/trino/conf/log4j2.properties new file mode 100644 index 00000000000..693be4c3081 --- /dev/null +++ b/dev/docker/trino/conf/log4j2.properties @@ -0,0 +1,30 @@ +# +# Copyright 2023 Datastrato. +# This software is licensed under the Apache License version 2. +# + +# Set to debug or trace if log4j initialization is failing +status = info + +# Name of the configuration +name = ConsoleLogConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = consoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# File appender configuration +appender.file.type = File +appender.file.name = fileLogger +appender.file.fileName = gravitino-trino-connector.log +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# Root logger level +rootLogger.level = info + +# Root logger referring to console and file appenders +rootLogger.appenderRef.stdout.ref = consoleLogger +rootLogger.appenderRef.file.ref = fileLogger diff --git a/dev/docker/trino/conf/node.properties b/dev/docker/trino/conf/node.properties new file mode 100644 index 00000000000..d85e2f0048a --- /dev/null +++ b/dev/docker/trino/conf/node.properties @@ -0,0 +1,7 @@ +# +# Copyright 2023 Datastrato. +# This software is licensed under the Apache License version 2. +# +node.environment = docker +node.data-dir = /data/trino +plugin.dir = /usr/lib/trino/plugin diff --git a/docs/integration-test.md b/docs/integration-test.md index 53443654e26..7891d7f40a2 100644 --- a/docs/integration-test.md +++ b/docs/integration-test.md @@ -82,12 +82,30 @@ Run only test cases where tag is set `gravitino-docker-it`. [embbeded|deplo --------------------------------------------------------------- ``` +If Docker is not installed or the `mac docker connector` is not running, the `./gradlew test -PtestMode=[embedded|deploy]` +command will skip the test cases that depend on the `mac docker connector`. + +```text +------------------- Check Docker environment ------------------ +Docker server status .......................................... [running] +Gravitino IT Docker container is already running ............... [no] +Run only test cases where tag is set `gravitino-trino-it`. [embbeded|deploy test] +--------------------------------------------------------------- +``` + > Gravitino will run all integration test cases in the GitHub Actions environment. ### Running Gravitino CI Docker Environment Before running the tests, make sure Docker is installed. +#### Mac Docker connector +Because Docker Desktop for Mac does not provide access to container IP from host(macOS). +The [mac-docker-connector](https://github.com/wenjunxiao/mac-docker-connector) provides the ability for the macOS host to directly access the docker container IP. +This can result in host(macOS) and containers not being able to access each other's internal services directly over IPs. +Before running the integration tests, make sure to execute the `dev/docker/tools/mac-docker-connector.sh` script. +> Developing Gravitino in a linux environment does not have this limitation and does not require executing the `mac-docker-connector.sh` script ahead of time. + #### Running Gravitino Hive CI Docker Environment 1. Run a hive docker test environment container in the local using the `docker run --rm -d -p 8022:22 -p 8088:8088 -p 9000:9000 -p 9083:9083 -p 10000:10000 -p 10002:10002 -p 50010:50010 -p 50070:50070 -p 50075:50075 datastrato/gravitino-ci-hive` command. diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e712076e88e..ec3e1793740 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -31,7 +31,8 @@ spark = "3.4.1" scala-collection-compat = "2.7.0" sqlite-jdbc = "3.42.0.0" testng = "7.7.1" - +testcontainers = "1.19.0" +trino-jdbc = "426" protobuf-plugin = "0.9.2" spotless-plugin = '6.11.0' @@ -99,7 +100,9 @@ scala-collection-compat = { group = "org.scala-lang.modules", name = "scala-col sqlite-jdbc = { group = "org.xerial", name = "sqlite-jdbc", version.ref = "sqlite-jdbc" } testng = { group = "org.testng", name = "testng", version.ref = "testng" } spark-hive = { group = "org.apache.spark", name = "spark-hive_2.13", version.ref = "spark" } - +testcontainers = { group = "org.testcontainers", name = "testcontainers", version.ref = "testcontainers" } +testcontainers-junit-jupiter = { group = "org.testcontainers", name = "junit-jupiter", version.ref = "testcontainers" } +trino-jdbc = { group = "io.trino", name = "trino-jdbc", version.ref = "trino-jdbc" } [bundles] log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", "log4j-12-api"] diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index 06b26ea722c..75d07dc2ee5 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -5,6 +5,7 @@ import java.io.IOException import kotlin.io.* +import org.gradle.internal.os.OperatingSystem plugins { `maven-publish` @@ -111,6 +112,9 @@ dependencies { testImplementation(libs.scala.collection.compat) testImplementation(libs.sqlite.jdbc) testImplementation(libs.spark.hive) + testImplementation(libs.testcontainers) + testImplementation(libs.testcontainers.junit.jupiter) + testImplementation(libs.trino.jdbc) } /* Optimizing integration test execution conditions */ @@ -118,9 +122,11 @@ dependencies { var HIVE_IMAGE_NAME = "datastrato/gravitino-ci-hive" var HIVE_IMAGE_TAG_NAME = "${HIVE_IMAGE_NAME}:0.1.2" var EXCLUDE_DOCKER_TEST = true +var EXCLUDE_TRINO_TEST = true // Use these 3 variables allow for more detailed control in the future. project.extra["dockerRunning"] = false project.extra["hiveContainerRunning"] = false +project.extra["macDockerConnector"] = false fun printDockerCheckInfo() { val testMode = project.properties["testMode"] as? String ?: "embedded" @@ -129,13 +135,20 @@ fun printDockerCheckInfo() { } val dockerRunning = project.extra["dockerRunning"] as? Boolean ?: false val hiveContainerRunning = project.extra["hiveContainerRunning"] as? Boolean ?: false + val macDockerConnector = project.extra["macDockerConnector"] as? Boolean ?: false if (dockerRunning && hiveContainerRunning) { EXCLUDE_DOCKER_TEST = false } + if (dockerRunning && macDockerConnector) { + EXCLUDE_TRINO_TEST = false + } println("------------------ Check Docker environment -----------------") println("Docker server status ........................................ [${if (dockerRunning) "running" else "stop"}]") println("Gravitino IT Docker container is already running ............. [${if (hiveContainerRunning) "yes" else "no"}]") + if (OperatingSystem.current().isMacOsX() && !(dockerRunning && macDockerConnector)) { + println("Run test cases without `gravitino-trino-it` tag .............. [$testMode test]") + } if (dockerRunning && hiveContainerRunning) { println("Use Gravitino IT Docker container to run all integration test. [$testMode test]") @@ -146,6 +159,26 @@ fun printDockerCheckInfo() { } tasks { + register("isMacDockerConnectorRunning") { + doLast { + val processName = "docker-connector" + val command = "pgrep -x ${processName}" + + try { + val execResult = project.exec { + commandLine("bash", "-c", command) + } + if (execResult.exitValue == 0) { + project.extra["macDockerConnector"] = true + } else { + project.extra["macDockerConnector"] = false + } + } catch (e: Exception) { + project.extra["macDockerConnector"] = false + } + } + } + // Use this task to check if docker container is running register("checkContainerRunning") { doLast { @@ -170,7 +203,7 @@ tasks { // Use this task to check if docker is running register("checkDockerRunning") { - dependsOn("checkContainerRunning") + dependsOn("checkContainerRunning", "isMacDockerConnectorRunning") doLast { try { @@ -198,15 +231,23 @@ tasks.test { dependsOn("checkDockerRunning") doFirst { + copy { + from("${project.rootDir}/dev/docker/trino/conf") + into("build/trino-conf") + fileMode = 0b111101101 + } + // Default use MiniGravitino to run integration tests environment("GRAVITINO_ROOT_DIR", rootDir.path) // TODO: use hive user instead after we fix the permission issue #554 environment("HADOOP_USER_NAME", "root") environment("HADOOP_HOME", "/tmp") environment("PROJECT_VERSION", version) + environment("TRINO_CONF_DIR", buildDir.path + "/trino-conf") val testMode = project.properties["testMode"] as? String ?: "embedded" - systemProperty("gravitino.log.path", buildDir.path) + systemProperty("gravitino.log.path", buildDir.path + "/integration-test.log") + delete(buildDir.path + "/integration-test.log") if (testMode == "deploy") { environment("GRAVITINO_HOME", rootDir.path + "/distribution/package") systemProperty("testMode", "deploy") @@ -225,6 +266,12 @@ tasks.test { println("${redColor}Gravitino-docker is not running locally, all integration test cases that tagged 'gravitino-docker-it' will be excluded.${resetColor}") excludeTags("gravitino-docker-it") } + if (EXCLUDE_TRINO_TEST && OperatingSystem.current().isMacOsX()) { + val redColor = "\u001B[31m" + val resetColor = "\u001B[0m" + println("${redColor}mac-docker-connector is not running locally, all integration test cases that tagged 'gravitino-trino-it' will be excluded.${resetColor}") + excludeTags("gravitino-trino-it") + } } } } diff --git a/integration-test/src/main/java/com/datastrato/gravitino/integration/test/util/CloseableGroup.java b/integration-test/src/main/java/com/datastrato/gravitino/integration/test/util/CloseableGroup.java new file mode 100644 index 00000000000..22902ce2d7d --- /dev/null +++ b/integration-test/src/main/java/com/datastrato/gravitino/integration/test/util/CloseableGroup.java @@ -0,0 +1,51 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.util; + +import static com.google.common.base.Throwables.propagateIfPossible; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayDeque; +import java.util.Deque; + +/** This class is inspired by com.google.common.io.Closer */ +public final class CloseableGroup implements AutoCloseable { + private final Deque stack = new ArrayDeque<>(4); + + private CloseableGroup() {} + + public static CloseableGroup create() { + return new CloseableGroup(); + } + + public C register(C closeable) { + requireNonNull(closeable, "closeable is null"); + stack.addFirst(closeable); + return closeable; + } + + @Override + public void close() throws Exception { + Throwable rootCause = null; + while (!stack.isEmpty()) { + AutoCloseable closeable = stack.removeFirst(); + try { + closeable.close(); + } catch (Throwable t) { + if (rootCause == null) { + rootCause = t; + } else if (rootCause != t) { + // Self-suppression not permitted + rootCause.addSuppressed(t); + } + } + } + if (rootCause != null) { + propagateIfPossible(rootCause, Exception.class); + // not possible + throw new AssertionError(rootCause); + } + } +} diff --git a/integration-test/src/main/java/com/datastrato/gravitino/integration/test/util/ITUtils.java b/integration-test/src/main/java/com/datastrato/gravitino/integration/test/util/ITUtils.java index 44074037f53..b67c6651a8b 100644 --- a/integration-test/src/main/java/com/datastrato/gravitino/integration/test/util/ITUtils.java +++ b/integration-test/src/main/java/com/datastrato/gravitino/integration/test/util/ITUtils.java @@ -29,7 +29,11 @@ public static void rewriteConfigFile( OutputStream outputStream = Files.newOutputStream(Paths.get(configFileName))) { props.load(inputStream); props.putAll(configMap); - props.store(outputStream, null); + for (String key : props.stringPropertyNames()) { + String value = props.getProperty(key); + // Use customized write functions to avoid escaping `:` into `\:`. + outputStream.write((key + " = " + value + "\n").getBytes()); + } } } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/CloseableGroupTest.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/CloseableGroupTest.java new file mode 100644 index 00000000000..4d4766fa219 --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/CloseableGroupTest.java @@ -0,0 +1,43 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.trino; + +import com.datastrato.gravitino.integration.test.util.CloseableGroup; +import java.io.Closeable; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class CloseableGroupTest { + @Test + public void callCloseableTest() throws Exception { + Closeable closeable1 = Mockito.mock(Closeable.class); + Closeable closeable2 = Mockito.mock(Closeable.class); + Closeable closeable3 = Mockito.mock(Closeable.class); + + CloseableGroup closeableGroup = CloseableGroup.create(); + closeableGroup.register(closeable1); + closeableGroup.register(closeable2); + closeableGroup.register(closeable3); + + closeableGroup.close(); + Mockito.verify(closeable1).close(); + Mockito.verify(closeable2).close(); + Mockito.verify(closeable3).close(); + } + + @Test + public void callAutoCloseableTest() throws Exception { + Closeable closeable1 = Mockito.mock(Closeable.class); + AutoCloseable closeable2 = Mockito.mock(AutoCloseable.class); + + CloseableGroup closeableGroup = CloseableGroup.create(); + closeableGroup.register(closeable1); + closeableGroup.register(closeable2); + + closeableGroup.close(); + Mockito.verify(closeable1).close(); + Mockito.verify(closeable2).close(); + } +} diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java new file mode 100644 index 00000000000..0f94c1758cf --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java @@ -0,0 +1,443 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.trino; + +import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.catalog.hive.HiveClientPool; +import com.datastrato.gravitino.client.GravitinoMetaLake; +import com.datastrato.gravitino.integration.test.util.AbstractIT; +import com.datastrato.gravitino.integration.test.util.CloseableGroup; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.integration.test.util.HiveContainer; +import com.datastrato.gravitino.integration.test.util.ITUtils; +import com.datastrato.gravitino.integration.test.util.TrinoContainer; +import com.datastrato.gravitino.rel.Schema; +import com.datastrato.gravitino.rel.Table; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.thrift.TException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; + +@Tag("gravitino-trino-it") +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class TrinoConnectorIT extends AbstractIT { + public static final Logger LOG = LoggerFactory.getLogger(TrinoConnectorIT.class); + private static final CloseableGroup closer = CloseableGroup.create(); + + // The subnet must match the configuration in `dev/docker/tools/mac-docker-connector.conf` + private static String CONTAINER_NETWORK_SUBNET = "192.168.0.0/24"; + private static String CONTAINER_NETWORK_GATEWAY = "192.168.0.1"; + private static String CONTAINER_NETWORK_IPRANGE = "192.168.0.100/28"; + + public static TrinoContainer trinoContainer; + + public static HiveContainer hiveContainer; + private static HiveClientPool hiveClientPool; + + public static String metalakeName = + GravitinoITUtils.genRandomName("TrinoIT_metalake").toLowerCase(); + public static String catalogName = + GravitinoITUtils.genRandomName("TrinoIT_catalog").toLowerCase(); + public static String databaseName = + GravitinoITUtils.genRandomName("TrinoIT_database").toLowerCase(); + public static String scenarioTab1Name = + GravitinoITUtils.genRandomName("TrinoIT_table1").toLowerCase(); + public static String scenarioTab2Name = + GravitinoITUtils.genRandomName("TrinoIT_table2").toLowerCase(); + public static String tab1Name = GravitinoITUtils.genRandomName("TrinoIT_table3").toLowerCase(); + private static GravitinoMetaLake metalake; + private static Catalog catalog; + + static final String TRINO_CONNECTOR_LIB_DIR = + System.getenv("GRAVITINO_ROOT_DIR") + "/trino-connector/build/libs"; + + @BeforeAll + public static void startDockerContainer() throws IOException, TException, InterruptedException { + String trinoConfDir = System.getenv("TRINO_CONF_DIR"); + + // Let containers assign addresses in a fixed subnet to avoid `mac-docker-connector` needing to + // refresh the configuration + com.github.dockerjava.api.model.Network.Ipam.Config ipamConfig = + new com.github.dockerjava.api.model.Network.Ipam.Config(); + ipamConfig + .withSubnet(CONTAINER_NETWORK_SUBNET) + .withGateway(CONTAINER_NETWORK_GATEWAY) + .withIpRange(CONTAINER_NETWORK_IPRANGE); + + Network network = + closer.register( + Network.builder() + .createNetworkCmdModifier( + cmd -> + cmd.withIpam( + new com.github.dockerjava.api.model.Network.Ipam() + .withConfig(ipamConfig))) + .build()); + + // Start Hive container + HiveContainer.Builder hiveBuilder = + HiveContainer.builder() + .withHostName("gravitino-ci-hive") + .withEnvVars( + ImmutableMap.builder().put("HADOOP_USER_NAME", "root").build()) + .withNetwork(network); + hiveContainer = closer.register(hiveBuilder.build()); + hiveContainer.start(); + + // Initial hive client + HiveConf hiveConf = new HiveConf(); + String hiveMetastoreUris = + String.format( + "thrift://%s:%d", + getPrimaryNICIp(), hiveContainer.getMappedPort(HiveContainer.HIVE_METASTORE_PORT)); + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveMetastoreUris); + hiveClientPool = closer.register(new HiveClientPool(1, hiveConf)); + + // Currently must first create metalake and catalog then start trino container + createMetalake(); + createCatalog(); + + // Configure Trino config file + updateTrinoConfigFile(); + + // Start Trino container + String hiveContainerIp = hiveContainer.getContainerIpAddress(); + TrinoContainer.Builder trinoBuilder = + TrinoContainer.builder() + .withEnvVars( + ImmutableMap.builder().put("HADOOP_USER_NAME", "root").build()) + .withNetwork(network) + .withExtraHosts( + ImmutableMap.builder() + .put(hiveContainerIp, hiveContainerIp) + .build()) + .withFilesToMount( + ImmutableMap.builder() + .put(TrinoContainer.TRINO_CONTAINER_CONF_DIR, trinoConfDir) + .put( + TrinoContainer.TRINO_CONTAINER_PLUGIN_GRAVITINO_DIR, + TRINO_CONNECTOR_LIB_DIR) + .build()) + .withExposePorts(ImmutableSet.of(TrinoContainer.TRINO_PORT)); + + trinoContainer = closer.register(trinoBuilder.build()); + trinoContainer.start(); + trinoContainer.checkSyncCatalogFromGravitino(5, metalakeName, catalogName); + + createSchema(); + } + + @AfterAll + public static void stopDockerContainer() { + try { + closer.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + public static void createSchema() throws TException, InterruptedException { + String sql1 = + String.format( + "CREATE SCHEMA \"%s.%s\".%s WITH (\n" + + " location = 'hdfs://%s:9000/user/hive/warehouse/%s.db'\n" + + ")", + metalakeName, + catalogName, + databaseName, + hiveContainer.getContainerIpAddress(), + databaseName); + trinoContainer.executeUpdateSQL(sql1); + NameIdentifier idSchema = NameIdentifier.of(metalakeName, catalogName, databaseName); + Schema schema = catalog.asSchemas().loadSchema(idSchema); + Assertions.assertEquals(schema.name(), databaseName); + Database database = hiveClientPool.run(client -> client.getDatabase(databaseName)); + Assertions.assertEquals(databaseName, database.getName()); + } + + @Test + @Order(1) + public void testShowSchemas() { + String sql = + String.format( + "SHOW SCHEMAS FROM \"%s.%s\" LIKE '%s'", metalakeName, catalogName, databaseName); + ArrayList> queryData = trinoContainer.executeQuerySQL(sql); + Assertions.assertEquals(queryData.get(0).get(0), databaseName); + } + + @Test + @Order(2) + public void testCreateTable() throws TException, InterruptedException { + String sql3 = + String.format( + "CREATE TABLE \"%s.%s\".%s.%s (\n" + + " col1 varchar,\n" + + " col2 varchar,\n" + + " col3 varchar\n" + + ")\n" + + "WITH (\n" + + " format = 'TEXTFILE'\n" + + ")", + metalakeName, catalogName, databaseName, tab1Name); + trinoContainer.executeUpdateSQL(sql3); + + // Verify in Gravitino Server + NameIdentifier idTable1 = NameIdentifier.of(metalakeName, catalogName, databaseName, tab1Name); + Table table1 = catalog.asTableCatalog().loadTable(idTable1); + Assertions.assertEquals(table1.name(), tab1Name); + + // Verify in Hive Server + org.apache.hadoop.hive.metastore.api.Table hiveTab1 = + hiveClientPool.run(client -> client.getTable(databaseName, tab1Name)); + Assertions.assertEquals(databaseName, hiveTab1.getDbName()); + Assertions.assertEquals(tab1Name, hiveTab1.getTableName()); + } + + @Order(3) + @Test + public void testShowTable() { + String sql = + String.format( + "SHOW TABLES FROM \"%s.%s\".%s LIKE '%s'", + metalakeName, catalogName, databaseName, tab1Name); + ArrayList> queryData = trinoContainer.executeQuerySQL(sql); + Assertions.assertEquals(queryData.get(0).get(0), tab1Name); + } + + @Test + @Order(4) + public void testScenarioTable1() throws TException, InterruptedException { + String sql3 = + String.format( + "CREATE TABLE \"%s.%s\".%s.%s (\n" + + " user_name varchar,\n" + + " gender varchar,\n" + + " age varchar,\n" + + " phone varchar,\n" + + " email varchar,\n" + + " address varchar,\n" + + " birthday varchar,\n" + + " create_time varchar,\n" + + " update_time varchar\n" + + ")\n" + + "WITH (\n" + + " format = 'TEXTFILE'\n" + + ")", + metalakeName, catalogName, databaseName, scenarioTab1Name); + trinoContainer.executeUpdateSQL(sql3); + + // Verify in Gravitino Server + NameIdentifier idTable1 = + NameIdentifier.of(metalakeName, catalogName, databaseName, scenarioTab1Name); + Table table1 = catalog.asTableCatalog().loadTable(idTable1); + Assertions.assertEquals(table1.name(), scenarioTab1Name); + + // Verify in Hive Server + org.apache.hadoop.hive.metastore.api.Table hiveTab1 = + hiveClientPool.run(client -> client.getTable(databaseName, scenarioTab1Name)); + Assertions.assertEquals(databaseName, hiveTab1.getDbName()); + Assertions.assertEquals(scenarioTab1Name, hiveTab1.getTableName()); + + // Insert data to table1 + ArrayList> table1Data = new ArrayList<>(); + table1Data.add(new ArrayList<>(Arrays.asList("jake", "male", "30", "+1 6125047154"))); + table1Data.add(new ArrayList<>(Arrays.asList("jeff", "male", "25", "+1 2673800457"))); + table1Data.add(new ArrayList<>(Arrays.asList("rose", "female", "28", "+1 7073958726"))); + table1Data.add(new ArrayList<>(Arrays.asList("sam", "man", "18", "+1 8157809623"))); + StringBuilder sql5 = + new StringBuilder( + String.format( + "INSERT INTO \"%s.%s\".%s.%s (user_name, gender, age, phone) VALUES", + metalakeName, catalogName, databaseName, scenarioTab1Name)); + int index = 0; + for (ArrayList record : table1Data) { + sql5.append( + String.format( + " ('%s', '%s', '%s', '%s'),", + record.get(0), record.get(1), record.get(2), record.get(3))); + if (index++ == table1Data.size() - 1) { + // Delete the last comma + sql5.deleteCharAt(sql5.length() - 1); + } + } + trinoContainer.executeUpdateSQL(sql5.toString()); + + // Select data from table1 and verify it + String sql6 = + String.format( + "SELECT user_name, gender, age, phone FROM \"%s.%s\".%s.%s ORDER BY user_name", + metalakeName, catalogName, databaseName, scenarioTab1Name); + ArrayList> table1QueryData = trinoContainer.executeQuerySQL(sql6); + Assertions.assertEquals(table1Data, table1QueryData); + } + + @Test + @Order(5) + public void testScenarioTable2() throws TException, InterruptedException { + String sql4 = + String.format( + "CREATE TABLE \"%s.%s\".%s.%s (\n" + + " user_name varchar,\n" + + " consumer varchar,\n" + + " recharge varchar,\n" + + " event_time varchar,\n" + + " create_time varchar,\n" + + " update_time varchar\n" + + ")\n" + + "WITH (\n" + + " format = 'TEXTFILE'\n" + + ")", + metalakeName, catalogName, databaseName, scenarioTab2Name); + trinoContainer.executeUpdateSQL(sql4); + + // Verify in Gravitino Server + NameIdentifier idTable2 = + NameIdentifier.of(metalakeName, catalogName, databaseName, scenarioTab2Name); + Table table2 = catalog.asTableCatalog().loadTable(idTable2); + Assertions.assertEquals(table2.name(), scenarioTab2Name); + + // Verify in Hive Server + org.apache.hadoop.hive.metastore.api.Table hiveTab2 = + hiveClientPool.run(client -> client.getTable(databaseName, scenarioTab2Name)); + Assertions.assertEquals(databaseName, hiveTab2.getDbName()); + Assertions.assertEquals(scenarioTab2Name, hiveTab2.getTableName()); + + // Insert data to table2 + ArrayList> table2Data = new ArrayList<>(); + table2Data.add(new ArrayList<>(Arrays.asList("jake", "", "$250", "22nd,July,2009"))); + table2Data.add(new ArrayList<>(Arrays.asList("jeff", "$40.25", "", "18nd,July,2023"))); + table2Data.add(new ArrayList<>(Arrays.asList("rose", "$28.45", "", "22nd,July,2023"))); + table2Data.add(new ArrayList<>(Arrays.asList("sam", "$27.03", "", "22nd,July,2023"))); + int index = 0; + StringBuilder sql7 = + new StringBuilder( + String.format( + "INSERT INTO \"%s.%s\".%s.%s (user_name, consumer, recharge, event_time) VALUES", + metalakeName, catalogName, databaseName, scenarioTab2Name)); + for (ArrayList record : table2Data) { + sql7.append( + String.format( + " ('%s', '%s', '%s', '%s'),", + record.get(0), record.get(1), record.get(2), record.get(3))); + if (index++ == table2Data.size() - 1) { + // Delete the last comma + sql7.deleteCharAt(sql7.length() - 1); + } + } + trinoContainer.executeUpdateSQL(sql7.toString()); + + // Select data from table1 and verify it + String sql8 = + String.format( + "SELECT user_name, consumer, recharge, event_time FROM \"%s.%s\".%s.%s ORDER BY user_name", + metalakeName, catalogName, databaseName, scenarioTab2Name); + ArrayList> table2QueryData = trinoContainer.executeQuerySQL(sql8); + Assertions.assertEquals(table2Data, table2QueryData); + } + + @Test + @Order(6) + public void testScenarioJoinTwoTable() { + String sql9 = + String.format( + "SELECT * FROM (SELECT t1.user_name as user_name, gender, age, phone, consumer, recharge, event_time FROM \"%1$s.%2$s\".%3$s.%4$s AS t1\n" + + "JOIN\n" + + " (SELECT user_name, consumer, recharge, event_time FROM \"%1$s.%2$s\".%3$s.%5$s) AS t2\n" + + " ON t1.user_name = t2.user_name) ORDER BY user_name", + metalakeName, catalogName, databaseName, scenarioTab1Name, scenarioTab2Name); + ArrayList> joinQueryData = trinoContainer.executeQuerySQL(sql9); + ArrayList> joinData = new ArrayList<>(); + joinData.add( + new ArrayList<>( + Arrays.asList("jake", "male", "30", "+1 6125047154", "", "$250", "22nd,July,2009"))); + joinData.add( + new ArrayList<>( + Arrays.asList("jeff", "male", "25", "+1 2673800457", "$40.25", "", "18nd,July,2023"))); + joinData.add( + new ArrayList<>( + Arrays.asList( + "rose", "female", "28", "+1 7073958726", "$28.45", "", "22nd,July,2023"))); + joinData.add( + new ArrayList<>( + Arrays.asList("sam", "man", "18", "+1 8157809623", "$27.03", "", "22nd,July,2023"))); + Assertions.assertEquals(joinData, joinQueryData); + } + + private static void createMetalake() { + GravitinoMetaLake[] gravitinoMetaLakes = client.listMetalakes(); + Assertions.assertEquals(0, gravitinoMetaLakes.length); + + GravitinoMetaLake createdMetalake = + client.createMetalake(NameIdentifier.of(metalakeName), "comment", Collections.emptyMap()); + GravitinoMetaLake loadMetalake = client.loadMetalake(NameIdentifier.of(metalakeName)); + Assertions.assertEquals(createdMetalake, loadMetalake); + + metalake = loadMetalake; + } + + private static void createCatalog() { + Map properties = Maps.newHashMap(); + String hiveMetastoreUris = + String.format( + "thrift://%s:%d", + hiveContainer.getContainerIpAddress(), HiveContainer.HIVE_METASTORE_PORT); + LOG.debug("hiveMetastoreUris is {}", hiveMetastoreUris); + properties.put(METASTORE_URIS, hiveMetastoreUris); + + Catalog createdCatalog = + metalake.createCatalog( + NameIdentifier.of(metalakeName, catalogName), + Catalog.Type.RELATIONAL, + "hive", + "comment", + properties); + Catalog loadCatalog = metalake.loadCatalog(NameIdentifier.of(metalakeName, catalogName)); + + catalog = loadCatalog; + } + + private static void updateTrinoConfigFile() throws IOException { + String trinoConfDir = System.getenv("TRINO_CONF_DIR"); + ITUtils.rewriteConfigFile( + trinoConfDir + "/catalog/gravitino.properties.template", + trinoConfDir + "/catalog/gravitino.properties", + ImmutableMap.builder() + .put( + TrinoContainer.TRINO_CONF_GRAVITINO_URI, + String.format( + "http://%s:%d", AbstractIT.getPrimaryNICIp(), getGravitinoServerPort())) + .put(TrinoContainer.TRINO_CONF_GRAVITINO_METALAKE, metalakeName) + .build()); + ITUtils.rewriteConfigFile( + trinoConfDir + "/catalog/hive.properties.template", + trinoConfDir + "/catalog/hive.properties", + ImmutableMap.of( + TrinoContainer.TRINO_CONF_HIVE_METASTORE_URI, + String.format( + "thrift://%s:%d", + hiveContainer.getContainerIpAddress(), HiveContainer.HIVE_METASTORE_PORT))); + } +} diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java index c77d152b845..d6ca6a838dd 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java @@ -15,9 +15,14 @@ import com.datastrato.gravitino.server.ServerConfig; import com.datastrato.gravitino.server.web.JettyServerConfig; import java.io.IOException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; @@ -40,6 +45,12 @@ public class AbstractIT { protected static Map customConfigs = new HashMap<>(); + public static int getGravitinoServerPort() { + JettyServerConfig jettyServerConfig = + JettyServerConfig.fromConfig(serverConfig, WEBSERVER_CONF_PREFIX); + return jettyServerConfig.getHttpPort(); + } + public static void registerCustomConfigs(Map configs) { customConfigs.putAll(configs); } @@ -106,4 +117,31 @@ public static void stopIntegrationTest() throws IOException, InterruptedExceptio } LOG.info("Tearing down Gravitino Server"); } + + // Get host IP from primary NIC + protected static String getPrimaryNICIp() { + String hostIP = "127.0.0.1"; + try { + NetworkInterface networkInterface = NetworkInterface.getByName("en0"); // macOS + if (networkInterface == null) { + networkInterface = NetworkInterface.getByName("eth0"); // Linux and Windows + } + if (networkInterface != null) { + Enumeration addresses = networkInterface.getInetAddresses(); + while (addresses.hasMoreElements()) { + InetAddress address = addresses.nextElement(); + if (!address.isLoopbackAddress() && address.getHostAddress().indexOf(':') == -1) { + hostIP = address.getHostAddress().replace("/", ""); // remove the first char '/' + break; + } + } + } else { + InetAddress ip = InetAddress.getLocalHost(); + hostIP = ip.getHostAddress(); + } + } catch (SocketException | UnknownHostException e) { + LOG.error(e.getMessage(), e); + } + return hostIP; + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/BaseContainer.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/BaseContainer.java new file mode 100644 index 00000000000..ec71bdc00b8 --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/BaseContainer.java @@ -0,0 +1,199 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.util; + +import static java.util.Objects.requireNonNull; +import static org.testcontainers.utility.MountableFile.forHostPath; + +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.InspectContainerResponse; +import com.github.dockerjava.api.model.ContainerNetwork; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; +import org.testcontainers.containers.wait.strategy.Wait; + +/** + * The BaseContainer is the base for all containers. It's contains the common methods and settings + * for all containers. You can extend this class to create your own container to integration test. + */ +public abstract class BaseContainer implements AutoCloseable { + public static final Logger LOG = LoggerFactory.getLogger(BaseContainer.class); + // Host name of the container + private final String hostName; + // Exposed ports of the container + private final Set ports; + // Files to mount in the container + private final Map filesToMount; + // environment variables of the container + private final Map envVars; + // Additional host and IP address mapping + private final Map extraHosts; + // Network of the container + private final Optional network; + + private final GenericContainer container; + + protected BaseContainer( + String image, + String hostName, + Set ports, + Map extraHosts, + Map filesToMount, + Map envVars, + Optional network) { + this.container = new GenericContainer<>(requireNonNull(image, "image is null")); + this.ports = requireNonNull(ports, "ports is null"); + this.hostName = requireNonNull(hostName, "hostName is null"); + this.extraHosts = extraHosts; + this.filesToMount = requireNonNull(filesToMount, "filesToMount is null"); + this.envVars = requireNonNull(envVars, "envVars is null"); + this.network = requireNonNull(network, "network is null"); + + setupContainer(); + } + + protected void setupContainer() { + // Add exposed ports in the container + for (int port : this.ports) { + container.addExposedPort(port); + } + // Add files to mount in the container + filesToMount.forEach( + (dockerPath, filePath) -> + container.withCopyFileToContainer(forHostPath(filePath), dockerPath)); + // Set environment variables + container.withEnv(envVars); + // Set up an additional host and IP address mapping through which the container + // can look up the corresponding IP address by host name. + // This method fixes an error that occurs when HDFS looks up hostnames from DNS. + extraHosts.forEach((hostName, ipAddress) -> container.withExtraHost(hostName, ipAddress)); + container + .withCreateContainerCmdModifier(c -> c.withHostName(hostName)) + .withStartupCheckStrategy(new IsRunningStartupCheckStrategy()) + .waitingFor(Wait.forListeningPort()) + .withStartupTimeout(Duration.ofMinutes(5)); + network.ifPresent(net -> container.withNetwork(net).withNetworkAliases(hostName)); + } + + // This method is used to set the log output of the container. + protected void withLogConsumer(Consumer logConsumer) { + container.withLogConsumer(logConsumer); + } + + // This method is used to get the expose port number of the container. + public Integer getMappedPort(int exposedPort) { + return container.getMappedPort(exposedPort); + } + + // This method is used to get the IP address of the container. + public String getContainerIpAddress() { + DockerClient dockerClient = DockerClientFactory.instance().client(); + InspectContainerResponse containerResponse = + dockerClient.inspectContainerCmd(container.getContainerId()).exec(); + + String ipAddress = containerResponse.getNetworkSettings().getIpAddress(); + Map containerNetworkMap = + containerResponse.getNetworkSettings().getNetworks(); + Preconditions.checkArgument( + (containerNetworkMap.size() == 1), + "Container \"NetworkMap\" size is required equals to 1."); + for (Map.Entry entry : containerNetworkMap.entrySet()) { + return entry.getValue().getIpAddress(); + } + + return ipAddress; + } + + public void start() { + container.start(); + } + + protected abstract boolean checkContainerStatus(int retryLimit); + + // Execute the command in the container. + public Container.ExecResult executeInContainer(String... commandAndArgs) { + try { + return container.execInContainer(commandAndArgs); + } catch (IOException | InterruptedException e) { + throw new RuntimeException( + "Exception while running command: " + String.join(" ", commandAndArgs), e); + } + } + + @Override + public void close() { + container.stop(); + } + + protected abstract static class Builder< + SELF extends Builder, CONTAINER extends BaseContainer> { + protected String image; + protected String hostName; + protected Set exposePorts = ImmutableSet.of(); + protected Map extraHosts = ImmutableMap.of(); + protected Map filesToMount = ImmutableMap.of(); + protected Map envVars = ImmutableMap.of(); + protected Optional network = Optional.empty(); + + protected SELF self; + + @SuppressWarnings("unchecked") + public Builder() { + this.self = (SELF) this; + } + + public SELF withImage(String image) { + this.image = image; + return self; + } + + public SELF withHostName(String hostName) { + this.hostName = hostName; + return self; + } + + public SELF withExposePorts(Set exposePorts) { + this.exposePorts = exposePorts; + return self; + } + + public SELF withExtraHosts(Map extraHosts) { + this.extraHosts = extraHosts; + return self; + } + + public SELF withFilesToMount(Map filesToMount) { + this.filesToMount = filesToMount; + return self; + } + + public SELF withEnvVars(Map envVars) { + this.envVars = envVars; + return self; + } + + public SELF withNetwork(Network network) { + this.network = Optional.of(network); + return self; + } + + public abstract CONTAINER build(); + } +} diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/GravitinoITUtils.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/GravitinoITUtils.java index 2ed54386ecc..d3698066653 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/GravitinoITUtils.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/GravitinoITUtils.java @@ -54,7 +54,7 @@ public static void sleep(long millis, boolean logOutput) { } public static String genRandomName(String prefix) { - return prefix + "_" + UUID.randomUUID().toString().replace("-", ""); + return prefix + "_" + UUID.randomUUID().toString().replace("-", "").substring(0, 8); } public static HiveConf hiveConfig() { diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/HiveContainer.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/HiveContainer.java new file mode 100644 index 00000000000..f464abdfba6 --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/HiveContainer.java @@ -0,0 +1,115 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.util; + +import static java.lang.String.format; + +import com.google.common.collect.ImmutableSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.rnorth.ducttape.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.Network; + +public class HiveContainer extends BaseContainer { + public static final Logger LOG = LoggerFactory.getLogger(HiveContainer.class); + + public static final String DEFAULT_IMAGE = "datastrato/gravitino-ci-hive:latest"; + public static final String HOST_NAME = "gravitino-ci-hive"; + private static final int MYSQL_PORT = 3306; + private static final int YARN_SERVICE_PORT = 8088; + private static final int HDFS_DEFAULTFS_PORT = 9000; + public static final int HIVE_METASTORE_PORT = 9083; + private static final int HIVESERVER2_PORT = 10000; + private static final int HIVESERVER2_HTTP_PORT = 10002; + private static final int HDFS_NAMENODE_PORT = 50070; + private static final int HDFS_DATANODE_HTTP_SERVER_PORT = 50075; + private static final int HDFS_DATANODE_DATA_TRANSFER_PORT = 50010; + + public static Builder builder() { + return new Builder(); + } + + protected HiveContainer( + String image, + String hostName, + Set ports, + Map extraHosts, + Map filesToMount, + Map envVars, + Optional network) { + super(image, hostName, ports, extraHosts, filesToMount, envVars, network); + } + + @Override + protected void setupContainer() { + super.setupContainer(); + withLogConsumer(new PrintingContainerLog(format("%-14s| ", "HiveContainer"))); + } + + @Override + public void start() { + super.start(); + Preconditions.check("Hive container startup failed!", checkContainerStatus(5)); + } + + @Override + protected boolean checkContainerStatus(int retryLimit) { + int nRetry = 0; + boolean isHiveContainerReady = false; + while (nRetry++ < retryLimit) { + try { + String[] commandAndArgs = new String[] {"bash", "/tmp/check-status.sh"}; + Container.ExecResult execResult = executeInContainer(commandAndArgs); + if (execResult.getExitCode() != 0) { + String message = + format( + "Command [%s] exited with %s", + String.join(" ", commandAndArgs), execResult.getExitCode()); + LOG.error("{}", message); + LOG.error("stderr: {}", execResult.getStderr()); + LOG.error("stdout: {}", execResult.getStdout()); + } else { + LOG.info("Hive container startup success!"); + isHiveContainerReady = true; + break; + } + Thread.sleep(5000); + } catch (RuntimeException e) { + LOG.error(e.getMessage(), e); + } catch (InterruptedException e) { + // ignore + } + } + return isHiveContainerReady; + } + + public static class Builder extends BaseContainer.Builder { + private Builder() { + this.image = DEFAULT_IMAGE; + this.hostName = HOST_NAME; + this.exposePorts = + ImmutableSet.of( + MYSQL_PORT, + YARN_SERVICE_PORT, + HDFS_DEFAULTFS_PORT, + HIVE_METASTORE_PORT, + HIVESERVER2_PORT, + HIVESERVER2_HTTP_PORT, + HDFS_NAMENODE_PORT, + HDFS_DATANODE_HTTP_SERVER_PORT, + HDFS_DATANODE_DATA_TRANSFER_PORT); + } + + @Override + public HiveContainer build() { + return new HiveContainer( + image, hostName, exposePorts, extraHosts, filesToMount, envVars, network); + } + } +} diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/PrintingContainerLog.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/PrintingContainerLog.java new file mode 100644 index 00000000000..d18914b5117 --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/PrintingContainerLog.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.util; + +import static java.util.Objects.requireNonNull; +import static org.testcontainers.containers.output.OutputFrame.OutputType.END; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.BaseConsumer; +import org.testcontainers.containers.output.OutputFrame; + +// Printing Container Log +final class PrintingContainerLog extends BaseConsumer { + public static final Logger LOG = LoggerFactory.getLogger(PrintingContainerLog.class); + private final String prefix; + + public PrintingContainerLog(String prefix) { + this.prefix = requireNonNull(prefix, "prefix is null"); + } + + @Override + public void accept(OutputFrame outputFrame) { + // remove new line characters + String message = outputFrame.getUtf8String().replaceAll("\\r?\\n?$", ""); + if (!message.isEmpty() || outputFrame.getType() != END) { + LOG.info("{}{}", prefix, message); + } + if (outputFrame.getType() == END) { + LOG.info("{}(exited)", prefix); + } + } +} diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/TrinoContainer.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/TrinoContainer.java new file mode 100644 index 00000000000..21fd9db367c --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/TrinoContainer.java @@ -0,0 +1,206 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.util; + +import static java.lang.String.format; + +import com.google.common.collect.ImmutableSet; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.rnorth.ducttape.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; + +public class TrinoContainer extends BaseContainer { + public static final Logger LOG = LoggerFactory.getLogger(TrinoContainer.class); + + public static final String DEFAULT_IMAGE = "datastrato/gravitino-ci-trino:latest"; + public static final String HOST_NAME = "gravitino-ci-trino"; + public static final int TRINO_PORT = 8080; + + static Connection trinoJdbcConnection = null; + + public static final String TRINO_CONF_GRAVITINO_URI = "gravitino.uri"; + public static final String TRINO_CONF_GRAVITINO_METALAKE = "gravitino.metalake"; + public static final String TRINO_CONF_HIVE_METASTORE_URI = "hive.metastore.uri"; + public static final String TRINO_CONTAINER_CONF_DIR = "/etc/trino"; + public static final String TRINO_CONTAINER_PLUGIN_GRAVITINO_DIR = + "/usr/lib/trino/plugin/gravitino"; + + public static Builder builder() { + return new Builder(); + } + + protected TrinoContainer( + String image, + String hostName, + Set ports, + Map extraHosts, + Map filesToMount, + Map envVars, + Optional network) { + super(image, hostName, ports, extraHosts, filesToMount, envVars, network); + } + + @Override + protected void setupContainer() { + super.setupContainer(); + withLogConsumer(new PrintingContainerLog(format("%-15s| ", "TrinoContainer"))); + } + + @Override + public void start() { + super.start(); + + Preconditions.check("Initialization Trino JDBC connect failed!", initTrinoJdbcConnection()); + Preconditions.check("Trino container startup failed!", checkContainerStatus(5)); + } + + @Override + protected boolean checkContainerStatus(int retryLimit) { + int nRetry = 0; + boolean isTrinoJdbcConnectionReady = false; + int sleepTime = 5000; + while (nRetry++ < retryLimit && !isTrinoJdbcConnectionReady) { + isTrinoJdbcConnectionReady = testTrinoJdbcConnection(); + if (isTrinoJdbcConnectionReady) { + break; + } else { + try { + Thread.sleep(sleepTime); + LOG.warn("Waiting for trino server to be ready... ({}ms)", nRetry * sleepTime); + } catch (InterruptedException e) { + // ignore + } + } + } + return isTrinoJdbcConnectionReady; + } + + @Override + public void close() { + if (trinoJdbcConnection != null) { + try { + trinoJdbcConnection.close(); + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + } + + super.close(); + } + + public boolean initTrinoJdbcConnection() { + final String dbUrl = + String.format("jdbc:trino://127.0.0.1:%d", getMappedPort(TrinoContainer.TRINO_PORT)); + + try { + trinoJdbcConnection = DriverManager.getConnection(dbUrl, "admin", ""); + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + return false; + } + return true; + } + + // Check tha Trino has synchronized the catalog from Gravitino + public boolean checkSyncCatalogFromGravitino( + int retryLimit, String metalakeName, String catalogName) { + boolean catalogFoundInTrino = false; + int nRetry = 0; + int sleepTime = 5000; + while (!catalogFoundInTrino && nRetry++ < retryLimit) { + ArrayList> queryData = + executeQuerySQL(format("SHOW CATALOGS LIKE '%s.%s'", metalakeName, catalogName)); + for (ArrayList record : queryData) { + String columnValue = record.get(0); + if (columnValue.equals(String.format("%s.%s", metalakeName, catalogName))) { + catalogFoundInTrino = true; + break; + } + } + try { + Thread.sleep(sleepTime); + LOG.warn( + "Waiting for Trino synchronized the catalog from Gravitino... ({}ms)", + nRetry * sleepTime); + } catch (InterruptedException e) { + // ignore + } + } + return catalogFoundInTrino; + } + + private boolean testTrinoJdbcConnection() { + try (Statement stmt = trinoJdbcConnection.createStatement(); + ResultSet rs = stmt.executeQuery("select 1")) { + while (rs.next()) { + int one = rs.getInt(1); + Preconditions.check("Trino JDBC connection test failed!", one == 1); + } + } catch (SQLException e) { + // Maybe Trino server is still initialing + LOG.warn(e.getMessage(), e); + return false; + } + + return true; + } + + public ArrayList> executeQuerySQL(String sql) { + LOG.info("executeQuerySQL: {}", sql); + ArrayList> queryData = new ArrayList<>(); + try (Statement stmt = trinoJdbcConnection.createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + ResultSetMetaData metaData = rs.getMetaData(); + int columnCount = metaData.getColumnCount(); + + while (rs.next()) { + ArrayList record = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + String columnValue = rs.getString(i); + record.add(columnValue); + } + queryData.add(record); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + return queryData; + } + + public void executeUpdateSQL(String sql) { + LOG.info("executeUpdateSQL: {}", sql); + try (Statement stmt = trinoJdbcConnection.createStatement()) { + stmt.executeUpdate(sql); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static class Builder + extends BaseContainer.Builder { + private Builder() { + this.image = DEFAULT_IMAGE; + this.hostName = HOST_NAME; + this.exposePorts = ImmutableSet.of(); + } + + @Override + public TrinoContainer build() { + return new TrinoContainer( + image, hostName, exposePorts, extraHosts, filesToMount, envVars, network); + } + } +} diff --git a/integration-test/src/test/resources/log4j2.properties b/integration-test/src/test/resources/log4j2.properties index 0a7968eafc9..2d97f307db5 100644 --- a/integration-test/src/test/resources/log4j2.properties +++ b/integration-test/src/test/resources/log4j2.properties @@ -16,12 +16,12 @@ appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n # Log files location -property.basePath = ${sys:gravitino.log.path} +property.logPath = ${sys:gravitino.log.path} # File appender configuration appender.file.type = File appender.file.name = fileLogger -appender.file.fileName = ${basePath}/integration-test.log +appender.file.fileName = ${logPath} appender.file.layout.type = PatternLayout appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n