Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Glue Catalog - table operations (3/3) #314

Merged
merged 18 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/catalog/glue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ aws-config = { workspace = true }
aws-sdk-glue = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
typed-builder = { workspace = true }
uuid = { workspace = true }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
tokio = { workspace = true }
304 changes: 286 additions & 18 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@
// under the License.

use async_trait::async_trait;
use aws_sdk_glue::types::TableInput;
use iceberg::io::FileIO;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
TableIdent,
};
use std::{collections::HashMap, fmt::Debug};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use typed_builder::TypedBuilder;

use crate::error::from_aws_sdk_error;
use crate::error::{from_aws_build_error, from_aws_sdk_error};
use crate::utils::{
convert_to_database, convert_to_namespace, create_sdk_config, validate_namespace,
convert_to_database, convert_to_glue_table, convert_to_namespace, create_metadata_location,
create_sdk_config, get_default_table_location, get_metadata_location, validate_namespace,
};
use crate::with_catalog_id;

Expand All @@ -38,6 +43,7 @@ pub struct GlueCatalogConfig {
uri: Option<String>,
#[builder(default, setter(strip_option))]
catalog_id: Option<String>,
warehouse: String,
#[builder(default)]
props: HashMap<String, String>,
}
Expand All @@ -48,6 +54,7 @@ struct GlueClient(aws_sdk_glue::Client);
pub struct GlueCatalog {
config: GlueCatalogConfig,
client: GlueClient,
file_io: FileIO,
}

