Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LSM: fixing Compaction interactions with iterators #177

Merged
merged 14 commits into from
Oct 20, 2022
Merged

Conversation

kprotty
Copy link
Contributor

@kprotty kprotty commented Oct 4, 2022

MergeIterator stream_peek()

There are times when peek() on the TableIterator (and transitively, the LevelIterator) will return null without being empty. This happens if it runs out of values buffered in memory and needs to tick() again. Unfortunately, the MergeIterator currently assumes that peek() returning null means the iterator is completed and stops accessing it / removes it from its heap data structure.

MergeIterator.stream_peek() now can return a third state of Pending which means that, on the given iterator being merged, peek() would return null but buffered_all_values() would return false. This ensures iterators are accessed until they are truly empty.

Compaction iterator IO tracking

There was a bug in the order IO was tracked during Compaction. tick() would be called on an iterator which would initiate IO on the Grid. The Grid would service the IO from the cache and complete it inline. This bubbles up and calls Compaction.io_finish() early. tick() returns true that IO started and does Compaction.io_start() but it was too late.

A fix is to do Compaction.io_start() before calling tick(). If it returns true, io_finish() will be called either inline or eventually. If it returns false, Compaction calls io_finish() itself to cancel it out.

Manifest invisible table iterator issues

One of the last issues is that assert_no_invisible_tables() is being hit at a checkpoint(). When the even/odd compactions complete, they end up calling remove_invisible_tables for their level, which all happens before the checkpoint. It seems as though some invisible tables still remain according to the ManifestLevels.

Changing remove_invisible_tables to iterate all levels and not use the KeyRange as a hint seems to address this for lsm/test.zig but it's still an issue in the rafiki tests. Other attempts (to no luck in rafiki) include re-creating the iterator in a loop to get the next table to mitigate the possibility of invalidation by removal, and removing from all levels at the end of the fourth beat instead of when Compactions are done.

This bug is left unresolved in this PR until a good fix can be found, but the other changes are still worth getting merged in the mean time.

src/lsm/compaction.zig Outdated Show resolved Hide resolved
@jorangreef
Copy link
Member

Thanks for the awesome write-up @kprotty.

src/lsm/compaction.zig Outdated Show resolved Hide resolved
src/lsm/k_way_merge.zig Outdated Show resolved Hide resolved
Comment on lines 105 to 121
if (stream_peek(it.context, root)) |key| {
it.keys[0] = key;
it.down_heap();
} else {
it.swap(0, it.k - 1);
it.k -= 1;
it.down_heap();
} else |err| switch (err) {
error.Pending => return null,
error.Empty => {
it.swap(0, it.k - 1);
it.k -= 1;
it.down_heap();
},
}
Copy link
Member

@sentientwaffle sentientwaffle Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to a bug that we discussed. The value may not be available right now (in the Pending case), but when becomes available the stream may be in the wrong position in the heap. I think we need to move this whole heap update block from after the stream_pop() to before the stream_pop(). (Note that this means that the heap invariant of keys would need to be modified accordingly).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So Pending should do the same as the successful if |key| branch to ensure it's peek'd again?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't because it doesn't have a key to use.

I think we need to move the whole heap update immediately before we remove a value. e.g.

if (stream_peek(it.context, root)) |key| {
                it.keys[0] = key;
                it.down_heap();
            } else {
                it.swap(0, it.k - 1);
                it.k -= 1;
                it.down_heap();
            } else |err| switch (err) {
                error.Pending => return null,
                error.Empty => {
                    it.swap(0, it.k - 1);
                    it.k -= 1;
                    it.down_heap();
                },
            }

must all move before

            const root = it.streams[0];
            // We know that each input iterator is sorted, so we don't need to compare the next
            // key on that iterator with the current min/max.
            const value = stream_pop(it.context, root);

