diff --git a/docs/_static/conflict_resolution_flow.png b/docs/_static/conflict_resolution_flow.png
deleted file mode 100644
index 4727763c9a..0000000000
Binary files a/docs/_static/conflict_resolution_flow.png and /dev/null differ
diff --git a/docs/format.rst b/docs/format.rst
index 1bb17db72b..d88821a584 100644
--- a/docs/format.rst
+++ b/docs/format.rst
@@ -260,35 +260,3 @@ put-if-not-exists, these operations should be used. This is true of local file
systems and cloud object stores, with the notable except of AWS S3. For ones
that lack this functionality, an external locking mechanism can be configured
by the user.
-
-Conflict resolution
-~~~~~~~~~~~~~~~~~~~
-
-If two writers try to commit at the same time, one will succeed and the other
-will fail. The failed writer should attempt to retry the commit, but only if
-it's changes are compatible with the changes made by the successful writer.
-
-The changes for a given commit are recorded as a transaction file, under the
-``_transactions`` prefix in the dataset directory. The transaction file is a
-serialized ``Transaction`` protobuf message. See the ``transaction.proto`` file
-for its definition.
-
-.. image:: _static/conflict_resolution_flow.png
-
-The commit process is as follows:
-
- 1. The writer finishes writing all data files.
- 2. The writer creates a transaction file in the ``_transactions`` directory.
- This files describes the operations that were performed, which is used for two
- purposes: (1) to detect conflicts, and (2) to re-build the manifest during
- retries.
- 3. Look for any new commits since the writer started writing. If there are any,
- read their transaction files and check for conflicts. If there are any
- conflicts, abort the commit. Otherwise, continue.
- 4. Build a manifest and attempt to commit it to the next version. If the commit
- fails because another writer has already committed, go back to step 3.
- 5. If the commit succeeds, update the ``_latest.manifest`` file.
-
-When checking whether two transactions conflict, be conservative. If the
-transaction file is missing, assume it conflicts. If the transaction file
-has an unknown operation, assume it conflicts.
diff --git a/protos/format.proto b/protos/format.proto
index d7df4935ac..6ad26a09f9 100644
--- a/protos/format.proto
+++ b/protos/format.proto
@@ -100,18 +100,6 @@ message Manifest {
//
// For a single file, will be zero.
uint32 max_fragment_id = 11;
-
- // Path to the transaction file, relative to `{root}/_transactions`
- //
- // This contains a serialized Transaction message representing the transaction
- // that created this version.
- //
- // May be empty if no transaction file was written.
- //
- // The path format is "{read_version}-{uuid}.txn" where {read_version} is the
- // version of the table the transaction read from, and {uuid} is a
- // hyphen-separated UUID.
- string transaction_file = 12;
} // Manifest
// Auxiliary Data attached to a version.
diff --git a/protos/transaction.proto b/protos/transaction.proto
deleted file mode 100644
index 1a18f319a9..0000000000
--- a/protos/transaction.proto
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = "proto3";
-
-import "format.proto";
-
-package lance.format.pb;
-
-// A transaction represents the changes to a dataset.
-//
-// This has two purposes:
-// 1. When retrying a commit, the transaction can be used to re-build an updated
-// manifest.
-// 2. When there's a conflict, this can be used to determine whether the other
-// transaction is compatible with this one.
-message Transaction {
- // The version of the dataset this transaction was built from.
- //
- // For example, for a delete transaction this means the version of the dataset
- // that was read from while evaluating the deletion predicate.
- uint64 read_version = 1;
-
- // The UUID that unique identifies a transaction.
- string uuid = 2;
-
- // Optional version tag.
- string tag = 3;
-
- // Add new rows to the dataset.
- message Append {
- // The new fragments to append.
- //
- // Fragment IDs are not yet assigned.
- repeated DataFragment fragments = 1;
- }
-
- // Mark rows as deleted.
- message Delete {
- // The fragments to update
- //
- // The fragment IDs will match existing fragments in the dataset.
- repeated DataFragment updated_fragments = 1;
- // The fragments to delete entirely.
- repeated uint64 deleted_fragment_ids = 2;
- // The predicate that was evaluated
- //
- // This may be used to determine whether the delete would have affected
- // files written by a concurrent transaction.
- string predicate = 3;
- }
-
- // Create or overwrite the entire dataset.
- message Overwrite {
- // The new fragments
- //
- // Fragment IDs are not yet assigned.
- repeated DataFragment fragments = 1;
- // The new schema
- repeated Field schema = 2;
- // Schema metadata.
- map schema_metadata = 3;
- }
-
- // Add a new secondary index.
- message CreateIndex {
- repeated IndexMetadata new_indices = 1;
- }
-
- // An operation that rewrites but does not change the data in the table. These
- // kinds of operations just rearrange data.
- message Rewrite {
- // The old fragments that are being replaced
- //
- // These should all have existing fragment IDs.
- repeated DataFragment old_fragments = 1;
- // The new fragments
- //
- // These fragments IDs are not yet assigned.
- repeated DataFragment new_fragments = 2;
- }
-
- // An operation that merges in a new column, altering the schema.
- message Merge {
- // The updated fragments
- //
- // These should all have existing fragment IDs.
- repeated DataFragment fragments = 1;
- // The new schema
- repeated Field schema = 2;
- // Schema metadata.
- map schema_metadata = 3;
- }
-
- // The operation of this transaction.
- oneof operation {
- Append append = 100;
- Delete delete = 101;
- Overwrite overwrite = 102;
- CreateIndex create_index = 103;
- Rewrite rewrite = 104;
- Merge merge = 105;
- }
-}
\ No newline at end of file
diff --git a/rust/build.rs b/rust/build.rs
index 2db811ff0d..ef3515d99b 100644
--- a/rust/build.rs
+++ b/rust/build.rs
@@ -6,11 +6,7 @@ fn main() -> Result<()> {
let mut prost_build = prost_build::Config::new();
prost_build.protoc_arg("--experimental_allow_proto3_optional");
prost_build.compile_protos(
- &[
- "./protos/format.proto",
- "./protos/index.proto",
- "./protos/transaction.proto",
- ],
+ &["./protos/format.proto", "./protos/index.proto"],
&["./protos"],
)?;
diff --git a/rust/src/dataset.rs b/rust/src/dataset.rs
index a1789164c9..d8841b490d 100644
--- a/rust/src/dataset.rs
+++ b/rust/src/dataset.rs
@@ -35,19 +35,16 @@ mod feature_flags;
pub mod fragment;
mod hash_joiner;
pub mod scanner;
-pub mod transaction;
pub mod updater;
mod write;
use self::feature_flags::{apply_feature_flags, can_read_dataset, can_write_dataset};
use self::fragment::FileFragment;
use self::scanner::Scanner;
-use self::transaction::{Operation, Transaction};
use self::write::{reader_to_stream, write_fragments};
use crate::datatypes::Schema;
use crate::error::box_error;
use crate::format::{pb, Fragment, Index, Manifest};
-use crate::io::commit::{commit_new_dataset, commit_transaction, CommitError};
use crate::io::object_store::ObjectStoreParams;
use crate::io::{
object_reader::{read_message, read_struct},
@@ -380,34 +377,56 @@ impl Dataset {
}
}
- let object_store = Arc::new(object_store);
- let fragments =
- write_fragments(object_store.clone(), &base, &schema, stream, params.clone()).await?;
+ let mut fragment_id = if matches!(params.mode, WriteMode::Append) {
+ dataset.as_ref().map_or(0, |d| {
+ d.manifest.max_fragment_id().map(|max| max + 1).unwrap_or(0)
+ })
+ } else {
+ // Create or Overwrite.
+ // Overwrite resets the fragment ID to zero.
+ 0
+ };
- let operation = match params.mode {
- WriteMode::Create | WriteMode::Overwrite => Operation::Overwrite { schema, fragments },
- WriteMode::Append => Operation::Append { fragments },
+ let mut fragments: Vec = if matches!(params.mode, WriteMode::Append) {
+ dataset
+ .as_ref()
+ .map_or(vec![], |d| d.manifest.fragments.as_ref().clone())
+ } else {
+ // Create or Overwrite create new fragments.
+ vec![]
};
- let transaction = Transaction::new(
- dataset.as_ref().map(|ds| ds.manifest.version).unwrap_or(0),
- operation,
- None,
- );
+ let object_store = Arc::new(object_store);
+ let mut new_fragments =
+ write_fragments(object_store.clone(), &base, &schema, stream, params.clone()).await?;
- let manifest = if let Some(dataset) = &dataset {
- commit_transaction(
- dataset,
- &object_store,
- &transaction,
- &Default::default(),
- &Default::default(),
- )
- .await?
+ // Assign IDs.
+ for fragment in &mut new_fragments {
+ fragment.id = fragment_id;
+ fragment_id += 1;
+ }
+ fragments.extend(new_fragments);
+
+ let mut manifest = if let Some(dataset) = &dataset {
+ Manifest::new_from_previous(&dataset.manifest, &schema, Arc::new(fragments))
} else {
- commit_new_dataset(&object_store, &base, &transaction, &Default::default()).await?
+ Manifest::new(&schema, Arc::new(fragments))
};
+ // Inherit the index if we're just appending rows
+ let indices = match (&dataset, ¶ms.mode) {
+ (Some(d), WriteMode::Append) => Some(d.load_indices().await?),
+ _ => None,
+ };
+ write_manifest_file(
+ &object_store,
+ &base,
+ &mut manifest,
+ indices,
+ Default::default(),
+ )
+ .await?;
+
Ok(Self {
object_store,
base,
@@ -443,12 +462,6 @@ impl Dataset {
..params.unwrap_or_default()
};
- // Need to include params here because it might include a commit mechanism.
- let object_store = Arc::new(
- self.object_store()
- .with_params(¶ms.store_params.clone().unwrap_or_default()),
- );
-
let (stream, schema) = reader_to_stream(batches)?;
// Return Error if append and input schema differ
@@ -459,29 +472,47 @@ impl Dataset {
});
}
- let fragments = write_fragments(
+ let mut fragment_id = self
+ .manifest
+ .max_fragment_id()
+ .map(|max| max + 1)
+ .unwrap_or(0);
+
+ let mut fragments: Vec = self.manifest.fragments.as_ref().clone();
+
+ let object_store = self.object_store.clone();
+
+ let mut new_fragments = write_fragments(
object_store.clone(),
- &self.base,
+ object_store.base_path(),
&schema,
stream,
params.clone(),
)
.await?;
- let transaction =
- Transaction::new(self.manifest.version, Operation::Append { fragments }, None);
+ // Assign IDs.
+ for fragment in &mut new_fragments {
+ fragment.id = fragment_id;
+ fragment_id += 1;
+ }
+ fragments.extend(new_fragments);
+
+ let mut manifest =
+ Manifest::new_from_previous(&self.manifest, &schema, Arc::new(fragments));
- let new_manifest = commit_transaction(
- self,
+ // inherit the indices
+ let indices = Some(self.load_indices().await?);
+
+ write_manifest_file(
&object_store,
- &transaction,
- &Default::default(),
- &Default::default(),
+ object_store.base_path(),
+ &mut manifest,
+ indices,
+ Default::default(),
)
.await?;
- self.manifest = Arc::new(new_manifest);
-
Ok(())
}
@@ -572,7 +603,7 @@ impl Dataset {
&base,
&mut manifest,
Some(indices),
- &Default::default(),
+ Default::default(),
)
.await?;
Ok(Self {
@@ -644,21 +675,21 @@ impl Dataset {
.try_collect::>()
.await?;
- let transaction = Transaction::new(
- self.manifest.version,
- Operation::Merge {
- fragments: updated_fragments,
- schema: new_schema,
- },
- None,
+ // Inherit the index, since we are just adding columns.
+ let indices = self.load_indices().await?;
+
+ let mut manifest = Manifest::new_from_previous(
+ self.manifest.as_ref(),
+ &new_schema,
+ Arc::new(updated_fragments),
);
- let manifest = commit_transaction(
- self,
+ write_manifest_file(
&self.object_store,
- &transaction,
- &Default::default(),
- &Default::default(),
+ &self.base,
+ &mut manifest,
+ Some(indices),
+ Default::default(),
)
.await?;
@@ -824,44 +855,36 @@ impl Dataset {
/// Delete rows based on a predicate.
pub async fn delete(&mut self, predicate: &str) -> Result<()> {
- let mut updated_fragments: Vec = Vec::new();
- let mut deleted_fragment_ids: Vec = Vec::new();
- stream::iter(self.get_fragments())
- .map(|f| async move {
- let old_fragment = f.metadata.clone();
- let new_fragment = f.delete(predicate).await?.map(|f| f.metadata);
- Ok((old_fragment, new_fragment))
- })
+ let mut updated_fragments = stream::iter(self.get_fragments())
+ .map(|f| async move { f.delete(predicate).await.map(|f| f.map(|f| f.metadata)) })
.buffer_unordered(num_cpus::get())
// Drop the fragments that were deleted.
- .try_for_each(|(old_fragment, new_fragment)| {
- if let Some(new_fragment) = new_fragment {
- if new_fragment != old_fragment {
- updated_fragments.push(new_fragment);
- }
- } else {
- deleted_fragment_ids.push(old_fragment.id);
- }
- futures::future::ready(Ok::<_, crate::Error>(()))
- })
+ .try_filter_map(|f| futures::future::ready(Ok(f)))
+ .try_collect::>()
.await?;
- let transaction = Transaction::new(
- self.manifest.version,
- Operation::Delete {
- updated_fragments,
- deleted_fragment_ids,
- predicate: predicate.to_string(),
- },
- None,
+ // Maintain the order of fragment IDs.
+ updated_fragments.sort_by_key(|f| f.id);
+
+ // Inherit the index, unless we deleted all the fragments.
+ let indices = if updated_fragments.is_empty() {
+ None
+ } else {
+ Some(self.load_indices().await?)
+ };
+
+ let mut manifest = Manifest::new_from_previous(
+ self.manifest.as_ref(),
+ self.schema(),
+ Arc::new(updated_fragments),
);
- let manifest = commit_transaction(
- self,
+ write_manifest_file(
&self.object_store,
- &transaction,
- &Default::default(),
- &Default::default(),
+ &self.base,
+ &mut manifest,
+ indices,
+ Default::default(),
)
.await?;
@@ -882,6 +905,14 @@ impl Dataset {
self.versions_dir().child(format!("{version}.manifest"))
}
+ fn latest_manifest_path(&self) -> Path {
+ latest_manifest_path(&self.base)
+ }
+
+ pub(crate) async fn latest_manifest(&self) -> Result {
+ read_manifest(&self.object_store, &self.latest_manifest_path()).await
+ }
+
pub(crate) fn data_dir(&self) -> Path {
self.base.child(DATA_DIR)
}
@@ -1018,8 +1049,8 @@ pub(crate) async fn write_manifest_file(
base_path: &Path,
manifest: &mut Manifest,
indices: Option>,
- config: &ManifestWriteConfig,
-) -> std::result::Result<(), CommitError> {
+ config: ManifestWriteConfig,
+) -> Result<()> {
if config.auto_set_feature_flags {
apply_feature_flags(manifest);
}
@@ -1044,8 +1075,7 @@ pub(crate) async fn write_manifest_file(
object_store
.inner
.copy(&path, &latest_manifest_path(base_path))
- .await
- .map_err(|err| CommitError::OtherError(err.into()))?;
+ .await?;
Ok(())
}
@@ -1363,7 +1393,7 @@ mod tests {
&dataset.base,
&mut manifest,
None,
- &ManifestWriteConfig {
+ ManifestWriteConfig {
auto_set_feature_flags: false,
timestamp: None,
},
@@ -1968,10 +1998,10 @@ mod tests {
.unwrap();
dataset.validate().await.unwrap();
- // The version should match the table version it was created from.
+ // Check the version is set correctly
let indices = dataset.load_indices().await.unwrap();
let actual = indices.first().unwrap().dataset_version;
- let expected = dataset.manifest.version - 1;
+ let expected = dataset.manifest.version;
assert_eq!(actual, expected);
// Append should inherit index
@@ -1986,7 +2016,7 @@ mod tests {
.unwrap();
let indices = dataset.load_indices().await.unwrap();
let actual = indices.first().unwrap().dataset_version;
- let expected = dataset.manifest.version - 2;
+ let expected = dataset.manifest.version - 1;
assert_eq!(actual, expected);
dataset.validate().await.unwrap();
diff --git a/rust/src/dataset/transaction.rs b/rust/src/dataset/transaction.rs
deleted file mode 100644
index 643cebd91e..0000000000
--- a/rust/src/dataset/transaction.rs
+++ /dev/null
@@ -1,627 +0,0 @@
-// Copyright 2023 Lance Developers.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-//! Transaction definitions for updating datasets
-//!
-//! Prior to creating a new manifest, a transaction must be created representing
-//! the changes being made to the dataset. By representing them as incremental
-//! changes, we can detect whether concurrent operations are compatible with
-//! one another. We can also rebuild manifests when retrying committing a
-//! manifest.
-//!
-//! ## Conflict Resolution
-//!
-//! Transactions are compatible with one another if they don't conflict.
-//! Currently, conflict resolution always assumes a Serializable isolation
-//! level.
-//!
-//! Below are the compatibilities between conflicting transactions. The columns
-//! represent the operation that has been applied, while the rows represent the
-//! operation that is being checked for compatibility to see if it can retry.
-//! ✅ indicates that the operation is compatible, while ❌ indicates that it is
-//! a conflict. Some operations have additional conditions that must be met for
-//! them to be compatible.
-//!
-//! | | Append | Delete | Overwrite/Create | Create Index | Rewrite | Merge |
-//! |------------------|--------|--------|------------------|--------------|---------|-------|
-//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ |
-//! | Delete | ❌ | (1) | ❌ | ✅ | (1) | ❌ |
-//! | Overwrite/Create | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ |
-//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ |
-//! | Rewrite | ✅ | (1) | ❌ | ❌ | (1) | ❌ |
-//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ |
-//!
-//! (1) Delete and rewrite are compatible with each other and themselves only if
-//! they affect distinct fragments. Otherwise, they conflict.
-
-use std::{collections::HashSet, sync::Arc};
-
-use crate::{
- datatypes::Schema,
- format::Index,
- format::{
- pb::{self, IndexMetadata},
- Fragment, Manifest,
- },
-};
-
-use super::{feature_flags::apply_feature_flags, ManifestWriteConfig};
-use crate::{Error, Result};
-
-/// A change to a dataset that can be retried
-///
-/// This contains enough information to be able to build the next manifest,
-/// given the current manifest.
-#[derive(Debug, Clone)]
-pub struct Transaction {
- /// The version of the table this transaction is based off of. If this is
- /// the first transaction, this should be 0.
- pub read_version: u64,
- pub uuid: String,
- pub operation: Operation,
- pub tag: Option,
-}
-
-/// An operation on a dataset.
-#[derive(Debug, Clone)]
-pub enum Operation {
- /// Adding new fragments to the dataset. The fragments contained within
- /// haven't yet been assigned a final ID.
- Append { fragments: Vec },
- /// Updated fragments contain those that have been modified with new deletion
- /// files. The deleted fragment IDs are those that should be removed from
- /// the manifest.
- Delete {
- updated_fragments: Vec,
- deleted_fragment_ids: Vec,
- predicate: String,
- },
- /// Overwrite the entire dataset with the given fragments. This is also
- /// used when initially creating a table.
- Overwrite {
- fragments: Vec,
- schema: Schema,
- },
- /// A new index has been created.
- CreateIndex {
- /// The new secondary indices that are being added
- new_indices: Vec,
- },
- /// Data is rewritten but *not* modified. This is used for things like
- /// compaction or re-ordering. Contains the old fragments and the new
- /// ones that have been replaced.
- Rewrite {
- old_fragments: Vec,
- new_fragments: Vec,
- },
- /// Merge a new column in
- Merge {
- fragments: Vec,
- schema: Schema,
- },
-}
-
-impl Operation {
- /// Returns the IDs of fragments that have been modified by this operation.
- ///
- /// This does not include new fragments.
- fn modified_fragment_ids(&self) -> Box + '_> {
- match self {
- // These operations add new fragments or don't modify any.
- Self::Append { .. } | Self::Overwrite { .. } | Self::CreateIndex { .. } => {
- Box::new(std::iter::empty())
- }
- Self::Delete {
- updated_fragments,
- deleted_fragment_ids,
- ..
- } => Box::new(
- updated_fragments
- .iter()
- .map(|f| f.id)
- .chain(deleted_fragment_ids.iter().copied()),
- ),
- Self::Rewrite { old_fragments, .. } => Box::new(old_fragments.iter().map(|f| f.id)),
- Self::Merge { fragments, .. } => Box::new(fragments.iter().map(|f| f.id)),
- }
- }
-
- /// Check whether another operation modifies the same fragment IDs as this one.
- fn modifies_same_ids(&self, other: &Self) -> bool {
- let self_ids = self.modified_fragment_ids().collect::>();
- let mut other_ids = other.modified_fragment_ids();
- other_ids.any(|id| self_ids.contains(&id))
- }
-
- pub fn name(&self) -> &str {
- match self {
- Self::Append { .. } => "Append",
- Self::Delete { .. } => "Delete",
- Self::Overwrite { .. } => "Overwrite",
- Self::CreateIndex { .. } => "CreateIndex",
- Self::Rewrite { .. } => "Rewrite",
- Self::Merge { .. } => "Merge",
- }
- }
-}
-
-impl Transaction {
- pub fn new(read_version: u64, operation: Operation, tag: Option) -> Self {
- let uuid = uuid::Uuid::new_v4().hyphenated().to_string();
- Self {
- read_version,
- uuid,
- operation,
- tag,
- }
- }
-
- /// Returns true if the transaction cannot be committed if the other
- /// transaction is committed first.
- pub fn conflicts_with(&self, other: &Self) -> bool {
- // TODO: this assume IsolationLevel is Serializable, but we could also
- // support Snapshot Isolation, which is more permissive. In particular,
- // it would allow a Delete transaction to succeed after a concurrent
- // Append, even if the Append added rows that would be deleted.
- match &self.operation {
- Operation::Append { .. } => match &other.operation {
- // Append is compatible with anything that doesn't change the schema
- Operation::Append { .. } => false,
- Operation::Rewrite { .. } => false,
- Operation::CreateIndex { .. } => false,
- Operation::Delete { .. } => false,
- _ => true,
- },
- Operation::Rewrite { .. } => match &other.operation {
- // Rewrite is only compatible with operations that don't touch
- // existing fragments.
- // TODO: it could also be compatible with operations that update
- // fragments we don't touch.
- Operation::Append { .. } => false,
- Operation::Delete { .. } => {
- // If we rewrote any fragments that were modified by delete,
- // we conflict.
- self.operation.modifies_same_ids(&other.operation)
- }
- Operation::Rewrite { .. } => {
- // As long as they rewrite disjoint fragments they shouldn't conflict.
- self.operation.modifies_same_ids(&other.operation)
- }
- _ => true,
- },
- // Overwrite always succeeds
- Operation::Overwrite { .. } => false,
- Operation::CreateIndex { .. } => match &other.operation {
- Operation::Append { .. } => false,
- // Indices are identified by UUIDs, so they shouldn't conflict.
- Operation::CreateIndex { .. } => false,
- // Although some of the rows we indexed may have been deleted,
- // row ids are still valid, so we allow this optimistically.
- Operation::Delete { .. } => false,
- // Merge doesn't change row ids, so this should be fine.
- Operation::Merge { .. } => false,
- // Rewrite likely changed many of the row ids, so our index is
- // likely useless. It should be rebuilt.
- // TODO: we could be smarter here and only invalidate the index
- // if the rewrite changed more than X% of row ids.
- Operation::Rewrite { .. } => true,
- _ => true,
- },
- Operation::Delete { .. } => match &other.operation {
- Operation::CreateIndex { .. } => false,
- Operation::Delete { .. } => {
- // If we update the same fragments, we conflict.
- self.operation.modifies_same_ids(&other.operation)
- }
- Operation::Rewrite { .. } => {
- // If we update any fragments that were rewritten, we conflict.
- self.operation.modifies_same_ids(&other.operation)
- }
- _ => true,
- },
- // Merge changes the schema, but preserves row ids, so the only operation
- // it's compatible with is CreateIndex.
- Operation::Merge { .. } => !matches!(&other.operation, Operation::CreateIndex { .. }),
- }
- }
-
- fn fragments_with_ids<'a, T>(
- new_fragments: T,
- fragment_id: &'a mut u64,
- ) -> impl Iterator- + 'a
- where
- T: IntoIterator
- + 'a,
- {
- new_fragments.into_iter().map(|mut f| {
- f.id = *fragment_id;
- *fragment_id += 1;
- f
- })
- }
-
- /// Create a new manifest from the current manifest and the transaction.
- ///
- /// `current_manifest` should only be None if the dataset does not yet exist.
- pub(crate) fn build_manifest(
- &self,
- current_manifest: Option<&Manifest>,
- current_indices: Vec,
- transaction_file_path: &str,
- config: &ManifestWriteConfig,
- ) -> Result<(Manifest, Vec)> {
- // Get the schema and the final fragment list
- let schema = match self.operation {
- Operation::Overwrite { ref schema, .. } => schema.clone(),
- Operation::Merge { ref schema, .. } => schema.clone(),
- _ => {
- if let Some(current_manifest) = current_manifest {
- current_manifest.schema.clone()
- } else {
- return Err(Error::Internal {
- message: "Cannot create a new dataset without a schema".to_string(),
- });
- }
- }
- };
-
- let mut fragment_id = if matches!(self.operation, Operation::Overwrite { .. }) {
- 0
- } else {
- current_manifest
- .and_then(|m| m.max_fragment_id())
- .map(|id| id + 1)
- .unwrap_or(0)
- };
- let mut final_fragments = Vec::new();
- let mut final_indices = current_indices;
-
- let maybe_existing_fragments =
- current_manifest
- .map(|m| m.fragments.as_ref())
- .ok_or_else(|| Error::Internal {
- message: format!(
- "No current manifest was provided while building manifest for operation {}",
- self.operation.name()
- ),
- });
-
- match &self.operation {
- Operation::Append { ref fragments } => {
- final_fragments.extend(maybe_existing_fragments?.clone());
- final_fragments.extend(Self::fragments_with_ids(
- fragments.clone(),
- &mut fragment_id,
- ));
- }
- Operation::Delete {
- ref updated_fragments,
- ref deleted_fragment_ids,
- ..
- } => {
- // Remove the deleted fragments
- final_fragments.extend(maybe_existing_fragments?.clone());
- final_fragments.retain(|f| !deleted_fragment_ids.contains(&f.id));
- final_fragments.iter_mut().for_each(|f| {
- for updated in updated_fragments {
- if updated.id == f.id {
- *f = updated.clone();
- }
- }
- });
- }
- Operation::Overwrite { ref fragments, .. } => {
- final_fragments.extend(Self::fragments_with_ids(
- fragments.clone(),
- &mut fragment_id,
- ));
- final_indices = Vec::new();
- }
- Operation::Rewrite {
- ref new_fragments, ..
- } => {
- final_fragments.extend(Self::fragments_with_ids(
- new_fragments.clone(),
- &mut fragment_id,
- ));
- }
- Operation::CreateIndex { new_indices } => {
- final_fragments.extend(maybe_existing_fragments?.clone());
- final_indices.retain(|existing_index| {
- !new_indices
- .iter()
- .any(|new_index| new_index.name == existing_index.name)
- });
- final_indices.extend(new_indices.clone());
- }
- Operation::Merge { ref fragments, .. } => {
- final_fragments.extend(fragments.clone());
- }
- };
-
- let mut manifest = if let Some(current_manifest) = current_manifest {
- Manifest::new_from_previous(current_manifest, &schema, Arc::new(final_fragments))
- } else {
- Manifest::new(&schema, Arc::new(final_fragments))
- };
-
- manifest.tag = self.tag.clone();
-
- if config.auto_set_feature_flags {
- apply_feature_flags(&mut manifest);
- }
- manifest.set_timestamp(config.timestamp);
-
- manifest.update_max_fragment_id();
-
- manifest.transaction_file = Some(transaction_file_path.to_string());
-
- Ok((manifest, final_indices))
- }
-}
-
-impl TryFrom<&pb::Transaction> for Transaction {
- type Error = Error;
-
- fn try_from(message: &pb::Transaction) -> Result {
- let operation = match &message.operation {
- Some(pb::transaction::Operation::Append(pb::transaction::Append { fragments })) => {
- Operation::Append {
- fragments: fragments.iter().map(Fragment::from).collect(),
- }
- }
- Some(pb::transaction::Operation::Delete(pb::transaction::Delete {
- updated_fragments,
- deleted_fragment_ids,
- predicate,
- })) => Operation::Delete {
- updated_fragments: updated_fragments.iter().map(Fragment::from).collect(),
- deleted_fragment_ids: deleted_fragment_ids.clone(),
- predicate: predicate.clone(),
- },
- Some(pb::transaction::Operation::Overwrite(pb::transaction::Overwrite {
- fragments,
- schema,
- schema_metadata: _schema_metadata, // TODO: handle metadata
- })) => Operation::Overwrite {
- fragments: fragments.iter().map(Fragment::from).collect(),
- schema: Schema::from(schema),
- },
- Some(pb::transaction::Operation::Rewrite(pb::transaction::Rewrite {
- old_fragments,
- new_fragments,
- })) => Operation::Rewrite {
- old_fragments: old_fragments.iter().map(Fragment::from).collect(),
- new_fragments: new_fragments.iter().map(Fragment::from).collect(),
- },
- Some(pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex {
- new_indices,
- })) => Operation::CreateIndex {
- new_indices: new_indices
- .iter()
- .map(Index::try_from)
- .collect::>()?,
- },
- Some(pb::transaction::Operation::Merge(pb::transaction::Merge {
- fragments,
- schema,
- schema_metadata: _schema_metadata, // TODO: handle metadata
- })) => Operation::Merge {
- fragments: fragments.iter().map(Fragment::from).collect(),
- schema: Schema::from(schema),
- },
- None => {
- return Err(Error::Internal {
- message: "Transaction message did not contain an operation".to_string(),
- });
- }
- };
- Ok(Self {
- read_version: message.read_version,
- uuid: message.uuid.clone(),
- operation,
- tag: if message.tag.is_empty() {
- None
- } else {
- Some(message.tag.clone())
- },
- })
- }
-}
-
-impl From<&Transaction> for pb::Transaction {
- fn from(value: &Transaction) -> Self {
- let operation = match &value.operation {
- Operation::Append { fragments } => {
- pb::transaction::Operation::Append(pb::transaction::Append {
- fragments: fragments.iter().map(pb::DataFragment::from).collect(),
- })
- }
- Operation::Delete {
- updated_fragments,
- deleted_fragment_ids,
- predicate,
- } => pb::transaction::Operation::Delete(pb::transaction::Delete {
- updated_fragments: updated_fragments
- .iter()
- .map(pb::DataFragment::from)
- .collect(),
- deleted_fragment_ids: deleted_fragment_ids.clone(),
- predicate: predicate.clone(),
- }),
- Operation::Overwrite { fragments, schema } => {
- pb::transaction::Operation::Overwrite(pb::transaction::Overwrite {
- fragments: fragments.iter().map(pb::DataFragment::from).collect(),
- schema: schema.into(),
- schema_metadata: Default::default(), // TODO: handle metadata
- })
- }
- Operation::Rewrite {
- old_fragments,
- new_fragments,
- } => pb::transaction::Operation::Rewrite(pb::transaction::Rewrite {
- old_fragments: old_fragments.iter().map(pb::DataFragment::from).collect(),
- new_fragments: new_fragments.iter().map(pb::DataFragment::from).collect(),
- }),
- Operation::CreateIndex { new_indices } => {
- pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex {
- new_indices: new_indices.iter().map(IndexMetadata::from).collect(),
- })
- }
- Operation::Merge { fragments, schema } => {
- pb::transaction::Operation::Merge(pb::transaction::Merge {
- fragments: fragments.iter().map(pb::DataFragment::from).collect(),
- schema: schema.into(),
- schema_metadata: Default::default(), // TODO: handle metadata
- })
- }
- };
-
- Self {
- read_version: value.read_version,
- uuid: value.uuid.clone(),
- operation: Some(operation),
- tag: value.tag.clone().unwrap_or("".to_string()),
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_conflicts() {
- let index0 = Index::new(uuid::Uuid::new_v4(), "test", &[0], 1);
- let fragment0 = Fragment::new(0);
- let fragment1 = Fragment::new(1);
- let fragment2 = Fragment::new(2);
- // The transactions that will be checked against
- let other_operations = [
- Operation::Append {
- fragments: vec![fragment0.clone()],
- },
- Operation::CreateIndex {
- new_indices: vec![index0.clone()],
- },
- Operation::Delete {
- updated_fragments: vec![fragment0.clone()],
- deleted_fragment_ids: vec![2],
- predicate: "x > 2".to_string(),
- },
- Operation::Merge {
- fragments: vec![fragment0.clone(), fragment2.clone()],
- schema: Schema::default(),
- },
- Operation::Overwrite {
- fragments: vec![fragment0.clone(), fragment2.clone()],
- schema: Schema::default(),
- },
- Operation::Rewrite {
- old_fragments: vec![fragment0.clone()],
- new_fragments: vec![fragment2.clone()],
- },
- ];
- let other_transactions = other_operations
- .iter()
- .map(|op| Transaction::new(0, op.clone(), None))
- .collect::>();
-
- // Transactions and whether they are expected to conflict with each
- // of other_transactions
- let cases = [
- (
- Operation::Append {
- fragments: vec![fragment0.clone()],
- },
- [false, false, false, true, true, false],
- ),
- (
- Operation::Delete {
- // Delete that affects fragments different from other transactions
- updated_fragments: vec![fragment1.clone()],
- deleted_fragment_ids: vec![],
- predicate: "x > 2".to_string(),
- },
- [true, false, false, true, true, false],
- ),
- (
- Operation::Delete {
- // Delete that affects same fragments as other transactions
- updated_fragments: vec![fragment0.clone(), fragment2.clone()],
- deleted_fragment_ids: vec![],
- predicate: "x > 2".to_string(),
- },
- [true, false, true, true, true, true],
- ),
- (
- Operation::Overwrite {
- fragments: vec![fragment0.clone(), fragment2.clone()],
- schema: Schema::default(),
- },
- // No conflicts: overwrite can always happen since it doesn't
- // depend on previous state of the table.
- [false, false, false, false, false, false],
- ),
- (
- Operation::CreateIndex {
- new_indices: vec![index0],
- },
- // Will only conflict with operations that modify row ids.
- [false, false, false, false, true, true],
- ),
- (
- // Rewrite that affects different fragments
- Operation::Rewrite {
- old_fragments: vec![fragment1.clone()],
- new_fragments: vec![fragment0.clone()],
- },
- [false, true, false, true, true, false],
- ),
- (
- // Rewrite that affects the same fragments
- Operation::Rewrite {
- old_fragments: vec![fragment0.clone(), fragment2.clone()],
- new_fragments: vec![fragment0.clone()],
- },
- [false, true, true, true, true, true],
- ),
- (
- Operation::Merge {
- fragments: vec![fragment0.clone(), fragment2.clone()],
- schema: Schema::default(),
- },
- // Merge conflicts with everything except CreateIndex.
- [true, false, true, true, true, true],
- ),
- ];
-
- for (operation, expected_conflicts) in &cases {
- let transaction = Transaction::new(0, operation.clone(), None);
- for (other, expected_conflict) in other_transactions.iter().zip(expected_conflicts) {
- assert_eq!(
- transaction.conflicts_with(other),
- *expected_conflict,
- "Transaction {:?} should {} with {:?}",
- transaction,
- if *expected_conflict {
- "conflict"
- } else {
- "not conflict"
- },
- other
- );
- }
- }
- }
-}
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 07ccad1dbd..ffeea2fbd6 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -45,8 +45,6 @@ pub enum Error {
},
#[snafu(display("Not supported: {source}"))]
NotSupported { source: BoxedError },
- #[snafu(display("Commit conflict for version {version}: {source}"))]
- CommitConflict { version: u64, source: BoxedError },
#[snafu(display("Encountered internal error. Please file a bug report at https://github.com/lancedb/lance/issues. {message}"))]
Internal { message: String },
#[snafu(display("LanceError(Arrow): {message}"))]
diff --git a/rust/src/format/manifest.rs b/rust/src/format/manifest.rs
index 4c0d421d66..4d99b3a70f 100644
--- a/rust/src/format/manifest.rs
+++ b/rust/src/format/manifest.rs
@@ -64,9 +64,6 @@ pub struct Manifest {
/// The max fragment id used so far
pub max_fragment_id: u32,
-
- /// The path to the transaction file, relative to the root of the dataset
- pub transaction_file: Option,
}
impl Manifest {
@@ -82,7 +79,6 @@ impl Manifest {
reader_feature_flags: 0,
writer_feature_flags: 0,
max_fragment_id: 0,
- transaction_file: None,
}
}
@@ -102,7 +98,6 @@ impl Manifest {
reader_feature_flags: 0, // These will be set on commit
writer_feature_flags: 0, // These will be set on commit
max_fragment_id: previous.max_fragment_id,
- transaction_file: None,
}
}
@@ -199,11 +194,6 @@ impl From for Manifest {
reader_feature_flags: p.reader_feature_flags,
writer_feature_flags: p.writer_feature_flags,
max_fragment_id: p.max_fragment_id,
- transaction_file: if p.transaction_file.is_empty() {
- None
- } else {
- Some(p.transaction_file)
- },
}
}
}
@@ -229,11 +219,10 @@ impl From<&Manifest> for pb::Manifest {
version_aux_data: m.version_aux_data as u64,
index_section: m.index_section.map(|i| i as u64),
timestamp: timestamp_nanos,
- tag: m.tag.clone().unwrap_or_default(),
+ tag: m.tag.clone().unwrap_or("".to_string()),
reader_feature_flags: m.reader_feature_flags,
writer_feature_flags: m.writer_feature_flags,
max_fragment_id: m.max_fragment_id,
- transaction_file: m.transaction_file.clone().unwrap_or_default(),
}
}
}
diff --git a/rust/src/index.rs b/rust/src/index.rs
index 12383ac9c4..33b3401e83 100644
--- a/rust/src/index.rs
+++ b/rust/src/index.rs
@@ -32,9 +32,8 @@ pub(crate) mod cache;
pub(crate) mod prefilter;
pub mod vector;
-use crate::dataset::transaction::{Operation, Transaction};
+use crate::dataset::write_manifest_file;
use crate::format::Index as IndexMetadata;
-use crate::io::commit::commit_transaction;
use crate::session::Session;
use crate::{dataset::Dataset, Error, Result};
@@ -160,21 +159,27 @@ impl DatasetIndexExt for Dataset {
}
}
- let new_idx = IndexMetadata::new(index_id, &index_name, &[field.id], self.manifest.version);
- let transaction = Transaction::new(
- self.manifest.version,
- Operation::CreateIndex {
- new_indices: vec![new_idx],
- },
- None,
- );
+ let latest_manifest = self.latest_manifest().await?;
+ let mut new_manifest = self.manifest.as_ref().clone();
+ new_manifest.version = latest_manifest.version + 1;
- let new_manifest = commit_transaction(
- self,
- self.object_store(),
- &transaction,
- &Default::default(),
- &Default::default(),
+ // Write index metadata down
+ let new_idx = IndexMetadata::new(index_id, &index_name, &[field.id], new_manifest.version);
+ // Exclude the old index if it exists.
+ // We already checked that there is no index with the same name but on different fields.
+ let mut indices = indices
+ .iter()
+ .filter(|idx| idx.name != index_name)
+ .cloned()
+ .collect::>();
+ indices.push(new_idx);
+
+ write_manifest_file(
+ &self.object_store,
+ &self.base,
+ &mut new_manifest,
+ Some(indices),
+ Default::default(),
)
.await?;
diff --git a/rust/src/index/vector.rs b/rust/src/index/vector.rs
index db51e7dfc0..c24085a5f0 100644
--- a/rust/src/index/vector.rs
+++ b/rust/src/index/vector.rs
@@ -173,7 +173,7 @@ impl TryFrom<&str> for MetricType {
}
/// Parameters of each index stage.
-#[derive(Debug, Clone)]
+#[derive(Debug)]
pub enum StageParams {
Ivf(IvfBuildParams),
@@ -183,7 +183,6 @@ pub enum StageParams {
}
/// The parameters to build vector index.
-#[derive(Debug, Clone)]
pub struct VectorIndexParams {
pub stages: Vec,
diff --git a/rust/src/index/vector/diskann.rs b/rust/src/index/vector/diskann.rs
index 7c0d523f0a..622c3f3bce 100644
--- a/rust/src/index/vector/diskann.rs
+++ b/rust/src/index/vector/diskann.rs
@@ -161,7 +161,7 @@ mod tests {
// Check the version is set correctly
let indices = dataset.load_indices().await.unwrap();
let actual = indices.first().unwrap().dataset_version;
- let expected = dataset.manifest.version - 1;
+ let expected = dataset.manifest.version;
assert_eq!(actual, expected);
}
}
diff --git a/rust/src/io/commit.rs b/rust/src/io/commit.rs
index 0f770eaf5a..4c86b4d767 100644
--- a/rust/src/io/commit.rs
+++ b/rust/src/io/commit.rs
@@ -35,15 +35,10 @@
use std::{fmt::Debug, sync::atomic::AtomicBool};
-use crate::dataset::transaction::Transaction;
-use crate::dataset::{write_manifest_file, ManifestWriteConfig};
-use crate::Dataset;
-use crate::Result;
-use crate::{format::pb, format::Index, format::Manifest};
+use crate::{format::Index, format::Manifest};
use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::Error as ObjectStoreError;
-use prost::Message;
use super::ObjectStore;
@@ -53,7 +48,7 @@ pub type ManifestWriter = for<'a> fn(
manifest: &'a mut Manifest,
indices: Option>,
path: &'a Path,
-) -> BoxFuture<'a, Result<()>>;
+) -> BoxFuture<'a, crate::Result<()>>;
/// Handle commits that prevent conflicting writes.
///
@@ -73,11 +68,10 @@ pub trait CommitHandler: Debug + Send + Sync {
path: &Path,
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
- ) -> std::result::Result<(), CommitError>;
+ ) -> Result<(), CommitError>;
}
/// Errors that can occur when committing a manifest.
-#[derive(Debug)]
pub enum CommitError {
/// Another transaction has already been written to the path
CommitConflict,
@@ -119,7 +113,7 @@ impl CommitHandler for UnsafeCommitHandler {
path: &Path,
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
- ) -> std::result::Result<(), CommitError> {
+ ) -> Result<(), CommitError> {
// Log a one-time warning
if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
@@ -156,7 +150,7 @@ impl CommitHandler for RenameCommitHandler {
path: &Path,
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
- ) -> std::result::Result<(), CommitError> {
+ ) -> Result<(), CommitError> {
// Create a temporary object, then use `rename_if_not_exists` to commit.
// If failed, clean up the temporary object.
@@ -219,13 +213,13 @@ pub trait CommitLock {
/// It is not required that the lock tracks the version. It is provided in
/// case the locking is handled by a catalog service that needs to know the
/// current version of the table.
- async fn lock(&self, version: u64) -> std::result::Result;
+ async fn lock(&self, version: u64) -> Result;
}
#[async_trait::async_trait]
pub trait CommitLease: Send + Sync {
/// Return the lease, indicating whether the commit was successful.
- async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
+ async fn release(&self, success: bool) -> Result<(), CommitError>;
}
#[async_trait::async_trait]
@@ -237,7 +231,7 @@ impl CommitHandler for T {
path: &Path,
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
- ) -> std::result::Result<(), CommitError> {
+ ) -> Result<(), CommitError> {
// NOTE: once we have the lease we cannot use ? to return errors, since
// we must release the lease before returning.
let lease = self.lock(manifest.version).await?;
@@ -269,227 +263,19 @@ impl CommitHandler for T {
}
}
-#[derive(Debug, Clone)]
-pub struct CommitConfig {
- pub num_retries: u32,
- // TODO: add isolation_level
-}
-
-impl Default for CommitConfig {
- fn default() -> Self {
- Self { num_retries: 5 }
- }
-}
-
-/// Read the transaction data from a transaction file.
-async fn read_transaction_file(
- object_store: &ObjectStore,
- base_path: &Path,
- transaction_file: &str,
-) -> Result {
- let path = base_path.child("_transactions").child(transaction_file);
- let result = object_store.inner.get(&path).await?;
- let data = result.bytes().await?;
- let transaction = pb::Transaction::decode(data)?;
- (&transaction).try_into()
-}
-
-/// Write a transaction to a file and return the relative path.
-async fn write_transaction_file(
- object_store: &ObjectStore,
- base_path: &Path,
- transaction: &Transaction,
-) -> Result {
- let file_name = format!("{}-{}.txn", transaction.read_version, transaction.uuid);
- let path = base_path.child("_transactions").child(file_name.as_str());
-
- let message = pb::Transaction::from(transaction);
- let buf = message.encode_to_vec();
- object_store.inner.put(&path, buf.into()).await?;
-
- Ok(file_name)
-}
-
-fn check_transaction(
- transaction: &Transaction,
- other_version: u64,
- other_transaction: &Option,
-) -> Result<()> {
- if other_transaction.is_none() {
- return Err(crate::Error::Internal {
- message: format!(
- "There was a conflicting transaction at version {}, \
- and it was missing transaction metadata.",
- other_version
- ),
- });
- }
-
- if transaction.conflicts_with(other_transaction.as_ref().unwrap()) {
- return Err(crate::Error::CommitConflict {
- version: other_version,
- source: format!(
- "There was a concurrent commit that conflicts with this one and it \
- cannot be automatically resolved. Please rerun the operation off the latest version \
- of the table.\n Transaction: {:?}\n Conflicting Transaction: {:?}",
- transaction, other_transaction
- )
- .into(),
- });
- }
-
- Ok(())
-}
-
-pub(crate) async fn commit_new_dataset(
- object_store: &ObjectStore,
- base_path: &Path,
- transaction: &Transaction,
- write_config: &ManifestWriteConfig,
-) -> Result {
- let transaction_file = write_transaction_file(object_store, base_path, transaction).await?;
-
- let (mut manifest, indices) =
- transaction.build_manifest(None, vec![], &transaction_file, write_config)?;
-
- write_manifest_file(
- object_store,
- base_path,
- &mut manifest,
- if indices.is_empty() {
- None
- } else {
- Some(indices.clone())
- },
- write_config,
- )
- .await?;
-
- Ok(manifest)
-}
-
-/// Attempt to commit a transaction, with retries and conflict resolution.
-pub(crate) async fn commit_transaction(
- dataset: &Dataset,
- object_store: &ObjectStore,
- transaction: &Transaction,
- write_config: &ManifestWriteConfig,
- commit_config: &CommitConfig,
-) -> Result {
- // Note: object_store has been configured with WriteParams, but dataset.object_store()
- // has not necessarily. So for anything involving writing, use `object_store`.
- let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?;
-
- let mut dataset = dataset.clone();
- // First, get all transactions since read_version
- let mut other_transactions = Vec::new();
- let mut version = transaction.read_version;
- loop {
- version += 1;
- match dataset.checkout_version(version).await {
- Ok(next_dataset) => {
- let other_txn = if let Some(txn_file) = &next_dataset.manifest.transaction_file {
- Some(read_transaction_file(object_store, &next_dataset.base, txn_file).await?)
- } else {
- None
- };
- other_transactions.push(other_txn);
- dataset = next_dataset;
- }
- Err(crate::Error::NotFound { .. }) | Err(crate::Error::DatasetNotFound { .. }) => {
- break;
- }
- Err(e) => {
- return Err(e);
- }
- }
- }
-
- let mut target_version = version;
-
- // If any of them conflict with the transaction, return an error
- for (version_offset, other_transaction) in other_transactions.iter().enumerate() {
- let other_version = transaction.read_version + version_offset as u64 + 1;
- check_transaction(transaction, other_version, other_transaction)?;
- }
-
- for _ in 0..commit_config.num_retries {
- // Build an up-to-date manifest from the transaction and current manifest
- let (mut manifest, indices) = transaction.build_manifest(
- Some(dataset.manifest.as_ref()),
- dataset.load_indices().await?,
- &transaction_file,
- write_config,
- )?;
-
- debug_assert_eq!(manifest.version, target_version);
-
- // Try to commit the manifest
- let result = write_manifest_file(
- object_store,
- &dataset.base,
- &mut manifest,
- if indices.is_empty() {
- None
- } else {
- Some(indices.clone())
- },
- write_config,
- )
- .await;
-
- match result {
- Ok(()) => {
- return Ok(manifest);
- }
- Err(CommitError::CommitConflict) => {
- // See if we can retry the commit
- dataset = dataset.checkout_version(target_version).await?;
-
- let other_transaction =
- if let Some(txn_file) = dataset.manifest.transaction_file.as_ref() {
- Some(read_transaction_file(object_store, &dataset.base, txn_file).await?)
- } else {
- None
- };
- check_transaction(transaction, target_version, &other_transaction)?;
- target_version += 1;
- }
- Err(CommitError::OtherError(err)) => {
- // If other error, return
- return Err(err);
- }
- }
- }
-
- Err(crate::Error::CommitConflict {
- version: target_version,
- source: format!(
- "Failed to commit the transaction after {} retries.",
- commit_config.num_retries
- )
- .into(),
- })
-}
-
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
- use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator};
+ use arrow_array::{Int64Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use futures::future::join_all;
use super::*;
- use crate::arrow::FixedSizeListArrayExt;
- use crate::dataset::transaction::Operation;
- use crate::dataset::{WriteMode, WriteParams};
- use crate::index::vector::{MetricType, VectorIndexParams};
- use crate::index::{DatasetIndexExt, IndexType};
+ use crate::dataset::WriteParams;
use crate::io::object_store::ObjectStoreParams;
- use crate::utils::testing::generate_random_array;
use crate::Dataset;
async fn test_commit_handler(handler: Arc, should_succeed: bool) {
@@ -549,9 +335,11 @@ mod tests {
task_results
);
} else {
- // All we can promise here is at least one tasks succeeds, but multiple
- // could in theory.
- assert!(num_successes >= distinct_results.len(),);
+ assert!(
+ num_successes > distinct_results.len(),
+ "Expected some conflicts. Got {:?}",
+ task_results
+ );
}
}
@@ -578,7 +366,7 @@ mod tests {
impl CommitLock for CustomCommitHandler {
type Lease = CustomCommitLease;
- async fn lock(&self, version: u64) -> std::result::Result {
+ async fn lock(&self, version: u64) -> Result {
let mut locked_version = self.locked_version.lock().unwrap();
if locked_version.is_some() {
// Already locked
@@ -597,7 +385,7 @@ mod tests {
#[async_trait::async_trait]
impl CommitLease for CustomCommitLease {
- async fn release(&self, _success: bool) -> std::result::Result<(), CommitError> {
+ async fn release(&self, _success: bool) -> Result<(), CommitError> {
let mut locked_version = self.locked_version.lock().unwrap();
if *locked_version != Some(self.version) {
// Already released
@@ -621,194 +409,4 @@ mod tests {
let handler = Arc::new(UnsafeCommitHandler);
test_commit_handler(handler, false).await;
}
-
- #[tokio::test]
- async fn test_roundtrip_transaction_file() {
- let object_store = ObjectStore::memory();
- let base_path = Path::from("test");
- let transaction = Transaction::new(
- 42,
- Operation::Append { fragments: vec![] },
- Some("hello world".to_string()),
- );
-
- let file_name = write_transaction_file(&object_store, &base_path, &transaction)
- .await
- .unwrap();
- let read_transaction = read_transaction_file(&object_store, &base_path, &file_name)
- .await
- .unwrap();
-
- assert_eq!(transaction.read_version, read_transaction.read_version);
- assert_eq!(transaction.uuid, read_transaction.uuid);
- assert!(matches!(
- read_transaction.operation,
- Operation::Append { .. }
- ));
- assert_eq!(transaction.tag, read_transaction.tag);
- }
-
- #[tokio::test]
- async fn test_concurrent_create_index() {
- // Create a table with two vector columns
- let test_dir = tempfile::tempdir().unwrap();
- let test_uri = test_dir.path().to_str().unwrap();
-
- let dimension = 16;
- let schema = Arc::new(ArrowSchema::new(vec![
- Field::new(
- "vector1",
- DataType::FixedSizeList(
- Arc::new(Field::new("item", DataType::Float32, true)),
- dimension,
- ),
- false,
- ),
- Field::new(
- "vector2",
- DataType::FixedSizeList(
- Arc::new(Field::new("item", DataType::Float32, true)),
- dimension,
- ),
- false,
- ),
- ]));
- let float_arr = generate_random_array(512 * dimension as usize);
- let vectors = Arc::new(
- ::try_new_from_values(
- float_arr, dimension,
- )
- .unwrap(),
- );
- let batches =
- vec![
- RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()])
- .unwrap(),
- ];
-
- let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
- let dataset = Dataset::write(reader, test_uri, None).await.unwrap();
- dataset.validate().await.unwrap();
-
- // From initial version, concurrently call create index 3 times,
- // two of which will be for the same column.
- let params = VectorIndexParams::ivf_pq(10, 8, 2, false, MetricType::L2, 50);
- let futures: Vec<_> = ["vector1", "vector1", "vector2"]
- .iter()
- .map(|col_name| {
- let dataset = dataset.clone();
- let params = params.clone();
- tokio::spawn(async move {
- dataset
- .create_index(&[col_name], IndexType::Vector, None, ¶ms, true)
- .await
- })
- })
- .collect();
-
- let results = join_all(futures).await;
- for result in results {
- assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
- }
-
- // Validate that each version has the anticipated number of indexes
- let dataset = dataset.checkout_version(1).await.unwrap();
- assert!(dataset.load_indices().await.unwrap().is_empty());
-
- let dataset = dataset.checkout_version(2).await.unwrap();
- assert_eq!(dataset.load_indices().await.unwrap().len(), 1);
-
- let dataset = dataset.checkout_version(3).await.unwrap();
- let indices = dataset.load_indices().await.unwrap();
- assert!(!indices.is_empty() && indices.len() <= 2);
-
- // At this point, we have created two indices. If they are both for the same column,
- // it must be vector1 and not vector2.
- if indices.len() == 2 {
- let mut fields: Vec = indices.iter().flat_map(|i| i.fields.clone()).collect();
- fields.sort();
- assert_eq!(fields, vec![0, 1]);
- } else {
- assert_eq!(indices[0].fields, vec![0]);
- }
-
- let dataset = dataset.checkout_version(4).await.unwrap();
- let indices = dataset.load_indices().await.unwrap();
- assert_eq!(indices.len(), 2);
- let mut fields: Vec = indices.iter().flat_map(|i| i.fields.clone()).collect();
- fields.sort();
- assert_eq!(fields, vec![0, 1]);
- }
-
- #[tokio::test]
- async fn test_concurrent_writes() {
- for write_mode in [WriteMode::Append, WriteMode::Overwrite] {
- // Create an empty table
- let test_dir = tempfile::tempdir().unwrap();
- let test_uri = test_dir.path().to_str().unwrap();
-
- let schema = Arc::new(ArrowSchema::new(vec![Field::new(
- "i",
- DataType::Int32,
- false,
- )]));
-
- let dataset = Dataset::write(
- RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()),
- test_uri,
- None,
- )
- .await
- .unwrap();
-
- // Make some sample data
- let batch = RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
- )
- .unwrap();
-
- // Write data concurrently in 5 tasks
- let futures: Vec<_> = (0..5)
- .map(|_| {
- let batch = batch.clone();
- let schema = schema.clone();
- let uri = test_uri.to_string();
- tokio::spawn(async move {
- let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
- Dataset::write(
- reader,
- &uri,
- Some(WriteParams {
- mode: write_mode,
- ..Default::default()
- }),
- )
- .await
- })
- })
- .collect();
- let results = join_all(futures).await;
-
- // Assert all succeeded
- for result in results {
- assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
- }
-
- // Assert final fragments and versions expected
- let dataset = dataset.checkout_version(6).await.unwrap();
-
- match write_mode {
- WriteMode::Append => {
- assert_eq!(dataset.get_fragments().len(), 5);
- }
- WriteMode::Overwrite => {
- assert_eq!(dataset.get_fragments().len(), 1);
- }
- _ => unreachable!(),
- }
-
- dataset.validate().await.unwrap()
- }
- }
}
diff --git a/rust/src/io/object_store.rs b/rust/src/io/object_store.rs
index 4a00bfed67..f646cc39e4 100644
--- a/rust/src/io/object_store.rs
+++ b/rust/src/io/object_store.rs
@@ -258,21 +258,6 @@ impl ObjectStore {
))
}
- pub fn with_params(&self, params: &ObjectStoreParams) -> Self {
- Self {
- inner: params
- .object_store_wrapper
- .as_ref()
- .map(|w| w.wrap(self.inner.clone()))
- .unwrap_or_else(|| self.inner.clone()),
- commit_handler: params
- .commit_handler
- .clone()
- .unwrap_or(self.commit_handler.clone()),
- ..self.clone()
- }
- }
-
fn new_from_path(str_path: &str, params: &ObjectStoreParams) -> Result<(Self, Path)> {
let expanded = tilde(str_path).to_string();
let expanded_path = StdPath::new(&expanded);