Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: transaction files and commit conflict resolution #1127

Merged
merged 11 commits into from
Aug 15, 2023

Conversation

wjones127
Copy link
Contributor

@wjones127 wjones127 commented Aug 8, 2023

This PR introduces a new file: Transaction files. These describe the incremental changes a commit is attempting to make or has made. They are described entirely by a protobuf message. These files are considered optional, and thus these changes are backwards compatible. No feature flags are needed.

The commit flow has been refactored based around the transaction files. Writers that find a conflicting writer has beaten them to a commit will now retry up to 5 times, using their transaction data to re-build the manifest. Importantly, before retrying writers will validate that their changes are compatible with the changes that have been committed concurrently.

@wjones127 wjones127 force-pushed the wjones127/conflict-resolution branch from b5fe143 to 1e42636 Compare August 9, 2023 18:05
@wjones127 wjones127 changed the title wip: outline conflict resolution impl feat: transaction files and commit conflict resolution Aug 9, 2023
@wjones127
Copy link
Contributor Author

We might also refactor the low-level Commit API to use transactions at the interface. I'll leave that for a follow up PR.

@wjones127 wjones127 force-pushed the wjones127/conflict-resolution branch from b8056f9 to 43a3455 Compare August 9, 2023 21:59
@wjones127 wjones127 marked this pull request as ready for review August 9, 2023 22:11
Copy link
Contributor

@chebbyChefNEQ chebbyChefNEQ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks amazing. Will do a second pass tmr.

// The path format is "{read_version}-{uuid}.txn" where {read_version} is the
// version of the table the transaction read from, and {uuid} is a
// hyphen-separated UUID.
string transaction_file = 12;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wonder if it'd be a good idea to keep track of a sliding window of transaction_file pointers here.

From the start of the commit attempt (V) to the retry (V +n), more than version could have past. This means we will have to read the manifest for each of V+1..V+n just for the transaction file path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I suppose we could do something like this. A time-based window would probably be most appropriate. One really only cares what happened in the last 5 minutes or so, I think.

But this would add quite a bit of complexity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Let's skip it then. We can add it later if it's truly a problem

}