src/lsm/table_iterator.zig Show resolved Hide resolved
src/lsm/k_way_merge.zig Outdated Show resolved Hide resolved
src/lsm/table_iterator.zig Outdated Show resolved Hide resolved
src/lsm/level_iterator.zig Outdated Show resolved Hide resolved
src/lsm/k_way_merge.zig Outdated Show resolved Hide resolved
src/lsm/grid.zig Outdated Show resolved Hide resolved
src/lsm/level_iterator.zig Outdated Show resolved Hide resolved
src/lsm/table_iterator.zig Show resolved Hide resolved
src/lsm/table_iterator.zig Outdated Show resolved Hide resolved
src/lsm/table_iterator.zig Outdated Show resolved Hide resolved
Comment on lines 105 to 121
if (stream_peek(it.context, root)) |key| {
it.keys[0] = key;
it.down_heap();
} else {
it.swap(0, it.k - 1);
it.k -= 1;
it.down_heap();
} else |err| switch (err) {
error.Pending => return null,
error.Empty => {
it.swap(0, it.k - 1);
it.k -= 1;
it.down_heap();
},
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't because it doesn't have a key to use.

I think we need to move the whole heap update immediately before we remove a value. e.g.

if (stream_peek(it.context, root)) |key| {
                it.keys[0] = key;
                it.down_heap();
            } else {
                it.swap(0, it.k - 1);
                it.k -= 1;
                it.down_heap();
            } else |err| switch (err) {
                error.Pending => return null,
                error.Empty => {
                    it.swap(0, it.k - 1);
                    it.k -= 1;
                    it.down_heap();
                },
            }

must all move before

            const root = it.streams[0];
            // We know that each input iterator is sorted, so we don't need to compare the next
            // key on that iterator with the current min/max.
            const value = stream_pop(it.context, root);

@sentientwaffle
Copy link
Member

In addition, can you verify that this fixes #183, #186, and #188? When I checked those last week it looked like all 3 would be addressed by this PR.

@kprotty
Copy link
Contributor Author

kprotty commented Oct 17, 2022

#183 should be fixed by making Grid.read_block async to avoid reordered io_finish()
#186 should be fixed by reporting error.Drained from the iterators & handling all of that
#188 should be fixed by using buffered_all_values() in peek() (which does the same assert), then fixing that usage by #186

The merge iterator tests are currently failing I believe due to the heap update changes (think I implemented it wrong).

Copy link
Member

@sentientwaffle sentientwaffle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is great to have these bugs fixed! Just a few notes, but it is most of the way there 🔥

src/lsm/grid.zig Outdated Show resolved Hide resolved
src/lsm/grid.zig Outdated Show resolved Hide resolved
grid.read_queue = .{};
while (copy.pop()) |pending_read| {
if (pending_read.address == read.address) {
assert(pending_read.checksum == read.checksum);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 This is true right now, but I think once we have grid recovery this won't be true — another replica that is far behind might ask us for an old block we don't have anymore, from an address that we are using for some new data. Both of those reads might be queued up simultaneously.

I think we can still queue the reads up together, we just need to be careful how we handle them when the read completes to make sure that the correct read gets a response. (The incorrect read can be unreachable for now — implementing that will be part of grid recovery).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the case of "an address that we are using for some new data" in grid.read_queue already handled by release_at_checkpoint(addr) calling assert_not_reading(addr)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might have to update release_at_checkpoint for grid repair as well. When we release the block, we know that we aren't reading it due to (e.g.) compaction or serving a prefetch(). But we can't predict when another replica might request a block from us.

src/lsm/grid.zig Outdated Show resolved Hide resolved
src/lsm/k_way_merge.zig Outdated Show resolved Hide resolved
@@ -212,7 +212,8 @@ pub fn LevelIteratorType(comptime Table: type, comptime Storage: type) type {
fn next_table_iterator(it: *LevelIterator) *TableIterator {
if (it.tables.full()) {
const table = &it.tables.head_ptr().?.table_iterator;
while (table.peek() != null) {
while (true) {
_ = table.peek() catch break;
it.values.push(table.pop()) catch unreachable;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it wasn't touched by your PR, but could this use push_assume_capacity?

src/lsm/level_iterator.zig Outdated Show resolved Hide resolved
};
return scope.table_iterator.peek().?;

return scope.table_iterator.peek() catch unreachable;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we guaranteed that peek() always has a value ready?

src/lsm/set_associative_cache.zig Outdated Show resolved Hide resolved
src/lsm/table_iterator.zig Outdated Show resolved Hide resolved
};
return scope.table_iterator.peek().?;

return scope.table_iterator.peek();
}

/// This may only be called after peek() has returned non-null.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update this comment (and any others similarly out of date).

@sentientwaffle
Copy link
Member

sentientwaffle commented Oct 20, 2022

Edit: Nevermind, I fixed the heap update issue and this seems to be resolved.

This branch is still running into crashes on lsm_forest_fuzz that appear related to iterator buffering, e.g.:

/home/djg/Code/tb/tb-lsm-iterators/src/lsm/level_iterator.zig:304:57: 0x2841bd in lsm.level_iterator.LevelIteratorType(lsm.table.TableType(u64,tigerbeetle.Account,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).compare_keys,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).key_from_value,18446744073709551615,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone_from_key),test.storage.Storage).pop (lsm_forest_fuzz)
            const table_iterator = &it.tables.head_ptr().?.table_iterator;
                                                        ^
/home/djg/Code/tb/tb-lsm-iterators/src/lsm/compaction.zig:99:51: 0x283e24 in lsm.compaction.MergeStreamSelector.pop (lsm_forest_fuzz)
                    1 => compaction.iterator_b.pop(),
                                                  ^
/home/djg/Code/tb/tb-lsm-iterators/src/lsm/k_way_merge.zig:109:37: 0x480b29 in lsm.k_way_merge.KWayMergeIterator(lsm.compaction.CompactionType(lsm.table.TableType(u64,tigerbeetle.Account,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).compare_keys,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).key_from_value,18446744073709551615,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone_from_key),test.storage.Storage,lsm.table_immutable.TableImmutableIteratorType),u64,tigerbeetle.Account,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).key_from_value,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).compare_keys,2,lsm.compaction.MergeStreamSelector.peek,lsm.compaction.MergeStreamSelector.pop,lsm.compaction.MergeStreamSelector.precedence).pop_internal (lsm_forest_fuzz)
            const value = stream_pop(it.context, root);
                                    ^
/home/djg/Code/tb/tb-lsm-iterators/src/lsm/k_way_merge.zig:88:35: 0x43dd72 in lsm.k_way_merge.KWayMergeIterator(lsm.compaction.CompactionType(lsm.table.TableType(u64,tigerbeetle.Account,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).compare_keys,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).key_from_value,18446744073709551615,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone_from_key),test.storage.Storage,lsm.table_immutable.TableImmutableIteratorType),u64,tigerbeetle.Account,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).key_from_value,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).compare_keys,2,lsm.compaction.MergeStreamSelector.peek,lsm.compaction.MergeStreamSelector.pop,lsm.compaction.MergeStreamSelector.precedence).pop (lsm_forest_fuzz)
            while (it.pop_internal()) |value| {
                                  ^
/home/djg/Code/tb/tb-lsm-iterators/src/lsm/compaction.zig:432:49: 0x3f7406 in lsm.compaction.CompactionType(lsm.table.TableType(u64,tigerbeetle.Account,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).compare_keys,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).key_from_value,18446744073709551615,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone_from_key),test.storage.Storage,lsm.table_immutable.TableImmutableIteratorType).cpu_merge (lsm_forest_fuzz)
                const value = merge_iterator.pop() orelse break;
                                                ^
/home/djg/Code/tb/tb-lsm-iterators/src/lsm/compaction.zig:402:37: 0x3cf91e in lsm.compaction.CompactionType(lsm.table.TableType(u64,tigerbeetle.Account,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).compare_keys,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).key_from_value,18446744073709551615,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone_from_key),test.storage.Storage,lsm.table_immutable.TableImmutableIteratorType).cpu_merge_start (lsm_forest_fuzz)
                compaction.cpu_merge();
                                    ^
/home/djg/Code/tb/tb-lsm-iterators/src/lsm/compaction.zig:381:71: 0x3a55e5 in lsm.compaction.CompactionType(lsm.table.TableType(u64,tigerbeetle.Account,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).compare_keys,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).key_from_value,18446744073709551615,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone,lsm.groove.ObjectTreeHelpers(tigerbeetle.Account).tombstone_from_key),test.storage.Storage,lsm.table_immutable.TableImmutableIteratorType).io_finish (lsm_forest_fuzz)
            if (compaction.io_pending == 0) compaction.cpu_merge_start();
                                                                      ^
/home/djg/Code/tb/tb-lsm-iterators/src/lsm/compaction.zig:348:42: 0x3cf690 in lsm.compaction.write_callback.callback (lsm_forest_fuzz)
                    _compaction.io_finish();
                                         ^

Can you check it out? (The above is seed 12421653945172149209).

kprotty added 13 commits October 20, 2022 10:02
The TableIterator (and consequently the LevelIterator) could have `peek()` return null even if all values weren't yet buffered in memory. The issue is that a null `peek()` is treated as the iterator being empty.

The merge iterator's `stream_peek()` can now return a Pending state to differentiate this. Pending iterators can be discovered by checking `buffered_all_values()` when `peek()` returns null.
The iterator's tick() would complete inline and call io_finish() before returning true for Compaction to call io_start().

Now, io_start() is called before-hand. If tick() returns true, the io_finish() will be (or was) called by the iterator. If tick() returns false, the Compaction will resolve the io_finish() instead.
Now that Grid.read_block always completes asynchronous and never inline, the callers (compaction iterators) don't have to worry about out-of-order io_finish() calls.
Previously, you would use `buffered_all_values()` to know if `peek()` returning null meant the iterator was empty or still buffering values with nothing immediate to offer. Now this state distinction is returned directly by `peek()`, reducing the chance of a footgun assuming null means empty.

`buffered_all_values()` still exists and is pub as the buffering aspects (instead of the peeking aspect) is still used by `tick()`.
Ensures that a stream which isn't ready (but also isn't empty) is not in the wrong heap position after pop().

This also ensures that streams which are still buffering are not passed into MergeIterator.init(). Compaction already guarantees this by only calling init() after ticking the relevant iterators for the merge stream.
"Buffering" isn't necessarily correct as it implies there's currently I/O happening to refill the values (this happens on a subsequent tick() instead).
The stream is now peeked again after a pop, but instead of returning null on Drained, it correctly returns the popped value.
Only check for the existence of a grid address when pushing to read_cache to be resolved on next tick(). This avoids bumping up the likelyhood of keeping the block in cache unnecessarily.
Copy link
Member

@sentientwaffle sentientwaffle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 Great work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants