diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 2c30d0e840..ae0593a6e4 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2187,6 +2187,14 @@ def commit( f"commit_lock must be a function, got {type(commit_lock)}" ) + if read_version is None and not isinstance( + operation, (LanceOperation.Overwrite, LanceOperation.Restore) + ): + raise ValueError( + "read_version is required for all operations except " + "Overwrite and Restore" + ) + new_ds = _Dataset.commit( base_uri, operation._to_inner(), diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 65b84eb121..af1e2974bb 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -847,7 +847,7 @@ def test_append_with_commit(tmp_path: Path): fragment = lance.fragment.LanceFragment.create(base_dir, table) append = lance.LanceOperation.Append([fragment]) - with pytest.raises(OSError): + with pytest.raises(ValueError): # Must specify read version dataset = lance.LanceDataset.commit(dataset, append) diff --git a/python/src/lib.rs b/python/src/lib.rs index 67c00b7692..ec39c834fd 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -304,7 +304,7 @@ fn manifest_needs_migration(dataset: &PyAny) -> PyResult { let indices = RT .block_on(Some(py), dataset_ref.load_indices())? .map_err(|err| PyIOError::new_err(format!("Could not read dataset metadata: {}", err)))?; - let manifest = RT + let (manifest, _) = RT .block_on(Some(py), dataset_ref.latest_manifest())? .map_err(|err| PyIOError::new_err(format!("Could not read dataset metadata: {}", err)))?; Ok(::lance::io::commit::manifest_needs_migration( diff --git a/rust/lance-core/src/cache.rs b/rust/lance-core/src/cache.rs index 133b780fa3..8f357bda5d 100644 --- a/rust/lance-core/src/cache.rs +++ b/rust/lance-core/src/cache.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use deepsize::{Context, DeepSizeOf}; use futures::Future; -use moka::sync::Cache; +use moka::sync::{Cache, ConcurrentCacheExt}; use object_store::path::Path; use crate::utils::path::LancePathExt; @@ -114,6 +114,7 @@ impl FileMetadataCache { pub fn size(&self) -> usize { if let Some(cache) = self.cache.as_ref() { + cache.sync(); cache.entry_count() as usize } else { 0 diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index b3deb41599..f668cdfaae 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -400,6 +400,23 @@ impl ObjectStore { uri: &str, params: &ObjectStoreParams, ) -> Result<(Self, Path)> { + if let Some((store, path)) = params.object_store.as_ref() { + let mut inner = store.clone(); + if let Some(wrapper) = params.object_store_wrapper.as_ref() { + inner = wrapper.wrap(inner); + } + let store = Self { + inner, + scheme: path.scheme().to_string(), + block_size: params.block_size.unwrap_or(64 * 1024), + use_constant_size_upload_parts: params.use_constant_size_upload_parts, + list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(), + io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, + download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT, + }; + let path = Path::from(path.path()); + return Ok((store, path)); + } let (object_store, path) = match Url::parse(uri) { Ok(url) if url.scheme().len() == 1 && cfg!(windows) => { // On Windows, the drive is parsed as a scheme diff --git a/rust/lance-table/src/format/fragment.rs b/rust/lance-table/src/format/fragment.rs index 76125c3c94..475b2fb23e 100644 --- a/rust/lance-table/src/format/fragment.rs +++ b/rust/lance-table/src/format/fragment.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use deepsize::DeepSizeOf; use lance_core::Error; use lance_file::format::{MAJOR_VERSION, MINOR_VERSION}; use lance_file::version::LanceFileVersion; @@ -16,7 +17,7 @@ use lance_core::error::Result; /// Lance Data File /// /// A data file is one piece of file storing data. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] pub struct DataFile { /// Relative path of the data file to dataset root. pub path: String, @@ -144,7 +145,7 @@ impl TryFrom for DataFile { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] #[serde(rename_all = "lowercase")] pub enum DeletionFileType { Array, @@ -161,7 +162,7 @@ impl DeletionFileType { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] pub struct DeletionFile { pub read_version: u64, pub id: u64, @@ -199,7 +200,7 @@ impl TryFrom for DeletionFile { } /// A reference to a part of a file. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] pub struct ExternalFile { pub path: String, pub offset: u64, @@ -207,7 +208,7 @@ pub struct ExternalFile { } /// Metadata about location of the row id sequence. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] pub enum RowIdMeta { Inline(Vec), External(ExternalFile), @@ -234,7 +235,7 @@ impl TryFrom for RowIdMeta { /// /// A fragment is a set of files which represent the different columns of the same rows. /// If column exists in the schema, but the related file does not exist, treat this column as `nulls`. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] pub struct Fragment { /// Fragment ID pub id: u64, diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 4b7eba92d5..6a2b558707 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -473,7 +473,7 @@ pub trait CommitHandler: Debug + Send + Sync { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError>; + ) -> std::result::Result; } async fn default_resolve_version( @@ -723,7 +723,7 @@ impl CommitHandler for UnsafeCommitHandler { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError> { + ) -> std::result::Result { // Log a one-time warning if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) { WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed); @@ -737,7 +737,7 @@ impl CommitHandler for UnsafeCommitHandler { // Write the manifest naively manifest_writer(object_store, manifest, indices, &version_path).await?; - Ok(()) + Ok(version_path) } } @@ -783,7 +783,7 @@ impl CommitHandler for T { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError> { + ) -> std::result::Result { let path = naming_scheme.manifest_path(base_path, manifest.version); // NOTE: once we have the lease we cannot use ? to return errors, since // we must release the lease before returning. @@ -812,7 +812,7 @@ impl CommitHandler for T { // Release the lock lease.release(res.is_ok()).await?; - res.map_err(|err| err.into()) + res.map_err(|err| err.into()).map(|_| path) } } @@ -826,7 +826,7 @@ impl CommitHandler for Arc { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError> { + ) -> std::result::Result { self.as_ref() .commit( manifest, @@ -855,7 +855,7 @@ impl CommitHandler for RenameCommitHandler { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError> { + ) -> std::result::Result { // Create a temporary object, then use `rename_if_not_exists` to commit. // If failed, clean up the temporary object. @@ -870,7 +870,7 @@ impl CommitHandler for RenameCommitHandler { .rename_if_not_exists(&tmp_path, &path) .await { - Ok(_) => Ok(()), + Ok(_) => Ok(path), Err(ObjectStoreError::AlreadyExists { .. }) => { // Another transaction has already been committed // Attempt to clean up temporary object, but ignore errors if we can't diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 75cd0003fd..bace96ea3e 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -307,7 +307,7 @@ impl CommitHandler for ExternalManifestCommitHandler { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError> { + ) -> std::result::Result { // path we get here is the path to the manifest we want to write // use object_store.base_path.as_ref() for getting the root of the dataset @@ -323,27 +323,26 @@ impl CommitHandler for ExternalManifestCommitHandler { .await .map_err(|_| CommitError::CommitConflict {}); - if res.is_err() { + if let Err(err) = res { // delete the staging manifest match object_store.inner.delete(&staging_path).await { Ok(_) => {} Err(ObjectStoreError::NotFound { .. }) => {} Err(e) => return Err(CommitError::OtherError(e.into())), } - return res; + return Err(err); } let scheme = detect_naming_scheme_from_path(&path)?; - self.finalize_manifest( - base_path, - &staging_path, - manifest.version, - &object_store.inner, - scheme, - ) - .await?; - - Ok(()) + Ok(self + .finalize_manifest( + base_path, + &staging_path, + manifest.version, + &object_store.inner, + scheme, + ) + .await?) } } diff --git a/rust/lance-table/src/io/manifest.rs b/rust/lance-table/src/io/manifest.rs index 9d3c55e109..766b665a33 100644 --- a/rust/lance-table/src/io/manifest.rs +++ b/rust/lance-table/src/io/manifest.rs @@ -28,8 +28,16 @@ use crate::format::{pb, DataStorageFormat, Index, Manifest, MAGIC}; /// /// This only reads manifest files. It does not read data files. #[instrument(level = "debug", skip(object_store))] -pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result { - let file_size = object_store.inner.head(path).await?.size; +pub async fn read_manifest( + object_store: &ObjectStore, + path: &Path, + known_size: Option, +) -> Result { + let file_size = if let Some(known_size) = known_size { + known_size as usize + } else { + object_store.inner.head(path).await?.size + }; const PREFETCH_SIZE: usize = 64 * 1024; let initial_start = std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize; let range = Range { @@ -263,7 +271,7 @@ mod test { .unwrap(); writer.shutdown().await.unwrap(); - let roundtripped_manifest = read_manifest(&store, &path).await.unwrap(); + let roundtripped_manifest = read_manifest(&store, &path, None).await.unwrap(); assert_eq!(manifest, roundtripped_manifest); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 520955b325..1a7151553e 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -109,6 +109,9 @@ pub struct Dataset { uri: String, pub(crate) base: Path, pub(crate) manifest: Arc, + // Path for the manifest that is loaded. Used to get additional information, + // such as the index metadata. + pub(crate) manifest_file: Path, pub(crate) session: Arc, pub tags: Tags, pub manifest_naming_scheme: ManifestNamingScheme, @@ -308,7 +311,9 @@ impl Dataset { /// Check out the latest version of the dataset pub async fn checkout_latest(&mut self) -> Result<()> { - self.manifest = Arc::new(self.latest_manifest().await?); + let (manifest, path) = self.latest_manifest().await?; + self.manifest = Arc::new(manifest); + self.manifest_file = path; Ok(()) } @@ -324,6 +329,7 @@ impl Dataset { base_path, self.uri.clone(), manifest, + manifest_location.path, self.session.clone(), self.commit_handler.clone(), manifest_location.naming_scheme, @@ -402,11 +408,13 @@ impl Dataset { Ok(manifest) } + #[allow(clippy::too_many_arguments)] async fn checkout_manifest( object_store: Arc, base_path: Path, uri: String, manifest: Manifest, + manifest_file: Path, session: Arc, commit_handler: Arc, manifest_naming_scheme: ManifestNamingScheme, @@ -421,6 +429,7 @@ impl Dataset { base: base_path, uri, manifest: Arc::new(manifest), + manifest_file, commit_handler, session, tags, @@ -488,12 +497,18 @@ impl Dataset { .commit_handler .resolve_version_location(&blobs_path, blobs_version, &self.object_store.inner) .await?; - let manifest = read_manifest(&self.object_store, &blob_manifest_location.path).await?; + let manifest = read_manifest( + &self.object_store, + &blob_manifest_location.path, + blob_manifest_location.size, + ) + .await?; let blobs_dataset = Self::checkout_manifest( self.object_store.clone(), blobs_path, format!("{}/{}", self.uri, BLOB_DIR), manifest, + blob_manifest_location.path, self.session.clone(), self.commit_handler.clone(), ManifestNamingScheme::V2, @@ -513,15 +528,26 @@ impl Dataset { == LanceFileVersion::Legacy } - pub async fn latest_manifest(&self) -> Result { - read_manifest( - &self.object_store, - &self - .commit_handler - .resolve_latest_version(&self.base, &self.object_store) - .await?, - ) - .await + pub async fn latest_manifest(&self) -> Result<(Manifest, Path)> { + let location = self + .commit_handler + .resolve_latest_location(&self.base, &self.object_store) + .await?; + if location.version == self.manifest.version { + return Ok((self.manifest.as_ref().clone(), self.manifest_file.clone())); + } + let mut manifest = read_manifest(&self.object_store, &location.path, location.size).await?; + if manifest.schema.has_dictionary_types() { + let reader = if let Some(size) = location.size { + self.object_store + .open_with_size(&location.path, size as usize) + .await? + } else { + self.object_store.open(&location.path).await? + }; + populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?; + } + Ok((manifest, location.path)) } /// Read the transaction file for this version of the dataset. @@ -540,7 +566,7 @@ impl Dataset { /// Restore the currently checked out version of the dataset as the latest version. pub async fn restore(&mut self) -> Result<()> { - let latest_manifest = self.latest_manifest().await?; + let (latest_manifest, _) = self.latest_manifest().await?; let latest_version = latest_manifest.version; let transaction = Transaction::new( @@ -552,18 +578,19 @@ impl Dataset { None, ); - self.manifest = Arc::new( - commit_transaction( - self, - &self.object_store, - self.commit_handler.as_ref(), - &transaction, - &Default::default(), - &Default::default(), - self.manifest_naming_scheme, - ) - .await?, - ); + let (restored_manifest, path) = commit_transaction( + self, + &self.object_store, + self.commit_handler.as_ref(), + &transaction, + &Default::default(), + &Default::default(), + self.manifest_naming_scheme, + ) + .await?; + + self.manifest = Arc::new(restored_manifest); + self.manifest_file = path; Ok(()) } @@ -899,7 +926,7 @@ impl Dataset { None, ); - let manifest = commit_transaction( + let (manifest, path) = commit_transaction( self, &self.object_store, self.commit_handler.as_ref(), @@ -911,6 +938,7 @@ impl Dataset { .await?; self.manifest = Arc::new(manifest); + self.manifest_file = path; Ok(()) } @@ -927,10 +955,8 @@ impl Dataset { &self.object_store } - pub(crate) async fn manifest_file(&self, version: u64) -> Result { - self.commit_handler - .resolve_version(&self.base, version, &self.object_store.inner) - .await + pub(crate) async fn manifest_file(&self) -> Result { + Ok(self.manifest_file.clone()) } pub(crate) fn data_dir(&self) -> Path { @@ -970,7 +996,7 @@ impl Dataset { .list_manifests(&self.base, &self.object_store.inner) .await? .try_filter_map(|path| async move { - match read_manifest(&self.object_store, &path).await { + match read_manifest(&self.object_store, &path, None).await { Ok(manifest) => Ok(Some(Version::from(&manifest))), Err(e) => Err(e), } @@ -1429,7 +1455,7 @@ impl Dataset { None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( self, &self.object_store, self.commit_handler.as_ref(), @@ -1441,6 +1467,7 @@ impl Dataset { .await?; self.manifest = Arc::new(manifest); + self.manifest_file = manifest_path; Ok(()) } @@ -1481,7 +1508,7 @@ impl Dataset { None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( self, &self.object_store, self.commit_handler.as_ref(), @@ -1493,6 +1520,7 @@ impl Dataset { .await?; self.manifest = Arc::new(manifest); + self.manifest_file = manifest_path; Ok(()) } @@ -1509,7 +1537,7 @@ impl Dataset { None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( self, &self.object_store, self.commit_handler.as_ref(), @@ -1521,6 +1549,7 @@ impl Dataset { .await?; self.manifest = Arc::new(manifest); + self.manifest_file = manifest_path; Ok(()) } @@ -1567,7 +1596,7 @@ pub(crate) async fn write_manifest_file( indices: Option>, config: &ManifestWriteConfig, naming_scheme: ManifestNamingScheme, -) -> std::result::Result<(), CommitError> { +) -> std::result::Result { if config.auto_set_feature_flags { apply_feature_flags(manifest, config.use_move_stable_row_ids)?; } @@ -1585,9 +1614,7 @@ pub(crate) async fn write_manifest_file( write_manifest_file_to_path, naming_scheme, ) - .await?; - - Ok(()) + .await } fn write_manifest_file_to_path<'a>( @@ -2032,6 +2059,7 @@ mod tests { .resolve_latest_version(&dataset.base, dataset.object_store()) .await .unwrap(), + None, ) .await .unwrap(); @@ -2054,6 +2082,7 @@ mod tests { .resolve_latest_version(&dataset.base, dataset.object_store()) .await .unwrap(), + None, ) .await .unwrap(); diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 005ea89372..342965852a 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -292,7 +292,7 @@ impl DatasetBuilder { } } - let (manifest, manifest_naming_scheme) = if let Some(mut manifest) = manifest { + let (manifest, location) = if let Some(mut manifest) = manifest { let location = commit_handler .resolve_version_location(&base_path, manifest.version, &object_store.inner) .await?; @@ -300,7 +300,7 @@ impl DatasetBuilder { let reader = object_store.open(&location.path).await?; populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?; } - (manifest, location.naming_scheme) + (manifest, location) } else { let manifest_location = match version { Some(version) => { @@ -319,7 +319,7 @@ impl DatasetBuilder { }; let manifest = Dataset::load_manifest(&object_store, &manifest_location).await?; - (manifest, manifest_location.naming_scheme) + (manifest, manifest_location) }; Dataset::checkout_manifest( @@ -327,9 +327,10 @@ impl DatasetBuilder { base_path, table_uri, manifest, + location.path, session, commit_handler, - manifest_naming_scheme, + location.naming_scheme, ) .await } diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 7cc75541e1..16dd432c86 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -171,7 +171,7 @@ impl<'a> CleanupTask<'a> { // ignore it then we might delete valid data files thinking they are not // referenced. - let manifest = read_manifest(&self.dataset.object_store, &path).await?; + let manifest = read_manifest(&self.dataset.object_store, &path, None).await?; let dataset_version = self.dataset.version().version; // Don't delete the latest version, even if it is old. Don't delete tagged versions, diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 3e17727bb6..034168c26c 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -597,7 +597,7 @@ async fn reserve_fragment_ids( None, ); - let manifest = commit_transaction( + let (manifest, _) = commit_transaction( dataset, dataset.object_store(), dataset.commit_handler.as_ref(), @@ -902,7 +902,7 @@ pub async fn commit_compaction( None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( dataset, dataset.object_store(), dataset.commit_handler.as_ref(), @@ -914,6 +914,7 @@ pub async fn commit_compaction( .await?; dataset.manifest = Arc::new(manifest); + dataset.manifest_file = manifest_path; Ok(metrics) } diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index b42aaaaa32..106c0570bb 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -269,7 +269,7 @@ pub(super) async fn add_columns( /*blob_op= */ None, None, ); - let new_manifest = commit_transaction( + let (new_manifest, new_path) = commit_transaction( dataset, &dataset.object_store, dataset.commit_handler.as_ref(), @@ -281,6 +281,7 @@ pub(super) async fn add_columns( .await?; dataset.manifest = Arc::new(new_manifest); + dataset.manifest_file = new_path; Ok(()) } @@ -590,7 +591,7 @@ pub(super) async fn alter_columns( // TODO: adjust the indices here for the new schema - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( dataset, &dataset.object_store, dataset.commit_handler.as_ref(), @@ -602,6 +603,7 @@ pub(super) async fn alter_columns( .await?; dataset.manifest = Arc::new(manifest); + dataset.manifest_file = manifest_path; Ok(()) } @@ -651,7 +653,7 @@ pub(super) async fn drop_columns(dataset: &mut Dataset, columns: &[&str]) -> Res None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( dataset, &dataset.object_store, dataset.commit_handler.as_ref(), @@ -663,6 +665,7 @@ pub(super) async fn drop_columns(dataset: &mut Dataset, columns: &[&str]) -> Res .await?; dataset.manifest = Arc::new(manifest); + dataset.manifest_file = manifest_path; Ok(()) } diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 9c9ba7a44a..558c0e9ba3 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -43,6 +43,7 @@ use std::{ sync::Arc, }; +use deepsize::DeepSizeOf; use lance_core::{datatypes::Schema, Error, Result}; use lance_file::{datatypes::Fields, version::LanceFileVersion}; use lance_io::object_store::ObjectStore; @@ -70,7 +71,7 @@ use lance_table::feature_flags::{apply_feature_flags, FLAG_MOVE_STABLE_ROW_IDS}; /// /// This contains enough information to be able to build the next manifest, /// given the current manifest. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, DeepSizeOf)] pub struct Transaction { /// The version of the table this transaction is based off of. If this is /// the first transaction, this should be 0. @@ -94,7 +95,7 @@ pub enum BlobsOperation { } /// An operation on a dataset. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, DeepSizeOf)] pub enum Operation { /// Adding new fragments to the dataset. The fragments contained within /// haven't yet been assigned a final ID. @@ -173,7 +174,13 @@ pub struct RewrittenIndex { pub new_id: Uuid, } -#[derive(Debug, Clone)] +impl DeepSizeOf for RewrittenIndex { + fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize { + 0 + } +} + +#[derive(Debug, Clone, DeepSizeOf)] pub struct RewriteGroup { pub old_fragments: Vec, pub new_fragments: Vec, @@ -456,13 +463,13 @@ impl Transaction { config: &ManifestWriteConfig, tx_path: &str, ) -> Result<(Manifest, Vec)> { - let path = commit_handler - .resolve_version(base_path, version, &object_store.inner) + let location = commit_handler + .resolve_version_location(base_path, version, &object_store.inner) .await?; - let mut manifest = read_manifest(object_store, &path).await?; + let mut manifest = read_manifest(object_store, &location.path, location.size).await?; manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); manifest.transaction_file = Some(tx_path.to_string()); - let indices = read_manifest_indexes(object_store, &path, &manifest).await?; + let indices = read_manifest_indexes(object_store, &location.path, &manifest).await?; Ok((manifest, indices)) } diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index df75533d32..626863a8c3 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use lance_file::version::LanceFileVersion; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use lance_table::{ - format::DataStorageFormat, + format::{is_detached_version, DataStorageFormat}, io::commit::{CommitConfig, CommitHandler, ManifestNamingScheme}, }; use snafu::{location, Location}; @@ -190,20 +190,28 @@ impl<'a> CommitBuilder<'a> { } }; + let session = self + .session + .or_else(|| self.dest.dataset().map(|ds| ds.session.clone())) + .unwrap_or_default(); + let dest = match &self.dest { WriteDestination::Dataset(dataset) => WriteDestination::Dataset(dataset.clone()), WriteDestination::Uri(uri) => { // Check if it already exists. - let mut builder = DatasetBuilder::from_uri(uri).with_read_params(ReadParams { - store_options: self.store_params.clone(), - commit_handler: self.commit_handler.clone(), - object_store_registry: self.object_store_registry.clone(), - ..Default::default() - }); - - // If read_version is zero, then it might not have originally been - // passed. We can assume the latest version. - if transaction.read_version > 0 { + let mut builder = DatasetBuilder::from_uri(uri) + .with_read_params(ReadParams { + store_options: self.store_params.clone(), + commit_handler: self.commit_handler.clone(), + object_store_registry: self.object_store_registry.clone(), + ..Default::default() + }) + .with_session(session.clone()); + + // If we are using a detached version, we need to load the dataset. + // Otherwise, we are writing to the main history, and need to check + // out the latest version. + if is_detached_version(transaction.read_version) { builder = builder.with_version(transaction.read_version) } @@ -265,7 +273,7 @@ impl<'a> CommitBuilder<'a> { ..Default::default() }; - let manifest = if let Some(dataset) = dest.dataset() { + let (manifest, manifest_file) = if let Some(dataset) = dest.dataset() { if self.detached { if matches!(manifest_naming_scheme, ManifestNamingScheme::V1) { return Err(Error::NotSupported { @@ -308,6 +316,7 @@ impl<'a> CommitBuilder<'a> { &transaction, &manifest_config, manifest_naming_scheme, + &session, ) .await? }; @@ -321,7 +330,8 @@ impl<'a> CommitBuilder<'a> { match &self.dest { WriteDestination::Dataset(dataset) => Ok(Dataset { manifest: Arc::new(manifest), - session: self.session.unwrap_or(dataset.session.clone()), + manifest_file, + session, ..dataset.as_ref().clone() }), WriteDestination::Uri(uri) => Ok(Dataset { @@ -329,7 +339,8 @@ impl<'a> CommitBuilder<'a> { base: base_path, uri: uri.to_string(), manifest: Arc::new(manifest), - session: self.session.unwrap_or_default(), + manifest_file, + session, commit_handler, tags, manifest_naming_scheme, @@ -337,3 +348,143 @@ impl<'a> CommitBuilder<'a> { } } } + +#[cfg(test)] +mod tests { + use arrow::array::{Int32Array, RecordBatch}; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use lance_table::{ + format::{DataFile, Fragment}, + io::commit::RenameCommitHandler, + }; + use url::Url; + + use crate::dataset::{InsertBuilder, WriteParams}; + + use super::*; + + fn sample_transaction(read_version: u64) -> Transaction { + Transaction { + uuid: uuid::Uuid::new_v4().hyphenated().to_string(), + operation: Operation::Append { + fragments: vec![Fragment { + id: 0, + files: vec![DataFile { + path: "file.lance".to_string(), + fields: vec![0], + column_indices: vec![0], + file_major_version: 2, + file_minor_version: 0, + }], + deletion_file: None, + row_id_meta: None, + physical_rows: Some(10), + }], + }, + read_version, + blobs_op: None, + tag: None, + } + } + + #[tokio::test] + async fn test_reuse_session() { + // Need to use in-memory for accurate IOPS tracking. + use crate::utils::test::IoTrackingStore; + + // Create new dataset + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..10_i32))], + ) + .unwrap(); + let memory_store = Arc::new(object_store::memory::InMemory::new()); + let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper(); + let store_params = ObjectStoreParams { + object_store_wrapper: Some(io_stats_wrapper), + object_store: Some((memory_store.clone(), Url::parse("memory://test").unwrap())), + ..Default::default() + }; + let dataset = InsertBuilder::new("memory://test") + .with_params(&WriteParams { + store_params: Some(store_params.clone()), + commit_handler: Some(Arc::new(RenameCommitHandler)), + ..Default::default() + }) + .execute(vec![batch]) + .await + .unwrap(); + let mut dataset = Arc::new(dataset); + + let reset_iops = || { + io_stats.lock().unwrap().read_iops = 0; + io_stats.lock().unwrap().write_iops = 0; + }; + let get_new_iops = || { + let read_iops = io_stats.lock().unwrap().read_iops; + let write_iops = io_stats.lock().unwrap().write_iops; + reset_iops(); + (read_iops, write_iops) + }; + + let (initial_reads, initial_writes) = get_new_iops(); + assert!(initial_reads > 0); + assert!(initial_writes > 0); + + // Commit transaction 5 times + for i in 0..5 { + let new_ds = CommitBuilder::new(dataset.clone()) + .execute(sample_transaction(1)) + .await + .unwrap(); + dataset = Arc::new(new_ds); + assert_eq!(dataset.manifest().version, i + 2); + + // Because we are writing transactions sequentially, and caching them, + // we shouldn't need to read anything from disk. Except we do need + // to check for the latest version to see if we need to do conflict + // resolution. + let (reads, writes) = get_new_iops(); + assert_eq!(reads, 1, "i = {}", i); + // Should see 3 IOPs: + // 1. Write the transaction files + // 2. Write the manifest + // 3. Atomically rename the manifest + assert_eq!(writes, 3, "i = {}", i); + } + + // Commit transaction with URI and session + let new_ds = CommitBuilder::new("memory://test") + .with_store_params(store_params.clone()) + .with_commit_handler(Arc::new(RenameCommitHandler)) + .with_session(dataset.session.clone()) + .execute(sample_transaction(1)) + .await + .unwrap(); + assert_eq!(new_ds.manifest().version, 7); + // Session should still be re-used + // However, the dataset needs to be loaded, so an additional two IOPs + // are needed. + let (reads, writes) = get_new_iops(); + assert_eq!(reads, 3); + assert_eq!(writes, 3); + + // Commit transaction with URI and no session + let new_ds = CommitBuilder::new("memory://test") + .with_store_params(store_params) + .with_commit_handler(Arc::new(RenameCommitHandler)) + .execute(sample_transaction(1)) + .await + .unwrap(); + assert_eq!(new_ds.manifest().version, 8); + // Now we have to load all previous transactions. + let (reads, writes) = get_new_iops(); + assert!(reads > 20); + assert_eq!(writes, 3); + } +} diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 471dfc8744..909014d62a 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1000,7 +1000,7 @@ impl MergeInsertJob { None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( dataset.as_ref(), dataset.object_store(), dataset.commit_handler.as_ref(), @@ -1013,6 +1013,7 @@ impl MergeInsertJob { let mut dataset = dataset.as_ref().clone(); dataset.manifest = Arc::new(manifest); + dataset.manifest_file = manifest_path; Ok(Arc::new(dataset)) } diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index a2a37cdfcc..b15a328967 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -373,7 +373,7 @@ impl UpdateJob { None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( self.dataset.as_ref(), self.dataset.object_store(), self.dataset.commit_handler.as_ref(), @@ -386,6 +386,7 @@ impl UpdateJob { let mut dataset = self.dataset.as_ref().clone(); dataset.manifest = Arc::new(manifest); + dataset.manifest_file = manifest_path; Ok(Arc::new(dataset)) } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 7edb2771d1..deef5ac13e 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -328,7 +328,7 @@ impl DatasetIndexExt for Dataset { None, ); - let new_manifest = commit_transaction( + let (new_manifest, manifest_path) = commit_transaction( self, self.object_store(), self.commit_handler.as_ref(), @@ -340,6 +340,7 @@ impl DatasetIndexExt for Dataset { .await?; self.manifest = Arc::new(new_manifest); + self.manifest_file = manifest_path; Ok(()) } @@ -354,7 +355,7 @@ impl DatasetIndexExt for Dataset { return Ok(indices); } - let manifest_file = self.manifest_file(self.version().version).await?; + let manifest_file = self.manifest_file().await?; let loaded_indices: Arc> = read_manifest_indexes(&self.object_store, &manifest_file, &self.manifest) .await? @@ -403,7 +404,7 @@ impl DatasetIndexExt for Dataset { None, ); - let new_manifest = commit_transaction( + let (new_manifest, new_path) = commit_transaction( self, self.object_store(), self.commit_handler.as_ref(), @@ -415,6 +416,7 @@ impl DatasetIndexExt for Dataset { .await?; self.manifest = Arc::new(new_manifest); + self.manifest_file = new_path; Ok(()) } @@ -501,7 +503,7 @@ impl DatasetIndexExt for Dataset { None, ); - let new_manifest = commit_transaction( + let (new_manifest, manifest_path) = commit_transaction( self, self.object_store(), self.commit_handler.as_ref(), @@ -513,6 +515,7 @@ impl DatasetIndexExt for Dataset { .await?; self.manifest = Arc::new(new_manifest); + self.manifest_file = manifest_path; Ok(()) } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 794536b84f..e8e77e4b41 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -36,7 +36,7 @@ use rand::{thread_rng, Rng}; use snafu::{location, Location}; use futures::future::Either; -use futures::{StreamExt, TryStreamExt}; +use futures::{FutureExt, StreamExt, TryStreamExt}; use lance_core::{Error, Result}; use lance_index::DatasetIndexExt; use object_store::path::Path; @@ -47,6 +47,7 @@ use crate::dataset::fragment::FileFragment; use crate::dataset::transaction::{Operation, Transaction}; use crate::dataset::{write_manifest_file, ManifestWriteConfig, BLOB_DIR}; use crate::index::DatasetIndexInternalExt; +use crate::session::Session; use crate::Dataset; #[cfg(all(feature = "dynamodb", test))] @@ -67,6 +68,42 @@ async fn read_transaction_file( transaction.try_into() } +fn transaction_file_cache_path(base_path: &Path, version: u64) -> Path { + base_path + .child("_transactions") + .child(format!("{}.txn", version)) +} + +async fn read_dataset_transaction_file( + dataset: &Dataset, + version: u64, +) -> Result> { + let cache_path = transaction_file_cache_path(&dataset.base, version); + dataset + .session + .file_metadata_cache + .get_or_insert(&cache_path, |_| async move { + let dataset_version = dataset.checkout_version(version).await?; + let object_store = dataset_version.object_store(); + let path = dataset_version + .manifest + .transaction_file + .as_ref() + .ok_or_else(|| Error::Internal { + message: format!( + "Dataset version {} does not have a transaction file", + version + ), + location: location!(), + })?; + let transaction = read_transaction_file(object_store, &dataset.base, path) + .await + .unwrap(); + Ok(transaction) + }) + .await +} + /// Write a transaction to a file and return the relative path. async fn write_transaction_file( object_store: &ObjectStore, @@ -86,7 +123,7 @@ async fn write_transaction_file( fn check_transaction( transaction: &Transaction, other_version: u64, - other_transaction: &Option, + other_transaction: Option<&Transaction>, ) -> Result<()> { if other_transaction.is_none() { return Err(crate::Error::Internal { @@ -116,6 +153,7 @@ fn check_transaction( Ok(()) } +#[allow(clippy::too_many_arguments)] async fn do_commit_new_dataset( object_store: &ObjectStore, commit_handler: &dyn CommitHandler, @@ -124,7 +162,8 @@ async fn do_commit_new_dataset( write_config: &ManifestWriteConfig, manifest_naming_scheme: ManifestNamingScheme, blob_version: Option, -) -> Result { + session: &Session, +) -> Result<(Manifest, Path)> { let transaction_file = write_transaction_file(object_store, base_path, transaction).await?; let (mut manifest, indices) = @@ -150,7 +189,13 @@ async fn do_commit_new_dataset( // TODO: Allow Append or Overwrite mode to retry using `commit_transaction` // if there is a conflict. match result { - Ok(()) => Ok(manifest), + Ok(manifest_path) => { + session.file_metadata_cache.insert( + transaction_file_cache_path(base_path, manifest.version), + Arc::new(transaction.clone()), + ); + Ok((manifest, manifest_path)) + } Err(CommitError::CommitConflict) => Err(crate::Error::DatasetAlreadyExists { uri: base_path.to_string(), location: location!(), @@ -166,11 +211,12 @@ pub(crate) async fn commit_new_dataset( transaction: &Transaction, write_config: &ManifestWriteConfig, manifest_naming_scheme: ManifestNamingScheme, -) -> Result { + session: &Session, +) -> Result<(Manifest, Path)> { let blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { let blob_path = base_path.child(BLOB_DIR); let blob_tx = Transaction::new(0, blob_op.clone(), None, None); - let blob_manifest = do_commit_new_dataset( + let (blob_manifest, _) = do_commit_new_dataset( object_store, commit_handler, &blob_path, @@ -178,6 +224,7 @@ pub(crate) async fn commit_new_dataset( write_config, manifest_naming_scheme, None, + session, ) .await?; Some(blob_manifest.version) @@ -193,6 +240,7 @@ pub(crate) async fn commit_new_dataset( write_config, manifest_naming_scheme, blob_version, + session, ) .await } @@ -477,7 +525,7 @@ pub(crate) async fn do_commit_detached_transaction( write_config: &ManifestWriteConfig, commit_config: &CommitConfig, new_blob_version: Option, -) -> Result { +) -> Result<(Manifest, Path)> { // We don't strictly need a transaction file but we go ahead and create one for // record-keeping if nothing else. let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?; @@ -535,8 +583,8 @@ pub(crate) async fn do_commit_detached_transaction( .await; match result { - Ok(()) => { - return Ok(manifest); + Ok(path) => { + return Ok((manifest, path)); } Err(CommitError::CommitConflict) => { // We pick a random u64 for the version, so it's possible (though extremely unlikely) @@ -572,12 +620,12 @@ pub(crate) async fn commit_detached_transaction( transaction: &Transaction, write_config: &ManifestWriteConfig, commit_config: &CommitConfig, -) -> Result { +) -> Result<(Manifest, Path)> { let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { let blobs_dataset = dataset.blobs_dataset().await?.unwrap(); let blobs_tx = Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None); - let blobs_manifest = do_commit_detached_transaction( + let (blobs_manifest, _) = do_commit_detached_transaction( blobs_dataset.as_ref(), object_store, commit_handler, @@ -613,12 +661,12 @@ pub(crate) async fn commit_transaction( write_config: &ManifestWriteConfig, commit_config: &CommitConfig, manifest_naming_scheme: ManifestNamingScheme, -) -> Result { +) -> Result<(Manifest, Path)> { let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { let blobs_dataset = dataset.blobs_dataset().await?.unwrap(); let blobs_tx = Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None); - let blobs_manifest = do_commit_detached_transaction( + let (blobs_manifest, _) = do_commit_detached_transaction( blobs_dataset.as_ref(), object_store, commit_handler, @@ -637,41 +685,42 @@ pub(crate) async fn commit_transaction( // has not necessarily. So for anything involving writing, use `object_store`. let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?; - let mut dataset = dataset.clone(); // First, get all transactions since read_version - let mut other_transactions = Vec::new(); - let mut version = transaction.read_version; - loop { - version += 1; - match dataset.checkout_version(version).await { - Ok(next_dataset) => { - let other_txn = if let Some(txn_file) = &next_dataset.manifest.transaction_file { - Some(read_transaction_file(object_store, &next_dataset.base, txn_file).await?) - } else { - None - }; - other_transactions.push(other_txn); - dataset = next_dataset; - } - Err(crate::Error::NotFound { .. }) | Err(crate::Error::DatasetNotFound { .. }) => { - break; - } - Err(e) => { - return Err(e); - } - } - } + let read_version = transaction.read_version; + let mut dataset = dataset.clone(); + // We need to checkout the latest version, because any fixes we apply + // (like computing the new row ids) needs to be done based on the most + // recent manifest. + dataset.checkout_latest().await?; + let latest_version = dataset.manifest.version; + let other_transactions = futures::stream::iter((read_version + 1)..=latest_version) + .map(|version| { + read_dataset_transaction_file(&dataset, version) + .map(move |res| res.map(|tx| (version, tx))) + }) + .buffer_unordered(dataset.object_store().io_parallelism()) + .take_while(|res| { + futures::future::ready(!matches!( + res, + Err(crate::Error::NotFound { .. }) | Err(crate::Error::DatasetNotFound { .. }) + )) + }) + .try_collect::>() + .await?; - let mut target_version = version; + let mut target_version = latest_version + 1; if is_detached_version(target_version) { return Err(Error::Internal { message: "more than 2^65 versions have been created and so regular version numbers are appearing as 'detached' versions.".into(), location: location!() }); } // If any of them conflict with the transaction, return an error - for (version_offset, other_transaction) in other_transactions.iter().enumerate() { - let other_version = transaction.read_version + version_offset as u64 + 1; - check_transaction(transaction, other_version, other_transaction)?; + for (other_version, other_transaction) in other_transactions.iter() { + check_transaction( + transaction, + *other_version, + Some(other_transaction.as_ref()), + )?; } for attempt_i in 0..commit_config.num_retries { @@ -731,8 +780,14 @@ pub(crate) async fn commit_transaction( .await; match result { - Ok(()) => { - return Ok(manifest); + Ok(manifest_path) => { + let cache_path = transaction_file_cache_path(&dataset.base, target_version); + dataset + .session() + .file_metadata_cache + .insert(cache_path, Arc::new(transaction.clone())); + + return Ok((manifest, manifest_path)); } Err(CommitError::CommitConflict) => { // See if we can retry the commit. Try to account for all @@ -743,18 +798,23 @@ pub(crate) async fn commit_transaction( let backoff_time = backoff_time(attempt_i); tokio::time::sleep(backoff_time).await; - let latest_version = dataset.latest_version_id().await?; - for version in target_version..=latest_version { - dataset = dataset.checkout_version(version).await?; - let other_transaction = if let Some(txn_file) = - dataset.manifest.transaction_file.as_ref() - { - Some(read_transaction_file(object_store, &dataset.base, txn_file).await?) - } else { - None - }; - check_transaction(transaction, version, &other_transaction)?; - } + dataset.checkout_latest().await?; + let latest_version = dataset.manifest.version; + futures::stream::iter(target_version..=latest_version) + .map(|version| { + read_dataset_transaction_file(&dataset, version) + .map(move |res| res.map(|tx| (version, tx))) + }) + .buffer_unordered(dataset.object_store().io_parallelism()) + .try_for_each(|(version, other_transaction)| { + let res = check_transaction( + transaction, + version, + Some(other_transaction.as_ref()), + ); + futures::future::ready(res) + }) + .await?; target_version = latest_version + 1; } Err(CommitError::OtherError(err)) => { @@ -1127,7 +1187,7 @@ mod tests { } } - async fn get_empty_dataset() -> Dataset { + async fn get_empty_dataset() -> (tempfile::TempDir, Dataset) { let test_dir = tempfile::tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -1137,18 +1197,19 @@ mod tests { false, )])); - Dataset::write( + let ds = Dataset::write( RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()), test_uri, None, ) .await - .unwrap() + .unwrap(); + (test_dir, ds) } #[tokio::test] async fn test_good_concurrent_config_writes() { - let dataset = get_empty_dataset().await; + let (_tmpdir, dataset) = get_empty_dataset().await; // Test successful concurrent insert config operations let futures: Vec<_> = ["key1", "key2", "key3", "key4", "key5"] @@ -1202,7 +1263,7 @@ mod tests { async fn test_bad_concurrent_config_writes() { // If two concurrent insert config operations occur for the same key, a // `CommitConflict` should be returned - let dataset = get_empty_dataset().await; + let (_tmpdir, dataset) = get_empty_dataset().await; let futures: Vec<_> = ["key1", "key1", "key2", "key3", "key4"] .iter() @@ -1220,30 +1281,30 @@ mod tests { // Assert that either the first or the second operation fails let mut first_operation_failed = false; - let error_fragment = "Commit conflict for version"; for (i, result) in results.into_iter().enumerate() { + let result = result.unwrap(); match i { 0 => { - if !matches!(result, Ok(Ok(_))) { + if result.is_err() { first_operation_failed = true; - assert!(result - .unwrap() - .err() - .unwrap() - .to_string() - .contains(error_fragment)); + assert!( + matches!(&result, &Err(Error::CommitConflict { .. })), + "{:?}", + result, + ); } } 1 => match first_operation_failed { - true => assert!(matches!(result, Ok(Ok(_))), "{:?}", result), - false => assert!(result - .unwrap() - .err() - .unwrap() - .to_string() - .contains(error_fragment)), + true => assert!(result.is_ok(), "{:?}", result), + false => { + assert!( + matches!(&result, &Err(Error::CommitConflict { .. })), + "{:?}", + result, + ); + } }, - _ => assert!(matches!(result, Ok(Ok(_))), "{:?}", result), + _ => assert!(result.is_ok(), "{:?}", result), } } } diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 6621798ec0..93bc9ee2ac 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -17,7 +17,7 @@ use lance_table::format::Fragment; use object_store::path::Path; use object_store::{ GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, - PutOptions, PutPayload, PutResult, Result as OSResult, + PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart, }; use rand::prelude::SliceRandom; use rand::{Rng, SeedableRng}; @@ -270,6 +270,8 @@ fn field_structure(fragment: &Fragment) -> Vec> { pub struct IoStats { pub read_iops: u64, pub read_bytes: u64, + pub write_iops: u64, + pub write_bytes: u64, } impl Display for IoStats { @@ -313,20 +315,23 @@ impl IoTrackingStore { stats.read_iops += 1; stats.read_bytes += num_bytes; } + + fn record_write(&self, num_bytes: u64) { + let mut stats = self.stats.lock().unwrap(); + stats.write_iops += 1; + stats.write_bytes += num_bytes; + } } #[async_trait::async_trait] impl ObjectStore for IoTrackingStore { - async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult { - self.target.put(location, bytes).await - } - async fn put_opts( &self, location: &Path, bytes: PutPayload, opts: PutOptions, ) -> OSResult { + self.record_write(bytes.content_length() as u64); self.target.put_opts(location, bytes, opts).await } @@ -335,7 +340,11 @@ impl ObjectStore for IoTrackingStore { location: &Path, opts: PutMultipartOpts, ) -> OSResult> { - self.target.put_multipart_opts(location, opts).await + let target = self.target.put_multipart_opts(location, opts).await?; + Ok(Box::new(IoTrackingMultipartUpload { + target, + stats: self.stats.clone(), + })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult { @@ -390,18 +399,47 @@ impl ObjectStore for IoTrackingStore { } async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> { + self.record_write(0); self.target.copy(from, to).await } async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> { + self.record_write(0); self.target.rename(from, to).await } async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { + self.record_write(0); self.target.copy_if_not_exists(from, to).await } } +#[derive(Debug)] +struct IoTrackingMultipartUpload { + target: Box, + stats: Arc>, +} + +#[async_trait::async_trait] +impl MultipartUpload for IoTrackingMultipartUpload { + async fn abort(&mut self) -> OSResult<()> { + self.target.abort().await + } + + async fn complete(&mut self) -> OSResult { + self.target.complete().await + } + + fn put_part(&mut self, payload: PutPayload) -> UploadPart { + { + let mut stats = self.stats.lock().unwrap(); + stats.write_iops += 1; + stats.write_bytes += payload.content_length() as u64; + } + self.target.put_part(payload) + } +} + #[cfg(test)] mod tests { use std::sync::Arc;