Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Buffered streamer + use on stable state #639

Merged
merged 10 commits into from
Jan 15, 2023

Conversation

dranikpg
Copy link
Contributor

@dranikpg dranikpg commented Jan 3, 2023

This PR introduces:

  • BufferedStreamerBase for managing wakeups/blocks to allow creating double-buffered streamers easily
  • JournalStreamer that uses BufferedStreamerBase

Why BufferedStreamerBase separately? In the future, We might want to replace the snapshot channel with it in case of single producer - single consumer to remove allocations, synchronization, fragmentation, etc...

Comment on lines 29 to 30
// Write stream in buffered parts until producer finishes or error occurs.
std::error_code WriteAll(io::Sink* dest);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BufferedStreamBase doesn't run the writer fiber itself because in the case of, for example, a snapshot the writing fiber is managed externally

Comment on lines 13 to 36
void BufferedStreamerBase::ReportWritten() {
buffered_++;
// Check and possibly wait for consumer to keep up.
waker_.await([this]() { return !IsStalled() || IsStopped(); });
// Notify consumer we have new data.
waker_.notify();
}

std::error_code BufferedStreamerBase::WriteAll(io::Sink* dest) {
CHECK_NE(buf_ptr_, nullptr);

while (!IsStopped()) {
// Wait for more data or stop signal.
waker_.await([this]() { return buffered_ > 0 || IsStopped(); });
if (IsStopped())
break;

// Steal-swap producer buffer.
std::swap(buf_stash_, *buf_ptr_);
buffered_ = 0;

// If producer stalled, notify we consumed data and it can unblock.
waker_.notify();
RETURN_ON_ERR(dest->Write(buf_stash_.InputBuffer()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much simpler than last time

Comment on lines 73 to 74

// Self referential because buf reference pointr to writer.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: fix comment

src/server/io_utils.h Outdated Show resolved Hide resolved
@dranikpg dranikpg requested review from romange and adiholden January 4, 2023 08:55
Copy link
Collaborator

@romange romange left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I am not sure about "wait/notify" sequence, commented about it.

write_fb_ = Fiber(&JournalStreamer::WriterFb, this, dest);
journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry) {
writer_.Write(entry);
ReportWritten();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit confusing to have both the backpressure and notification bundled together.
Suppose you are stalled. You want to notify your consumer before you block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You block because the consumer does not keep up, there is no point in notifying it before - its already busy. Once its no longer stalled it will have consumed your latest portions

waker_.notify();
}

std::error_code BufferedStreamerBase::WriteAll(io::Sink* dest) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write part is a confusing since both the consumer and producer parts are "writing".

I needed to look into the function to understand what it does. Maybe call it ConsumeIntoSink or similar?

src/server/io_utils.cc Outdated Show resolved Hide resolved
break;

// Steal-swap producer buffer.
std::swap(buf_stash_, *buf_ptr_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the flow of switching the buffer of journal writer without it knowing is not very intuitive.
How about creating journal writer with buffer not a member but one that is set from outside, with this approach you will have SetBuffer func to replace the buf and you can use it here.

Copy link
Contributor Author

@dranikpg dranikpg Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only counter-intuitive thing to stealing a buffer is that it can't cause a data race because the writer and consumer have to be on the same thread. Making it use the journal writer internally won't make it re-usable.

What could be done instead is:

  • making the journal writer unbuffered, i.e. write directly to a sink
  • the sink will be some buffer owned by the buffered stream (so it will own both)

Copy link
Contributor Author

@dranikpg dranikpg Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, we can keep the internal buffer inside the journal writer, but now flash to a sink (which will be the IoBuf) after each operations. The downside is that this introduced an intermediate copy - but its probably very cheap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romange What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's localized and does not affect lots of places in the code.
and we can make it more clear by adding more comments.

@adiholden
Copy link
Collaborator

The flush function of journal writer can be removed right?

@dranikpg dranikpg requested review from romange and adiholden January 5, 2023 13:49
if (cb_id)
sf_->journal()->UnregisterOnChange(cb_id);
// Register cleanup.
flow->cleanup = [this, streamer, flow]() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that flow cleanup will be triggered between the start above and the update of the cleanup lambda?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because unlike the replica it has a per-session mutex that is locked on state transitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it doesn't contain any await points

while (!IsStopped()) {
// Wait for more data or stop signal.
waker_.await([this]() { return buffered_ > 0 || IsStopped(); });
if (IsStopped())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If producer is done doesnt it make more sense to finish write all the buffered journal data and then return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed.
It doesn't matter currently because its used as cancellation, but the general behavior should be this

adiholden
adiholden previously approved these changes Jan 8, 2023
adiholden
adiholden previously approved these changes Jan 8, 2023
romange
romange previously approved these changes Jan 8, 2023
@romange
Copy link
Collaborator

romange commented Jan 9, 2023

Maybe rebase it to let tests run?

@dranikpg
Copy link
Contributor Author

dranikpg commented Jan 9, 2023

Maybe rebase it to let tests run?

I will update the branch when pytests are merged (plz re-approve them), it will trigger a new ci flow

@dranikpg dranikpg marked this pull request as draft January 9, 2023 21:37
@dranikpg dranikpg dismissed stale reviews from romange and adiholden via 7bd0654 January 10, 2023 12:40
@dranikpg dranikpg marked this pull request as ready for review January 10, 2023 12:41
@dranikpg dranikpg enabled auto-merge (squash) January 10, 2023 12:41
@dranikpg dranikpg requested review from romange and adiholden January 10, 2023 12:41
waker_.notify();
RETURN_ON_ERR(dest->Write(consumer_buf_.InputBuffer()));

if (IsStopped()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of checking here, you can unconditionally call waker_.notifyAll(); after the loop.


// If producer stalled, notify we consumed data and it can unblock.
waker_.notify();
RETURN_ON_ERR(dest->Write(consumer_buf_.InputBuffer()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if we return an error but the producer is blocked ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the whole issue I was solving and what the comment in dflycmd is about. The function that receives the error cancels the context and finalizes the streamer without waiting for explicit cleanup

src/server/io_utils.h Outdated Show resolved Hide resolved
// Write some data into the internal buffer.
// Consumer needs to be woken up manually with NotifyWritten to avoid waking it up for small
// writes.
io::Result<size_t> WriteSome(const iovec* vec, uint32_t len) override;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would write here in the comments how your ideal produce flow looks like in terms. for example,

while (should_write()) {
 bsb->WriteSome(...);
 bsb->NotifyWritten();
}
bsb->Finalize();

if this flow is correct, why you do not call NotifyWritten inside WriteSome?

Copy link
Contributor Author

@dranikpg dranikpg Jan 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because notify issues a wakeup which I don't always want (I've written this in the comment)

Now that the writer is unbuffered (notify is essentially instead of flush), we don't want a wakeup after writing every small part. In the future we even might not want to wake after ever entry - for example when we mass evict entries.

@dranikpg dranikpg marked this pull request as draft January 10, 2023 20:02
auto-merge was automatically disabled January 10, 2023 20:02

Pull request was converted to draft

@dranikpg dranikpg marked this pull request as ready for review January 13, 2023 10:54
@dranikpg dranikpg requested a review from romange January 13, 2023 10:54
@dranikpg dranikpg merged commit 7eff61c into dragonflydb:main Jan 15, 2023
@dranikpg dranikpg deleted the async-streamer branch February 27, 2023 16:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants