From b8056f9a04132e6a99c21f3859174ea2ca65ab8c Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 9 Aug 2023 14:52:20 -0700 Subject: [PATCH] cleanup --- docs/format.rst | 8 ++- protos/transaction.proto | 17 +++-- rust/src/dataset.rs | 2 +- rust/src/dataset/transaction.rs | 118 +++++++++++++++++++++++++------- rust/src/io/commit.rs | 58 ++++++++++++---- 5 files changed, 155 insertions(+), 48 deletions(-) diff --git a/docs/format.rst b/docs/format.rst index d3be1bdc59..a61fbe4278 100644 --- a/docs/format.rst +++ b/docs/format.rst @@ -265,8 +265,13 @@ Conflict resolution ~~~~~~~~~~~~~~~~~~~ If two writers try to commit at the same time, one will succeed and the other -will fail. The failed writer should attempt to retry the commit. +will fail. The failed writer should attempt to retry the commit, but only if +it's changes are compatible with the changes made by the successful writer. +The changes for a given commit are recorded as a transaction file, under the +``_transactions`` prefix in the dataset directory. The transaction file is a +serialize ``Transaction`` protobuf message. See the ``transaction.proto`` file +for its definition. .. image:: _static/conflict_resolution_flow.png @@ -284,7 +289,6 @@ The commit process is as follows: fails because another writer has already committed, go back to step 3. 5. If the commit succeeds, update the ``_latest.manifest`` file. - When checking whether two transactions conflict, be conservative. If the transaction file is missing, assume it conflicts. If the transaction file has an unknown operation, assume it conflicts. diff --git a/protos/transaction.proto b/protos/transaction.proto index 1224df594c..aae1212731 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -31,22 +31,26 @@ package lance.format.pb; // transaction is compatible with this one. message Transaction { // The version of the dataset this transaction was built from. + // + // For example, for a delete transaction this means the version of the dataset + // that was read from while evaluating the deletion predicate. uint64 read_version = 1; - // The UUID + // The UUID that unique identifies a transaction. string uuid = 2; - // Optional version tag + // Optional version tag. string tag = 3; + // Add new rows to the dataset. message Append { // The new fragments to append. // - // The fragment IDs are meaningless here, because they are not yet assigned - // final values. + // Fragment IDs are not yet assigned. repeated DataFragment fragments = 1; } + // Mark rows as deleted. message Delete { // The fragments to update // @@ -61,11 +65,11 @@ message Transaction { string predicate = 3; } + // Create or overwrite the entire dataset. message Overwrite { // The new fragments // - // The fragment IDs are meaningless here, because they are not yet assigned - // final values. + // Fragment IDs are not yet assigned. repeated DataFragment fragments = 1; // The new schema repeated Field schema = 2; @@ -73,6 +77,7 @@ message Transaction { map schema_metadata = 3; } + // Add a new secondary index. message CreateIndex {} // An operation that rewrites but does not change the data in the table. These diff --git a/rust/src/dataset.rs b/rust/src/dataset.rs index 913599d158..993c14d768 100644 --- a/rust/src/dataset.rs +++ b/rust/src/dataset.rs @@ -245,7 +245,7 @@ impl Dataset { .await } - pub(crate) async fn checkout_manifest( + async fn checkout_manifest( object_store: Arc, base_path: Path, manifest_path: &Path, diff --git a/rust/src/dataset/transaction.rs b/rust/src/dataset/transaction.rs index ead5c4aa67..c5c7e092da 100644 --- a/rust/src/dataset/transaction.rs +++ b/rust/src/dataset/transaction.rs @@ -12,16 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO: -// [ ] Module docs -// [ ] Conflict resolution Tests -// [x] Serialization -// [x] Deserialization -// [x] Commit function -// [ ] Commit tests -// [ ] Python API -// [ ] Python API docs -// [x] Make transaction a separate file +//! Transaction definitions for updating datasets +//! +//! Prior to creating a new manifest, a transaction must be created representing +//! the changes being made to the dataset. By representing them as incremental +//! changes, we can detect whether concurrent operations are compatible with +//! one another. We can also rebuild manifests when retrying committing a +//! manifest. use std::{collections::HashSet, sync::Arc}; @@ -46,9 +43,6 @@ pub struct Transaction { pub operation: Operation, pub tag: Option, } -// phalanx/src/catalog.rs - -// TODO: This should have a protobuf message /// An operation on a dataset. #[derive(Debug, Clone)] @@ -64,7 +58,8 @@ pub enum Operation { deleted_fragment_ids: Vec, predicate: String, }, - /// Overwrite the entire dataset with the given fragments. + /// Overwrite the entire dataset with the given fragments. This is also + /// used when initially creating a table. Overwrite { fragments: Vec, schema: Schema, @@ -83,8 +78,6 @@ pub enum Operation { fragments: Vec, schema: Schema, }, - // TODO: a Custom one to allow arbitrary changes that will have same conflict - // resolution rules as unknown? } impl Transaction { @@ -140,6 +133,14 @@ impl Transaction { deleted_fragment_ids.iter().any(|f| left_ids.contains(f)) || updated_fragments.iter().any(|f| left_ids.contains(&f.id)) } + Operation::Rewrite { + old_fragments: right_fragments, + .. + } => { + // As long as they rewrite disjoint fragments they shouldn't conflict. + let left_ids: HashSet = old_fragments.iter().map(|f| f.id).collect(); + right_fragments.iter().any(|f| left_ids.contains(&f.id)) + } _ => true, }, // Overwrite always succeeds @@ -148,6 +149,11 @@ impl Transaction { Operation::Append { .. } => false, // Indices are identified by UUIDs, so they shouldn't conflict. Operation::CreateIndex { .. } => false, + // Although some of the rows we indexed may have been deleted, + // row ids are still valid, so we allow this optimistically. + Operation::Delete { .. } => false, + // Merge doesn't change row ids, so this should be fine. + Operation::Merge { .. } => false, // Rewrite likely changed many of the row ids, so our index is // likely useless. It should be rebuilt. // TODO: we could be smarter here and only invalidate the index @@ -168,9 +174,16 @@ impl Transaction { let left_ids: HashSet = left_fragments.iter().map(|f| f.id).collect(); right_fragments.iter().any(|f| left_ids.contains(&f.id)) } + Operation::Rewrite { old_fragments, .. } => { + // If we update any fragments that were rewritten, we conflict. + let left_ids: HashSet = left_fragments.iter().map(|f| f.id).collect(); + old_fragments.iter().any(|f| left_ids.contains(&f.id)) + } _ => true, }, - _ => true, + // Merge changes the schema, but preserves row ids, so the only operation + // it's compatible with is CreateIndex. + Operation::Merge { .. } => !matches!(&other.operation, Operation::CreateIndex), } } @@ -456,13 +469,70 @@ mod tests { // Transactions and whether they are expected to conflict with each // of other_transactions - let cases = [( - Operation::Append { - fragments: vec![fragment0.clone()], - }, - [false, false, false, true, true, false], - // TODO: more cases - )]; + let cases = [ + ( + Operation::Append { + fragments: vec![fragment0.clone()], + }, + [false, false, false, true, true, false], + ), + ( + Operation::Delete { + // Delete that affects fragments different from other transactions + updated_fragments: vec![fragment1.clone()], + deleted_fragment_ids: vec![], + predicate: "x > 2".to_string(), + }, + [true, false, false, true, true, false], + ), + ( + Operation::Delete { + // Delete that affects same fragments as other transactions + updated_fragments: vec![fragment0.clone(), fragment2.clone()], + deleted_fragment_ids: vec![], + predicate: "x > 2".to_string(), + }, + [true, false, true, true, true, true], + ), + ( + Operation::Overwrite { + fragments: vec![fragment0.clone(), fragment2.clone()], + schema: Schema::default(), + }, + // No conflicts: overwrite can always happen since it doesn't + // depend on previous state of the table. + [false, false, false, false, false, false], + ), + ( + Operation::CreateIndex, + // Will only conflict with operations that modify row ids. + [false, false, false, false, true, true], + ), + ( + // Rewrite that affects different fragments + Operation::Rewrite { + old_fragments: vec![fragment1.clone()], + new_fragments: vec![fragment0.clone()], + }, + [false, true, false, true, true, false], + ), + ( + // Rewrite that affects the same fragments + Operation::Rewrite { + old_fragments: vec![fragment0.clone(), fragment2.clone()], + new_fragments: vec![fragment0.clone()], + }, + [false, true, true, true, true, true], + ), + ( + Operation::Merge { + fragments: vec![fragment0.clone(), fragment2.clone()], + schema: Schema::default(), + }, + // Merge conflicts with everything except CreateIndex. + [true, false, true, true, true, true], + ), + ]; for (operation, expected_conflicts) in &cases { let transaction = Transaction::new(0, operation.clone(), None); diff --git a/rust/src/io/commit.rs b/rust/src/io/commit.rs index 3bee428dcf..067d75ebd9 100644 --- a/rust/src/io/commit.rs +++ b/rust/src/io/commit.rs @@ -38,6 +38,7 @@ use std::{fmt::Debug, sync::atomic::AtomicBool}; use crate::dataset::transaction::Transaction; use crate::dataset::{write_manifest_file, ManifestWriteConfig}; use crate::Dataset; +use crate::Result; use crate::{format::pb, format::Index, format::Manifest}; use futures::future::BoxFuture; use object_store::path::Path; @@ -52,7 +53,7 @@ pub type ManifestWriter = for<'a> fn( manifest: &'a mut Manifest, indices: Option>, path: &'a Path, -) -> BoxFuture<'a, crate::Result<()>>; +) -> BoxFuture<'a, Result<()>>; /// Handle commits that prevent conflicting writes. /// @@ -72,7 +73,7 @@ pub trait CommitHandler: Debug + Send + Sync { path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, - ) -> Result<(), CommitError>; + ) -> std::result::Result<(), CommitError>; } /// Errors that can occur when committing a manifest. @@ -117,7 +118,7 @@ impl CommitHandler for UnsafeCommitHandler { path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, - ) -> Result<(), CommitError> { + ) -> std::result::Result<(), CommitError> { // Log a one-time warning if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) { WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed); @@ -154,7 +155,7 @@ impl CommitHandler for RenameCommitHandler { path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, - ) -> Result<(), CommitError> { + ) -> std::result::Result<(), CommitError> { // Create a temporary object, then use `rename_if_not_exists` to commit. // If failed, clean up the temporary object. @@ -217,13 +218,13 @@ pub trait CommitLock { /// It is not required that the lock tracks the version. It is provided in /// case the locking is handled by a catalog service that needs to know the /// current version of the table. - async fn lock(&self, version: u64) -> Result; + async fn lock(&self, version: u64) -> std::result::Result; } #[async_trait::async_trait] pub trait CommitLease: Send + Sync { /// Return the lease, indicating whether the commit was successful. - async fn release(&self, success: bool) -> Result<(), CommitError>; + async fn release(&self, success: bool) -> std::result::Result<(), CommitError>; } #[async_trait::async_trait] @@ -235,7 +236,7 @@ impl CommitHandler for T { path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, - ) -> Result<(), CommitError> { + ) -> std::result::Result<(), CommitError> { // NOTE: once we have the lease we cannot use ? to return errors, since // we must release the lease before returning. let lease = self.lock(manifest.version).await?; @@ -284,7 +285,7 @@ async fn read_transaction_file( object_store: &ObjectStore, base_path: &Path, transaction_file: &str, -) -> crate::Result { +) -> Result { let path = base_path.child("_transactions").child(transaction_file); let result = object_store.inner.get(&path).await?; let data = result.bytes().await?; @@ -297,7 +298,7 @@ async fn write_transaction_file( object_store: &ObjectStore, base_path: &Path, transaction: &Transaction, -) -> crate::Result { +) -> Result { let file_name = format!("{}-{}.txn", transaction.read_version, transaction.uuid); let path = base_path.child("_transactions").child(file_name.as_str()); @@ -308,11 +309,11 @@ async fn write_transaction_file( Ok(file_name) } -pub(crate) fn check_transaction( +fn check_transaction( transaction: &Transaction, other_version: u64, other_transaction: &Option, -) -> crate::Result<()> { +) -> Result<()> { if other_transaction.is_none() { return Err(crate::Error::Internal { message: format!( @@ -345,7 +346,7 @@ pub(crate) async fn commit_new_dataset( transaction: &Transaction, indices: Option>, write_config: &ManifestWriteConfig, -) -> crate::Result { +) -> Result { let transaction_file = write_transaction_file(object_store, base_path, transaction).await?; let mut manifest = transaction.build_manifest(None, &transaction_file, write_config)?; @@ -370,7 +371,7 @@ pub(crate) async fn commit_transaction( indices: Option>, write_config: &ManifestWriteConfig, commit_config: &CommitConfig, -) -> crate::Result { +) -> Result { // Note: object_store has been configured with WriteParams, but dataset.object_store() // has not necessarily. So for anything involving writing, use `object_store`. let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?; @@ -471,6 +472,7 @@ mod tests { use super::*; + use crate::dataset::transaction::Operation; use crate::dataset::WriteParams; use crate::io::object_store::ObjectStoreParams; use crate::Dataset; @@ -561,7 +563,7 @@ mod tests { impl CommitLock for CustomCommitHandler { type Lease = CustomCommitLease; - async fn lock(&self, version: u64) -> Result { + async fn lock(&self, version: u64) -> std::result::Result { let mut locked_version = self.locked_version.lock().unwrap(); if locked_version.is_some() { // Already locked @@ -580,7 +582,7 @@ mod tests { #[async_trait::async_trait] impl CommitLease for CustomCommitLease { - async fn release(&self, _success: bool) -> Result<(), CommitError> { + async fn release(&self, _success: bool) -> std::result::Result<(), CommitError> { let mut locked_version = self.locked_version.lock().unwrap(); if *locked_version != Some(self.version) { // Already released @@ -604,4 +606,30 @@ mod tests { let handler = Arc::new(UnsafeCommitHandler); test_commit_handler(handler, false).await; } + + #[tokio::test] + async fn test_roundtrip_transaction_file() { + let object_store = ObjectStore::memory(); + let base_path = Path::from("test"); + let transaction = Transaction::new( + 42, + Operation::Append { fragments: vec![] }, + Some("hello world".to_string()), + ); + + let file_name = write_transaction_file(&object_store, &base_path, &transaction) + .await + .unwrap(); + let read_transaction = read_transaction_file(&object_store, &base_path, &file_name) + .await + .unwrap(); + + assert_eq!(transaction.read_version, read_transaction.read_version); + assert_eq!(transaction.uuid, read_transaction.uuid); + assert!(matches!( + read_transaction.operation, + Operation::Append { .. } + )); + assert_eq!(transaction.tag, read_transaction.tag); + } }