-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: transaction files and commit conflict resolution #1127
Conversation
b5fe143
to
1e42636
Compare
We might also refactor the low-level Commit API to use transactions at the interface. I'll leave that for a follow up PR. |
b8056f9
to
43a3455
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks amazing. Will do a second pass tmr.
// The path format is "{read_version}-{uuid}.txn" where {read_version} is the | ||
// version of the table the transaction read from, and {uuid} is a | ||
// hyphen-separated UUID. | ||
string transaction_file = 12; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I wonder if it'd be a good idea to keep track of a sliding window of transaction_file pointers here.
From the start of the commit attempt (V) to the retry (V +n), more than version could have past. This means we will have to read the manifest for each of V+1..V+n
just for the transaction file path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I suppose we could do something like this. A time-based window would probably be most appropriate. One really only cares what happened in the last 5 minutes or so, I think.
But this would add quite a bit of complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Let's skip it then. We can add it later if it's truly a problem
protos/transaction.proto
Outdated
} | ||
|
||
// Add a new secondary index. | ||
message CreateIndex {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's keep track of the index type here and column here. If a dataset has multiple index to rebuild, we might want to have parallel index builds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I should test that. There might be an issue with how we handle indices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call out here. I've added the full index metadata here and realized I forgot to write the CreateIndex path. So that's there now and there's a test for concurrent CreateIndex operations.
@@ -100,6 +100,18 @@ message Manifest { | |||
// | |||
// For a single file, will be zero. | |||
uint32 max_fragment_id = 11; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooc, is there any reason to prefer having a transaction file over just putting the transaction directly in the manifest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main reason is that outside of the commit loop, which only happens in a narrow time range, this information is mostly redundant. In theory this can be quite large, so I didn’t want us to have to pay the IO cost every time the version was loaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose since many times it's small, a classic compromise would be to allow it to be inlined if it isn't too large. Does that sounds better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think either way is fine tbh. Write txs should be able to endure one additional IO. We can keep it as is for simplicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to forgo this for now, since I think we can add this in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions / comments but from what I understand this looks like a very clever addition!
docs/format.rst
Outdated
|
||
The changes for a given commit are recorded as a transaction file, under the | ||
``_transactions`` prefix in the dataset directory. The transaction file is a | ||
serialize ``Transaction`` protobuf message. See the ``transaction.proto`` file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serialize ``Transaction`` protobuf message. See the ``transaction.proto`` file | |
serialized ``Transaction`` protobuf message. See the ``transaction.proto`` file |
docs/format.rst
Outdated
|
||
1. The writer finishes writing all data files. | ||
2. The writer creates a transaction file in the ``_transactions`` directory. | ||
This files describes the operation that was performed, which is used for two |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This files describes the operation that was performed, which is used for two | |
This files describes the operation that were performed, which is used for two |
string uuid = 2; | ||
|
||
// Optional version tag. | ||
string tag = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you have in mind for this field? It doesn't seem to be used currently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to eventually support #588
But you're right it isn't used currently.
Ok(()) | ||
} | ||
|
||
pub(crate) async fn commit_new_dataset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if two threads try to create a dataset with the same name at the same time? Actually, it looks like write_manifest_file
will return a conflict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's a good question. I think the commit mechanism should protect us, and the remainder of files shouldn't collide.
rust/src/dataset/transaction.rs
Outdated
let left_ids: HashSet<u64> = left_fragments.iter().map(|f| f.id).collect(); | ||
right_fragments.iter().any(|f| left_ids.contains(&f.id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: This "fragments intersect" logic is reused a lot. Might be worth pulling into a helper function for readability.
} | ||
} | ||
|
||
impl TryFrom<&pb::Transaction> for Transaction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a real comment. Just bleh, rust protobuf seems to require quite a bit of boilerplate code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha. I think if we want to operate directly off of the generated message structs we could, it's just then we'd have to deal with all the optional fields throughout the code base. So I think we create these conversions to stricter structs just to keep the validation in one place. But it does involve some boilerplate.
44df2bf
to
e5add1d
Compare
9ddfd26
to
4154dc3
Compare
//! ✅ indicates that the operation is compatible, while ❌ indicates that it is | ||
//! a conflict. Some operations have additional conditions that must be met for | ||
//! them to be compatible. | ||
//! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is amazing! ❤️
} | ||
Err(CommitError::CommitConflict) => { | ||
// See if we can retry the commit | ||
dataset = dataset.checkout_version(target_version).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: multiple version could have passed here, we should pull till the latest one.
Let's just add a ticket for this? No need to fix it right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// First, get all transactions since read_version | ||
let mut other_transactions = Vec::new(); | ||
let mut version = transaction.read_version; | ||
loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe extract this to a function so we can reuse in below. (ticket okay)
final_fragments.extend(maybe_existing_fragments?.clone()); | ||
final_fragments.retain(|f| !deleted_fragment_ids.contains(&f.id)); | ||
final_fragments.iter_mut().for_each(|f| { | ||
for updated in updated_fragments { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's add a todo to do in with two sorted lists instead of brute force?
Operation::Rewrite { | ||
ref new_fragments, .. | ||
} => { | ||
final_fragments.extend(Self::fragments_with_ids( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to retain the frags that aren't rewritten?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll be fixing this as part of the compaction implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one blocking question (w.r.t. rewrite conflict resolution)
General comment (non-blocking):
- We have many kinds of transactions and the combination of interaction is
O(N^2)
. I wonder if it would be cleaner to keep a smaller set of "core txs". SayRewrite
is actuallyDelete { old_frags } + Append { new_frags }
Overwrite
isDelete { all_frags } + ChangeSchema { .. } + Append { .. }
- I can also see the flip side that this could hide interactions. Happy to take this to a ticket.
Originally I was thinking I could represent these with some generic struct like: struct Transaction {
updated_metadata: Option<Metadata>,
new_fragments: Vec<Fragment>,
updated_fragments: Vec<Fragment>,
removed_fragment_ids: Vec<u32>,
} But I realized there's operations like I think, though, we should be free to transform these internally into something that easier to resolve, if we end up finding patterns in the implementation. |
This PR introduces a new file:
Transaction
files. These describe the incremental changes a commit is attempting to make or has made. They are described entirely by a protobuf message. These files are considered optional, and thus these changes are backwards compatible. No feature flags are needed.The commit flow has been refactored based around the transaction files. Writers that find a conflicting writer has beaten them to a commit will now retry up to 5 times, using their transaction data to re-build the manifest. Importantly, before retrying writers will validate that their changes are compatible with the changes that have been committed concurrently.