Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Aug 9, 2023
1 parent 8a1093a commit b8056f9
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 48 deletions.
8 changes: 6 additions & 2 deletions docs/format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,13 @@ Conflict resolution
~~~~~~~~~~~~~~~~~~~

If two writers try to commit at the same time, one will succeed and the other
will fail. The failed writer should attempt to retry the commit.
will fail. The failed writer should attempt to retry the commit, but only if
it's changes are compatible with the changes made by the successful writer.

The changes for a given commit are recorded as a transaction file, under the
``_transactions`` prefix in the dataset directory. The transaction file is a
serialize ``Transaction`` protobuf message. See the ``transaction.proto`` file
for its definition.

.. image:: _static/conflict_resolution_flow.png

Expand All @@ -284,7 +289,6 @@ The commit process is as follows:
fails because another writer has already committed, go back to step 3.
5. If the commit succeeds, update the ``_latest.manifest`` file.


When checking whether two transactions conflict, be conservative. If the
transaction file is missing, assume it conflicts. If the transaction file
has an unknown operation, assume it conflicts.
17 changes: 11 additions & 6 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,26 @@ package lance.format.pb;
// transaction is compatible with this one.
message Transaction {
// The version of the dataset this transaction was built from.
//
// For example, for a delete transaction this means the version of the dataset
// that was read from while evaluating the deletion predicate.
uint64 read_version = 1;

// The UUID
// The UUID that unique identifies a transaction.
string uuid = 2;

// Optional version tag
// Optional version tag.
string tag = 3;

// Add new rows to the dataset.
message Append {
// The new fragments to append.
//
// The fragment IDs are meaningless here, because they are not yet assigned
// final values.
// Fragment IDs are not yet assigned.
repeated DataFragment fragments = 1;
}

// Mark rows as deleted.
message Delete {
// The fragments to update
//
Expand All @@ -61,18 +65,19 @@ message Transaction {
string predicate = 3;
}

// Create or overwrite the entire dataset.
message Overwrite {
// The new fragments
//
// The fragment IDs are meaningless here, because they are not yet assigned
// final values.
// Fragment IDs are not yet assigned.
repeated DataFragment fragments = 1;
// The new schema
repeated Field schema = 2;
// Schema metadata.
map<string, bytes> schema_metadata = 3;
}

// Add a new secondary index.
message CreateIndex {}

// An operation that rewrites but does not change the data in the table. These
Expand Down
2 changes: 1 addition & 1 deletion rust/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl Dataset {
.await
}

pub(crate) async fn checkout_manifest(
async fn checkout_manifest(
object_store: Arc<ObjectStore>,
base_path: Path,
manifest_path: &Path,
Expand Down
118 changes: 94 additions & 24 deletions rust/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// TODO:
// [ ] Module docs
// [ ] Conflict resolution Tests
// [x] Serialization
// [x] Deserialization
// [x] Commit function
// [ ] Commit tests
// [ ] Python API
// [ ] Python API docs
// [x] Make transaction a separate file
//! Transaction definitions for updating datasets
//!
//! Prior to creating a new manifest, a transaction must be created representing
//! the changes being made to the dataset. By representing them as incremental
//! changes, we can detect whether concurrent operations are compatible with
//! one another. We can also rebuild manifests when retrying committing a
//! manifest.
use std::{collections::HashSet, sync::Arc};

Expand All @@ -46,9 +43,6 @@ pub struct Transaction {
pub operation: Operation,
pub tag: Option<String>,
}
// phalanx/src/catalog.rs

// TODO: This should have a protobuf message

/// An operation on a dataset.
#[derive(Debug, Clone)]
Expand All @@ -64,7 +58,8 @@ pub enum Operation {
deleted_fragment_ids: Vec<u64>,
predicate: String,
},
/// Overwrite the entire dataset with the given fragments.
/// Overwrite the entire dataset with the given fragments. This is also
/// used when initially creating a table.
Overwrite {
fragments: Vec<Fragment>,
schema: Schema,
Expand All @@ -83,8 +78,6 @@ pub enum Operation {
fragments: Vec<Fragment>,
schema: Schema,
},
// TODO: a Custom one to allow arbitrary changes that will have same conflict
// resolution rules as unknown?
}

impl Transaction {
Expand Down Expand Up @@ -140,6 +133,14 @@ impl Transaction {
deleted_fragment_ids.iter().any(|f| left_ids.contains(f))
|| updated_fragments.iter().any(|f| left_ids.contains(&f.id))
}
Operation::Rewrite {
old_fragments: right_fragments,
..
} => {
// As long as they rewrite disjoint fragments they shouldn't conflict.
let left_ids: HashSet<u64> = old_fragments.iter().map(|f| f.id).collect();
right_fragments.iter().any(|f| left_ids.contains(&f.id))
}
_ => true,
},
// Overwrite always succeeds
Expand All @@ -148,6 +149,11 @@ impl Transaction {
Operation::Append { .. } => false,
// Indices are identified by UUIDs, so they shouldn't conflict.
Operation::CreateIndex { .. } => false,
// Although some of the rows we indexed may have been deleted,
// row ids are still valid, so we allow this optimistically.
Operation::Delete { .. } => false,
// Merge doesn't change row ids, so this should be fine.
Operation::Merge { .. } => false,
// Rewrite likely changed many of the row ids, so our index is
// likely useless. It should be rebuilt.
// TODO: we could be smarter here and only invalidate the index
Expand All @@ -168,9 +174,16 @@ impl Transaction {
let left_ids: HashSet<u64> = left_fragments.iter().map(|f| f.id).collect();
right_fragments.iter().any(|f| left_ids.contains(&f.id))
}
Operation::Rewrite { old_fragments, .. } => {
// If we update any fragments that were rewritten, we conflict.
let left_ids: HashSet<u64> = left_fragments.iter().map(|f| f.id).collect();
old_fragments.iter().any(|f| left_ids.contains(&f.id))
}
_ => true,
},
_ => true,
// Merge changes the schema, but preserves row ids, so the only operation
// it's compatible with is CreateIndex.
Operation::Merge { .. } => !matches!(&other.operation, Operation::CreateIndex),
}
}

Expand Down Expand Up @@ -456,13 +469,70 @@ mod tests {

// Transactions and whether they are expected to conflict with each
// of other_transactions
let cases = [(
Operation::Append {
fragments: vec![fragment0.clone()],
},
[false, false, false, true, true, false],
// TODO: more cases
)];
let cases = [
(
Operation::Append {
fragments: vec![fragment0.clone()],
},
[false, false, false, true, true, false],
),
(
Operation::Delete {
// Delete that affects fragments different from other transactions
updated_fragments: vec![fragment1.clone()],
deleted_fragment_ids: vec![],
predicate: "x > 2".to_string(),
},
[true, false, false, true, true, false],
),
(
Operation::Delete {
// Delete that affects same fragments as other transactions
updated_fragments: vec![fragment0.clone(), fragment2.clone()],
deleted_fragment_ids: vec![],
predicate: "x > 2".to_string(),
},
[true, false, true, true, true, true],
),
(
Operation::Overwrite {
fragments: vec![fragment0.clone(), fragment2.clone()],
schema: Schema::default(),
},
// No conflicts: overwrite can always happen since it doesn't
// depend on previous state of the table.
[false, false, false, false, false, false],
),
(
Operation::CreateIndex,
// Will only conflict with operations that modify row ids.
[false, false, false, false, true, true],
),
(
// Rewrite that affects different fragments
Operation::Rewrite {
old_fragments: vec![fragment1.clone()],
new_fragments: vec![fragment0.clone()],
},
[false, true, false, true, true, false],
),
(
// Rewrite that affects the same fragments
Operation::Rewrite {
old_fragments: vec![fragment0.clone(), fragment2.clone()],
new_fragments: vec![fragment0.clone()],
},
[false, true, true, true, true, true],
),
(
Operation::Merge {
fragments: vec![fragment0.clone(), fragment2.clone()],
schema: Schema::default(),
},
// Merge conflicts with everything except CreateIndex.
[true, false, true, true, true, true],
),
];

for (operation, expected_conflicts) in &cases {
let transaction = Transaction::new(0, operation.clone(), None);
Expand Down
Loading

0 comments on commit b8056f9

Please sign in to comment.