-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: allow kernel to read tables with invalid _last_checkpoint #311
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ use std::sync::Arc; | |
|
||
use itertools::Itertools; | ||
use serde::{Deserialize, Serialize}; | ||
use tracing::debug; | ||
use tracing::{debug, warn}; | ||
use url::Url; | ||
|
||
use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; | ||
|
@@ -287,7 +287,12 @@ struct CheckpointMetadata { | |
|
||
/// Try reading the `_last_checkpoint` file. | ||
/// | ||
/// In case the file is not found, `None` is returned. | ||
/// Note that we typically want to ignore a missing/invalid `_last_checkpoint` file without failing | ||
/// the read. Thus, the semantics of this function are to return `None` if the file is not found or | ||
/// is invalid JSON. Unexpected/unrecoverable errors are returned as `Err` case and are assumed to | ||
/// cause failure. | ||
/// | ||
/// TODO: java kernel retries three times before failing, should we do the same? | ||
fn read_last_checkpoint( | ||
fs_client: &dyn FileSystemClient, | ||
log_root: &Url, | ||
|
@@ -297,7 +302,9 @@ fn read_last_checkpoint( | |
.read_files(vec![(file_path, None)]) | ||
.and_then(|mut data| data.next().expect("read_files should return one file")) | ||
{ | ||
Ok(data) => Ok(Some(serde_json::from_slice(&data)?)), | ||
Ok(data) => Ok(serde_json::from_slice(&data) | ||
.map_err(|e| warn!("invalid _last_checkpoint JSON: {e}")) | ||
zachschuermann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.ok()), | ||
Err(Error::FileNotFound(_)) => Ok(None), | ||
Err(err) => Err(err), | ||
} | ||
|
@@ -402,7 +409,9 @@ mod tests { | |
use std::sync::Arc; | ||
|
||
use object_store::local::LocalFileSystem; | ||
use object_store::memory::InMemory; | ||
use object_store::path::Path; | ||
use object_store::ObjectStore; | ||
|
||
use crate::engine::default::executor::tokio::TokioBackgroundExecutor; | ||
use crate::engine::default::filesystem::ObjectStoreFileSystemClient; | ||
|
@@ -472,6 +481,47 @@ mod tests { | |
assert!(cp.is_none()) | ||
} | ||
|
||
fn valid_last_checkpoint() -> Vec<u8> { | ||
r#"{"size":8,"size_in_bytes":21857,"version":1}"#.as_bytes().to_vec() | ||
zachschuermann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
#[test] | ||
fn test_read_table_with_invalid_last_checkpoint() { | ||
// in memory file system | ||
let store = Arc::new(InMemory::new()); | ||
|
||
// put _last_checkpoint file | ||
let data = valid_last_checkpoint(); | ||
let invalid_data = "invalid".as_bytes().to_vec(); | ||
let path = Path::from("valid/_last_checkpoint"); | ||
let invalid_path = Path::from("invalid/_last_checkpoint"); | ||
|
||
tokio::runtime::Runtime::new() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can all of this be replace with a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea considered doing that but figured this called out async a little better so kept it as-is. I would prefer to have these be sync tests but I think we are forced to async with object_store.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does calling the async out here provide some value to someone trying to understand the test? Assuming they know rust async, doing the runtime plumbing just seems like boiler plate right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably not. I would like to think about how to isolate the async code (really just needed for setup) so this is more of a TODO |
||
.expect("create tokio runtime") | ||
.block_on(async { | ||
store | ||
.put(&path, data.into()) | ||
.await | ||
.expect("put _last_checkpoint"); | ||
store | ||
.put(&invalid_path, invalid_data.into()) | ||
.await | ||
.expect("put _last_checkpoint"); | ||
}); | ||
|
||
let client = ObjectStoreFileSystemClient::new( | ||
store, | ||
Path::from("/"), | ||
Arc::new(TokioBackgroundExecutor::new()), | ||
); | ||
let url = Url::parse("memory:///valid/").expect("valid url"); | ||
let valid = read_last_checkpoint(&client, &url).expect("read last checkpoint"); | ||
let url = Url::parse("memory:///invalid/").expect("valid url"); | ||
let invalid = read_last_checkpoint(&client, &url).expect("read last checkpoint"); | ||
assert!(valid.is_some()); | ||
assert!(invalid.is_none()) | ||
} | ||
|
||
#[test_log::test] | ||
fn test_read_table_with_checkpoint() { | ||
let path = std::fs::canonicalize(PathBuf::from( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my initial instinct would be, that if retry made sense, then on io errors when trying to read the file. I would expect this to be the responsibility of of the engine inside
read_files
. This I think is is good as is. OR do we get errors back, that we think are retryable?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it just assumed the read is happening over the network and isn't local? If it's local it wouldn't make sense to retry, but for a network read obviously it makes more sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my initial reaction is also to leave this to the engine. I don't think there is anything specific about reading the _last_checkpoint file and if failures occur across the network it should be handled the same for all our reads and taken care of within
read_files