-
Notifications
You must be signed in to change notification settings - Fork 998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): Buffered streamer + use on stable state #639
Conversation
Signed-off-by: Vladislav Oleshko <[email protected]>
src/server/io_utils.h
Outdated
// Write stream in buffered parts until producer finishes or error occurs. | ||
std::error_code WriteAll(io::Sink* dest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BufferedStreamBase doesn't run the writer fiber itself because in the case of, for example, a snapshot the writing fiber is managed externally
src/server/io_utils.cc
Outdated
void BufferedStreamerBase::ReportWritten() { | ||
buffered_++; | ||
// Check and possibly wait for consumer to keep up. | ||
waker_.await([this]() { return !IsStalled() || IsStopped(); }); | ||
// Notify consumer we have new data. | ||
waker_.notify(); | ||
} | ||
|
||
std::error_code BufferedStreamerBase::WriteAll(io::Sink* dest) { | ||
CHECK_NE(buf_ptr_, nullptr); | ||
|
||
while (!IsStopped()) { | ||
// Wait for more data or stop signal. | ||
waker_.await([this]() { return buffered_ > 0 || IsStopped(); }); | ||
if (IsStopped()) | ||
break; | ||
|
||
// Steal-swap producer buffer. | ||
std::swap(buf_stash_, *buf_ptr_); | ||
buffered_ = 0; | ||
|
||
// If producer stalled, notify we consumed data and it can unblock. | ||
waker_.notify(); | ||
RETURN_ON_ERR(dest->Write(buf_stash_.InputBuffer())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks much simpler than last time
src/server/dflycmd.cc
Outdated
|
||
// Self referential because buf reference pointr to writer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: fix comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I am not sure about "wait/notify" sequence, commented about it.
src/server/dflycmd.cc
Outdated
write_fb_ = Fiber(&JournalStreamer::WriterFb, this, dest); | ||
journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry) { | ||
writer_.Write(entry); | ||
ReportWritten(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a bit confusing to have both the backpressure and notification bundled together.
Suppose you are stalled. You want to notify your consumer before you block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You block because the consumer does not keep up, there is no point in notifying it before - its already busy. Once its no longer stalled it will have consumed your latest portions
src/server/io_utils.cc
Outdated
waker_.notify(); | ||
} | ||
|
||
std::error_code BufferedStreamerBase::WriteAll(io::Sink* dest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write
part is a confusing since both the consumer and producer parts are "writing".
I needed to look into the function to understand what it does. Maybe call it ConsumeIntoSink
or similar?
src/server/io_utils.cc
Outdated
break; | ||
|
||
// Steal-swap producer buffer. | ||
std::swap(buf_stash_, *buf_ptr_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the flow of switching the buffer of journal writer without it knowing is not very intuitive.
How about creating journal writer with buffer not a member but one that is set from outside, with this approach you will have SetBuffer func to replace the buf and you can use it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only counter-intuitive thing to stealing a buffer is that it can't cause a data race because the writer and consumer have to be on the same thread. Making it use the journal writer internally won't make it re-usable.
What could be done instead is:
- making the journal writer unbuffered, i.e. write directly to a sink
- the sink will be some buffer owned by the buffered stream (so it will own both)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise, we can keep the internal buffer inside the journal writer, but now flash to a sink (which will be the IoBuf) after each operations. The downside is that this introduced an intermediate copy - but its probably very cheap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@romange What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's localized and does not affect lots of places in the code.
and we can make it more clear by adding more comments.
The flush function of journal writer can be removed right? |
Signed-off-by: Vladislav Oleshko <[email protected]>
Signed-off-by: Vladislav Oleshko <[email protected]>
if (cb_id) | ||
sf_->journal()->UnregisterOnChange(cb_id); | ||
// Register cleanup. | ||
flow->cleanup = [this, streamer, flow]() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that flow cleanup will be triggered between the start above and the update of the cleanup lambda?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because unlike the replica it has a per-session mutex that is locked on state transitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And it doesn't contain any await points
src/server/io_utils.cc
Outdated
while (!IsStopped()) { | ||
// Wait for more data or stop signal. | ||
waker_.await([this]() { return buffered_ > 0 || IsStopped(); }); | ||
if (IsStopped()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If producer is done doesnt it make more sense to finish write all the buffered journal data and then return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed.
It doesn't matter currently because its used as cancellation, but the general behavior should be this
Signed-off-by: Vladislav Oleshko <[email protected]>
Maybe rebase it to let tests run? |
I will update the branch when pytests are merged (plz re-approve them), it will trigger a new ci flow |
src/server/io_utils.cc
Outdated
waker_.notify(); | ||
RETURN_ON_ERR(dest->Write(consumer_buf_.InputBuffer())); | ||
|
||
if (IsStopped()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of checking here, you can unconditionally call waker_.notifyAll();
after the loop.
src/server/io_utils.cc
Outdated
|
||
// If producer stalled, notify we consumed data and it can unblock. | ||
waker_.notify(); | ||
RETURN_ON_ERR(dest->Write(consumer_buf_.InputBuffer())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if we return an error but the producer is blocked ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the whole issue I was solving and what the comment in dflycmd is about. The function that receives the error cancels the context and finalizes the streamer without waiting for explicit cleanup
// Write some data into the internal buffer. | ||
// Consumer needs to be woken up manually with NotifyWritten to avoid waking it up for small | ||
// writes. | ||
io::Result<size_t> WriteSome(const iovec* vec, uint32_t len) override; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would write here in the comments how your ideal produce flow looks like in terms. for example,
while (should_write()) {
bsb->WriteSome(...);
bsb->NotifyWritten();
}
bsb->Finalize();
if this flow is correct, why you do not call NotifyWritten inside WriteSome?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because notify issues a wakeup which I don't always want (I've written this in the comment)
Now that the writer is unbuffered (notify is essentially instead of flush), we don't want a wakeup after writing every small part. In the future we even might not want to wake after ever entry - for example when we mass evict entries.
Signed-off-by: Vladislav Oleshko <[email protected]>
7bd0654
to
ff1d573
Compare
Pull request was converted to draft
Signed-off-by: Vladislav Oleshko <[email protected]>
This PR introduces:
Why BufferedStreamerBase separately? In the future, We might want to replace the snapshot channel with it in case of single producer - single consumer to remove allocations, synchronization, fragmentation, etc...