// Add a new secondary index.
message CreateIndex {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's keep track of the index type here and column here. If a dataset has multiple index to rebuild, we might want to have parallel index builds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I should test that. There might be an issue with how we handle indices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call out here. I've added the full index metadata here and realized I forgot to write the CreateIndex path. So that's there now and there's a test for concurrent CreateIndex operations.

rust/src/dataset/transaction.rs Show resolved Hide resolved
rust/src/dataset/transaction.rs Show resolved Hide resolved
@@ -100,6 +100,18 @@ message Manifest {
//
// For a single file, will be zero.
uint32 max_fragment_id = 11;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc, is there any reason to prefer having a transaction file over just putting the transaction directly in the manifest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason is that outside of the commit loop, which only happens in a narrow time range, this information is mostly redundant. In theory this can be quite large, so I didn’t want us to have to pay the IO cost every time the version was loaded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose since many times it's small, a classic compromise would be to allow it to be inlined if it isn't too large. Does that sounds better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either way is fine tbh. Write txs should be able to endure one additional IO. We can keep it as is for simplicity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to forgo this for now, since I think we can add this in the future.

Copy link
Contributor

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions / comments but from what I understand this looks like a very clever addition!

docs/format.rst Outdated

The changes for a given commit are recorded as a transaction file, under the
``_transactions`` prefix in the dataset directory. The transaction file is a
serialize ``Transaction`` protobuf message. See the ``transaction.proto`` file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
serialize ``Transaction`` protobuf message. See the ``transaction.proto`` file
serialized ``Transaction`` protobuf message. See the ``transaction.proto`` file

docs/format.rst Outdated

1. The writer finishes writing all data files.
2. The writer creates a transaction file in the ``_transactions`` directory.
This files describes the operation that was performed, which is used for two
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This files describes the operation that was performed, which is used for two
This files describes the operation that were performed, which is used for two

string uuid = 2;

// Optional version tag.
string tag = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you have in mind for this field? It doesn't seem to be used currently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to eventually support #588

But you're right it isn't used currently.

Ok(())
}

pub(crate) async fn commit_new_dataset(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if two threads try to create a dataset with the same name at the same time? Actually, it looks like write_manifest_file will return a conflict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good question. I think the commit mechanism should protect us, and the remainder of files shouldn't collide.

Comment on lines 174 to 175
let left_ids: HashSet<u64> = left_fragments.iter().map(|f| f.id).collect();
right_fragments.iter().any(|f| left_ids.contains(&f.id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: This "fragments intersect" logic is reused a lot. Might be worth pulling into a helper function for readability.

}
}

impl TryFrom<&pb::Transaction> for Transaction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a real comment. Just bleh, rust protobuf seems to require quite a bit of boilerplate code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha. I think if we want to operate directly off of the generated message structs we could, it's just then we'd have to deal with all the optional fields throughout the code base. So I think we create these conversions to stricter structs just to keep the validation in one place. But it does involve some boilerplate.

@wjones127 wjones127 force-pushed the wjones127/conflict-resolution branch from 44df2bf to e5add1d Compare August 11, 2023 16:36
@wjones127 wjones127 requested a review from westonpace August 11, 2023 19:35
@wjones127 wjones127 force-pushed the wjones127/conflict-resolution branch from 9ddfd26 to 4154dc3 Compare August 14, 2023 16:51
//! ✅ indicates that the operation is compatible, while ❌ indicates that it is
//! a conflict. Some operations have additional conditions that must be met for
//! them to be compatible.
//!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is amazing! ❤️

}
Err(CommitError::CommitConflict) => {
// See if we can retry the commit
dataset = dataset.checkout_version(target_version).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: multiple version could have passed here, we should pull till the latest one.

Let's just add a ticket for this? No need to fix it right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// First, get all transactions since read_version
let mut other_transactions = Vec::new();
let mut version = transaction.read_version;
loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe extract this to a function so we can reuse in below. (ticket okay)

final_fragments.extend(maybe_existing_fragments?.clone());
final_fragments.retain(|f| !deleted_fragment_ids.contains(&f.id));
final_fragments.iter_mut().for_each(|f| {
for updated in updated_fragments {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's add a todo to do in with two sorted lists instead of brute force?

Operation::Rewrite {
ref new_fragments, ..
} => {
final_fragments.extend(Self::fragments_with_ids(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to retain the frags that aren't rewritten?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be fixing this as part of the compaction implementation.

Copy link
Contributor

@chebbyChefNEQ chebbyChefNEQ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one blocking question (w.r.t. rewrite conflict resolution)

General comment (non-blocking):

  • We have many kinds of transactions and the combination of interaction is O(N^2). I wonder if it would be cleaner to keep a smaller set of "core txs". Say
    • Rewrite is actually Delete { old_frags } + Append { new_frags }
    • Overwrite is Delete { all_frags } + ChangeSchema { .. } + Append { .. }
    • I can also see the flip side that this could hide interactions. Happy to take this to a ticket.

@wjones127
Copy link
Contributor Author

We have many kinds of transactions and the combination of interaction is O(N^2). I wonder if it would be cleaner to keep a smaller set of "core txs". Say
Rewrite is actually Delete { old_frags } + Append { new_frags }
Overwrite is Delete { all_frags } + ChangeSchema { .. } + Append { .. }
I can also see the flip side that this could hide interactions. Happy to take this to a ticket.

Originally I was thinking I could represent these with some generic struct like:

struct Transaction {
   updated_metadata: Option<Metadata>,
   new_fragments: Vec<Fragment>,
   updated_fragments: Vec<Fragment>,
   removed_fragment_ids: Vec<u32>,
}

But I realized there's operations like delete where we also want to check the predicate itself. And then there's probably additional cases I'm not yet thinking of. Delta Lake originally had things just expressed in terms of new files and removed files, but then the conflict resolution ended up relying more and more on the unstructured metadata left in the log about transactions. So I'm thinking it might be safer to write these transaction types in the message.

I think, though, we should be free to transform these internally into something that easier to resolve, if we end up finding patterns in the implementation.

@wjones127 wjones127 merged commit 0a12b97 into main Aug 15, 2023
@wjones127 wjones127 deleted the wjones127/conflict-resolution branch August 15, 2023 22:08
eddyxu added a commit that referenced this pull request Aug 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants