-
Notifications
You must be signed in to change notification settings - Fork 548
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LSM: fixing Compaction interactions with iterators #177
Conversation
Thanks for the awesome write-up @kprotty. |
src/lsm/k_way_merge.zig
Outdated
if (stream_peek(it.context, root)) |key| { | ||
it.keys[0] = key; | ||
it.down_heap(); | ||
} else { | ||
it.swap(0, it.k - 1); | ||
it.k -= 1; | ||
it.down_heap(); | ||
} else |err| switch (err) { | ||
error.Pending => return null, | ||
error.Empty => { | ||
it.swap(0, it.k - 1); | ||
it.k -= 1; | ||
it.down_heap(); | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to a bug that we discussed. The value may not be available right now (in the Pending
case), but when becomes available the stream may be in the wrong position in the heap. I think we need to move this whole heap update block from after the stream_pop()
to before the stream_pop()
. (Note that this means that the heap invariant of keys
would need to be modified accordingly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So Pending should do the same as the successful if |key|
branch to ensure it's peek'd again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can't because it doesn't have a key
to use.
I think we need to move the whole heap update immediately before we remove a value. e.g.
if (stream_peek(it.context, root)) |key| {
it.keys[0] = key;
it.down_heap();
} else {
it.swap(0, it.k - 1);
it.k -= 1;
it.down_heap();
} else |err| switch (err) {
error.Pending => return null,
error.Empty => {
it.swap(0, it.k - 1);
it.k -= 1;
it.down_heap();
},
}
must all move before
const root = it.streams[0];
// We know that each input iterator is sorted, so we don't need to compare the next
// key on that iterator with the current min/max.
const value = stream_pop(it.context, root);
5159eb9
to
51ab743
Compare
src/lsm/k_way_merge.zig
Outdated
if (stream_peek(it.context, root)) |key| { | ||
it.keys[0] = key; | ||
it.down_heap(); | ||
} else { | ||
it.swap(0, it.k - 1); | ||
it.k -= 1; | ||
it.down_heap(); | ||
} else |err| switch (err) { | ||
error.Pending => return null, | ||
error.Empty => { | ||
it.swap(0, it.k - 1); | ||
it.k -= 1; | ||
it.down_heap(); | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can't because it doesn't have a key
to use.
I think we need to move the whole heap update immediately before we remove a value. e.g.
if (stream_peek(it.context, root)) |key| {
it.keys[0] = key;
it.down_heap();
} else {
it.swap(0, it.k - 1);
it.k -= 1;
it.down_heap();
} else |err| switch (err) {
error.Pending => return null,
error.Empty => {
it.swap(0, it.k - 1);
it.k -= 1;
it.down_heap();
},
}
must all move before
const root = it.streams[0];
// We know that each input iterator is sorted, so we don't need to compare the next
// key on that iterator with the current min/max.
const value = stream_pop(it.context, root);
c511eee
to
0f912c4
Compare
#183 should be fixed by making Grid.read_block async to avoid reordered io_finish() The merge iterator tests are currently failing I believe due to the heap update changes (think I implemented it wrong). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is great to have these bugs fixed! Just a few notes, but it is most of the way there 🔥
grid.read_queue = .{}; | ||
while (copy.pop()) |pending_read| { | ||
if (pending_read.address == read.address) { | ||
assert(pending_read.checksum == read.checksum); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 This is true right now, but I think once we have grid recovery this won't be true — another replica that is far behind might ask us for an old block we don't have anymore, from an address that we are using for some new data. Both of those reads might be queued up simultaneously.
I think we can still queue the reads up together, we just need to be careful how we handle them when the read completes to make sure that the correct read gets a response. (The incorrect read can be unreachable
for now — implementing that will be part of grid recovery).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the case of "an address that we are using for some new data" in grid.read_queue
already handled by release_at_checkpoint(addr)
calling assert_not_reading(addr)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might have to update release_at_checkpoint
for grid repair as well. When we release the block, we know that we aren't reading it due to (e.g.) compaction or serving a prefetch()
. But we can't predict when another replica might request a block from us.
src/lsm/level_iterator.zig
Outdated
@@ -212,7 +212,8 @@ pub fn LevelIteratorType(comptime Table: type, comptime Storage: type) type { | |||
fn next_table_iterator(it: *LevelIterator) *TableIterator { | |||
if (it.tables.full()) { | |||
const table = &it.tables.head_ptr().?.table_iterator; | |||
while (table.peek() != null) { | |||
while (true) { | |||
_ = table.peek() catch break; | |||
it.values.push(table.pop()) catch unreachable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it wasn't touched by your PR, but could this use push_assume_capacity
?
src/lsm/level_iterator.zig
Outdated
}; | ||
return scope.table_iterator.peek().?; | ||
|
||
return scope.table_iterator.peek() catch unreachable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are we guaranteed that peek()
always has a value ready?
fb36a2d
to
4ebdee4
Compare
src/lsm/level_iterator.zig
Outdated
}; | ||
return scope.table_iterator.peek().?; | ||
|
||
return scope.table_iterator.peek(); | ||
} | ||
|
||
/// This may only be called after peek() has returned non-null. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update this comment (and any others similarly out of date).
Edit: Nevermind, I fixed the heap update issue and this seems to be resolved.
|
The TableIterator (and consequently the LevelIterator) could have `peek()` return null even if all values weren't yet buffered in memory. The issue is that a null `peek()` is treated as the iterator being empty. The merge iterator's `stream_peek()` can now return a Pending state to differentiate this. Pending iterators can be discovered by checking `buffered_all_values()` when `peek()` returns null.
The iterator's tick() would complete inline and call io_finish() before returning true for Compaction to call io_start(). Now, io_start() is called before-hand. If tick() returns true, the io_finish() will be (or was) called by the iterator. If tick() returns false, the Compaction will resolve the io_finish() instead.
Now that Grid.read_block always completes asynchronous and never inline, the callers (compaction iterators) don't have to worry about out-of-order io_finish() calls.
Previously, you would use `buffered_all_values()` to know if `peek()` returning null meant the iterator was empty or still buffering values with nothing immediate to offer. Now this state distinction is returned directly by `peek()`, reducing the chance of a footgun assuming null means empty. `buffered_all_values()` still exists and is pub as the buffering aspects (instead of the peeking aspect) is still used by `tick()`.
Ensures that a stream which isn't ready (but also isn't empty) is not in the wrong heap position after pop(). This also ensures that streams which are still buffering are not passed into MergeIterator.init(). Compaction already guarantees this by only calling init() after ticking the relevant iterators for the merge stream.
"Buffering" isn't necessarily correct as it implies there's currently I/O happening to refill the values (this happens on a subsequent tick() instead).
The stream is now peeked again after a pop, but instead of returning null on Drained, it correctly returns the popped value.
Only check for the existence of a grid address when pushing to read_cache to be resolved on next tick(). This avoids bumping up the likelyhood of keeping the block in cache unnecessarily.
4ebdee4
to
fa15d81
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀 Great work!
MergeIterator stream_peek()
There are times when
peek()
on theTableIterator
(and transitively, theLevelIterator
) will return null without being empty. This happens if it runs out of values buffered in memory and needs to tick() again. Unfortunately, theMergeIterato
r currently assumes thatpeek()
returning null means the iterator is completed and stops accessing it / removes it from its heap data structure.MergeIterator.stream_peek()
now can return a third state of Pending which means that, on the given iterator being merged,peek()
would return null butbuffered_all_values()
would return false. This ensures iterators are accessed until they are truly empty.Compaction iterator IO tracking
There was a bug in the order IO was tracked during Compaction.
tick()
would be called on an iterator which would initiate IO on the Grid. The Grid would service the IO from the cache and complete it inline. This bubbles up and callsCompaction.io_finish()
early.tick()
returns true that IO started and doesCompaction.io_start()
but it was too late.A fix is to do
Compaction.io_start()
before callingtick()
. If it returns true,io_finish()
will be called either inline or eventually. If it returns false, Compaction callsio_finish()
itself to cancel it out.Manifest invisible table iterator issues
One of the last issues is that
assert_no_invisible_tables()
is being hit at a checkpoint(). When the even/odd compactions complete, they end up callingremove_invisible_tables
for their level, which all happens before the checkpoint. It seems as though some invisible tables still remain according to the ManifestLevels.Changing
remove_invisible_tables
to iterate all levels and not use the KeyRange as a hint seems to address this forlsm/test.zig
but it's still an issue in the rafiki tests. Other attempts (to no luck in rafiki) include re-creating the iterator in a loop to get the next table to mitigate the possibility of invalidation by removal, and removing from all levels at the end of the fourth beat instead of when Compactions are done.This bug is left unresolved in this PR until a good fix can be found, but the other changes are still worth getting merged in the mean time.