From 8723ed7f505331a4fd2e177743ff4f61a0fd740d Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Wed, 6 Mar 2024 06:12:39 +0100 Subject: [PATCH 01/24] fmt members --- Cargo.toml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index dccc6bdf1..22bc8f90f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,12 @@ [workspace] resolver = "2" -members = ["crates/catalog/*", "crates/examples", "crates/iceberg", "crates/test_utils"] +members = [ + "crates/catalog/*", + "crates/examples", + "crates/iceberg", + "crates/test_utils", +] [workspace.package] version = "0.2.0" From 342b6862cdb1ee1acd5eab354ccb882d6d0a8c81 Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Wed, 6 Mar 2024 06:43:08 +0100 Subject: [PATCH 02/24] setup basic test-infra for hms-catalog --- crates/catalog/hms/Cargo.toml | 6 ++ .../hms/testdata/hms_catalog/Dockerfile | 36 +++++++++ .../testdata/hms_catalog/docker-compose.yaml | 48 ++++++++++++ .../hms/testdata/hms_catalog/entrypoint.sh | 15 ++++ .../hms/testdata/hms_catalog/hive-site.xml | 51 ++++++++++++ crates/catalog/hms/tests/hms_catalog_test.rs | 78 +++++++++++++++++++ 6 files changed, 234 insertions(+) create mode 100644 crates/catalog/hms/testdata/hms_catalog/Dockerfile create mode 100644 crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml create mode 100755 crates/catalog/hms/testdata/hms_catalog/entrypoint.sh create mode 100644 crates/catalog/hms/testdata/hms_catalog/hive-site.xml create mode 100644 crates/catalog/hms/tests/hms_catalog_test.rs diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index f44125c5a..22b399a78 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -35,3 +35,9 @@ hive_metastore = { workspace = true } iceberg = { workspace = true } typed-builder = { workspace = true } volo-thrift = { workspace = true } +log = "0.4.20" + +[dev-dependencies] +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +port_scanner = { workspace = true } +tokio = { workspace = true } diff --git a/crates/catalog/hms/testdata/hms_catalog/Dockerfile b/crates/catalog/hms/testdata/hms_catalog/Dockerfile new file mode 100644 index 000000000..7e0f8a75e --- /dev/null +++ b/crates/catalog/hms/testdata/hms_catalog/Dockerfile @@ -0,0 +1,36 @@ +FROM openjdk:8u342-jre + +ENV HADOOP_VERSION=3.3.5 +ENV HADOOP_HOME=/opt/hadoop-${HADOOP_VERSION} +ENV PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin + +ENV HIVE_VERSION=3.1.3 +ENV HIVE_HOME=/opt/apache-hive-${HIVE_VERSION}-bin +ENV PATH=$HIVE_HOME/bin:$PATH + +# Set classpath for S3 Access +ENV HADOOP_CLASSPATH=${HADOOP_HOME}/share/hadoop/tools/lib/aws-java-sdk-bundle-1.12.316.jar:${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-aws-3.3.5.jar + +WORKDIR /opt + +RUN apt-get update && apt-get install -y procps fastjar + +RUN wget https://downloads.apache.org/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz && \ + tar -xzf hadoop-${HADOOP_VERSION}.tar.gz && \ + rm hadoop-${HADOOP_VERSION}.tar.gz + +RUN wget https://downloads.apache.org/hive/hive-${HIVE_VERSION}/apache-hive-${HIVE_VERSION}-bin.tar.gz && \ + tar -xzf apache-hive-${HIVE_VERSION}-bin.tar.gz && \ + rm apache-hive-${HIVE_VERSION}-bin.tar.gz + +RUN cd ${HIVE_HOME}/lib && \ + wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar + +COPY ./hive-site.xml ${HIVE_HOME}/conf/hive-site.xml +COPY ./entrypoint.sh /entrypoint.sh + +RUN chmod +x /entrypoint.sh + +EXPOSE 9083 + +ENTRYPOINT ["sh", "-c", "/entrypoint.sh"] \ No newline at end of file diff --git a/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml new file mode 100644 index 000000000..28f84c1a7 --- /dev/null +++ b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3.8' + +services: + minio: + image: minio/minio + ports: + - "9000:9000" + - "9001:9001" + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + command: [ "server", "/data", "--console-address", ":9001" ] + + hive-mysql: + image: mysql:5.7 + ports: + - "3306:3306" + environment: + - MYSQL_ROOT_PASSWORD=admin + - MYSQL_DATABASE=metastore + - MYSQL_USER=hive + - MYSQL_PASSWORD=hive + + hive-metastore: + image: iceberg-hms + build: ./ + depends_on: + - hive-mysql + ports: + - "9083:9083" diff --git a/crates/catalog/hms/testdata/hms_catalog/entrypoint.sh b/crates/catalog/hms/testdata/hms_catalog/entrypoint.sh new file mode 100755 index 000000000..6235dd2aa --- /dev/null +++ b/crates/catalog/hms/testdata/hms_catalog/entrypoint.sh @@ -0,0 +1,15 @@ +#!/bin/sh + +HIVE_VERSION=3.1.3 +HIVE_HOME=/opt/apache-hive-${HIVE_VERSION}-bin + +# Check if schema exists +${HIVE_HOME}/bin/schematool -dbType mysql -info + +if [ $? -eq 1 ]; then + echo "Getting schema info failed. Probably not initialized. Initializing...in 5s" + sleep 5 + ${HIVE_HOME}/bin/schematool -initSchema -dbType mysql +fi + +${HIVE_HOME}/bin/hive --service metastore diff --git a/crates/catalog/hms/testdata/hms_catalog/hive-site.xml b/crates/catalog/hms/testdata/hms_catalog/hive-site.xml new file mode 100644 index 000000000..cb72ee830 --- /dev/null +++ b/crates/catalog/hms/testdata/hms_catalog/hive-site.xml @@ -0,0 +1,51 @@ + + + metastore.thrift.uris + thrift://localhost:9083 + Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. + + + metastore.task.threads.always + org.apache.hadoop.hive.metastore.events.EventCleanerTask + + + metastore.expression.proxy + org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy + + + javax.jdo.option.ConnectionDriverName + com.mysql.cj.jdbc.Driver + + + javax.jdo.option.ConnectionURL + jdbc:mysql://hive-mysql:3306/metastore + + + javax.jdo.option.ConnectionUserName + hive + + + javax.jdo.option.ConnectionPassword + hive + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3a.access.key + admin + + + fs.s3a.secret.key + password + + + fs.s3a.endpoint + http://minio:9000 + + + fs.s3a.path.style.access + true + + diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs new file mode 100644 index 000000000..af56c4675 --- /dev/null +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for hms catalog. + +use iceberg::{Catalog, NamespaceIdent}; +use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport}; +use iceberg_test_utils::docker::DockerCompose; +use iceberg_test_utils::{normalize_test_name, set_up}; +use port_scanner::scan_port_addr; +use tokio::time::sleep; + +const HMS_CATALOG_PORT: u16 = 9083; + +struct TestFixture { + _docker_compose: DockerCompose, + hms_catalog: HmsCatalog, +} + +async fn set_test_fixture(func: &str) -> TestFixture { + set_up(); + + let docker_compose = DockerCompose::new( + normalize_test_name(format!("{}_{func}", module_path!())), + format!("{}/testdata/hms_catalog", env!("CARGO_MANIFEST_DIR")), + ); + + docker_compose.run(); + + let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore"); + + let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT); + loop { + if !scan_port_addr(&read_port) { + log::info!("Waiting for 1s hms catalog to ready..."); + sleep(std::time::Duration::from_millis(1000)).await; + } else { + break; + } + } + + let config = HmsCatalogConfig::builder() + .address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT)) + .thrift_transport(HmsThriftTransport::Buffered) + .build(); + + let hms_catalog = HmsCatalog::new(config).unwrap(); + + TestFixture { + _docker_compose: docker_compose, + hms_catalog, + } +} + +#[tokio::test] +async fn test_list_namespace() { + let fixture = set_test_fixture("test_list_namespace").await; + + let expected = vec![NamespaceIdent::from_strs(["default"]).unwrap()]; + + let result = fixture.hms_catalog.list_namespaces(None).await.unwrap(); + + assert_eq!(expected, result) +} From d8ccce1dda638901986990aa755b44b18f760a7b Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Thu, 7 Mar 2024 15:01:47 +0100 Subject: [PATCH 03/24] add license --- .../hms/testdata/hms_catalog/Dockerfile | 17 +++++++++++++++++ .../hms/testdata/hms_catalog/entrypoint.sh | 17 +++++++++++++++++ .../hms/testdata/hms_catalog/hive-site.xml | 19 +++++++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/crates/catalog/hms/testdata/hms_catalog/Dockerfile b/crates/catalog/hms/testdata/hms_catalog/Dockerfile index 7e0f8a75e..7c1f86266 100644 --- a/crates/catalog/hms/testdata/hms_catalog/Dockerfile +++ b/crates/catalog/hms/testdata/hms_catalog/Dockerfile @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + FROM openjdk:8u342-jre ENV HADOOP_VERSION=3.3.5 diff --git a/crates/catalog/hms/testdata/hms_catalog/entrypoint.sh b/crates/catalog/hms/testdata/hms_catalog/entrypoint.sh index 6235dd2aa..f73863781 100755 --- a/crates/catalog/hms/testdata/hms_catalog/entrypoint.sh +++ b/crates/catalog/hms/testdata/hms_catalog/entrypoint.sh @@ -1,5 +1,22 @@ #!/bin/sh +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + HIVE_VERSION=3.1.3 HIVE_HOME=/opt/apache-hive-${HIVE_VERSION}-bin diff --git a/crates/catalog/hms/testdata/hms_catalog/hive-site.xml b/crates/catalog/hms/testdata/hms_catalog/hive-site.xml index cb72ee830..c2df65cdd 100644 --- a/crates/catalog/hms/testdata/hms_catalog/hive-site.xml +++ b/crates/catalog/hms/testdata/hms_catalog/hive-site.xml @@ -1,3 +1,22 @@ + + metastore.thrift.uris From 910d848dcc7c3f46e89fc8d24853f65bfb73bddf Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Thu, 7 Mar 2024 19:57:21 +0100 Subject: [PATCH 04/24] add hms create_namespace --- crates/catalog/hms/Cargo.toml | 1 + crates/catalog/hms/src/catalog.rs | 29 ++- crates/catalog/hms/src/utils.rs | 207 ++++++++++++++++++- crates/catalog/hms/tests/hms_catalog_test.rs | 49 ++++- 4 files changed, 277 insertions(+), 9 deletions(-) diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index 22b399a78..68b3aed54 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -36,6 +36,7 @@ iceberg = { workspace = true } typed-builder = { workspace = true } volo-thrift = { workspace = true } log = "0.4.20" +pilota = "0.10.0" [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index a00aaf337..bed3c7045 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -125,12 +125,35 @@ impl Catalog for HmsCatalog { .collect()) } + /// Creates a new namespace with the given identifier and properties. + /// + /// Attempts to create a namespace defined by the `namespace` + /// parameter and configured with the specified `properties`. + /// + /// This function can return an error in the following situations: + /// + /// - If `hive.metastore.database.owner-type` is specified without + /// `hive.metastore.database.owner`, + /// - Errors from `validate_namespace` if the namespace identifier does not + /// meet validation criteria. + /// - Errors from `convert_to_database` if the properties cannot be + /// successfully converted into a database configuration. + /// - Errors from the underlying database creation process, converted using + /// `from_thrift_error`. async fn create_namespace( &self, - _namespace: &NamespaceIdent, - _properties: HashMap, + namespace: &NamespaceIdent, + properties: HashMap, ) -> Result { - todo!() + let database = convert_to_database(namespace, &properties)?; + + self.client + .0 + .create_database(database) + .await + .map_err(from_thrift_error)?; + + Ok(Namespace::new(namespace.clone())) } async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result { diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 1b0ff33df..c8b180436 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -16,10 +16,16 @@ // under the License. use anyhow::anyhow; -use iceberg::{Error, ErrorKind}; +use hive_metastore::{Database, PrincipalType}; +use iceberg::{Error, ErrorKind, NamespaceIdent, Result}; +use pilota::{AHashMap, FastStr}; +use std::collections::HashMap; use std::fmt::Debug; use std::io; +pub const HMS_DB_OWNER: &str = "hive.metastore.database.owner"; +pub const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type"; + /// Format a thrift error into iceberg error. pub fn from_thrift_error(error: volo_thrift::error::ResponseError) -> Error where @@ -40,3 +46,202 @@ pub fn from_io_error(error: io::Error) -> Error { ) .with_source(error) } + +/// Converts name and properties into `hive_metastore::hms::Database` +/// after validating the `namespace` and `owner-settings`. +pub fn convert_to_database( + namespace: &NamespaceIdent, + properties: &HashMap, +) -> Result { + let name = validate_namespace(namespace)?; + validate_owner_settings(properties)?; + + let mut db = Database::default(); + let mut parameters = AHashMap::new(); + + db.name = Some(name.into()); + + for (k, v) in properties { + match k.as_str() { + "comment" => db.description = Some(v.clone().into()), + "location" => db.location_uri = Some(format_location_uri(v.clone()).into()), + HMS_DB_OWNER => db.owner_name = Some(v.clone().into()), + HMS_DB_OWNER_TYPE => { + let owner_type = match v.to_lowercase().as_str() { + "user" => PrincipalType::User, + "group" => PrincipalType::Group, + "role" => PrincipalType::Role, + _ => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid value for setting 'owner_type': {}", v), + )) + } + }; + db.owner_type = Some(owner_type); + } + _ => { + parameters.insert( + FastStr::from_string(k.clone()), + FastStr::from_string(v.clone()), + ); + } + } + } + + db.parameters = Some(parameters); + + // Set default user, if none provided + // https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44 + if db.owner_name.is_none() { + db.owner_name = Some("user.name".into()); + db.owner_type = Some(PrincipalType::User); + } + + Ok(db) +} + +/// Checks if provided `NamespaceIdent` is valid. +fn validate_namespace(namespace: &NamespaceIdent) -> Result { + let name = namespace.as_ref(); + + if name.len() != 1 { + return Err(Error::new( + ErrorKind::DataInvalid, + "Invalid database, hierarchical namespaces are not supported", + )); + } + + let name = name[0].clone(); + + if name.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot create namespace with empty name", + )); + } + + Ok(name) +} + +/// Formats location_uri by e.g. removing trailing slashes. +fn format_location_uri(location: String) -> String { + let mut location = location; + + if !location.starts_with('/') { + location = format!("/{}", location); + } + + if location.ends_with('/') && location.len() > 1 { + location.pop(); + } + + location +} + +/// Checks if `owner-settings` are valid. +/// If `owner_type` is set, then `owner` must also be set. +fn validate_owner_settings(properties: &HashMap) -> Result<()> { + let owner_is_set = properties.get(HMS_DB_OWNER).is_some(); + let owner_type_is_set = properties.get(HMS_DB_OWNER_TYPE).is_some(); + + if owner_type_is_set && !owner_is_set { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Setting '{}' without setting '{}' is not allowed", + HMS_DB_OWNER_TYPE, HMS_DB_OWNER + ), + )); + } + + Ok(()) +} +#[cfg(test)] +mod tests { + use iceberg::{Namespace, NamespaceIdent}; + + use super::*; + + #[test] + fn test_validate_owner_settings() { + let valid = HashMap::from([ + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), + ]); + let invalid = HashMap::from([(HMS_DB_OWNER_TYPE.to_string(), "user".to_string())]); + + assert!(validate_owner_settings(&valid).is_ok()); + assert!(validate_owner_settings(&invalid).is_err()); + } + + #[test] + fn test_convert_to_database() -> Result<()> { + let ns = NamespaceIdent::new("my_namespace".into()); + let properties = HashMap::from([ + ("comment".to_string(), "my_description".to_string()), + ("location".to_string(), "my_location".to_string()), + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), + ("key1".to_string(), "value1".to_string()), + ]); + + let db = convert_to_database(&ns, &properties)?; + + assert_eq!(db.name, Some(FastStr::from("my_namespace"))); + assert_eq!(db.description, Some(FastStr::from("my_description"))); + assert_eq!(db.owner_name, Some(FastStr::from("apache"))); + assert_eq!(db.owner_type, Some(PrincipalType::User)); + + if let Some(params) = db.parameters { + assert_eq!(params.get("key1".into()), Some(&FastStr::from("value1"))); + } + + Ok(()) + } + + #[test] + fn test_convert_to_database_with_default_user() -> Result<()> { + let ns = NamespaceIdent::new("my_namespace".into()); + let properties = HashMap::new(); + + let db = convert_to_database(&ns, &properties)?; + + assert_eq!(db.name, Some(FastStr::from("my_namespace"))); + assert_eq!(db.owner_name, Some(FastStr::from("user.name"))); + assert_eq!(db.owner_type, Some(PrincipalType::User)); + + Ok(()) + } + + #[test] + fn test_validate_namespace() { + let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string())); + let empty_ns = Namespace::new(NamespaceIdent::new("".to_string())); + let hierarchical_ns = Namespace::new( + NamespaceIdent::from_vec(vec!["level1".to_string(), "level2".to_string()]).unwrap(), + ); + + let valid = validate_namespace(valid_ns.name()); + let empty = validate_namespace(empty_ns.name()); + let hierarchical = validate_namespace(hierarchical_ns.name()); + + assert!(valid.is_ok()); + assert!(empty.is_err()); + assert!(hierarchical.is_err()); + } + + #[test] + fn test_format_location_uri() { + let inputs = vec!["iceberg", "is/", "/nice/", "really/nice/", "/"]; + let outputs = vec!["/iceberg", "/is", "/nice", "/really/nice", "/"]; + + inputs + .into_iter() + .zip(outputs.into_iter()) + .for_each(|(inp, out)| { + let location = format_location_uri(inp.to_string()); + assert_eq!(location, out); + }) + } +} diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index af56c4675..4c4cfa918 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -17,7 +17,9 @@ //! Integration tests for hms catalog. -use iceberg::{Catalog, NamespaceIdent}; +use std::collections::HashMap; + +use iceberg::{Catalog, Namespace, NamespaceIdent}; use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport}; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; @@ -25,6 +27,7 @@ use port_scanner::scan_port_addr; use tokio::time::sleep; const HMS_CATALOG_PORT: u16 = 9083; +type Result = std::result::Result; struct TestFixture { _docker_compose: DockerCompose, @@ -67,12 +70,48 @@ async fn set_test_fixture(func: &str) -> TestFixture { } #[tokio::test] -async fn test_list_namespace() { +async fn test_list_namespace() -> Result<()> { let fixture = set_test_fixture("test_list_namespace").await; - let expected = vec![NamespaceIdent::from_strs(["default"]).unwrap()]; + let expected_no_parent = vec![NamespaceIdent::new("default".into())]; + let result_no_parent = fixture.hms_catalog.list_namespaces(None).await?; + + let result_with_parent = fixture + .hms_catalog + .list_namespaces(Some(&NamespaceIdent::new("parent".into()))) + .await?; - let result = fixture.hms_catalog.list_namespaces(None).await.unwrap(); + assert_eq!(expected_no_parent, result_no_parent); + assert!(result_with_parent.len() == 0); - assert_eq!(expected, result) + Ok(()) +} + +#[tokio::test] +async fn test_create_namespace() -> Result<()> { + let fixture = set_test_fixture("test_create_namespace").await; + + let ns = Namespace::new(NamespaceIdent::new("my_namespace".to_string())); + let properties = HashMap::from([ + ("comment".to_string(), "my_description".to_string()), + ("location".to_string(), "my_location".to_string()), + ( + "hive.metastore.database.owner".to_string(), + "apache".to_string(), + ), + ( + "hive.metastore.database.owner-type".to_string(), + "user".to_string(), + ), + ("key1".to_string(), "value1".to_string()), + ]); + + let result = fixture + .hms_catalog + .create_namespace(ns.name(), properties) + .await?; + + assert_eq!(result, ns); + + Ok(()) } From 315ddde71b65cd0866be581a3ccb4335a71a34b1 Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Fri, 8 Mar 2024 06:01:51 +0100 Subject: [PATCH 05/24] add hms get_namespace --- crates/catalog/hms/src/catalog.rs | 26 ++++++++- crates/catalog/hms/src/utils.rs | 59 +++++++++++++++++++- crates/catalog/hms/tests/hms_catalog_test.rs | 32 ++++++++++- 3 files changed, 112 insertions(+), 5 deletions(-) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index bed3c7045..56fdc6a23 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -156,8 +156,30 @@ impl Catalog for HmsCatalog { Ok(Namespace::new(namespace.clone())) } - async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result { - todo!() + /// Retrieves a namespace by its identifier. + /// + /// Validates the given namespace identifier and then queries the + /// underlying database client to fetch the corresponding namespace data. + /// Constructs a `Namespace` object with the retrieved data and returns it. + /// + /// This function can return an error in any of the following situations: + /// - If the provided namespace identifier fails validation checks + /// - If there is an error querying the database, returned by + /// `from_thrift_error`. + async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { + let name = validate_namespace(namespace)?; + + let db = self + .client + .0 + .get_database(name.clone().into()) + .await + .map_err(from_thrift_error)?; + + let properties = properties_from_database(&db); + let ns = Namespace::with_properties(NamespaceIdent::new(name.into()), properties); + + Ok(ns) } async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result { diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index c8b180436..8d55e6eae 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -47,6 +47,41 @@ pub fn from_io_error(error: io::Error) -> Error { .with_source(error) } +/// Create and extract properties from `hive_metastore::hms::Database`. +pub fn properties_from_database(database: &Database) -> HashMap { + let mut properties = HashMap::new(); + + if let Some(description) = &database.description { + properties.insert("comment".to_string(), description.to_string()); + }; + + if let Some(location) = &database.location_uri { + properties.insert("location".to_string(), location.to_string()); + }; + + if let Some(owner) = &database.owner_name { + properties.insert(HMS_DB_OWNER.to_string(), owner.to_string()); + }; + + if let Some(owner_type) = &database.owner_type { + let value = match owner_type { + PrincipalType::User => "User", + PrincipalType::Group => "Group", + PrincipalType::Role => "Role", + }; + + properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string()); + }; + + if let Some(params) = &database.parameters { + params.iter().for_each(|(k, v)| { + properties.insert(k.clone().into(), v.clone().into()); + }); + }; + + properties +} + /// Converts name and properties into `hive_metastore::hms::Database` /// after validating the `namespace` and `owner-settings`. pub fn convert_to_database( @@ -102,7 +137,7 @@ pub fn convert_to_database( } /// Checks if provided `NamespaceIdent` is valid. -fn validate_namespace(namespace: &NamespaceIdent) -> Result { +pub fn validate_namespace(namespace: &NamespaceIdent) -> Result { let name = namespace.as_ref(); if name.len() != 1 { @@ -117,7 +152,7 @@ fn validate_namespace(namespace: &NamespaceIdent) -> Result { if name.is_empty() { return Err(Error::new( ErrorKind::DataInvalid, - "Cannot create namespace with empty name", + "Invalid database, provided namespace is empty.", )); } @@ -163,6 +198,26 @@ mod tests { use super::*; + #[test] + fn test_properties_from_database() -> Result<()> { + let ns = NamespaceIdent::new("my_namespace".into()); + let properties = HashMap::from([ + ("comment".to_string(), "my_description".to_string()), + ("location".to_string(), "/my_location".to_string()), + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "User".to_string()), + ("key1".to_string(), "value1".to_string()), + ]); + + let db = convert_to_database(&ns, &properties)?; + + let expected = properties_from_database(&db); + + assert_eq!(expected, properties); + + Ok(()) + } + #[test] fn test_validate_owner_settings() { let valid = HashMap::from([ diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 4c4cfa918..6c0f205f2 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -91,7 +91,7 @@ async fn test_list_namespace() -> Result<()> { async fn test_create_namespace() -> Result<()> { let fixture = set_test_fixture("test_create_namespace").await; - let ns = Namespace::new(NamespaceIdent::new("my_namespace".to_string())); + let ns = Namespace::new(NamespaceIdent::new("my_namespace".into())); let properties = HashMap::from([ ("comment".to_string(), "my_description".to_string()), ("location".to_string(), "my_location".to_string()), @@ -115,3 +115,33 @@ async fn test_create_namespace() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_get_namespace() -> Result<()> { + let fixture = set_test_fixture("test_get_namespace").await; + + let ns = Namespace::new(NamespaceIdent::new("default".into())); + let properties = HashMap::from([ + ( + "location".to_string(), + "file:/user/hive/warehouse".to_string(), + ), + ( + "hive.metastore.database.owner-type".to_string(), + "Role".to_string(), + ), + ("comment".to_string(), "Default Hive database".to_string()), + ( + "hive.metastore.database.owner".to_string(), + "public".to_string(), + ), + ]); + + let expected = Namespace::with_properties(NamespaceIdent::new("default".into()), properties); + + let result = fixture.hms_catalog.get_namespace(ns.name()).await?; + + assert_eq!(expected, result); + + Ok(()) +} From 05886bd6f865a5305a81b2c039d17c0c6eb68a7e Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Fri, 8 Mar 2024 06:21:10 +0100 Subject: [PATCH 06/24] fix: typo --- crates/iceberg/src/catalog/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index bd579d598..708e6bf3c 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -50,7 +50,7 @@ pub trait Catalog: Debug + Sync + Send { async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result; /// Check if namespace exists in catalog. - async fn namespace_exists(&self, namesace: &NamespaceIdent) -> Result; + async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result; /// Update a namespace inside the catalog. /// From ada8193380d832d1446945e7f0b780a83359d9a6 Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Fri, 8 Mar 2024 11:29:34 +0100 Subject: [PATCH 07/24] add hms namespace_exists and drop_namespace --- crates/catalog/hms/src/catalog.rs | 54 ++++++++++++++++++-- crates/catalog/hms/tests/hms_catalog_test.rs | 44 ++++++++++++++++ 2 files changed, 93 insertions(+), 5 deletions(-) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 56fdc6a23..f6031d2c2 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -19,6 +19,7 @@ use super::utils::*; use async_trait::async_trait; use hive_metastore::ThriftHiveMetastoreClient; use hive_metastore::ThriftHiveMetastoreClientBuilder; +use hive_metastore::ThriftHiveMetastoreGetDatabaseException; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, @@ -28,6 +29,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::net::ToSocketAddrs; use typed_builder::TypedBuilder; +use volo_thrift::ResponseError; /// Which variant of the thrift transport to communicate with HMS /// See: @@ -97,7 +99,6 @@ impl HmsCatalog { } } -/// Refer to for implementation details. #[async_trait] impl Catalog for HmsCatalog { /// HMS doesn't support nested namespaces. @@ -182,8 +183,36 @@ impl Catalog for HmsCatalog { Ok(ns) } - async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result { - todo!() + /// Checks if a namespace exists within the Hive Metastore. + /// + /// Validates the namespace identifier by querying the Hive Metastore + /// to determine if the specified namespace (database) exists. + /// + /// # Returns + /// A `Result` indicating the outcome of the check: + /// - `Ok(true)` if the namespace exists. + /// - `Ok(false)` if the namespace does not exist, identified by a specific + /// `UserException` variant. + /// - `Err(...)` if an error occurs during validation or the Hive Metastore + /// query, with the error encapsulating the issue. + async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { + let name = validate_namespace(namespace)?; + + let resp = self.client.0.get_database(name.clone().into()).await; + + match resp { + Ok(_) => Ok(true), + Err(err) => { + if let ResponseError::UserException(ThriftHiveMetastoreGetDatabaseException::O1( + _, + )) = &err + { + Ok(false) + } else { + Err(from_thrift_error(err)) + } + } + } } async fn update_namespace( @@ -194,8 +223,23 @@ impl Catalog for HmsCatalog { todo!() } - async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { - todo!() + /// Asynchronously drops a namespace from the Hive Metastore. + /// + /// # Returns + /// A `Result<()>` indicating the outcome: + /// - `Ok(())` signifies successful namespace deletion. + /// - `Err(...)` signifies failure to drop the namespace due to validation + /// errors, connectivity issues, or Hive Metastore constraints. + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + let name = validate_namespace(namespace)?; + + self.client + .0 + .drop_database(name.into(), false, false) + .await + .map_err(from_thrift_error)?; + + Ok(()) } async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result> { diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 6c0f205f2..1d0bf97cd 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -145,3 +145,47 @@ async fn test_get_namespace() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_namespace_exists() -> Result<()> { + let fixture = set_test_fixture("test_namespace_exists").await; + + let ns_exists = Namespace::new(NamespaceIdent::new("default".into())); + let ns_not_exists = Namespace::new(NamespaceIdent::new("not_here".into())); + + let result_exists = fixture + .hms_catalog + .namespace_exists(ns_exists.name()) + .await?; + let result_not_exists = fixture + .hms_catalog + .namespace_exists(ns_not_exists.name()) + .await?; + + assert!(result_exists); + assert!(!result_not_exists); + + Ok(()) +} + +#[tokio::test] +async fn test_drop_namespace() -> Result<()> { + let fixture = set_test_fixture("test_drop_namespace").await; + + let ns = Namespace::new(NamespaceIdent::new("delete_me".into())); + + fixture + .hms_catalog + .create_namespace(ns.name(), HashMap::new()) + .await?; + + let result = fixture.hms_catalog.namespace_exists(ns.name()).await?; + assert!(result); + + fixture.hms_catalog.drop_namespace(ns.name()).await?; + + let result = fixture.hms_catalog.namespace_exists(ns.name()).await?; + assert!(!result); + + Ok(()) +} From 00c720be9d3ae4afdd4d400088072b6d3d79b912 Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Fri, 8 Mar 2024 14:41:31 +0100 Subject: [PATCH 08/24] add hms update_namespace --- crates/catalog/hms/src/catalog.rs | 34 ++++++++++++++++++-- crates/catalog/hms/tests/hms_catalog_test.rs | 22 +++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index f6031d2c2..2c3d3def9 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -215,12 +215,40 @@ impl Catalog for HmsCatalog { } } + /// Asynchronously updates properties of an existing namespace. + /// + /// Converts the given namespace identifier and properties into a database + /// representation and then attempts to update the corresponding namespace + /// in the Hive Metastore. + /// + /// # Returns + /// Returns `Ok(())` if the namespace update is successful. If the + /// namespace cannot be updated due to missing information or an error + /// during the update process, an `Err(...)` is returned. async fn update_namespace( &self, - _namespace: &NamespaceIdent, - _properties: HashMap, + namespace: &NamespaceIdent, + properties: HashMap, ) -> Result<()> { - todo!() + let db = convert_to_database(namespace, &properties)?; + + let name = match &db.name { + Some(name) => name, + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + "Database name must be specified", + )) + } + }; + + self.client + .0 + .alter_database(name.clone(), db) + .await + .map_err(from_thrift_error)?; + + Ok(()) } /// Asynchronously drops a namespace from the Hive Metastore. diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 1d0bf97cd..4d5f7c359 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -168,6 +168,28 @@ async fn test_namespace_exists() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_update_namespace() -> Result<()> { + let fixture = set_test_fixture("test_update_namespace").await; + + let ns = Namespace::new(NamespaceIdent::new("default".into())); + let properties = HashMap::from([("comment".to_string(), "my_update".to_string())]); + + fixture + .hms_catalog + .update_namespace(ns.name(), properties) + .await?; + + let db = fixture.hms_catalog.get_namespace(ns.name()).await?; + + assert_eq!( + db.properties().get("comment"), + Some(&"my_update".to_string()) + ); + + Ok(()) +} + #[tokio::test] async fn test_drop_namespace() -> Result<()> { let fixture = set_test_fixture("test_drop_namespace").await; From 1b98f880808e7dbcae217d0ef1cd64cf7338aa7c Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Fri, 8 Mar 2024 14:54:31 +0100 Subject: [PATCH 09/24] move fns into HmsCatalog --- crates/catalog/hms/src/catalog.rs | 278 +++++++++++++++++++++++++++++- crates/catalog/hms/src/utils.rs | 262 +--------------------------- 2 files changed, 273 insertions(+), 267 deletions(-) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 2c3d3def9..1f7bf2393 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -17,6 +17,8 @@ use super::utils::*; use async_trait::async_trait; +use hive_metastore::Database; +use hive_metastore::PrincipalType; use hive_metastore::ThriftHiveMetastoreClient; use hive_metastore::ThriftHiveMetastoreClientBuilder; use hive_metastore::ThriftHiveMetastoreGetDatabaseException; @@ -25,12 +27,19 @@ use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; +use pilota::AHashMap; +use pilota::FastStr; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::net::ToSocketAddrs; use typed_builder::TypedBuilder; use volo_thrift::ResponseError; +/// hive.metastore.database.owner setting +pub const HMS_DB_OWNER: &str = "hive.metastore.database.owner"; +/// hive.metastore.database.owner-type setting +pub const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type"; + /// Which variant of the thrift transport to communicate with HMS /// See: #[derive(Debug, Default)] @@ -97,6 +106,154 @@ impl HmsCatalog { client: HmsClient(client), }) } + + /// Create and extract properties from `hive_metastore::hms::Database`. + pub fn properties_from_database(database: &Database) -> HashMap { + let mut properties = HashMap::new(); + + if let Some(description) = &database.description { + properties.insert("comment".to_string(), description.to_string()); + }; + + if let Some(location) = &database.location_uri { + properties.insert("location".to_string(), location.to_string()); + }; + + if let Some(owner) = &database.owner_name { + properties.insert(HMS_DB_OWNER.to_string(), owner.to_string()); + }; + + if let Some(owner_type) = &database.owner_type { + let value = match owner_type { + PrincipalType::User => "User", + PrincipalType::Group => "Group", + PrincipalType::Role => "Role", + }; + + properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string()); + }; + + if let Some(params) = &database.parameters { + params.iter().for_each(|(k, v)| { + properties.insert(k.clone().into(), v.clone().into()); + }); + }; + + properties + } + + /// Converts name and properties into `hive_metastore::hms::Database` + /// after validating the `namespace` and `owner-settings`. + pub fn convert_to_database( + namespace: &NamespaceIdent, + properties: &HashMap, + ) -> Result { + let name = HmsCatalog::validate_namespace(namespace)?; + HmsCatalog::validate_owner_settings(properties)?; + + let mut db = Database::default(); + let mut parameters = AHashMap::new(); + + db.name = Some(name.into()); + + for (k, v) in properties { + match k.as_str() { + "comment" => db.description = Some(v.clone().into()), + "location" => { + db.location_uri = Some(HmsCatalog::format_location_uri(v.clone()).into()) + } + HMS_DB_OWNER => db.owner_name = Some(v.clone().into()), + HMS_DB_OWNER_TYPE => { + let owner_type = match v.to_lowercase().as_str() { + "user" => PrincipalType::User, + "group" => PrincipalType::Group, + "role" => PrincipalType::Role, + _ => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid value for setting 'owner_type': {}", v), + )) + } + }; + db.owner_type = Some(owner_type); + } + _ => { + parameters.insert( + FastStr::from_string(k.clone()), + FastStr::from_string(v.clone()), + ); + } + } + } + + db.parameters = Some(parameters); + + // Set default user, if none provided + // https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44 + if db.owner_name.is_none() { + db.owner_name = Some("user.name".into()); + db.owner_type = Some(PrincipalType::User); + } + + Ok(db) + } + + /// Checks if provided `NamespaceIdent` is valid. + pub fn validate_namespace(namespace: &NamespaceIdent) -> Result { + let name = namespace.as_ref(); + + if name.len() != 1 { + return Err(Error::new( + ErrorKind::DataInvalid, + "Invalid database, hierarchical namespaces are not supported", + )); + } + + let name = name[0].clone(); + + if name.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Invalid database, provided namespace is empty.", + )); + } + + Ok(name) + } + + /// Formats location_uri by e.g. removing trailing slashes. + fn format_location_uri(location: String) -> String { + let mut location = location; + + if !location.starts_with('/') { + location = format!("/{}", location); + } + + if location.ends_with('/') && location.len() > 1 { + location.pop(); + } + + location + } + + /// Checks if `owner-settings` are valid. + /// If `owner_type` is set, then `owner` must also be set. + fn validate_owner_settings(properties: &HashMap) -> Result<()> { + let owner_is_set = properties.get(HMS_DB_OWNER).is_some(); + let owner_type_is_set = properties.get(HMS_DB_OWNER_TYPE).is_some(); + + if owner_type_is_set && !owner_is_set { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Setting '{}' without setting '{}' is not allowed", + HMS_DB_OWNER_TYPE, HMS_DB_OWNER + ), + )); + } + + Ok(()) + } } #[async_trait] @@ -146,7 +303,7 @@ impl Catalog for HmsCatalog { namespace: &NamespaceIdent, properties: HashMap, ) -> Result { - let database = convert_to_database(namespace, &properties)?; + let database = HmsCatalog::convert_to_database(namespace, &properties)?; self.client .0 @@ -168,7 +325,7 @@ impl Catalog for HmsCatalog { /// - If there is an error querying the database, returned by /// `from_thrift_error`. async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { - let name = validate_namespace(namespace)?; + let name = HmsCatalog::validate_namespace(namespace)?; let db = self .client @@ -177,7 +334,7 @@ impl Catalog for HmsCatalog { .await .map_err(from_thrift_error)?; - let properties = properties_from_database(&db); + let properties = HmsCatalog::properties_from_database(&db); let ns = Namespace::with_properties(NamespaceIdent::new(name.into()), properties); Ok(ns) @@ -196,7 +353,7 @@ impl Catalog for HmsCatalog { /// - `Err(...)` if an error occurs during validation or the Hive Metastore /// query, with the error encapsulating the issue. async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { - let name = validate_namespace(namespace)?; + let name = HmsCatalog::validate_namespace(namespace)?; let resp = self.client.0.get_database(name.clone().into()).await; @@ -230,7 +387,7 @@ impl Catalog for HmsCatalog { namespace: &NamespaceIdent, properties: HashMap, ) -> Result<()> { - let db = convert_to_database(namespace, &properties)?; + let db = HmsCatalog::convert_to_database(namespace, &properties)?; let name = match &db.name { Some(name) => name, @@ -259,7 +416,7 @@ impl Catalog for HmsCatalog { /// - `Err(...)` signifies failure to drop the namespace due to validation /// errors, connectivity issues, or Hive Metastore constraints. async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { - let name = validate_namespace(namespace)?; + let name = HmsCatalog::validate_namespace(namespace)?; self.client .0 @@ -302,3 +459,112 @@ impl Catalog for HmsCatalog { todo!() } } + +#[cfg(test)] +mod tests { + use iceberg::{Namespace, NamespaceIdent}; + + use super::*; + + #[test] + fn test_properties_from_database() -> Result<()> { + let ns = NamespaceIdent::new("my_namespace".into()); + let properties = HashMap::from([ + ("comment".to_string(), "my_description".to_string()), + ("location".to_string(), "/my_location".to_string()), + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "User".to_string()), + ("key1".to_string(), "value1".to_string()), + ]); + + let db = HmsCatalog::convert_to_database(&ns, &properties)?; + + let expected = HmsCatalog::properties_from_database(&db); + + assert_eq!(expected, properties); + + Ok(()) + } + + #[test] + fn test_validate_owner_settings() { + let valid = HashMap::from([ + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), + ]); + let invalid = HashMap::from([(HMS_DB_OWNER_TYPE.to_string(), "user".to_string())]); + + assert!(HmsCatalog::validate_owner_settings(&valid).is_ok()); + assert!(HmsCatalog::validate_owner_settings(&invalid).is_err()); + } + + #[test] + fn test_convert_to_database() -> Result<()> { + let ns = NamespaceIdent::new("my_namespace".into()); + let properties = HashMap::from([ + ("comment".to_string(), "my_description".to_string()), + ("location".to_string(), "my_location".to_string()), + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), + ("key1".to_string(), "value1".to_string()), + ]); + + let db = HmsCatalog::convert_to_database(&ns, &properties)?; + + assert_eq!(db.name, Some(FastStr::from("my_namespace"))); + assert_eq!(db.description, Some(FastStr::from("my_description"))); + assert_eq!(db.owner_name, Some(FastStr::from("apache"))); + assert_eq!(db.owner_type, Some(PrincipalType::User)); + + if let Some(params) = db.parameters { + assert_eq!(params.get("key1".into()), Some(&FastStr::from("value1"))); + } + + Ok(()) + } + + #[test] + fn test_convert_to_database_with_default_user() -> Result<()> { + let ns = NamespaceIdent::new("my_namespace".into()); + let properties = HashMap::new(); + + let db = HmsCatalog::convert_to_database(&ns, &properties)?; + + assert_eq!(db.name, Some(FastStr::from("my_namespace"))); + assert_eq!(db.owner_name, Some(FastStr::from("user.name"))); + assert_eq!(db.owner_type, Some(PrincipalType::User)); + + Ok(()) + } + + #[test] + fn test_validate_namespace() { + let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string())); + let empty_ns = Namespace::new(NamespaceIdent::new("".to_string())); + let hierarchical_ns = Namespace::new( + NamespaceIdent::from_vec(vec!["level1".to_string(), "level2".to_string()]).unwrap(), + ); + + let valid = HmsCatalog::validate_namespace(valid_ns.name()); + let empty = HmsCatalog::validate_namespace(empty_ns.name()); + let hierarchical = HmsCatalog::validate_namespace(hierarchical_ns.name()); + + assert!(valid.is_ok()); + assert!(empty.is_err()); + assert!(hierarchical.is_err()); + } + + #[test] + fn test_format_location_uri() { + let inputs = vec!["iceberg", "is/", "/nice/", "really/nice/", "/"]; + let outputs = vec!["/iceberg", "/is", "/nice", "/really/nice", "/"]; + + inputs + .into_iter() + .zip(outputs.into_iter()) + .for_each(|(inp, out)| { + let location = HmsCatalog::format_location_uri(inp.to_string()); + assert_eq!(location, out); + }) + } +} diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 8d55e6eae..1b0ff33df 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -16,16 +16,10 @@ // under the License. use anyhow::anyhow; -use hive_metastore::{Database, PrincipalType}; -use iceberg::{Error, ErrorKind, NamespaceIdent, Result}; -use pilota::{AHashMap, FastStr}; -use std::collections::HashMap; +use iceberg::{Error, ErrorKind}; use std::fmt::Debug; use std::io; -pub const HMS_DB_OWNER: &str = "hive.metastore.database.owner"; -pub const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type"; - /// Format a thrift error into iceberg error. pub fn from_thrift_error(error: volo_thrift::error::ResponseError) -> Error where @@ -46,257 +40,3 @@ pub fn from_io_error(error: io::Error) -> Error { ) .with_source(error) } - -/// Create and extract properties from `hive_metastore::hms::Database`. -pub fn properties_from_database(database: &Database) -> HashMap { - let mut properties = HashMap::new(); - - if let Some(description) = &database.description { - properties.insert("comment".to_string(), description.to_string()); - }; - - if let Some(location) = &database.location_uri { - properties.insert("location".to_string(), location.to_string()); - }; - - if let Some(owner) = &database.owner_name { - properties.insert(HMS_DB_OWNER.to_string(), owner.to_string()); - }; - - if let Some(owner_type) = &database.owner_type { - let value = match owner_type { - PrincipalType::User => "User", - PrincipalType::Group => "Group", - PrincipalType::Role => "Role", - }; - - properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string()); - }; - - if let Some(params) = &database.parameters { - params.iter().for_each(|(k, v)| { - properties.insert(k.clone().into(), v.clone().into()); - }); - }; - - properties -} - -/// Converts name and properties into `hive_metastore::hms::Database` -/// after validating the `namespace` and `owner-settings`. -pub fn convert_to_database( - namespace: &NamespaceIdent, - properties: &HashMap, -) -> Result { - let name = validate_namespace(namespace)?; - validate_owner_settings(properties)?; - - let mut db = Database::default(); - let mut parameters = AHashMap::new(); - - db.name = Some(name.into()); - - for (k, v) in properties { - match k.as_str() { - "comment" => db.description = Some(v.clone().into()), - "location" => db.location_uri = Some(format_location_uri(v.clone()).into()), - HMS_DB_OWNER => db.owner_name = Some(v.clone().into()), - HMS_DB_OWNER_TYPE => { - let owner_type = match v.to_lowercase().as_str() { - "user" => PrincipalType::User, - "group" => PrincipalType::Group, - "role" => PrincipalType::Role, - _ => { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid value for setting 'owner_type': {}", v), - )) - } - }; - db.owner_type = Some(owner_type); - } - _ => { - parameters.insert( - FastStr::from_string(k.clone()), - FastStr::from_string(v.clone()), - ); - } - } - } - - db.parameters = Some(parameters); - - // Set default user, if none provided - // https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44 - if db.owner_name.is_none() { - db.owner_name = Some("user.name".into()); - db.owner_type = Some(PrincipalType::User); - } - - Ok(db) -} - -/// Checks if provided `NamespaceIdent` is valid. -pub fn validate_namespace(namespace: &NamespaceIdent) -> Result { - let name = namespace.as_ref(); - - if name.len() != 1 { - return Err(Error::new( - ErrorKind::DataInvalid, - "Invalid database, hierarchical namespaces are not supported", - )); - } - - let name = name[0].clone(); - - if name.is_empty() { - return Err(Error::new( - ErrorKind::DataInvalid, - "Invalid database, provided namespace is empty.", - )); - } - - Ok(name) -} - -/// Formats location_uri by e.g. removing trailing slashes. -fn format_location_uri(location: String) -> String { - let mut location = location; - - if !location.starts_with('/') { - location = format!("/{}", location); - } - - if location.ends_with('/') && location.len() > 1 { - location.pop(); - } - - location -} - -/// Checks if `owner-settings` are valid. -/// If `owner_type` is set, then `owner` must also be set. -fn validate_owner_settings(properties: &HashMap) -> Result<()> { - let owner_is_set = properties.get(HMS_DB_OWNER).is_some(); - let owner_type_is_set = properties.get(HMS_DB_OWNER_TYPE).is_some(); - - if owner_type_is_set && !owner_is_set { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Setting '{}' without setting '{}' is not allowed", - HMS_DB_OWNER_TYPE, HMS_DB_OWNER - ), - )); - } - - Ok(()) -} -#[cfg(test)] -mod tests { - use iceberg::{Namespace, NamespaceIdent}; - - use super::*; - - #[test] - fn test_properties_from_database() -> Result<()> { - let ns = NamespaceIdent::new("my_namespace".into()); - let properties = HashMap::from([ - ("comment".to_string(), "my_description".to_string()), - ("location".to_string(), "/my_location".to_string()), - (HMS_DB_OWNER.to_string(), "apache".to_string()), - (HMS_DB_OWNER_TYPE.to_string(), "User".to_string()), - ("key1".to_string(), "value1".to_string()), - ]); - - let db = convert_to_database(&ns, &properties)?; - - let expected = properties_from_database(&db); - - assert_eq!(expected, properties); - - Ok(()) - } - - #[test] - fn test_validate_owner_settings() { - let valid = HashMap::from([ - (HMS_DB_OWNER.to_string(), "apache".to_string()), - (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), - ]); - let invalid = HashMap::from([(HMS_DB_OWNER_TYPE.to_string(), "user".to_string())]); - - assert!(validate_owner_settings(&valid).is_ok()); - assert!(validate_owner_settings(&invalid).is_err()); - } - - #[test] - fn test_convert_to_database() -> Result<()> { - let ns = NamespaceIdent::new("my_namespace".into()); - let properties = HashMap::from([ - ("comment".to_string(), "my_description".to_string()), - ("location".to_string(), "my_location".to_string()), - (HMS_DB_OWNER.to_string(), "apache".to_string()), - (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), - ("key1".to_string(), "value1".to_string()), - ]); - - let db = convert_to_database(&ns, &properties)?; - - assert_eq!(db.name, Some(FastStr::from("my_namespace"))); - assert_eq!(db.description, Some(FastStr::from("my_description"))); - assert_eq!(db.owner_name, Some(FastStr::from("apache"))); - assert_eq!(db.owner_type, Some(PrincipalType::User)); - - if let Some(params) = db.parameters { - assert_eq!(params.get("key1".into()), Some(&FastStr::from("value1"))); - } - - Ok(()) - } - - #[test] - fn test_convert_to_database_with_default_user() -> Result<()> { - let ns = NamespaceIdent::new("my_namespace".into()); - let properties = HashMap::new(); - - let db = convert_to_database(&ns, &properties)?; - - assert_eq!(db.name, Some(FastStr::from("my_namespace"))); - assert_eq!(db.owner_name, Some(FastStr::from("user.name"))); - assert_eq!(db.owner_type, Some(PrincipalType::User)); - - Ok(()) - } - - #[test] - fn test_validate_namespace() { - let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string())); - let empty_ns = Namespace::new(NamespaceIdent::new("".to_string())); - let hierarchical_ns = Namespace::new( - NamespaceIdent::from_vec(vec!["level1".to_string(), "level2".to_string()]).unwrap(), - ); - - let valid = validate_namespace(valid_ns.name()); - let empty = validate_namespace(empty_ns.name()); - let hierarchical = validate_namespace(hierarchical_ns.name()); - - assert!(valid.is_ok()); - assert!(empty.is_err()); - assert!(hierarchical.is_err()); - } - - #[test] - fn test_format_location_uri() { - let inputs = vec!["iceberg", "is/", "/nice/", "really/nice/", "/"]; - let outputs = vec!["/iceberg", "/is", "/nice", "/really/nice", "/"]; - - inputs - .into_iter() - .zip(outputs.into_iter()) - .for_each(|(inp, out)| { - let location = format_location_uri(inp.to_string()); - assert_eq!(location, out); - }) - } -} From f82fa5d0cdc2a308006f6fea3226898369c71cfb Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Fri, 8 Mar 2024 14:58:00 +0100 Subject: [PATCH 10/24] use `expose` in docker-compose --- .../hms/testdata/hms_catalog/docker-compose.yaml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml index 28f84c1a7..ec65baf32 100644 --- a/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml +++ b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml @@ -20,9 +20,9 @@ version: '3.8' services: minio: image: minio/minio - ports: - - "9000:9000" - - "9001:9001" + expose: + - 9000 + - 9001 environment: - MINIO_ROOT_USER=admin - MINIO_ROOT_PASSWORD=password @@ -31,8 +31,8 @@ services: hive-mysql: image: mysql:5.7 - ports: - - "3306:3306" + expose: + - 3306 environment: - MYSQL_ROOT_PASSWORD=admin - MYSQL_DATABASE=metastore @@ -44,5 +44,5 @@ services: build: ./ depends_on: - hive-mysql - ports: - - "9083:9083" + expose: + - 9083 From 10b53ba84c8152be67aaafb7a77640b96cbe34ba Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Fri, 8 Mar 2024 17:00:31 +0100 Subject: [PATCH 11/24] add hms list_tables --- crates/catalog/hms/src/catalog.rs | 18 ++++++++++++++++-- crates/catalog/hms/tests/hms_catalog_test.rs | 13 +++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 1f7bf2393..b7630ce0a 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -427,8 +427,22 @@ impl Catalog for HmsCatalog { Ok(()) } - async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result> { - todo!() + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { + let name = HmsCatalog::validate_namespace(namespace)?; + + let tables = self + .client + .0 + .get_all_tables(name.clone().into()) + .await + .map_err(from_thrift_error)?; + + let tables = tables + .iter() + .map(|table| TableIdent::new(namespace.clone(), table.to_string())) + .collect(); + + Ok(tables) } async fn create_table( diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 4d5f7c359..9d192c24f 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -211,3 +211,16 @@ async fn test_drop_namespace() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_list_tables() -> Result<()> { + let fixture = set_test_fixture("test_list_tables").await; + + let ns = Namespace::new(NamespaceIdent::new("default".into())); + + let result = fixture.hms_catalog.list_tables(ns.name()).await?; + + assert_eq!(result, vec![]); + + Ok(()) +} From b35916c4810232cce796a320098dab49a43af1b0 Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Fri, 8 Mar 2024 17:03:14 +0100 Subject: [PATCH 12/24] fix: clippy --- crates/catalog/hms/src/catalog.rs | 15 ++++++--------- crates/catalog/hms/tests/hms_catalog_test.rs | 2 +- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index b7630ce0a..5ed072d81 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -335,7 +335,7 @@ impl Catalog for HmsCatalog { .map_err(from_thrift_error)?; let properties = HmsCatalog::properties_from_database(&db); - let ns = Namespace::with_properties(NamespaceIdent::new(name.into()), properties); + let ns = Namespace::with_properties(NamespaceIdent::new(name), properties); Ok(ns) } @@ -531,7 +531,7 @@ mod tests { assert_eq!(db.owner_type, Some(PrincipalType::User)); if let Some(params) = db.parameters { - assert_eq!(params.get("key1".into()), Some(&FastStr::from("value1"))); + assert_eq!(params.get("key1"), Some(&FastStr::from("value1"))); } Ok(()) @@ -573,12 +573,9 @@ mod tests { let inputs = vec!["iceberg", "is/", "/nice/", "really/nice/", "/"]; let outputs = vec!["/iceberg", "/is", "/nice", "/really/nice", "/"]; - inputs - .into_iter() - .zip(outputs.into_iter()) - .for_each(|(inp, out)| { - let location = HmsCatalog::format_location_uri(inp.to_string()); - assert_eq!(location, out); - }) + inputs.into_iter().zip(outputs).for_each(|(inp, out)| { + let location = HmsCatalog::format_location_uri(inp.to_string()); + assert_eq!(location, out); + }) } } diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 9d192c24f..5628c094a 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -82,7 +82,7 @@ async fn test_list_namespace() -> Result<()> { .await?; assert_eq!(expected_no_parent, result_no_parent); - assert!(result_with_parent.len() == 0); + assert!(result_with_parent.is_empty()); Ok(()) } From cfcbc17e7846506c27793def87e0fd3465ec100d Mon Sep 17 00:00:00 2001 From: mlanhenke Date: Fri, 8 Mar 2024 17:24:31 +0100 Subject: [PATCH 13/24] fix: cargo sort --- .cargo/audit.toml | 8 ++++---- crates/catalog/hms/Cargo.toml | 4 ++-- deny.toml | 26 ++++++++++++++++---------- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 1d73f83b8..5db5a9d81 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -17,8 +17,8 @@ [advisories] ignore = [ - # rsa - # Marvin Attack: potential key recovery through timing sidechannels - # Issues: https://github.com/apache/iceberg-rust/issues/221 - "RUSTSEC-2023-0071", + # rsa + # Marvin Attack: potential key recovery through timing sidechannels + # Issues: https://github.com/apache/iceberg-rust/issues/221 + "RUSTSEC-2023-0071", ] diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index 68b3aed54..b15e2610d 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -33,10 +33,10 @@ anyhow = { workspace = true } async-trait = { workspace = true } hive_metastore = { workspace = true } iceberg = { workspace = true } -typed-builder = { workspace = true } -volo-thrift = { workspace = true } log = "0.4.20" pilota = "0.10.0" +typed-builder = { workspace = true } +volo-thrift = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } diff --git a/deny.toml b/deny.toml index e32367948..1d30b5f3f 100644 --- a/deny.toml +++ b/deny.toml @@ -19,17 +19,23 @@ unlicensed = "deny" copyleft = "deny" allow = [ - "Apache-2.0", - "Apache-2.0 WITH LLVM-exception", - "MIT", - "BSD-3-Clause", - "ISC", - "CC0-1.0", + "Apache-2.0", + "Apache-2.0 WITH LLVM-exception", + "MIT", + "BSD-3-Clause", + "ISC", + "CC0-1.0", ] exceptions = [ - { allow = ["OpenSSL"], name = "ring" }, - { allow = ["Unicode-DFS-2016"], name = "unicode-ident" }, - { allow = ["Zlib"], name = "adler32" } + { allow = [ + "OpenSSL", + ], name = "ring" }, + { allow = [ + "Unicode-DFS-2016", + ], name = "unicode-ident" }, + { allow = [ + "Zlib", + ], name = "adler32" }, ] [[licenses.clarify]] @@ -42,4 +48,4 @@ name = "ring" # compiled into non-test libraries, is included below." # OpenSSL - Obviously expression = "ISC AND MIT AND OpenSSL" -license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }] \ No newline at end of file +license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }] From 2e320fe9f2d4e1ed052780cddb710daf4cc98343 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 9 Mar 2024 12:07:23 +0100 Subject: [PATCH 14/24] fix: cargo workspace --- crates/catalog/hms/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index b15e2610d..ca3177ac3 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -33,7 +33,7 @@ anyhow = { workspace = true } async-trait = { workspace = true } hive_metastore = { workspace = true } iceberg = { workspace = true } -log = "0.4.20" +log = { workspace = true } pilota = "0.10.0" typed-builder = { workspace = true } volo-thrift = { workspace = true } From ae03c852d38aac8a866a2f0ffc14163fe1be6386 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 9 Mar 2024 12:19:33 +0100 Subject: [PATCH 15/24] move fns into utils + add constants --- crates/catalog/hms/src/catalog.rs | 277 +----------------------------- crates/catalog/hms/src/utils.rs | 268 ++++++++++++++++++++++++++++- 2 files changed, 274 insertions(+), 271 deletions(-) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 5ed072d81..de9ca57f8 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -17,8 +17,6 @@ use super::utils::*; use async_trait::async_trait; -use hive_metastore::Database; -use hive_metastore::PrincipalType; use hive_metastore::ThriftHiveMetastoreClient; use hive_metastore::ThriftHiveMetastoreClientBuilder; use hive_metastore::ThriftHiveMetastoreGetDatabaseException; @@ -27,19 +25,12 @@ use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; -use pilota::AHashMap; -use pilota::FastStr; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::net::ToSocketAddrs; use typed_builder::TypedBuilder; use volo_thrift::ResponseError; -/// hive.metastore.database.owner setting -pub const HMS_DB_OWNER: &str = "hive.metastore.database.owner"; -/// hive.metastore.database.owner-type setting -pub const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type"; - /// Which variant of the thrift transport to communicate with HMS /// See: #[derive(Debug, Default)] @@ -106,154 +97,6 @@ impl HmsCatalog { client: HmsClient(client), }) } - - /// Create and extract properties from `hive_metastore::hms::Database`. - pub fn properties_from_database(database: &Database) -> HashMap { - let mut properties = HashMap::new(); - - if let Some(description) = &database.description { - properties.insert("comment".to_string(), description.to_string()); - }; - - if let Some(location) = &database.location_uri { - properties.insert("location".to_string(), location.to_string()); - }; - - if let Some(owner) = &database.owner_name { - properties.insert(HMS_DB_OWNER.to_string(), owner.to_string()); - }; - - if let Some(owner_type) = &database.owner_type { - let value = match owner_type { - PrincipalType::User => "User", - PrincipalType::Group => "Group", - PrincipalType::Role => "Role", - }; - - properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string()); - }; - - if let Some(params) = &database.parameters { - params.iter().for_each(|(k, v)| { - properties.insert(k.clone().into(), v.clone().into()); - }); - }; - - properties - } - - /// Converts name and properties into `hive_metastore::hms::Database` - /// after validating the `namespace` and `owner-settings`. - pub fn convert_to_database( - namespace: &NamespaceIdent, - properties: &HashMap, - ) -> Result { - let name = HmsCatalog::validate_namespace(namespace)?; - HmsCatalog::validate_owner_settings(properties)?; - - let mut db = Database::default(); - let mut parameters = AHashMap::new(); - - db.name = Some(name.into()); - - for (k, v) in properties { - match k.as_str() { - "comment" => db.description = Some(v.clone().into()), - "location" => { - db.location_uri = Some(HmsCatalog::format_location_uri(v.clone()).into()) - } - HMS_DB_OWNER => db.owner_name = Some(v.clone().into()), - HMS_DB_OWNER_TYPE => { - let owner_type = match v.to_lowercase().as_str() { - "user" => PrincipalType::User, - "group" => PrincipalType::Group, - "role" => PrincipalType::Role, - _ => { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid value for setting 'owner_type': {}", v), - )) - } - }; - db.owner_type = Some(owner_type); - } - _ => { - parameters.insert( - FastStr::from_string(k.clone()), - FastStr::from_string(v.clone()), - ); - } - } - } - - db.parameters = Some(parameters); - - // Set default user, if none provided - // https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44 - if db.owner_name.is_none() { - db.owner_name = Some("user.name".into()); - db.owner_type = Some(PrincipalType::User); - } - - Ok(db) - } - - /// Checks if provided `NamespaceIdent` is valid. - pub fn validate_namespace(namespace: &NamespaceIdent) -> Result { - let name = namespace.as_ref(); - - if name.len() != 1 { - return Err(Error::new( - ErrorKind::DataInvalid, - "Invalid database, hierarchical namespaces are not supported", - )); - } - - let name = name[0].clone(); - - if name.is_empty() { - return Err(Error::new( - ErrorKind::DataInvalid, - "Invalid database, provided namespace is empty.", - )); - } - - Ok(name) - } - - /// Formats location_uri by e.g. removing trailing slashes. - fn format_location_uri(location: String) -> String { - let mut location = location; - - if !location.starts_with('/') { - location = format!("/{}", location); - } - - if location.ends_with('/') && location.len() > 1 { - location.pop(); - } - - location - } - - /// Checks if `owner-settings` are valid. - /// If `owner_type` is set, then `owner` must also be set. - fn validate_owner_settings(properties: &HashMap) -> Result<()> { - let owner_is_set = properties.get(HMS_DB_OWNER).is_some(); - let owner_type_is_set = properties.get(HMS_DB_OWNER_TYPE).is_some(); - - if owner_type_is_set && !owner_is_set { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Setting '{}' without setting '{}' is not allowed", - HMS_DB_OWNER_TYPE, HMS_DB_OWNER - ), - )); - } - - Ok(()) - } } #[async_trait] @@ -303,7 +146,7 @@ impl Catalog for HmsCatalog { namespace: &NamespaceIdent, properties: HashMap, ) -> Result { - let database = HmsCatalog::convert_to_database(namespace, &properties)?; + let database = convert_to_database(namespace, &properties)?; self.client .0 @@ -325,7 +168,7 @@ impl Catalog for HmsCatalog { /// - If there is an error querying the database, returned by /// `from_thrift_error`. async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { - let name = HmsCatalog::validate_namespace(namespace)?; + let name = validate_namespace(namespace)?; let db = self .client @@ -334,7 +177,7 @@ impl Catalog for HmsCatalog { .await .map_err(from_thrift_error)?; - let properties = HmsCatalog::properties_from_database(&db); + let properties = properties_from_database(&db); let ns = Namespace::with_properties(NamespaceIdent::new(name), properties); Ok(ns) @@ -353,7 +196,7 @@ impl Catalog for HmsCatalog { /// - `Err(...)` if an error occurs during validation or the Hive Metastore /// query, with the error encapsulating the issue. async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { - let name = HmsCatalog::validate_namespace(namespace)?; + let name = validate_namespace(namespace)?; let resp = self.client.0.get_database(name.clone().into()).await; @@ -387,7 +230,7 @@ impl Catalog for HmsCatalog { namespace: &NamespaceIdent, properties: HashMap, ) -> Result<()> { - let db = HmsCatalog::convert_to_database(namespace, &properties)?; + let db = convert_to_database(namespace, &properties)?; let name = match &db.name { Some(name) => name, @@ -416,7 +259,7 @@ impl Catalog for HmsCatalog { /// - `Err(...)` signifies failure to drop the namespace due to validation /// errors, connectivity issues, or Hive Metastore constraints. async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { - let name = HmsCatalog::validate_namespace(namespace)?; + let name = validate_namespace(namespace)?; self.client .0 @@ -428,7 +271,7 @@ impl Catalog for HmsCatalog { } async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { - let name = HmsCatalog::validate_namespace(namespace)?; + let name = validate_namespace(namespace)?; let tables = self .client @@ -473,109 +316,3 @@ impl Catalog for HmsCatalog { todo!() } } - -#[cfg(test)] -mod tests { - use iceberg::{Namespace, NamespaceIdent}; - - use super::*; - - #[test] - fn test_properties_from_database() -> Result<()> { - let ns = NamespaceIdent::new("my_namespace".into()); - let properties = HashMap::from([ - ("comment".to_string(), "my_description".to_string()), - ("location".to_string(), "/my_location".to_string()), - (HMS_DB_OWNER.to_string(), "apache".to_string()), - (HMS_DB_OWNER_TYPE.to_string(), "User".to_string()), - ("key1".to_string(), "value1".to_string()), - ]); - - let db = HmsCatalog::convert_to_database(&ns, &properties)?; - - let expected = HmsCatalog::properties_from_database(&db); - - assert_eq!(expected, properties); - - Ok(()) - } - - #[test] - fn test_validate_owner_settings() { - let valid = HashMap::from([ - (HMS_DB_OWNER.to_string(), "apache".to_string()), - (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), - ]); - let invalid = HashMap::from([(HMS_DB_OWNER_TYPE.to_string(), "user".to_string())]); - - assert!(HmsCatalog::validate_owner_settings(&valid).is_ok()); - assert!(HmsCatalog::validate_owner_settings(&invalid).is_err()); - } - - #[test] - fn test_convert_to_database() -> Result<()> { - let ns = NamespaceIdent::new("my_namespace".into()); - let properties = HashMap::from([ - ("comment".to_string(), "my_description".to_string()), - ("location".to_string(), "my_location".to_string()), - (HMS_DB_OWNER.to_string(), "apache".to_string()), - (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), - ("key1".to_string(), "value1".to_string()), - ]); - - let db = HmsCatalog::convert_to_database(&ns, &properties)?; - - assert_eq!(db.name, Some(FastStr::from("my_namespace"))); - assert_eq!(db.description, Some(FastStr::from("my_description"))); - assert_eq!(db.owner_name, Some(FastStr::from("apache"))); - assert_eq!(db.owner_type, Some(PrincipalType::User)); - - if let Some(params) = db.parameters { - assert_eq!(params.get("key1"), Some(&FastStr::from("value1"))); - } - - Ok(()) - } - - #[test] - fn test_convert_to_database_with_default_user() -> Result<()> { - let ns = NamespaceIdent::new("my_namespace".into()); - let properties = HashMap::new(); - - let db = HmsCatalog::convert_to_database(&ns, &properties)?; - - assert_eq!(db.name, Some(FastStr::from("my_namespace"))); - assert_eq!(db.owner_name, Some(FastStr::from("user.name"))); - assert_eq!(db.owner_type, Some(PrincipalType::User)); - - Ok(()) - } - - #[test] - fn test_validate_namespace() { - let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string())); - let empty_ns = Namespace::new(NamespaceIdent::new("".to_string())); - let hierarchical_ns = Namespace::new( - NamespaceIdent::from_vec(vec!["level1".to_string(), "level2".to_string()]).unwrap(), - ); - - let valid = HmsCatalog::validate_namespace(valid_ns.name()); - let empty = HmsCatalog::validate_namespace(empty_ns.name()); - let hierarchical = HmsCatalog::validate_namespace(hierarchical_ns.name()); - - assert!(valid.is_ok()); - assert!(empty.is_err()); - assert!(hierarchical.is_err()); - } - - #[test] - fn test_format_location_uri() { - let inputs = vec!["iceberg", "is/", "/nice/", "really/nice/", "/"]; - let outputs = vec!["/iceberg", "/is", "/nice", "/really/nice", "/"]; - - inputs.into_iter().zip(outputs).for_each(|(inp, out)| { - let location = HmsCatalog::format_location_uri(inp.to_string()); - assert_eq!(location, out); - }) - } -} diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 1b0ff33df..a8d8f64fa 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -16,10 +16,24 @@ // under the License. use anyhow::anyhow; -use iceberg::{Error, ErrorKind}; +use hive_metastore::{Database, PrincipalType}; +use iceberg::{Error, ErrorKind, NamespaceIdent, Result}; +use pilota::{AHashMap, FastStr}; +use std::collections::HashMap; use std::fmt::Debug; use std::io; +/// hive.metastore.database.owner setting +pub const HMS_DB_OWNER: &str = "hive.metastore.database.owner"; +/// hive.metastore.database.owner default setting +pub const HMS_DEFAULT_DB_OWNER: &str = "user.name"; +/// hive.metastore.database.owner-type setting +pub const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type"; +/// hive metatore `description` property +pub const COMMENT: &str = "comment"; +/// hive metatore `location` property +pub const LOCATION: &str = "location"; + /// Format a thrift error into iceberg error. pub fn from_thrift_error(error: volo_thrift::error::ResponseError) -> Error where @@ -40,3 +54,255 @@ pub fn from_io_error(error: io::Error) -> Error { ) .with_source(error) } + +/// Create and extract properties from `hive_metastore::hms::Database`. +pub fn properties_from_database(database: &Database) -> HashMap { + let mut properties = HashMap::new(); + + if let Some(description) = &database.description { + properties.insert(COMMENT.to_string(), description.to_string()); + }; + + if let Some(location) = &database.location_uri { + properties.insert(LOCATION.to_string(), location.to_string()); + }; + + if let Some(owner) = &database.owner_name { + properties.insert(HMS_DB_OWNER.to_string(), owner.to_string()); + }; + + if let Some(owner_type) = &database.owner_type { + let value = match owner_type { + PrincipalType::User => "User", + PrincipalType::Group => "Group", + PrincipalType::Role => "Role", + }; + + properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string()); + }; + + if let Some(params) = &database.parameters { + params.iter().for_each(|(k, v)| { + properties.insert(k.clone().into(), v.clone().into()); + }); + }; + + properties +} + +/// Converts name and properties into `hive_metastore::hms::Database` +/// after validating the `namespace` and `owner-settings`. +pub fn convert_to_database( + namespace: &NamespaceIdent, + properties: &HashMap, +) -> Result { + let name = validate_namespace(namespace)?; + validate_owner_settings(properties)?; + + let mut db = Database::default(); + let mut parameters = AHashMap::new(); + + db.name = Some(name.into()); + + for (k, v) in properties { + match k.as_str() { + COMMENT => db.description = Some(v.clone().into()), + LOCATION => db.location_uri = Some(format_location_uri(v.clone()).into()), + HMS_DB_OWNER => db.owner_name = Some(v.clone().into()), + HMS_DB_OWNER_TYPE => { + let owner_type = match v.to_lowercase().as_str() { + "user" => PrincipalType::User, + "group" => PrincipalType::Group, + "role" => PrincipalType::Role, + _ => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid value for setting 'owner_type': {}", v), + )) + } + }; + db.owner_type = Some(owner_type); + } + _ => { + parameters.insert( + FastStr::from_string(k.clone()), + FastStr::from_string(v.clone()), + ); + } + } + } + + db.parameters = Some(parameters); + + // Set default owner, if none provided + // https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44 + if db.owner_name.is_none() { + db.owner_name = Some(HMS_DEFAULT_DB_OWNER.into()); + db.owner_type = Some(PrincipalType::User); + } + + Ok(db) +} + +/// Checks if provided `NamespaceIdent` is valid. +pub fn validate_namespace(namespace: &NamespaceIdent) -> Result { + let name = namespace.as_ref(); + + if name.len() != 1 { + return Err(Error::new( + ErrorKind::DataInvalid, + "Invalid database, hierarchical namespaces are not supported", + )); + } + + let name = name[0].clone(); + + if name.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Invalid database, provided namespace is empty.", + )); + } + + Ok(name) +} + +/// Formats location_uri by e.g. removing trailing slashes. +fn format_location_uri(location: String) -> String { + let mut location = location; + + if !location.starts_with('/') { + location = format!("/{}", location); + } + + if location.ends_with('/') && location.len() > 1 { + location.pop(); + } + + location +} + +/// Checks if `owner-settings` are valid. +/// If `owner_type` is set, then `owner` must also be set. +fn validate_owner_settings(properties: &HashMap) -> Result<()> { + let owner_is_set = properties.get(HMS_DB_OWNER).is_some(); + let owner_type_is_set = properties.get(HMS_DB_OWNER_TYPE).is_some(); + + if owner_type_is_set && !owner_is_set { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Setting '{}' without setting '{}' is not allowed", + HMS_DB_OWNER_TYPE, HMS_DB_OWNER + ), + )); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use iceberg::{Namespace, NamespaceIdent}; + + use super::*; + + #[test] + fn test_properties_from_database() -> Result<()> { + let ns = NamespaceIdent::new("my_namespace".into()); + let properties = HashMap::from([ + (COMMENT.to_string(), "my_description".to_string()), + (LOCATION.to_string(), "/my_location".to_string()), + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "User".to_string()), + ("key1".to_string(), "value1".to_string()), + ]); + + let db = convert_to_database(&ns, &properties)?; + + let expected = properties_from_database(&db); + + assert_eq!(expected, properties); + + Ok(()) + } + + #[test] + fn test_validate_owner_settings() { + let valid = HashMap::from([ + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), + ]); + let invalid = HashMap::from([(HMS_DB_OWNER_TYPE.to_string(), "user".to_string())]); + + assert!(validate_owner_settings(&valid).is_ok()); + assert!(validate_owner_settings(&invalid).is_err()); + } + + #[test] + fn test_convert_to_database() -> Result<()> { + let ns = NamespaceIdent::new("my_namespace".into()); + let properties = HashMap::from([ + (COMMENT.to_string(), "my_description".to_string()), + (LOCATION.to_string(), "my_location".to_string()), + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), + ("key1".to_string(), "value1".to_string()), + ]); + + let db = convert_to_database(&ns, &properties)?; + + assert_eq!(db.name, Some(FastStr::from("my_namespace"))); + assert_eq!(db.description, Some(FastStr::from("my_description"))); + assert_eq!(db.owner_name, Some(FastStr::from("apache"))); + assert_eq!(db.owner_type, Some(PrincipalType::User)); + + if let Some(params) = db.parameters { + assert_eq!(params.get("key1"), Some(&FastStr::from("value1"))); + } + + Ok(()) + } + + #[test] + fn test_convert_to_database_with_default_user() -> Result<()> { + let ns = NamespaceIdent::new("my_namespace".into()); + let properties = HashMap::new(); + + let db = convert_to_database(&ns, &properties)?; + + assert_eq!(db.name, Some(FastStr::from("my_namespace"))); + assert_eq!(db.owner_name, Some(FastStr::from(HMS_DEFAULT_DB_OWNER))); + assert_eq!(db.owner_type, Some(PrincipalType::User)); + + Ok(()) + } + + #[test] + fn test_validate_namespace() { + let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string())); + let empty_ns = Namespace::new(NamespaceIdent::new("".to_string())); + let hierarchical_ns = Namespace::new( + NamespaceIdent::from_vec(vec!["level1".to_string(), "level2".to_string()]).unwrap(), + ); + + let valid = validate_namespace(valid_ns.name()); + let empty = validate_namespace(empty_ns.name()); + let hierarchical = validate_namespace(hierarchical_ns.name()); + + assert!(valid.is_ok()); + assert!(empty.is_err()); + assert!(hierarchical.is_err()); + } + + #[test] + fn test_format_location_uri() { + let inputs = vec!["iceberg", "is/", "/nice/", "really/nice/", "/"]; + let outputs = vec!["/iceberg", "/is", "/nice", "/really/nice", "/"]; + + inputs.into_iter().zip(outputs).for_each(|(inp, out)| { + let location = format_location_uri(inp.to_string()); + assert_eq!(location, out); + }) + } +} From 17cb90467bccf5ef0a42ba9e44e16642b3fbb703 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 9 Mar 2024 12:24:17 +0100 Subject: [PATCH 16/24] include database name in error msg --- crates/catalog/hms/src/utils.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index a8d8f64fa..b7c934ea5 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -151,7 +151,10 @@ pub fn validate_namespace(namespace: &NamespaceIdent) -> Result { if name.len() != 1 { return Err(Error::new( ErrorKind::DataInvalid, - "Invalid database, hierarchical namespaces are not supported", + format!( + "Invalid database name: {:?}, hierarchical namespaces are not supported", + namespace + ), )); } From 3aef54a94637c014b15ae895dc16a0b9d609c693 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 9 Mar 2024 12:26:19 +0100 Subject: [PATCH 17/24] add pilota to cargo workspace --- Cargo.toml | 1 + crates/catalog/hms/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 22bc8f90f..c978abc86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ murmur3 = "0.5.2" once_cell = "1" opendal = "0.45" ordered-float = "4.0.0" +pilota = "0.10.0" pretty_assertions = "1.4.0" port_scanner = "0.1.5" reqwest = { version = "^0.11", features = ["json"] } diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index ca3177ac3..475da7be6 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -34,7 +34,7 @@ async-trait = { workspace = true } hive_metastore = { workspace = true } iceberg = { workspace = true } log = { workspace = true } -pilota = "0.10.0" +pilota = { workspace = true } typed-builder = { workspace = true } volo-thrift = { workspace = true } From 1660f3bffc7abde6aebe36e6ee4a73b865b2d826 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 9 Mar 2024 12:29:44 +0100 Subject: [PATCH 18/24] add minio version --- crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml index ec65baf32..85413a8ab 100644 --- a/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml +++ b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml @@ -19,7 +19,7 @@ version: '3.8' services: minio: - image: minio/minio + image: minio/minio:RELEASE.2024-03-07T00-43-48Z expose: - 9000 - 9001 From a33132e2c031f34acc8692b5a5b23388adf6eef9 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 9 Mar 2024 16:30:47 +0100 Subject: [PATCH 19/24] change visibility to pub(crate); return namespace from conversion fn --- crates/catalog/hms/src/catalog.rs | 3 +-- crates/catalog/hms/src/utils.rs | 36 +++++++++++++++++++++---------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index de9ca57f8..57e3824ce 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -177,8 +177,7 @@ impl Catalog for HmsCatalog { .await .map_err(from_thrift_error)?; - let properties = properties_from_database(&db); - let ns = Namespace::with_properties(NamespaceIdent::new(name), properties); + let ns = convert_to_namespace(&db)?; Ok(ns) } diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index b7c934ea5..02f32c658 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -17,7 +17,7 @@ use anyhow::anyhow; use hive_metastore::{Database, PrincipalType}; -use iceberg::{Error, ErrorKind, NamespaceIdent, Result}; +use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result}; use pilota::{AHashMap, FastStr}; use std::collections::HashMap; use std::fmt::Debug; @@ -55,10 +55,20 @@ pub fn from_io_error(error: io::Error) -> Error { .with_source(error) } -/// Create and extract properties from `hive_metastore::hms::Database`. -pub fn properties_from_database(database: &Database) -> HashMap { +/// Returns a `Namespace` by extracting database name and properties +/// from `hive_metastore::hms::Database` +pub(crate) fn convert_to_namespace(database: &Database) -> Result { let mut properties = HashMap::new(); + let name = if let Some(name) = &database.name { + name.to_string() + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Database name must be specified", + )); + }; + if let Some(description) = &database.description { properties.insert(COMMENT.to_string(), description.to_string()); }; @@ -87,12 +97,15 @@ pub fn properties_from_database(database: &Database) -> HashMap }); }; - properties + Ok(Namespace::with_properties( + NamespaceIdent::new(name), + properties, + )) } /// Converts name and properties into `hive_metastore::hms::Database` /// after validating the `namespace` and `owner-settings`. -pub fn convert_to_database( +pub(crate) fn convert_to_database( namespace: &NamespaceIdent, properties: &HashMap, ) -> Result { @@ -145,7 +158,7 @@ pub fn convert_to_database( } /// Checks if provided `NamespaceIdent` is valid. -pub fn validate_namespace(namespace: &NamespaceIdent) -> Result { +pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result { let name = namespace.as_ref(); if name.len() != 1 { @@ -211,8 +224,7 @@ mod tests { use super::*; #[test] - fn test_properties_from_database() -> Result<()> { - let ns = NamespaceIdent::new("my_namespace".into()); + fn test_convert_to_namespace() -> Result<()> { let properties = HashMap::from([ (COMMENT.to_string(), "my_description".to_string()), (LOCATION.to_string(), "/my_location".to_string()), @@ -221,11 +233,13 @@ mod tests { ("key1".to_string(), "value1".to_string()), ]); - let db = convert_to_database(&ns, &properties)?; + let ident = NamespaceIdent::new("my_namespace".into()); + let db = convert_to_database(&ident, &properties)?; - let expected = properties_from_database(&db); + let expected_ns = Namespace::with_properties(ident, properties); + let result_ns = convert_to_namespace(&db)?; - assert_eq!(expected, properties); + assert_eq!(expected_ns, result_ns); Ok(()) } From 153bb6f43467ed6692a31f4c98b777d5515671a1 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 9 Mar 2024 16:35:24 +0100 Subject: [PATCH 20/24] add minio version in rest-catalog docker-compose --- .../rest/testdata/rest_catalog/docker-compose.yaml | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml index 5c101463f..27c2368b9 100644 --- a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml +++ b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml @@ -37,7 +37,7 @@ services: - 8181 minio: - image: minio/minio + image: minio/minio:RELEASE.2024-03-07T00-43-48Z environment: - MINIO_ROOT_USER=admin - MINIO_ROOT_PASSWORD=password @@ -56,10 +56,4 @@ services: - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 entrypoint: > - /bin/sh -c " - until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; - /usr/bin/mc rm -r --force minio/icebergdata; - /usr/bin/mc mb minio/icebergdata; - /usr/bin/mc policy set public minio/icebergdata; - tail -f /dev/null - " \ No newline at end of file + /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy set public minio/icebergdata; tail -f /dev/null " From 2b80a4bcdd470f650401a9d2aca8bed47441e937 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 9 Mar 2024 16:54:29 +0100 Subject: [PATCH 21/24] fix: hms test docker infrastructure --- .../hms/testdata/hms_catalog/Dockerfile | 67 +++++++----------- .../hms/testdata/hms_catalog/core-site.xml | 53 ++++++++++++++ .../testdata/hms_catalog/docker-compose.yaml | 24 ++++--- .../hms/testdata/hms_catalog/entrypoint.sh | 32 --------- .../hms/testdata/hms_catalog/hive-site.xml | 70 ------------------- crates/catalog/hms/tests/hms_catalog_test.rs | 5 +- 6 files changed, 91 insertions(+), 160 deletions(-) create mode 100644 crates/catalog/hms/testdata/hms_catalog/core-site.xml delete mode 100755 crates/catalog/hms/testdata/hms_catalog/entrypoint.sh delete mode 100644 crates/catalog/hms/testdata/hms_catalog/hive-site.xml diff --git a/crates/catalog/hms/testdata/hms_catalog/Dockerfile b/crates/catalog/hms/testdata/hms_catalog/Dockerfile index 7c1f86266..ff8c9fae6 100644 --- a/crates/catalog/hms/testdata/hms_catalog/Dockerfile +++ b/crates/catalog/hms/testdata/hms_catalog/Dockerfile @@ -1,53 +1,34 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -FROM openjdk:8u342-jre +FROM openjdk:8-jre-slim AS build -ENV HADOOP_VERSION=3.3.5 -ENV HADOOP_HOME=/opt/hadoop-${HADOOP_VERSION} -ENV PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin +RUN apt-get update -qq && apt-get -qq -y install curl -ENV HIVE_VERSION=3.1.3 -ENV HIVE_HOME=/opt/apache-hive-${HIVE_VERSION}-bin -ENV PATH=$HIVE_HOME/bin:$PATH +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 -# Set classpath for S3 Access -ENV HADOOP_CLASSPATH=${HADOOP_HOME}/share/hadoop/tools/lib/aws-java-sdk-bundle-1.12.316.jar:${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-aws-3.3.5.jar +RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar -Lo /tmp/aws-java-sdk-bundle-1.11.271.jar +RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar -WORKDIR /opt -RUN apt-get update && apt-get install -y procps fastjar +FROM apache/hive:3.1.3 -RUN wget https://downloads.apache.org/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz && \ - tar -xzf hadoop-${HADOOP_VERSION}.tar.gz && \ - rm hadoop-${HADOOP_VERSION}.tar.gz +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 -RUN wget https://downloads.apache.org/hive/hive-${HIVE_VERSION}/apache-hive-${HIVE_VERSION}-bin.tar.gz && \ - tar -xzf apache-hive-${HIVE_VERSION}-bin.tar.gz && \ - rm apache-hive-${HIVE_VERSION}-bin.tar.gz - -RUN cd ${HIVE_HOME}/lib && \ - wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar - -COPY ./hive-site.xml ${HIVE_HOME}/conf/hive-site.xml -COPY ./entrypoint.sh /entrypoint.sh - -RUN chmod +x /entrypoint.sh - -EXPOSE 9083 - -ENTRYPOINT ["sh", "-c", "/entrypoint.sh"] \ No newline at end of file +COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar /opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar +COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar /opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar +COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml \ No newline at end of file diff --git a/crates/catalog/hms/testdata/hms_catalog/core-site.xml b/crates/catalog/hms/testdata/hms_catalog/core-site.xml new file mode 100644 index 000000000..53789f0f0 --- /dev/null +++ b/crates/catalog/hms/testdata/hms_catalog/core-site.xml @@ -0,0 +1,53 @@ + + + + + + + fs.defaultFS + s3a://warehouse/hive + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3a.fast.upload + true + + + fs.s3a.endpoint + http://minio:9000 + + + fs.s3a.access.key + admin + + + fs.s3a.secret.key + password + + + fs.s3a.connection.ssl.enabled + false + + + fs.s3a.path.style.access + true + + \ No newline at end of file diff --git a/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml index 85413a8ab..c9605868b 100644 --- a/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml +++ b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml @@ -29,20 +29,22 @@ services: - MINIO_DOMAIN=minio command: [ "server", "/data", "--console-address", ":9001" ] - hive-mysql: - image: mysql:5.7 - expose: - - 3306 + mc: + depends_on: + - minio + image: minio/mc:RELEASE.2024-03-07T00-31-49Z environment: - - MYSQL_ROOT_PASSWORD=admin - - MYSQL_DATABASE=metastore - - MYSQL_USER=hive - - MYSQL_PASSWORD=hive + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f /dev/null " hive-metastore: - image: iceberg-hms + image: iceberg-hive-metastore build: ./ - depends_on: - - hive-mysql expose: - 9083 + environment: + SERVICE_NAME: "metastore" + SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/" diff --git a/crates/catalog/hms/testdata/hms_catalog/entrypoint.sh b/crates/catalog/hms/testdata/hms_catalog/entrypoint.sh deleted file mode 100755 index f73863781..000000000 --- a/crates/catalog/hms/testdata/hms_catalog/entrypoint.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/sh - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -HIVE_VERSION=3.1.3 -HIVE_HOME=/opt/apache-hive-${HIVE_VERSION}-bin - -# Check if schema exists -${HIVE_HOME}/bin/schematool -dbType mysql -info - -if [ $? -eq 1 ]; then - echo "Getting schema info failed. Probably not initialized. Initializing...in 5s" - sleep 5 - ${HIVE_HOME}/bin/schematool -initSchema -dbType mysql -fi - -${HIVE_HOME}/bin/hive --service metastore diff --git a/crates/catalog/hms/testdata/hms_catalog/hive-site.xml b/crates/catalog/hms/testdata/hms_catalog/hive-site.xml deleted file mode 100644 index c2df65cdd..000000000 --- a/crates/catalog/hms/testdata/hms_catalog/hive-site.xml +++ /dev/null @@ -1,70 +0,0 @@ - - - - - metastore.thrift.uris - thrift://localhost:9083 - Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. - - - metastore.task.threads.always - org.apache.hadoop.hive.metastore.events.EventCleanerTask - - - metastore.expression.proxy - org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy - - - javax.jdo.option.ConnectionDriverName - com.mysql.cj.jdbc.Driver - - - javax.jdo.option.ConnectionURL - jdbc:mysql://hive-mysql:3306/metastore - - - javax.jdo.option.ConnectionUserName - hive - - - javax.jdo.option.ConnectionPassword - hive - - - fs.s3a.impl - org.apache.hadoop.fs.s3a.S3AFileSystem - - - fs.s3a.access.key - admin - - - fs.s3a.secret.key - password - - - fs.s3a.endpoint - http://minio:9000 - - - fs.s3a.path.style.access - true - - diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 5628c094a..bab83a955 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -122,10 +122,7 @@ async fn test_get_namespace() -> Result<()> { let ns = Namespace::new(NamespaceIdent::new("default".into())); let properties = HashMap::from([ - ( - "location".to_string(), - "file:/user/hive/warehouse".to_string(), - ), + ("location".to_string(), "s3a://warehouse/hive".to_string()), ( "hive.metastore.database.owner-type".to_string(), "Role".to_string(), From bde3c985e4eedd2ee677cee9d8ed3a850cb2a4ee Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 9 Mar 2024 16:54:39 +0100 Subject: [PATCH 22/24] add version to minio/mc --- crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml index 27c2368b9..0152a22ca 100644 --- a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml +++ b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml @@ -50,7 +50,7 @@ services: mc: depends_on: - minio - image: minio/mc + image: minio/mc:RELEASE.2024-03-07T00-31-49Z environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password From 8375c750efef19d5986bc2e33a9d28c5ebbb693c Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 9 Mar 2024 16:59:57 +0100 Subject: [PATCH 23/24] fix: license header --- crates/catalog/hms/testdata/hms_catalog/core-site.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/catalog/hms/testdata/hms_catalog/core-site.xml b/crates/catalog/hms/testdata/hms_catalog/core-site.xml index 53789f0f0..12220704e 100644 --- a/crates/catalog/hms/testdata/hms_catalog/core-site.xml +++ b/crates/catalog/hms/testdata/hms_catalog/core-site.xml @@ -1,5 +1,3 @@ - - + + fs.defaultFS From 7dc7adbc4095dcee9ceab9081678e4aa98c8a15c Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 9 Mar 2024 18:56:21 +0100 Subject: [PATCH 24/24] fix: core-site --- crates/catalog/hms/testdata/hms_catalog/core-site.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/catalog/hms/testdata/hms_catalog/core-site.xml b/crates/catalog/hms/testdata/hms_catalog/core-site.xml index 12220704e..f0583a0bc 100644 --- a/crates/catalog/hms/testdata/hms_catalog/core-site.xml +++ b/crates/catalog/hms/testdata/hms_catalog/core-site.xml @@ -15,8 +15,6 @@ limitations under the License. --> - - fs.defaultFS