impl Debug for GlueCatalog {
Expand All @@ -60,15 +67,24 @@ impl Debug for GlueCatalog {

impl GlueCatalog {
/// Create a new glue catalog
pub async fn new(config: GlueCatalogConfig) -> Self {
pub async fn new(config: GlueCatalogConfig) -> Result<Self> {
let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;

let client = aws_sdk_glue::Client::new(&sdk_config);

GlueCatalog {
let file_io = FileIO::from_path(&config.warehouse)?
.with_props(&config.props)
.build()?;

Ok(GlueCatalog {
config,
client: GlueClient(client),
}
file_io,
})
}
/// Get the catalogs `FileIO`
pub fn file_io(&self) -> FileIO {
self.file_io.clone()
}
}

Expand All @@ -77,7 +93,7 @@ impl Catalog for GlueCatalog {
/// List namespaces from glue catalog.
///
/// Glue doesn't support nested namespaces.
/// We will return an empty list if parent is some
/// We will return an empty list if parent is some.
async fn list_namespaces(
&self,
parent: Option<&NamespaceIdent>,
Expand Down Expand Up @@ -277,6 +293,7 @@ impl Catalog for GlueCatalog {
/// querying the database.
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
let db_name = validate_namespace(namespace)?;

let mut table_list: Vec<TableIdent> = Vec::new();
let mut next_token: Option<String> = None;

Expand Down Expand Up @@ -310,31 +327,282 @@ impl Catalog for GlueCatalog {
Ok(table_list)
}

/// Creates a new table within a specified namespace using the provided
/// table creation settings.
///
/// # Returns
/// A `Result` wrapping a `Table` object representing the newly created
/// table.
///
/// # Errors
/// This function may return an error in several cases, including invalid
/// namespace identifiers, failure to determine a default storage location,
/// issues generating or writing table metadata, and errors communicating
/// with the Glue Catalog.
async fn create_table(
&self,
_namespace: &NamespaceIdent,
_creation: TableCreation,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
todo!()
let db_name = validate_namespace(namespace)?;
let table_name = creation.name.clone();

let location = match &creation.location {
Some(location) => location.clone(),
None => {
let ns = self.get_namespace(namespace).await?;
get_default_table_location(&ns, &table_name, &self.config.warehouse)
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata_location = create_metadata_location(&location, 0)?;

let mut file = self
.file_io
.new_output(&metadata_location)?
.writer()
.await?;
file.write_all(&serde_json::to_vec(&metadata)?).await?;
file.shutdown().await?;

let glue_table = convert_to_glue_table(
&table_name,
metadata_location.clone(),
&metadata,
metadata.properties(),
None,
)?;

let builder = self
.client
.0
.create_table()
.database_name(&db_name)
.table_input(glue_table);
let builder = with_catalog_id!(builder, self.config);

builder.send().await.map_err(from_aws_sdk_error)?;

let table = Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
.build();

Ok(table)
}

async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
todo!()
/// Loads a table from the Glue Catalog and constructs a `Table` object
/// based on its metadata.
///
/// # Returns
/// A `Result` wrapping a `Table` object that represents the loaded table.
///
/// # Errors
/// This function may return an error in several scenarios, including:
/// - Failure to validate the namespace.
/// - Failure to retrieve the table from the Glue Catalog.
/// - Absence of metadata location information in the table's properties.
/// - Issues reading or deserializing the table's metadata file.
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
let db_name = validate_namespace(table.namespace())?;
let table_name = table.name();

let builder = self
.client
.0
.get_table()
.database_name(&db_name)
.name(table_name);
let builder = with_catalog_id!(builder, self.config);

let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;

match glue_table_output.table() {
None => Err(Error::new(
ErrorKind::Unexpected,
format!(
"'Table' object for database: {} and table: {} does not exist",
marvinlanhenke marked this conversation as resolved.
Show resolved Hide resolved
db_name, table_name
),
)),
Some(table) => {
let metadata_location = get_metadata_location(&table.parameters)?;
marvinlanhenke marked this conversation as resolved.
Show resolved Hide resolved

let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?;
let mut metadata_str = String::new();
reader.read_to_string(&mut metadata_str).await?;
let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;

let table = Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(
NamespaceIdent::new(db_name),
table_name.to_owned(),
))
.build();

Ok(table)
}
}
}

async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
todo!()
/// Asynchronously drops a table from the database.
///
/// # Errors
/// Returns an error if:
/// - The namespace provided in `table` cannot be validated
/// or does not exist.
/// - The underlying database client encounters an error while
/// attempting to drop the table. This includes scenarios where
/// the table does not exist.
/// - Any network or communication error occurs with the database backend.
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
let db_name = validate_namespace(table.namespace())?;
let table_name = table.name();

let builder = self
.client
.0
.delete_table()
.database_name(&db_name)
.name(table_name);
let builder = with_catalog_id!(builder, self.config);

builder.send().await.map_err(from_aws_sdk_error)?;

Ok(())
}

async fn table_exists(&self, _table: &TableIdent) -> Result<bool> {
todo!()
/// Asynchronously checks the existence of a specified table
/// in the database.
///
/// # Returns
/// - `Ok(true)` if the table exists in the database.
/// - `Ok(false)` if the table does not exist in the database.
/// - `Err(...)` if an error occurs during the process
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
let db_name = validate_namespace(table.namespace())?;
let table_name = table.name();

let builder = self
.client
.0
.get_table()
.database_name(&db_name)
.name(table_name);
let builder = with_catalog_id!(builder, self.config);

let resp = builder.send().await;

match resp {
Ok(_) => Ok(true),
Err(err) => {
if err
.as_service_error()
.map(|e| e.is_entity_not_found_exception())
== Some(true)
{
return Ok(false);
}
Err(from_aws_sdk_error(err))
}
}
}

async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> {
todo!()
/// Asynchronously renames a table within the database
/// or moves it between namespaces (databases).
///
/// # Returns
/// - `Ok(())` on successful rename or move of the table.
/// - `Err(...)` if an error occurs during the process.
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
let src_db_name = validate_namespace(src.namespace())?;
let dest_db_name = validate_namespace(dest.namespace())?;

let src_table_name = src.name();
let dest_table_name = dest.name();

let builder = self
.client
.0
.get_table()
.database_name(&src_db_name)
.name(src_table_name);
let builder = with_catalog_id!(builder, self.config);

let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;

match glue_table_output.table() {
None => Err(Error::new(
ErrorKind::Unexpected,
format!(
"'Table' object for database: {} and table: {} does not exist",
src_db_name, src_table_name
),
)),
Some(table) => {
let rename_table_input = TableInput::builder()
.name(dest_table_name)
.set_parameters(table.parameters.clone())
.set_storage_descriptor(table.storage_descriptor.clone())
.set_table_type(table.table_type.clone())
.set_description(table.description.clone())
.build()
.map_err(from_aws_build_error)?;

let builder = self
.client
.0
.create_table()
.database_name(&dest_db_name)
.table_input(rename_table_input);
let builder = with_catalog_id!(builder, self.config);

builder.send().await.map_err(from_aws_sdk_error)?;

let drop_src_table_result = self.drop_table(src).await;

match drop_src_table_result {
marvinlanhenke marked this conversation as resolved.
Show resolved Hide resolved
Ok(_) => Ok(()),
Err(_) => {
let err_msg_src_table = format!(
"Failed to drop old table {}.{}.",
src_db_name, src_table_name
);

let drop_dest_table_result = self.drop_table(dest).await;

match drop_dest_table_result {
Ok(_) => Err(Error::new(
ErrorKind::Unexpected,
format!(
"{} Rolled back table creation for {}.{}.",
err_msg_src_table, dest_db_name, dest_table_name
),
)),
Err(_) => Err(Error::new(
ErrorKind::Unexpected,
format!(
"{} Failed to roll back table creation for {}.{}. Please clean up manually.",
err_msg_src_table, dest_db_name, dest_table_name
),
)),
}
}
}
}
}
}

async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
todo!()
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Updating a table is not supported yet",
))
}
}
Loading
Loading