diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 1d73f83b8..5db5a9d81 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -17,8 +17,8 @@ [advisories] ignore = [ - # rsa - # Marvin Attack: potential key recovery through timing sidechannels - # Issues: https://github.com/apache/iceberg-rust/issues/221 - "RUSTSEC-2023-0071", + # rsa + # Marvin Attack: potential key recovery through timing sidechannels + # Issues: https://github.com/apache/iceberg-rust/issues/221 + "RUSTSEC-2023-0071", ] diff --git a/Cargo.toml b/Cargo.toml index 3234bd07a..c482859cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,12 @@ [workspace] resolver = "2" -members = ["crates/catalog/*", "crates/examples", "crates/iceberg", "crates/test_utils"] +members = [ + "crates/catalog/*", + "crates/examples", + "crates/iceberg", + "crates/test_utils", +] [workspace.package] version = "0.2.0" @@ -55,6 +60,7 @@ once_cell = "1" opendal = "0.45" ordered-float = "4.0.0" parquet = "50" +pilota = "0.10.0" pretty_assertions = "1.4.0" port_scanner = "0.1.5" reqwest = { version = "^0.11", features = ["json"] } diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index f44125c5a..475da7be6 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -33,5 +33,12 @@ anyhow = { workspace = true } async-trait = { workspace = true } hive_metastore = { workspace = true } iceberg = { workspace = true } +log = { workspace = true } +pilota = { workspace = true } typed-builder = { workspace = true } volo-thrift = { workspace = true } + +[dev-dependencies] +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +port_scanner = { workspace = true } +tokio = { workspace = true } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index a00aaf337..57e3824ce 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -19,6 +19,7 @@ use super::utils::*; use async_trait::async_trait; use hive_metastore::ThriftHiveMetastoreClient; use hive_metastore::ThriftHiveMetastoreClientBuilder; +use hive_metastore::ThriftHiveMetastoreGetDatabaseException; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, @@ -28,6 +29,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::net::ToSocketAddrs; use typed_builder::TypedBuilder; +use volo_thrift::ResponseError; /// Which variant of the thrift transport to communicate with HMS /// See: @@ -97,7 +99,6 @@ impl HmsCatalog { } } -/// Refer to for implementation details. #[async_trait] impl Catalog for HmsCatalog { /// HMS doesn't support nested namespaces. @@ -125,36 +126,165 @@ impl Catalog for HmsCatalog { .collect()) } + /// Creates a new namespace with the given identifier and properties. + /// + /// Attempts to create a namespace defined by the `namespace` + /// parameter and configured with the specified `properties`. + /// + /// This function can return an error in the following situations: + /// + /// - If `hive.metastore.database.owner-type` is specified without + /// `hive.metastore.database.owner`, + /// - Errors from `validate_namespace` if the namespace identifier does not + /// meet validation criteria. + /// - Errors from `convert_to_database` if the properties cannot be + /// successfully converted into a database configuration. + /// - Errors from the underlying database creation process, converted using + /// `from_thrift_error`. async fn create_namespace( &self, - _namespace: &NamespaceIdent, - _properties: HashMap, + namespace: &NamespaceIdent, + properties: HashMap, ) -> Result { - todo!() + let database = convert_to_database(namespace, &properties)?; + + self.client + .0 + .create_database(database) + .await + .map_err(from_thrift_error)?; + + Ok(Namespace::new(namespace.clone())) } - async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result { - todo!() + /// Retrieves a namespace by its identifier. + /// + /// Validates the given namespace identifier and then queries the + /// underlying database client to fetch the corresponding namespace data. + /// Constructs a `Namespace` object with the retrieved data and returns it. + /// + /// This function can return an error in any of the following situations: + /// - If the provided namespace identifier fails validation checks + /// - If there is an error querying the database, returned by + /// `from_thrift_error`. + async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { + let name = validate_namespace(namespace)?; + + let db = self + .client + .0 + .get_database(name.clone().into()) + .await + .map_err(from_thrift_error)?; + + let ns = convert_to_namespace(&db)?; + + Ok(ns) } - async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result { - todo!() + /// Checks if a namespace exists within the Hive Metastore. + /// + /// Validates the namespace identifier by querying the Hive Metastore + /// to determine if the specified namespace (database) exists. + /// + /// # Returns + /// A `Result` indicating the outcome of the check: + /// - `Ok(true)` if the namespace exists. + /// - `Ok(false)` if the namespace does not exist, identified by a specific + /// `UserException` variant. + /// - `Err(...)` if an error occurs during validation or the Hive Metastore + /// query, with the error encapsulating the issue. + async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { + let name = validate_namespace(namespace)?; + + let resp = self.client.0.get_database(name.clone().into()).await; + + match resp { + Ok(_) => Ok(true), + Err(err) => { + if let ResponseError::UserException(ThriftHiveMetastoreGetDatabaseException::O1( + _, + )) = &err + { + Ok(false) + } else { + Err(from_thrift_error(err)) + } + } + } } + /// Asynchronously updates properties of an existing namespace. + /// + /// Converts the given namespace identifier and properties into a database + /// representation and then attempts to update the corresponding namespace + /// in the Hive Metastore. + /// + /// # Returns + /// Returns `Ok(())` if the namespace update is successful. If the + /// namespace cannot be updated due to missing information or an error + /// during the update process, an `Err(...)` is returned. async fn update_namespace( &self, - _namespace: &NamespaceIdent, - _properties: HashMap, + namespace: &NamespaceIdent, + properties: HashMap, ) -> Result<()> { - todo!() + let db = convert_to_database(namespace, &properties)?; + + let name = match &db.name { + Some(name) => name, + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + "Database name must be specified", + )) + } + }; + + self.client + .0 + .alter_database(name.clone(), db) + .await + .map_err(from_thrift_error)?; + + Ok(()) } - async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { - todo!() + /// Asynchronously drops a namespace from the Hive Metastore. + /// + /// # Returns + /// A `Result<()>` indicating the outcome: + /// - `Ok(())` signifies successful namespace deletion. + /// - `Err(...)` signifies failure to drop the namespace due to validation + /// errors, connectivity issues, or Hive Metastore constraints. + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + let name = validate_namespace(namespace)?; + + self.client + .0 + .drop_database(name.into(), false, false) + .await + .map_err(from_thrift_error)?; + + Ok(()) } - async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result> { - todo!() + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { + let name = validate_namespace(namespace)?; + + let tables = self + .client + .0 + .get_all_tables(name.clone().into()) + .await + .map_err(from_thrift_error)?; + + let tables = tables + .iter() + .map(|table| TableIdent::new(namespace.clone(), table.to_string())) + .collect(); + + Ok(tables) } async fn create_table( diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 1b0ff33df..02f32c658 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -16,10 +16,24 @@ // under the License. use anyhow::anyhow; -use iceberg::{Error, ErrorKind}; +use hive_metastore::{Database, PrincipalType}; +use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result}; +use pilota::{AHashMap, FastStr}; +use std::collections::HashMap; use std::fmt::Debug; use std::io; +/// hive.metastore.database.owner setting +pub const HMS_DB_OWNER: &str = "hive.metastore.database.owner"; +/// hive.metastore.database.owner default setting +pub const HMS_DEFAULT_DB_OWNER: &str = "user.name"; +/// hive.metastore.database.owner-type setting +pub const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type"; +/// hive metatore `description` property +pub const COMMENT: &str = "comment"; +/// hive metatore `location` property +pub const LOCATION: &str = "location"; + /// Format a thrift error into iceberg error. pub fn from_thrift_error(error: volo_thrift::error::ResponseError) -> Error where @@ -40,3 +54,272 @@ pub fn from_io_error(error: io::Error) -> Error { ) .with_source(error) } + +/// Returns a `Namespace` by extracting database name and properties +/// from `hive_metastore::hms::Database` +pub(crate) fn convert_to_namespace(database: &Database) -> Result { + let mut properties = HashMap::new(); + + let name = if let Some(name) = &database.name { + name.to_string() + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Database name must be specified", + )); + }; + + if let Some(description) = &database.description { + properties.insert(COMMENT.to_string(), description.to_string()); + }; + + if let Some(location) = &database.location_uri { + properties.insert(LOCATION.to_string(), location.to_string()); + }; + + if let Some(owner) = &database.owner_name { + properties.insert(HMS_DB_OWNER.to_string(), owner.to_string()); + }; + + if let Some(owner_type) = &database.owner_type { + let value = match owner_type { + PrincipalType::User => "User", + PrincipalType::Group => "Group", + PrincipalType::Role => "Role", + }; + + properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string()); + }; + + if let Some(params) = &database.parameters { + params.iter().for_each(|(k, v)| { + properties.insert(k.clone().into(), v.clone().into()); + }); + }; + + Ok(Namespace::with_properties( + NamespaceIdent::new(name), + properties, + )) +} + +/// Converts name and properties into `hive_metastore::hms::Database` +/// after validating the `namespace` and `owner-settings`. +pub(crate) fn convert_to_database( + namespace: &NamespaceIdent, + properties: &HashMap, +) -> Result { + let name = validate_namespace(namespace)?; + validate_owner_settings(properties)?; + + let mut db = Database::default(); + let mut parameters = AHashMap::new(); + + db.name = Some(name.into()); + + for (k, v) in properties { + match k.as_str() { + COMMENT => db.description = Some(v.clone().into()), + LOCATION => db.location_uri = Some(format_location_uri(v.clone()).into()), + HMS_DB_OWNER => db.owner_name = Some(v.clone().into()), + HMS_DB_OWNER_TYPE => { + let owner_type = match v.to_lowercase().as_str() { + "user" => PrincipalType::User, + "group" => PrincipalType::Group, + "role" => PrincipalType::Role, + _ => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid value for setting 'owner_type': {}", v), + )) + } + }; + db.owner_type = Some(owner_type); + } + _ => { + parameters.insert( + FastStr::from_string(k.clone()), + FastStr::from_string(v.clone()), + ); + } + } + } + + db.parameters = Some(parameters); + + // Set default owner, if none provided + // https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44 + if db.owner_name.is_none() { + db.owner_name = Some(HMS_DEFAULT_DB_OWNER.into()); + db.owner_type = Some(PrincipalType::User); + } + + Ok(db) +} + +/// Checks if provided `NamespaceIdent` is valid. +pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result { + let name = namespace.as_ref(); + + if name.len() != 1 { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid database name: {:?}, hierarchical namespaces are not supported", + namespace + ), + )); + } + + let name = name[0].clone(); + + if name.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Invalid database, provided namespace is empty.", + )); + } + + Ok(name) +} + +/// Formats location_uri by e.g. removing trailing slashes. +fn format_location_uri(location: String) -> String { + let mut location = location; + + if !location.starts_with('/') { + location = format!("/{}", location); + } + + if location.ends_with('/') && location.len() > 1 { + location.pop(); + } + + location +} + +/// Checks if `owner-settings` are valid. +/// If `owner_type` is set, then `owner` must also be set. +fn validate_owner_settings(properties: &HashMap) -> Result<()> { + let owner_is_set = properties.get(HMS_DB_OWNER).is_some(); + let owner_type_is_set = properties.get(HMS_DB_OWNER_TYPE).is_some(); + + if owner_type_is_set && !owner_is_set { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Setting '{}' without setting '{}' is not allowed", + HMS_DB_OWNER_TYPE, HMS_DB_OWNER + ), + )); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use iceberg::{Namespace, NamespaceIdent}; + + use super::*; + + #[test] + fn test_convert_to_namespace() -> Result<()> { + let properties = HashMap::from([ + (COMMENT.to_string(), "my_description".to_string()), + (LOCATION.to_string(), "/my_location".to_string()), + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "User".to_string()), + ("key1".to_string(), "value1".to_string()), + ]); + + let ident = NamespaceIdent::new("my_namespace".into()); + let db = convert_to_database(&ident, &properties)?; + + let expected_ns = Namespace::with_properties(ident, properties); + let result_ns = convert_to_namespace(&db)?; + + assert_eq!(expected_ns, result_ns); + + Ok(()) + } + + #[test] + fn test_validate_owner_settings() { + let valid = HashMap::from([ + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), + ]); + let invalid = HashMap::from([(HMS_DB_OWNER_TYPE.to_string(), "user".to_string())]); + + assert!(validate_owner_settings(&valid).is_ok()); + assert!(validate_owner_settings(&invalid).is_err()); + } + + #[test] + fn test_convert_to_database() -> Result<()> { + let ns = NamespaceIdent::new("my_namespace".into()); + let properties = HashMap::from([ + (COMMENT.to_string(), "my_description".to_string()), + (LOCATION.to_string(), "my_location".to_string()), + (HMS_DB_OWNER.to_string(), "apache".to_string()), + (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()), + ("key1".to_string(), "value1".to_string()), + ]); + + let db = convert_to_database(&ns, &properties)?; + + assert_eq!(db.name, Some(FastStr::from("my_namespace"))); + assert_eq!(db.description, Some(FastStr::from("my_description"))); + assert_eq!(db.owner_name, Some(FastStr::from("apache"))); + assert_eq!(db.owner_type, Some(PrincipalType::User)); + + if let Some(params) = db.parameters { + assert_eq!(params.get("key1"), Some(&FastStr::from("value1"))); + } + + Ok(()) + } + + #[test] + fn test_convert_to_database_with_default_user() -> Result<()> { + let ns = NamespaceIdent::new("my_namespace".into()); + let properties = HashMap::new(); + + let db = convert_to_database(&ns, &properties)?; + + assert_eq!(db.name, Some(FastStr::from("my_namespace"))); + assert_eq!(db.owner_name, Some(FastStr::from(HMS_DEFAULT_DB_OWNER))); + assert_eq!(db.owner_type, Some(PrincipalType::User)); + + Ok(()) + } + + #[test] + fn test_validate_namespace() { + let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string())); + let empty_ns = Namespace::new(NamespaceIdent::new("".to_string())); + let hierarchical_ns = Namespace::new( + NamespaceIdent::from_vec(vec!["level1".to_string(), "level2".to_string()]).unwrap(), + ); + + let valid = validate_namespace(valid_ns.name()); + let empty = validate_namespace(empty_ns.name()); + let hierarchical = validate_namespace(hierarchical_ns.name()); + + assert!(valid.is_ok()); + assert!(empty.is_err()); + assert!(hierarchical.is_err()); + } + + #[test] + fn test_format_location_uri() { + let inputs = vec!["iceberg", "is/", "/nice/", "really/nice/", "/"]; + let outputs = vec!["/iceberg", "/is", "/nice", "/really/nice", "/"]; + + inputs.into_iter().zip(outputs).for_each(|(inp, out)| { + let location = format_location_uri(inp.to_string()); + assert_eq!(location, out); + }) + } +} diff --git a/crates/catalog/hms/testdata/hms_catalog/Dockerfile b/crates/catalog/hms/testdata/hms_catalog/Dockerfile new file mode 100644 index 000000000..ff8c9fae6 --- /dev/null +++ b/crates/catalog/hms/testdata/hms_catalog/Dockerfile @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM openjdk:8-jre-slim AS build + +RUN apt-get update -qq && apt-get -qq -y install curl + +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 + +RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar -Lo /tmp/aws-java-sdk-bundle-1.11.271.jar +RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar + + +FROM apache/hive:3.1.3 + +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 + +COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar /opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar +COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar /opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar +COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml \ No newline at end of file diff --git a/crates/catalog/hms/testdata/hms_catalog/core-site.xml b/crates/catalog/hms/testdata/hms_catalog/core-site.xml new file mode 100644 index 000000000..f0583a0bc --- /dev/null +++ b/crates/catalog/hms/testdata/hms_catalog/core-site.xml @@ -0,0 +1,51 @@ + + + + + fs.defaultFS + s3a://warehouse/hive + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3a.fast.upload + true + + + fs.s3a.endpoint + http://minio:9000 + + + fs.s3a.access.key + admin + + + fs.s3a.secret.key + password + + + fs.s3a.connection.ssl.enabled + false + + + fs.s3a.path.style.access + true + + \ No newline at end of file diff --git a/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml new file mode 100644 index 000000000..c9605868b --- /dev/null +++ b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3.8' + +services: + minio: + image: minio/minio:RELEASE.2024-03-07T00-43-48Z + expose: + - 9000 + - 9001 + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + command: [ "server", "/data", "--console-address", ":9001" ] + + mc: + depends_on: + - minio + image: minio/mc:RELEASE.2024-03-07T00-31-49Z + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f /dev/null " + + hive-metastore: + image: iceberg-hive-metastore + build: ./ + expose: + - 9083 + environment: + SERVICE_NAME: "metastore" + SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/" diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs new file mode 100644 index 000000000..bab83a955 --- /dev/null +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -0,0 +1,223 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for hms catalog. + +use std::collections::HashMap; + +use iceberg::{Catalog, Namespace, NamespaceIdent}; +use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport}; +use iceberg_test_utils::docker::DockerCompose; +use iceberg_test_utils::{normalize_test_name, set_up}; +use port_scanner::scan_port_addr; +use tokio::time::sleep; + +const HMS_CATALOG_PORT: u16 = 9083; +type Result = std::result::Result; + +struct TestFixture { + _docker_compose: DockerCompose, + hms_catalog: HmsCatalog, +} + +async fn set_test_fixture(func: &str) -> TestFixture { + set_up(); + + let docker_compose = DockerCompose::new( + normalize_test_name(format!("{}_{func}", module_path!())), + format!("{}/testdata/hms_catalog", env!("CARGO_MANIFEST_DIR")), + ); + + docker_compose.run(); + + let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore"); + + let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT); + loop { + if !scan_port_addr(&read_port) { + log::info!("Waiting for 1s hms catalog to ready..."); + sleep(std::time::Duration::from_millis(1000)).await; + } else { + break; + } + } + + let config = HmsCatalogConfig::builder() + .address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT)) + .thrift_transport(HmsThriftTransport::Buffered) + .build(); + + let hms_catalog = HmsCatalog::new(config).unwrap(); + + TestFixture { + _docker_compose: docker_compose, + hms_catalog, + } +} + +#[tokio::test] +async fn test_list_namespace() -> Result<()> { + let fixture = set_test_fixture("test_list_namespace").await; + + let expected_no_parent = vec![NamespaceIdent::new("default".into())]; + let result_no_parent = fixture.hms_catalog.list_namespaces(None).await?; + + let result_with_parent = fixture + .hms_catalog + .list_namespaces(Some(&NamespaceIdent::new("parent".into()))) + .await?; + + assert_eq!(expected_no_parent, result_no_parent); + assert!(result_with_parent.is_empty()); + + Ok(()) +} + +#[tokio::test] +async fn test_create_namespace() -> Result<()> { + let fixture = set_test_fixture("test_create_namespace").await; + + let ns = Namespace::new(NamespaceIdent::new("my_namespace".into())); + let properties = HashMap::from([ + ("comment".to_string(), "my_description".to_string()), + ("location".to_string(), "my_location".to_string()), + ( + "hive.metastore.database.owner".to_string(), + "apache".to_string(), + ), + ( + "hive.metastore.database.owner-type".to_string(), + "user".to_string(), + ), + ("key1".to_string(), "value1".to_string()), + ]); + + let result = fixture + .hms_catalog + .create_namespace(ns.name(), properties) + .await?; + + assert_eq!(result, ns); + + Ok(()) +} + +#[tokio::test] +async fn test_get_namespace() -> Result<()> { + let fixture = set_test_fixture("test_get_namespace").await; + + let ns = Namespace::new(NamespaceIdent::new("default".into())); + let properties = HashMap::from([ + ("location".to_string(), "s3a://warehouse/hive".to_string()), + ( + "hive.metastore.database.owner-type".to_string(), + "Role".to_string(), + ), + ("comment".to_string(), "Default Hive database".to_string()), + ( + "hive.metastore.database.owner".to_string(), + "public".to_string(), + ), + ]); + + let expected = Namespace::with_properties(NamespaceIdent::new("default".into()), properties); + + let result = fixture.hms_catalog.get_namespace(ns.name()).await?; + + assert_eq!(expected, result); + + Ok(()) +} + +#[tokio::test] +async fn test_namespace_exists() -> Result<()> { + let fixture = set_test_fixture("test_namespace_exists").await; + + let ns_exists = Namespace::new(NamespaceIdent::new("default".into())); + let ns_not_exists = Namespace::new(NamespaceIdent::new("not_here".into())); + + let result_exists = fixture + .hms_catalog + .namespace_exists(ns_exists.name()) + .await?; + let result_not_exists = fixture + .hms_catalog + .namespace_exists(ns_not_exists.name()) + .await?; + + assert!(result_exists); + assert!(!result_not_exists); + + Ok(()) +} + +#[tokio::test] +async fn test_update_namespace() -> Result<()> { + let fixture = set_test_fixture("test_update_namespace").await; + + let ns = Namespace::new(NamespaceIdent::new("default".into())); + let properties = HashMap::from([("comment".to_string(), "my_update".to_string())]); + + fixture + .hms_catalog + .update_namespace(ns.name(), properties) + .await?; + + let db = fixture.hms_catalog.get_namespace(ns.name()).await?; + + assert_eq!( + db.properties().get("comment"), + Some(&"my_update".to_string()) + ); + + Ok(()) +} + +#[tokio::test] +async fn test_drop_namespace() -> Result<()> { + let fixture = set_test_fixture("test_drop_namespace").await; + + let ns = Namespace::new(NamespaceIdent::new("delete_me".into())); + + fixture + .hms_catalog + .create_namespace(ns.name(), HashMap::new()) + .await?; + + let result = fixture.hms_catalog.namespace_exists(ns.name()).await?; + assert!(result); + + fixture.hms_catalog.drop_namespace(ns.name()).await?; + + let result = fixture.hms_catalog.namespace_exists(ns.name()).await?; + assert!(!result); + + Ok(()) +} + +#[tokio::test] +async fn test_list_tables() -> Result<()> { + let fixture = set_test_fixture("test_list_tables").await; + + let ns = Namespace::new(NamespaceIdent::new("default".into())); + + let result = fixture.hms_catalog.list_tables(ns.name()).await?; + + assert_eq!(result, vec![]); + + Ok(()) +} diff --git a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml index 5c101463f..0152a22ca 100644 --- a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml +++ b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml @@ -37,7 +37,7 @@ services: - 8181 minio: - image: minio/minio + image: minio/minio:RELEASE.2024-03-07T00-43-48Z environment: - MINIO_ROOT_USER=admin - MINIO_ROOT_PASSWORD=password @@ -50,16 +50,10 @@ services: mc: depends_on: - minio - image: minio/mc + image: minio/mc:RELEASE.2024-03-07T00-31-49Z environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 entrypoint: > - /bin/sh -c " - until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; - /usr/bin/mc rm -r --force minio/icebergdata; - /usr/bin/mc mb minio/icebergdata; - /usr/bin/mc policy set public minio/icebergdata; - tail -f /dev/null - " \ No newline at end of file + /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy set public minio/icebergdata; tail -f /dev/null " diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index bd579d598..708e6bf3c 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -50,7 +50,7 @@ pub trait Catalog: Debug + Sync + Send { async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result; /// Check if namespace exists in catalog. - async fn namespace_exists(&self, namesace: &NamespaceIdent) -> Result; + async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result; /// Update a namespace inside the catalog. /// diff --git a/deny.toml b/deny.toml index e32367948..1d30b5f3f 100644 --- a/deny.toml +++ b/deny.toml @@ -19,17 +19,23 @@ unlicensed = "deny" copyleft = "deny" allow = [ - "Apache-2.0", - "Apache-2.0 WITH LLVM-exception", - "MIT", - "BSD-3-Clause", - "ISC", - "CC0-1.0", + "Apache-2.0", + "Apache-2.0 WITH LLVM-exception", + "MIT", + "BSD-3-Clause", + "ISC", + "CC0-1.0", ] exceptions = [ - { allow = ["OpenSSL"], name = "ring" }, - { allow = ["Unicode-DFS-2016"], name = "unicode-ident" }, - { allow = ["Zlib"], name = "adler32" } + { allow = [ + "OpenSSL", + ], name = "ring" }, + { allow = [ + "Unicode-DFS-2016", + ], name = "unicode-ident" }, + { allow = [ + "Zlib", + ], name = "adler32" }, ] [[licenses.clarify]] @@ -42,4 +48,4 @@ name = "ring" # compiled into non-test libraries, is included below." # OpenSSL - Obviously expression = "ISC AND MIT AND OpenSSL" -license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }] \ No newline at end of file +license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }]