Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hive metastore catalog support (part 2/2) #285

Merged
merged 71 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
8723ed7
fmt members
mlanhenke Mar 6, 2024
342b686
setup basic test-infra for hms-catalog
mlanhenke Mar 6, 2024
d8ccce1
add license
mlanhenke Mar 7, 2024
910d848
add hms create_namespace
mlanhenke Mar 7, 2024
315ddde
add hms get_namespace
mlanhenke Mar 8, 2024
05886bd
fix: typo
mlanhenke Mar 8, 2024
ada8193
add hms namespace_exists and drop_namespace
mlanhenke Mar 8, 2024
00c720b
add hms update_namespace
mlanhenke Mar 8, 2024
1b98f88
move fns into HmsCatalog
mlanhenke Mar 8, 2024
f82fa5d
use `expose` in docker-compose
mlanhenke Mar 8, 2024
10b53ba
add hms list_tables
mlanhenke Mar 8, 2024
b35916c
fix: clippy
mlanhenke Mar 8, 2024
cfcbc17
fix: cargo sort
mlanhenke Mar 8, 2024
2e320fe
fix: cargo workspace
marvinlanhenke Mar 9, 2024
ae03c85
move fns into utils + add constants
marvinlanhenke Mar 9, 2024
17cb904
include database name in error msg
marvinlanhenke Mar 9, 2024
3aef54a
add pilota to cargo workspace
marvinlanhenke Mar 9, 2024
1660f3b
add minio version
marvinlanhenke Mar 9, 2024
0716baf
Merge branch 'main' into hms
marvinlanhenke Mar 9, 2024
a33132e
change visibility to pub(crate); return namespace from conversion fn
marvinlanhenke Mar 9, 2024
153bb6f
add minio version in rest-catalog docker-compose
marvinlanhenke Mar 9, 2024
2b80a4b
fix: hms test docker infrastructure
marvinlanhenke Mar 9, 2024
bde3c98
add version to minio/mc
marvinlanhenke Mar 9, 2024
8375c75
fix: license header
marvinlanhenke Mar 9, 2024
7dc7adb
fix: core-site
marvinlanhenke Mar 9, 2024
5d119c5
Merge branch 'main' into hms
marvinlanhenke Mar 10, 2024
fde1138
split utils and errors
marvinlanhenke Mar 10, 2024
666f856
add fn get_default_table_location
marvinlanhenke Mar 11, 2024
71c447a
add fn get_metadata_location
marvinlanhenke Mar 11, 2024
97d71cc
add docs
marvinlanhenke Mar 11, 2024
c64ffa6
add HiveSchemaBuilder
marvinlanhenke Mar 12, 2024
1da7f95
add schema to HiveSchemaBuilder
marvinlanhenke Mar 12, 2024
f99ed2b
add convert_to_hive_table
marvinlanhenke Mar 12, 2024
1db7021
cargo sort
marvinlanhenke Mar 12, 2024
987704e
implement table_ops without TableMetadataBuilder
marvinlanhenke Mar 12, 2024
aa98865
refactor: HiveSchema fn from_iceberg
marvinlanhenke Mar 13, 2024
1d1d577
prepare table creation without metadata
marvinlanhenke Mar 13, 2024
7e6cce4
Merge branch 'main' into hms
marvinlanhenke Mar 13, 2024
6cd543d
Merge branch 'main' into hms
marvinlanhenke Mar 14, 2024
0d0ce47
simplify HiveSchemaBuilder
marvinlanhenke Mar 14, 2024
376528e
refactor: use ok_or_else()
marvinlanhenke Mar 14, 2024
87daa00
simplify HiveSchemaBuilder
marvinlanhenke Mar 14, 2024
af193e6
fix visibility of consts
marvinlanhenke Mar 14, 2024
3645311
Merge branch 'main' into hms
marvinlanhenke Mar 15, 2024
7658e77
change serde metadata v2
marvinlanhenke Mar 15, 2024
63f0b22
change default partition_specs and sort_orders
marvinlanhenke Mar 15, 2024
1a11ead
change test
marvinlanhenke Mar 15, 2024
565e9d2
Merge branch 'metadata_serde' into hms
marvinlanhenke Mar 15, 2024
a5954b9
add create table with metadata
marvinlanhenke Mar 16, 2024
3034dca
use FileIO::from_path
marvinlanhenke Mar 16, 2024
4c7d91a
add test_load_table
marvinlanhenke Mar 16, 2024
e1407d5
small fixes + docs
marvinlanhenke Mar 16, 2024
6a4d6c8
rename
marvinlanhenke Mar 16, 2024
747cc5b
extract get_metadata_location from hive_table
marvinlanhenke Mar 17, 2024
07dd4d9
add integration tests
marvinlanhenke Mar 17, 2024
4fdfa33
fix: clippy
marvinlanhenke Mar 17, 2024
2ce1a6a
remove whitespace
marvinlanhenke Mar 17, 2024
c04f3cf
Merge branch 'main' into hms
marvinlanhenke Mar 19, 2024
eaef1ef
fix: fixture names
marvinlanhenke Mar 19, 2024
f814906
remove builder-prefix `with`
marvinlanhenke Mar 20, 2024
dc054a9
capitalize error msg
marvinlanhenke Mar 20, 2024
9557a1c
remove trait bound `Display`
marvinlanhenke Mar 20, 2024
3cbb222
add const `OWNER`
marvinlanhenke Mar 20, 2024
af91d19
fix: default warehouse location
marvinlanhenke Mar 20, 2024
e8a7bdf
add test-case `list_tables`
marvinlanhenke Mar 20, 2024
a2fe37a
add all primitives to test_schema
marvinlanhenke Mar 20, 2024
2bb5e38
exclude `Timestamptz` from hive conversion
marvinlanhenke Mar 20, 2024
8c519eb
remove Self::T from schema
marvinlanhenke Mar 20, 2024
c765268
remove context
marvinlanhenke Mar 20, 2024
98a2f27
keep file_io in HmsCatalog
marvinlanhenke Mar 20, 2024
5bcbba8
use json schema repr
marvinlanhenke Mar 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/catalog/hms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ keywords = ["iceberg", "hive", "catalog"]
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
hive_metastore = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
pilota = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
typed-builder = { workspace = true }
uuid = { workspace = true }
volo-thrift = { workspace = true }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
tokio = { workspace = true }
222 changes: 207 additions & 15 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use crate::error::from_io_error;
use crate::error::from_thrift_error;

use super::utils::*;
use async_trait::async_trait;
use hive_metastore::ThriftHiveMetastoreClient;
use hive_metastore::ThriftHiveMetastoreClientBuilder;
use hive_metastore::ThriftHiveMetastoreGetDatabaseException;
use hive_metastore::ThriftHiveMetastoreGetTableException;
use iceberg::io::FileIO;
use iceberg::spec::TableMetadata;
use iceberg::spec::TableMetadataBuilder;
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
Expand All @@ -28,6 +35,8 @@ use iceberg::{
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::ToSocketAddrs;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use typed_builder::TypedBuilder;
use volo_thrift::ResponseError;

Expand All @@ -47,6 +56,8 @@ pub enum HmsThriftTransport {
pub struct HmsCatalogConfig {
address: String,
thrift_transport: HmsThriftTransport,
#[builder(default, setter(prefix = "with_"))]
liurenjie1024 marked this conversation as resolved.
Show resolved Hide resolved
props: HashMap<String, String>,
}

struct HmsClient(ThriftHiveMetastoreClient);
Expand Down Expand Up @@ -173,7 +184,7 @@ impl Catalog for HmsCatalog {
let db = self
.client
.0
.get_database(name.clone().into())
.get_database(name.into())
.await
.map_err(from_thrift_error)?;

Expand All @@ -197,7 +208,7 @@ impl Catalog for HmsCatalog {
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
let name = validate_namespace(namespace)?;

let resp = self.client.0.get_database(name.clone().into()).await;
let resp = self.client.0.get_database(name.into()).await;

match resp {
Ok(_) => Ok(true),
Expand Down Expand Up @@ -269,13 +280,22 @@ impl Catalog for HmsCatalog {
Ok(())
}

/// Asynchronously lists all tables within a specified namespace.
///
/// # Returns
///
/// A `Result<Vec<TableIdent>>`, which is:
/// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
/// representing a table within the specified namespace.
/// - `Err(...)` if an error occurs during namespace validation or while
/// querying the database.
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
let name = validate_namespace(namespace)?;

let tables = self
.client
.0
.get_all_tables(name.clone().into())
.get_all_tables(name.into())
.await
.map_err(from_thrift_error)?;

Expand All @@ -287,31 +307,203 @@ impl Catalog for HmsCatalog {
Ok(tables)
}

/// Creates a new table within a specified namespace using the provided
/// table creation settings.
///
/// # Returns
/// A `Result` wrapping a `Table` object representing the newly created
/// table.
///
/// # Errors
/// This function may return an error in several cases, including invalid
/// namespace identifiers, failure to determine a default storage location,
/// issues generating or writing table metadata, and errors communicating
/// with the Hive Metastore.
async fn create_table(
&self,
_namespace: &NamespaceIdent,
_creation: TableCreation,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
todo!()
let db_name = validate_namespace(namespace)?;
let table_name = creation.name.clone();

let location = match &creation.location {
Some(location) => location.clone(),
None => {
let ns = self.get_namespace(namespace).await?;
get_default_table_location(&ns, &table_name)?
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata_location = create_metadata_location(&location, 0)?;

let file_io = FileIO::from_path(&metadata_location)?
.with_props(&self.config.props)
.build()?;
liurenjie1024 marked this conversation as resolved.
Show resolved Hide resolved
let mut file = file_io.new_output(&metadata_location)?.writer().await?;
file.write_all(&serde_json::to_vec(&metadata)?).await?;
file.shutdown().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it's correct, I prefer to use flush, as we can keep underlying connection? cc @Xuanwo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liurenjie1024
I had troubles using flush() since no actual file was written - only when shutdown is used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's weird, cc @Xuanwo any ideas?


let hive_table = convert_to_hive_table(
db_name.clone(),
metadata.current_schema(),
table_name.clone(),
location,
metadata_location.clone(),
metadata.properties(),
)?;

self.client
.0
.create_table(hive_table)
.await
.map_err(from_thrift_error)?;

let table = Table::builder()
.file_io(file_io)
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
.build();

Ok(table)
}

async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
todo!()
/// Loads a table from the Hive Metastore and constructs a `Table` object
/// based on its metadata.
///
/// # Returns
/// A `Result` wrapping a `Table` object that represents the loaded table.
///
/// # Errors
/// This function may return an error in several scenarios, including:
/// - Failure to validate the namespace.
/// - Failure to retrieve the table from the Hive Metastore.
/// - Absence of metadata location information in the table's properties.
/// - Issues reading or deserializing the table's metadata file.
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
let db_name = validate_namespace(table.namespace())?;

let hive_table = self
.client
.0
.get_table(db_name.clone().into(), table.name.clone().into())
.await
.map_err(from_thrift_error)?;

let metadata_location = get_metadata_location(&hive_table.parameters)?;

let file_io = FileIO::from_path(&metadata_location)?
.with_props(&self.config.props)
.build()?;
let mut reader = file_io.new_input(&metadata_location)?.reader().await?;
let mut metadata_str = String::new();
reader.read_to_string(&mut metadata_str).await?;
let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;

let table = Table::builder()
.file_io(file_io)
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(
NamespaceIdent::new(db_name),
table.name.clone(),
))
.build();

Ok(table)
}

async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
todo!()
/// Asynchronously drops a table from the database.
///
/// # Errors
/// Returns an error if:
/// - The namespace provided in `table` cannot be validated
/// or does not exist.
/// - The underlying database client encounters an error while
/// attempting to drop the table. This includes scenarios where
/// the table does not exist.
/// - Any network or communication error occurs with the database backend.
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
let db_name = validate_namespace(table.namespace())?;

self.client
.0
.drop_table(db_name.into(), table.name.clone().into(), false)
.await
.map_err(from_thrift_error)?;

Ok(())
}

async fn table_exists(&self, _table: &TableIdent) -> Result<bool> {
todo!()
/// Asynchronously checks the existence of a specified table
/// in the database.
///
/// # Returns
/// - `Ok(true)` if the table exists in the database.
/// - `Ok(false)` if the table does not exist in the database.
/// - `Err(...)` if an error occurs during the process
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
let db_name = validate_namespace(table.namespace())?;
let table_name = table.name.clone();

let resp = self
.client
.0
.get_table(db_name.into(), table_name.into())
.await;

match resp {
Ok(_) => Ok(true),
Err(err) => {
if let ResponseError::UserException(ThriftHiveMetastoreGetTableException::O2(_)) =
&err
{
Ok(false)
} else {
Err(from_thrift_error(err))
}
}
}
}

async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> {
todo!()
/// Asynchronously renames a table within the database
/// or moves it between namespaces (databases).
///
/// # Returns
/// - `Ok(())` on successful rename or move of the table.
/// - `Err(...)` if an error occurs during the process.
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
let src_dbname = validate_namespace(src.namespace())?;
let dest_dbname = validate_namespace(dest.namespace())?;

let src_tbl_name = src.name.clone();
let dest_tbl_name = dest.name.clone();

let mut tbl = self
.client
.0
.get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
.await
.map_err(from_thrift_error)?;

tbl.db_name = Some(dest_dbname.into());
tbl.table_name = Some(dest_tbl_name.into());

self.client
.0
.alter_table(src_dbname.into(), src_tbl_name.into(), tbl)
.await
.map_err(from_thrift_error)?;

Ok(())
}

async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
todo!()
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Updating a table is not supported yet",
))
}
}
42 changes: 42 additions & 0 deletions crates/catalog/hms/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use anyhow::anyhow;
use iceberg::{Error, ErrorKind};
use std::fmt::Debug;
use std::io;

/// Format a thrift error into iceberg error.
pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) -> Error
where
T: Debug,
{
Error::new(
ErrorKind::Unexpected,
"operation failed for hitting thrift error".to_string(),
liurenjie1024 marked this conversation as resolved.
Show resolved Hide resolved
)
.with_source(anyhow!("thrift error: {:?}", error))
}

/// Format an io error into iceberg error.
pub fn from_io_error(error: io::Error) -> Error {
Error::new(
ErrorKind::Unexpected,
"operation failed for hitting io error".to_string(),
liurenjie1024 marked this conversation as resolved.
Show resolved Hide resolved
)
.with_source(error)
}
2 changes: 2 additions & 0 deletions crates/catalog/hms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@
mod catalog;
pub use catalog::*;

mod error;
mod schema;
mod utils;
Loading
Loading