From 2db7f9b6f8545b8651ac4a8335511a6908ba8008 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Tue, 6 Jun 2023 06:41:24 +0200 Subject: [PATCH] refactor: remove `LoadCheckpointError` and `ApplyLogError` (#1432) --- python/src/error.rs | 3 + rust/src/action/mod.rs | 115 ++++++++++++++++++++++++++++++---- rust/src/delta.rs | 128 ++++++-------------------------------- rust/src/errors.rs | 123 ++++++------------------------------ rust/src/storage/utils.rs | 10 ++- rust/src/table_state.rs | 21 ++++--- 6 files changed, 162 insertions(+), 238 deletions(-) diff --git a/python/src/error.rs b/python/src/error.rs index 69c3d0fbaf..5148dbced9 100644 --- a/python/src/error.rs +++ b/python/src/error.rs @@ -63,11 +63,14 @@ fn checkpoint_to_py(err: ProtocolError) -> PyErr { match err { ProtocolError::Arrow { source } => arrow_to_py(source), ProtocolError::ObjectStore { source } => object_store_to_py(source), + ProtocolError::EndOfLog => DeltaProtocolError::new_err("End of log"), ProtocolError::NoMetaData => DeltaProtocolError::new_err("Table metadata missing"), + ProtocolError::CheckpointNotFound => DeltaProtocolError::new_err(err.to_string()), ProtocolError::InvalidField(err) => PyValueError::new_err(err), ProtocolError::InvalidRow(err) => PyValueError::new_err(err), ProtocolError::SerializeOperation { source } => PyValueError::new_err(source.to_string()), ProtocolError::ParquetParseError { source } => PyIOError::new_err(source.to_string()), + ProtocolError::IO { source } => PyIOError::new_err(source.to_string()), ProtocolError::Generic(msg) => DeltaError::new_err(msg), } } diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 39ae3db473..d2f0004316 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -11,8 +11,12 @@ mod parquet_read; #[cfg(all(feature = "arrow"))] use arrow_schema::ArrowError; -use object_store::Error as ObjectStoreError; +use futures::StreamExt; +use lazy_static::lazy_static; +use log::*; +use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; use percent_encoding::percent_decode; +use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::borrow::Borrow; @@ -21,7 +25,8 @@ use std::hash::{Hash, Hasher}; use crate::delta_config::IsolationLevel; use crate::errors::DeltaResult; -use crate::{schema::*, DeltaTableMetaData}; +use crate::storage::ObjectStoreRef; +use crate::{delta::CheckPoint, schema::*, DeltaTableMetaData}; /// Error returned when an invalid Delta log action is encountered. #[allow(missing_docs)] @@ -30,6 +35,12 @@ pub enum ProtocolError { #[error("Table state does not contain metadata")] NoMetaData, + #[error("Checkpoint file not found")] + CheckpointNotFound, + + #[error("End of transaction log")] + EndOfLog, + /// The action contains an invalid field. #[error("Invalid action field: {0}")] InvalidField(String), @@ -42,20 +53,15 @@ pub enum ProtocolError { #[error("Generic action error: {0}")] Generic(String), - #[cfg(feature = "parquet2")] - #[error("Failed to parse parquet checkpoint: {}", .source)] - /// Error returned when parsing checkpoint parquet using the parquet2 crate. + /// Error returned when parsing checkpoint parquet using the parquet crate. + #[error("Failed to parse parquet checkpoint: {source}")] ParquetParseError { /// Parquet error details returned when parsing the checkpoint parquet + #[cfg(feature = "parquet2")] #[from] source: parquet2::error::Error, - }, - - #[cfg(feature = "parquet")] - #[error("Failed to parse parquet checkpoint: {}", .source)] - /// Error returned when parsing checkpoint parquet using the parquet crate. - ParquetParseError { /// Parquet error details returned when parsing the checkpoint parquet + #[cfg(feature = "parquet")] #[from] source: parquet::errors::ParquetError, }, @@ -84,6 +90,12 @@ pub enum ProtocolError { #[from] source: ObjectStoreError, }, + + #[error("Io: {source}")] + IO { + #[from] + source: std::io::Error, + }, } fn decode_path(raw_path: &str) -> Result { @@ -722,6 +734,87 @@ pub enum OutputMode { Update, } +pub(crate) async fn get_last_checkpoint( + object_store: &ObjectStoreRef, +) -> Result { + let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]); + debug!("loading checkpoint from {last_checkpoint_path}"); + match object_store.get(&last_checkpoint_path).await { + Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?), + Err(ObjectStoreError::NotFound { .. }) => { + match find_latest_check_point_for_version(object_store, i64::MAX).await { + Ok(Some(cp)) => Ok(cp), + _ => Err(ProtocolError::CheckpointNotFound), + } + } + Err(err) => Err(ProtocolError::ObjectStore { source: err }), + } +} + +pub(crate) async fn find_latest_check_point_for_version( + object_store: &ObjectStoreRef, + version: i64, +) -> Result, ProtocolError> { + lazy_static! { + static ref CHECKPOINT_REGEX: Regex = + Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.parquet$"#).unwrap(); + static ref CHECKPOINT_PARTS_REGEX: Regex = + Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$"#).unwrap(); + } + + let mut cp: Option = None; + let mut stream = object_store.list(Some(object_store.log_path())).await?; + + while let Some(obj_meta) = stream.next().await { + // Exit early if any objects can't be listed. + // We exclude the special case of a not found error on some of the list entities. + // This error mainly occurs for local stores when a temporary file has been deleted by + // concurrent writers or if the table is vacuumed by another client. + let obj_meta = match obj_meta { + Ok(meta) => Ok(meta), + Err(ObjectStoreError::NotFound { .. }) => continue, + Err(err) => Err(err), + }?; + if let Some(captures) = CHECKPOINT_REGEX.captures(obj_meta.location.as_ref()) { + let curr_ver_str = captures.get(1).unwrap().as_str(); + let curr_ver: i64 = curr_ver_str.parse().unwrap(); + if curr_ver > version { + // skip checkpoints newer than max version + continue; + } + if cp.is_none() || curr_ver > cp.unwrap().version { + cp = Some(CheckPoint { + version: curr_ver, + size: 0, + parts: None, + }); + } + continue; + } + + if let Some(captures) = CHECKPOINT_PARTS_REGEX.captures(obj_meta.location.as_ref()) { + let curr_ver_str = captures.get(1).unwrap().as_str(); + let curr_ver: i64 = curr_ver_str.parse().unwrap(); + if curr_ver > version { + // skip checkpoints newer than max version + continue; + } + if cp.is_none() || curr_ver > cp.unwrap().version { + let parts_str = captures.get(2).unwrap().as_str(); + let parts = parts_str.parse().unwrap(); + cp = Some(CheckPoint { + version: curr_ver, + size: 0, + parts: Some(parts), + }); + } + continue; + } + } + + Ok(cp) +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 3c5320c9f5..9a068d839b 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -23,12 +23,14 @@ use serde_json::{Map, Value}; use uuid::Uuid; use super::action; -use super::action::{Action, DeltaOperation}; +use super::action::{ + find_latest_check_point_for_version, get_last_checkpoint, Action, DeltaOperation, +}; use super::partitions::PartitionFilter; use super::schema::*; use super::table_state::DeltaTableState; -use crate::action::{Add, Stats}; -use crate::errors::{ApplyLogError, DeltaTableError, LoadCheckpointError}; +use crate::action::{Add, ProtocolError, Stats}; +use crate::errors::DeltaTableError; use crate::operations::vacuum::VacuumBuilder; use crate::storage::{commit_uri_from_version, ObjectStoreRef}; @@ -40,8 +42,8 @@ pub use crate::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion}; pub struct CheckPoint { /// Delta table version pub(crate) version: i64, // 20 digits decimals - size: i64, - parts: Option, // 10 digits decimals + pub(crate) size: i64, + pub(crate) parts: Option, // 10 digits decimals } impl CheckPoint { @@ -146,7 +148,7 @@ impl fmt::Display for DeltaTableMetaData { } impl TryFrom for DeltaTableMetaData { - type Error = serde_json::error::Error; + type Error = ProtocolError; fn try_from(action_metadata: action::MetaData) -> Result { let schema = action_metadata.get_schema()?; @@ -345,86 +347,6 @@ impl DeltaTable { Ok(current_delta_log_ver) } - async fn get_last_checkpoint(&self) -> Result { - let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]); - debug!("loading checkpoint from {last_checkpoint_path}"); - match self.storage.get(&last_checkpoint_path).await { - Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?), - Err(ObjectStoreError::NotFound { .. }) => { - match self.find_latest_check_point_for_version(i64::MAX).await { - Ok(Some(cp)) => Ok(cp), - _ => Err(LoadCheckpointError::NotFound), - } - } - Err(err) => Err(LoadCheckpointError::Storage { source: err }), - } - } - - async fn find_latest_check_point_for_version( - &self, - version: i64, - ) -> Result, DeltaTableError> { - lazy_static! { - static ref CHECKPOINT_REGEX: Regex = - Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.parquet$"#).unwrap(); - static ref CHECKPOINT_PARTS_REGEX: Regex = - Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$"#) - .unwrap(); - } - - let mut cp: Option = None; - let mut stream = self.storage.list(Some(self.storage.log_path())).await?; - - while let Some(obj_meta) = stream.next().await { - // Exit early if any objects can't be listed. - // We exclude the special case of a not found error on some of the list entities. - // This error mainly occurs for local stores when a temporary file has been deleted by - // concurrent writers or if the table is vacuumed by another client. - let obj_meta = match obj_meta { - Ok(meta) => Ok(meta), - Err(ObjectStoreError::NotFound { .. }) => continue, - Err(err) => Err(err), - }?; - if let Some(captures) = CHECKPOINT_REGEX.captures(obj_meta.location.as_ref()) { - let curr_ver_str = captures.get(1).unwrap().as_str(); - let curr_ver: i64 = curr_ver_str.parse().unwrap(); - if curr_ver > version { - // skip checkpoints newer than max version - continue; - } - if cp.is_none() || curr_ver > cp.unwrap().version { - cp = Some(CheckPoint { - version: curr_ver, - size: 0, - parts: None, - }); - } - continue; - } - - if let Some(captures) = CHECKPOINT_PARTS_REGEX.captures(obj_meta.location.as_ref()) { - let curr_ver_str = captures.get(1).unwrap().as_str(); - let curr_ver: i64 = curr_ver_str.parse().unwrap(); - if curr_ver > version { - // skip checkpoints newer than max version - continue; - } - if cp.is_none() || curr_ver > cp.unwrap().version { - let parts_str = captures.get(2).unwrap().as_str(); - let parts = parts_str.parse().unwrap(); - cp = Some(CheckPoint { - version: curr_ver, - size: 0, - parts: Some(parts), - }); - } - continue; - } - } - - Ok(cp) - } - #[cfg(any(feature = "parquet", feature = "parquet2"))] async fn restore_checkpoint(&mut self, check_point: CheckPoint) -> Result<(), DeltaTableError> { self.state = DeltaTableState::from_checkpoint(self, &check_point).await?; @@ -433,14 +355,14 @@ impl DeltaTable { } async fn get_latest_version(&mut self) -> Result { - let mut version = match self.get_last_checkpoint().await { + let mut version = match get_last_checkpoint(&self.storage).await { Ok(last_check_point) => last_check_point.version + 1, - Err(LoadCheckpointError::NotFound) => { + Err(ProtocolError::CheckpointNotFound) => { // no checkpoint, start with version 0 0 } Err(e) => { - return Err(DeltaTableError::LoadCheckpoint { source: e }); + return Err(DeltaTableError::from(e)); } }; @@ -460,11 +382,7 @@ impl DeltaTable { ObjectStoreError::NotFound { .. } => { version -= 1; if version < 0 { - let err = format!( - "No snapshot or version 0 found, perhaps {} is an empty dir?", - self.table_uri() - ); - return Err(DeltaTableError::NotATable(err)); + return Err(DeltaTableError::not_a_table(self.table_uri())); } } _ => return Err(DeltaTableError::from(e)), @@ -525,7 +443,7 @@ impl DeltaTable { /// loading the last checkpoint and incrementally applying each version since. #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { - match self.get_last_checkpoint().await { + match get_last_checkpoint(&self.storage).await { Ok(last_check_point) => { debug!("update with latest checkpoint {last_check_point:?}"); if Some(last_check_point) == self.last_check_point { @@ -536,11 +454,11 @@ impl DeltaTable { self.update_incremental(None).await } } - Err(LoadCheckpointError::NotFound) => { + Err(ProtocolError::CheckpointNotFound) => { debug!("update without checkpoint"); self.update_incremental(None).await } - Err(source) => Err(DeltaTableError::LoadCheckpoint { source }), + Err(err) => Err(DeltaTableError::from(err)), } } @@ -574,11 +492,7 @@ impl DeltaTable { } if self.version() == -1 { - let err = format!( - "No snapshot or version 0 found, perhaps {} is an empty dir?", - self.table_uri() - ); - return Err(DeltaTableError::NotATable(err)); + return Err(DeltaTableError::not_a_table(self.table_uri())); } Ok(()) @@ -600,7 +514,7 @@ impl DeltaTable { // 1. find latest checkpoint below version #[cfg(any(feature = "parquet", feature = "parquet2"))] - match self.find_latest_check_point_for_version(version).await? { + match find_latest_check_point_for_version(&self.storage, version).await? { Some(check_point) => { self.restore_checkpoint(check_point).await?; } @@ -657,7 +571,7 @@ impl DeltaTable { } Err(e) => { match e { - ApplyLogError::EndOfLog => { + ProtocolError::EndOfLog => { if earliest_commit.is_none() { earliest_commit = Some(self.get_earliest_delta_log_version().await?); @@ -670,11 +584,7 @@ impl DeltaTable { } else { version -= 1; if version == -1 { - let err = format!( - "No snapshot or version 0 found, perhaps {} is an empty dir?", - self.table_uri() - ); - return Err(DeltaTableError::NotATable(err)); + return Err(DeltaTableError::not_a_table(self.table_uri())); } } } diff --git a/rust/src/errors.rs b/rust/src/errors.rs index 40977db937..7b982b3cc3 100644 --- a/rust/src/errors.rs +++ b/rust/src/errors.rs @@ -2,30 +2,17 @@ use object_store::Error as ObjectStoreError; use crate::action::ProtocolError; -use crate::delta_config::DeltaConfigError; use crate::operations::transaction::TransactionError; /// A result returned by delta-rs pub type DeltaResult = Result; /// Delta Table specific error +#[allow(missing_docs)] #[derive(thiserror::Error, Debug)] pub enum DeltaTableError { - /// Error returned when applying transaction log failed. - #[error("Failed to apply transaction log: {}", .source)] - ApplyLog { - /// Apply error details returned when applying transaction log failed. - #[from] - source: ApplyLogError, - }, - - /// Error returned when loading checkpoint failed. - #[error("Failed to load checkpoint: {}", .source)] - LoadCheckpoint { - /// Load checkpoint error details returned when loading checkpoint failed. - #[from] - source: LoadCheckpointError, - }, + #[error("Delta protocol violation: {source}")] + Protocol { source: ProtocolError }, /// Error returned when reading the delta log object failed. #[error("Failed to read delta log object: {}", .source)] @@ -106,14 +93,6 @@ pub enum DeltaTableError { source: chrono::ParseError, }, - /// Error returned when the action record is invalid in log. - #[error("Invalid action record found in log: {}", .source)] - InvalidAction { - /// Action error details returned of the invalid action. - #[from] - source: ProtocolError, - }, - /// Error returned when attempting to write bad data to the table #[error("Attempted to write invalid data to the table: {:#?}", violations)] InvalidData { @@ -236,88 +215,26 @@ impl From for DeltaTableError { } } -/// Error related to Delta log application -#[derive(thiserror::Error, Debug)] -pub enum ApplyLogError { - /// Error returned when the end of transaction log is reached. - #[error("End of transaction log")] - EndOfLog, - - /// Error returned when the JSON of the log record is invalid. - #[error("Invalid JSON found when applying log record")] - InvalidJson { - /// JSON error details returned when reading the JSON log record. - #[from] - source: serde_json::error::Error, - }, - - /// Error returned when the storage failed to read the log content. - #[error("Failed to read log content")] - Storage { - /// Storage error details returned while reading the log content. - source: ObjectStoreError, - }, - - /// Error returned when reading delta config failed. - #[error("Failed to read delta config: {}", .source)] - Config { - /// Delta config error returned when reading delta config failed. - #[from] - source: DeltaConfigError, - }, - - /// Error returned when a line from log record is invalid. - #[error("Failed to read line from log record")] - Io { - /// Source error details returned while reading the log record. - #[from] - source: std::io::Error, - }, - - /// Error returned when the action record is invalid in log. - #[error("Invalid action record found in log: {}", .source)] - InvalidAction { - /// Action error details returned of the invalid action. - #[from] - source: ProtocolError, - }, -} - -impl From for ApplyLogError { - fn from(error: ObjectStoreError) -> Self { - match error { - ObjectStoreError::NotFound { .. } => ApplyLogError::EndOfLog, - _ => ApplyLogError::Storage { source: error }, +impl From for DeltaTableError { + fn from(value: ProtocolError) -> Self { + match value { + #[cfg(feature = "arrow")] + ProtocolError::Arrow { source } => DeltaTableError::Arrow { source }, + ProtocolError::IO { source } => DeltaTableError::Io { source }, + ProtocolError::ObjectStore { source } => DeltaTableError::ObjectStore { source }, + ProtocolError::ParquetParseError { source } => DeltaTableError::Parquet { source }, + _ => DeltaTableError::Protocol { source: value }, } } } -/// Error related to checkpoint loading -#[derive(thiserror::Error, Debug)] -pub enum LoadCheckpointError { - /// Error returned when the JSON checkpoint is not found. - #[error("Checkpoint file not found")] - NotFound, - /// Error returned when the JSON checkpoint is invalid. - #[error("Invalid JSON in checkpoint: {source}")] - InvalidJson { - /// Error details returned while reading the JSON. - #[from] - source: serde_json::error::Error, - }, - /// Error returned when it failed to read the checkpoint content. - #[error("Failed to read checkpoint content: {source}")] - Storage { - /// Storage error details returned while reading the checkpoint content. - source: ObjectStoreError, - }, -} - -impl From for LoadCheckpointError { - fn from(error: ObjectStoreError) -> Self { - match error { - ObjectStoreError::NotFound { .. } => LoadCheckpointError::NotFound, - _ => LoadCheckpointError::Storage { source: error }, - } +impl DeltaTableError { + /// Crate a NotATable Error with message for given path. + pub fn not_a_table(path: impl AsRef) -> Self { + let msg = format!( + "No snapshot or version 0 found, perhaps {} is an empty dir?", + path.as_ref() + ); + Self::NotATable(msg) } } diff --git a/rust/src/storage/utils.rs b/rust/src/storage/utils.rs index edfe5ddd95..30eabb4e70 100644 --- a/rust/src/storage/utils.rs +++ b/rust/src/storage/utils.rs @@ -82,12 +82,10 @@ impl TryFrom<&Add> for ObjectMeta { fn try_from(value: &Add) -> DeltaResult { let last_modified = DateTime::::from_utc( NaiveDateTime::from_timestamp_millis(value.modification_time).ok_or( - DeltaTableError::InvalidAction { - source: crate::action::ProtocolError::InvalidField(format!( - "invalid modification_time: {:?}", - value.modification_time - )), - }, + DeltaTableError::from(crate::action::ProtocolError::InvalidField(format!( + "invalid modification_time: {:?}", + value.modification_time + ))), )?, Utc, ); diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 1d76a4bada..a84cf36b32 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -7,12 +7,12 @@ use std::io::{BufRead, BufReader, Cursor}; use chrono::Utc; use lazy_static::lazy_static; -use object_store::{path::Path, ObjectStore}; +use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; use serde::{Deserialize, Serialize}; -use crate::action::{self, Action, Add}; +use crate::action::{self, Action, Add, ProtocolError}; use crate::delta_config::TableConfig; -use crate::errors::{ApplyLogError, DeltaTableError}; +use crate::errors::DeltaTableError; use crate::partitions::{DeltaTablePartition, PartitionFilter}; use crate::schema::SchemaDataType; use crate::storage::commit_uri_from_version; @@ -62,9 +62,13 @@ impl DeltaTableState { } /// Construct a delta table state object from commit version. - pub async fn from_commit(table: &DeltaTable, version: i64) -> Result { + pub async fn from_commit(table: &DeltaTable, version: i64) -> Result { let commit_uri = commit_uri_from_version(version); - let commit_log_bytes = table.storage.get(&commit_uri).await?.bytes().await?; + let commit_log_bytes = match table.storage.get(&commit_uri).await { + Ok(get) => Ok(get.bytes().await?), + Err(ObjectStoreError::NotFound { .. }) => Err(ProtocolError::EndOfLog), + Err(source) => Err(ProtocolError::ObjectStore { source }), + }?; let reader = BufReader::new(Cursor::new(commit_log_bytes)); let mut new_state = DeltaTableState::with_version(version); @@ -81,7 +85,7 @@ impl DeltaTableState { } /// Construct a delta table state object from a list of actions - pub fn from_actions(actions: Vec, version: i64) -> Result { + pub fn from_actions(actions: Vec, version: i64) -> Result { let mut new_state = DeltaTableState::with_version(version); for action in actions { new_state.process_action(action, true, true)?; @@ -312,7 +316,7 @@ impl DeltaTableState { action: action::Action, require_tombstones: bool, require_files: bool, - ) -> Result<(), ApplyLogError> { + ) -> Result<(), ProtocolError> { match action { // TODO: optionally load CDC into TableState action::Action::cdc(_v) => {} @@ -332,8 +336,7 @@ impl DeltaTableState { self.min_writer_version = v.min_writer_version; } action::Action::metaData(v) => { - let md = DeltaTableMetaData::try_from(v) - .map_err(|e| ApplyLogError::InvalidJson { source: e })?; + let md = DeltaTableMetaData::try_from(v)?; let table_config = TableConfig(&md.configuration); self.tombstone_retention_millis = table_config.deleted_file_retention_duration().as_millis() as i64;