Skip to content

Commit

Permalink
perf: optimize reading transactions in commit loop (#3117)
Browse files Browse the repository at this point in the history
* Cache transaction files
* Read transaction files with concurrency within commit loop

Closes #3057
  • Loading branch information
wjones127 authored Nov 21, 2024
1 parent 1d3b204 commit 8069936
Show file tree
Hide file tree
Showing 21 changed files with 518 additions and 188 deletions.
8 changes: 8 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2187,6 +2187,14 @@ def commit(
f"commit_lock must be a function, got {type(commit_lock)}"
)

if read_version is None and not isinstance(
operation, (LanceOperation.Overwrite, LanceOperation.Restore)
):
raise ValueError(
"read_version is required for all operations except "
"Overwrite and Restore"
)

new_ds = _Dataset.commit(
base_uri,
operation._to_inner(),
Expand Down
2 changes: 1 addition & 1 deletion python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ def test_append_with_commit(tmp_path: Path):
fragment = lance.fragment.LanceFragment.create(base_dir, table)
append = lance.LanceOperation.Append([fragment])

with pytest.raises(OSError):
with pytest.raises(ValueError):
# Must specify read version
dataset = lance.LanceDataset.commit(dataset, append)

Expand Down
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ fn manifest_needs_migration(dataset: &PyAny) -> PyResult<bool> {
let indices = RT
.block_on(Some(py), dataset_ref.load_indices())?
.map_err(|err| PyIOError::new_err(format!("Could not read dataset metadata: {}", err)))?;
let manifest = RT
let (manifest, _) = RT
.block_on(Some(py), dataset_ref.latest_manifest())?
.map_err(|err| PyIOError::new_err(format!("Could not read dataset metadata: {}", err)))?;
Ok(::lance::io::commit::manifest_needs_migration(
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;

use deepsize::{Context, DeepSizeOf};
use futures::Future;
use moka::sync::Cache;
use moka::sync::{Cache, ConcurrentCacheExt};
use object_store::path::Path;

use crate::utils::path::LancePathExt;
Expand Down Expand Up @@ -114,6 +114,7 @@ impl FileMetadataCache {

pub fn size(&self) -> usize {
if let Some(cache) = self.cache.as_ref() {
cache.sync();
cache.entry_count() as usize
} else {
0
Expand Down
17 changes: 17 additions & 0 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,23 @@ impl ObjectStore {
uri: &str,
params: &ObjectStoreParams,
) -> Result<(Self, Path)> {
if let Some((store, path)) = params.object_store.as_ref() {
let mut inner = store.clone();
if let Some(wrapper) = params.object_store_wrapper.as_ref() {
inner = wrapper.wrap(inner);
}
let store = Self {
inner,
scheme: path.scheme().to_string(),
block_size: params.block_size.unwrap_or(64 * 1024),
use_constant_size_upload_parts: params.use_constant_size_upload_parts,
list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
};
let path = Path::from(path.path());
return Ok((store, path));
}
let (object_store, path) = match Url::parse(uri) {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
// On Windows, the drive is parsed as a scheme
Expand Down
13 changes: 7 additions & 6 deletions rust/lance-table/src/format/fragment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use deepsize::DeepSizeOf;
use lance_core::Error;
use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
use lance_file::version::LanceFileVersion;
Expand All @@ -16,7 +17,7 @@ use lance_core::error::Result;
/// Lance Data File
///
/// A data file is one piece of file storing data.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct DataFile {
/// Relative path of the data file to dataset root.
pub path: String,
Expand Down Expand Up @@ -144,7 +145,7 @@ impl TryFrom<pb::DataFile> for DataFile {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
#[serde(rename_all = "lowercase")]
pub enum DeletionFileType {
Array,
Expand All @@ -161,7 +162,7 @@ impl DeletionFileType {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct DeletionFile {
pub read_version: u64,
pub id: u64,
Expand Down Expand Up @@ -199,15 +200,15 @@ impl TryFrom<pb::DeletionFile> for DeletionFile {
}

/// A reference to a part of a file.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct ExternalFile {
pub path: String,
pub offset: u64,
pub size: u64,
}

/// Metadata about location of the row id sequence.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub enum RowIdMeta {
Inline(Vec<u8>),
External(ExternalFile),
Expand All @@ -234,7 +235,7 @@ impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
///
/// A fragment is a set of files which represent the different columns of the same rows.
/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct Fragment {
/// Fragment ID
pub id: u64,
Expand Down
16 changes: 8 additions & 8 deletions rust/lance-table/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ pub trait CommitHandler: Debug + Send + Sync {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError>;
) -> std::result::Result<Path, CommitError>;
}

async fn default_resolve_version(
Expand Down Expand Up @@ -723,7 +723,7 @@ impl CommitHandler for UnsafeCommitHandler {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
) -> std::result::Result<Path, CommitError> {
// Log a one-time warning
if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -737,7 +737,7 @@ impl CommitHandler for UnsafeCommitHandler {
// Write the manifest naively
manifest_writer(object_store, manifest, indices, &version_path).await?;

Ok(())
Ok(version_path)
}
}

Expand Down Expand Up @@ -783,7 +783,7 @@ impl<T: CommitLock + Send + Sync> CommitHandler for T {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
) -> std::result::Result<Path, CommitError> {
let path = naming_scheme.manifest_path(base_path, manifest.version);
// NOTE: once we have the lease we cannot use ? to return errors, since
// we must release the lease before returning.
Expand Down Expand Up @@ -812,7 +812,7 @@ impl<T: CommitLock + Send + Sync> CommitHandler for T {
// Release the lock
lease.release(res.is_ok()).await?;

res.map_err(|err| err.into())
res.map_err(|err| err.into()).map(|_| path)
}
}

Expand All @@ -826,7 +826,7 @@ impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
) -> std::result::Result<Path, CommitError> {
self.as_ref()
.commit(
manifest,
Expand Down Expand Up @@ -855,7 +855,7 @@ impl CommitHandler for RenameCommitHandler {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
) -> std::result::Result<Path, CommitError> {
// Create a temporary object, then use `rename_if_not_exists` to commit.
// If failed, clean up the temporary object.

Expand All @@ -870,7 +870,7 @@ impl CommitHandler for RenameCommitHandler {
.rename_if_not_exists(&tmp_path, &path)
.await
{
Ok(_) => Ok(()),
Ok(_) => Ok(path),
Err(ObjectStoreError::AlreadyExists { .. }) => {
// Another transaction has already been committed
// Attempt to clean up temporary object, but ignore errors if we can't
Expand Down
25 changes: 12 additions & 13 deletions rust/lance-table/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl CommitHandler for ExternalManifestCommitHandler {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
) -> std::result::Result<Path, CommitError> {
// path we get here is the path to the manifest we want to write
// use object_store.base_path.as_ref() for getting the root of the dataset

Expand All @@ -323,27 +323,26 @@ impl CommitHandler for ExternalManifestCommitHandler {
.await
.map_err(|_| CommitError::CommitConflict {});

if res.is_err() {
if let Err(err) = res {
// delete the staging manifest
match object_store.inner.delete(&staging_path).await {
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => {}
Err(e) => return Err(CommitError::OtherError(e.into())),
}
return res;
return Err(err);
}

let scheme = detect_naming_scheme_from_path(&path)?;

self.finalize_manifest(
base_path,
&staging_path,
manifest.version,
&object_store.inner,
scheme,
)
.await?;

Ok(())
Ok(self
.finalize_manifest(
base_path,
&staging_path,
manifest.version,
&object_store.inner,
scheme,
)
.await?)
}
}
14 changes: 11 additions & 3 deletions rust/lance-table/src/io/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ use crate::format::{pb, DataStorageFormat, Index, Manifest, MAGIC};
///
/// This only reads manifest files. It does not read data files.
#[instrument(level = "debug", skip(object_store))]
pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result<Manifest> {
let file_size = object_store.inner.head(path).await?.size;
pub async fn read_manifest(
object_store: &ObjectStore,
path: &Path,
known_size: Option<u64>,
) -> Result<Manifest> {
let file_size = if let Some(known_size) = known_size {
known_size as usize
} else {
object_store.inner.head(path).await?.size
};
const PREFETCH_SIZE: usize = 64 * 1024;
let initial_start = std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize;
let range = Range {
Expand Down Expand Up @@ -263,7 +271,7 @@ mod test {
.unwrap();
writer.shutdown().await.unwrap();

let roundtripped_manifest = read_manifest(&store, &path).await.unwrap();
let roundtripped_manifest = read_manifest(&store, &path, None).await.unwrap();

assert_eq!(manifest, roundtripped_manifest);

Expand Down
Loading

0 comments on commit 8069936

Please sign in to comment.