Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow kernel to read tables with invalid _last_checkpoint #311

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 53 additions & 3 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;

use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tracing::debug;
use tracing::{debug, warn};
use url::Url;

use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
Expand Down Expand Up @@ -287,7 +287,12 @@ struct CheckpointMetadata {

/// Try reading the `_last_checkpoint` file.
///
/// In case the file is not found, `None` is returned.
/// Note that we typically want to ignore a missing/invalid `_last_checkpoint` file without failing
/// the read. Thus, the semantics of this function are to return `None` if the file is not found or
/// is invalid JSON. Unexpected/unrecoverable errors are returned as `Err` case and are assumed to
/// cause failure.
///
/// TODO: java kernel retries three times before failing, should we do the same?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my initial instinct would be, that if retry made sense, then on io errors when trying to read the file. I would expect this to be the responsibility of of the engine inside read_files. This I think is is good as is. OR do we get errors back, that we think are retryable?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just assumed the read is happening over the network and isn't local? If it's local it wouldn't make sense to retry, but for a network read obviously it makes more sense

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my initial reaction is also to leave this to the engine. I don't think there is anything specific about reading the _last_checkpoint file and if failures occur across the network it should be handled the same for all our reads and taken care of within read_files

fn read_last_checkpoint(
fs_client: &dyn FileSystemClient,
log_root: &Url,
Expand All @@ -297,7 +302,9 @@ fn read_last_checkpoint(
.read_files(vec![(file_path, None)])
.and_then(|mut data| data.next().expect("read_files should return one file"))
{
Ok(data) => Ok(Some(serde_json::from_slice(&data)?)),
Ok(data) => Ok(serde_json::from_slice(&data)
.map_err(|e| warn!("invalid _last_checkpoint JSON: {e}"))
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
.ok()),
Err(Error::FileNotFound(_)) => Ok(None),
Err(err) => Err(err),
}
Expand Down Expand Up @@ -402,7 +409,9 @@ mod tests {
use std::sync::Arc;

use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::ObjectStore;

use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::filesystem::ObjectStoreFileSystemClient;
Expand Down Expand Up @@ -472,6 +481,47 @@ mod tests {
assert!(cp.is_none())
}

fn valid_last_checkpoint() -> Vec<u8> {
r#"{"size":8,"size_in_bytes":21857,"version":1}"#.as_bytes().to_vec()
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
fn test_read_table_with_invalid_last_checkpoint() {
// in memory file system
let store = Arc::new(InMemory::new());

// put _last_checkpoint file
let data = valid_last_checkpoint();
let invalid_data = "invalid".as_bytes().to_vec();
let path = Path::from("valid/_last_checkpoint");
let invalid_path = Path::from("invalid/_last_checkpoint");

tokio::runtime::Runtime::new()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can all of this be replace with a #[tokio::test] instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea considered doing that but figured this called out async a little better so kept it as-is. I would prefer to have these be sync tests but I think we are forced to async with object_store..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does calling the async out here provide some value to someone trying to understand the test? Assuming they know rust async, doing the runtime plumbing just seems like boiler plate right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not. I would like to think about how to isolate the async code (really just needed for setup) so this is more of a TODO

.expect("create tokio runtime")
.block_on(async {
store
.put(&path, data.into())
.await
.expect("put _last_checkpoint");
store
.put(&invalid_path, invalid_data.into())
.await
.expect("put _last_checkpoint");
});

let client = ObjectStoreFileSystemClient::new(
store,
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
let url = Url::parse("memory:///valid/").expect("valid url");
let valid = read_last_checkpoint(&client, &url).expect("read last checkpoint");
let url = Url::parse("memory:///invalid/").expect("valid url");
let invalid = read_last_checkpoint(&client, &url).expect("read last checkpoint");
assert!(valid.is_some());
assert!(invalid.is_none())
}

#[test_log::test]
fn test_read_table_with_checkpoint() {
let path = std::fs::canonicalize(PathBuf::from(
Expand Down
2 changes: 1 addition & 1 deletion kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ golden_test!(
latest_snapshot_test
);
golden_test!("checkpoint", checkpoint_test);
skip_test!("corrupted-last-checkpoint-kernel": "BUG: should fallback to old commits/checkpoint");
golden_test!("corrupted-last-checkpoint-kernel", latest_snapshot_test);
golden_test!("data-reader-array-complex-objects", latest_snapshot_test);
golden_test!("data-reader-array-primitives", latest_snapshot_test);
golden_test!("data-reader-date-types-America", latest_snapshot_test);
Expand Down
Loading