-
Notifications
You must be signed in to change notification settings - Fork 998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): Implement robust error & cancellation on replica #531
Conversation
39d86c7
to
d8ac791
Compare
src/server/dflycmd.cc
Outdated
@@ -325,7 +325,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha | |||
// Shard can be null for io thread. | |||
if (shard != nullptr) { | |||
CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode. | |||
flow->saver->StartSnapshotInShard(true, cntx, shard); | |||
flow->saver->StartSnapshotInShard(true, *cntx, shard); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not understand this either. I am guessing it's related somehow to the cast you added?
b3a186e
to
debed54
Compare
Signed-off-by: Vladislav Oleshko <[email protected]>
debed54
to
97b146e
Compare
src/server/replica.cc
Outdated
auto err_handler = [this, sync_block](const auto& ge) { | ||
sync_block->Add(num_df_flows_); // Unblock sync_block. | ||
DefaultErrorHandler(ge); // Close sockets to unblock flows. | ||
}; | ||
if (cntx_.Switch(std::move(err_handler))) | ||
return cntx_; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added Switch
to the context that atomically checks for error and exchanges the error handler
This should be a working and more or less simple version |
src/server/common.h
Outdated
|
||
void Cancel(); // Cancels the context by submitting an `errc::operation_canceled` error. | ||
using Cancellation::IsCancelled; | ||
operator const Cancellation*(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to introduce an implicit casting from object D to B* when D derives from B?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I need to forbid getting non-const Cancellation* so that its real Cancel function cannot be called.
Otherwise, we can replace the Cancellation type by the Context everywhere, or make Cancellation an interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you try to void virtual inheritance, then I think it will be clearer than using these operators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we can just add explicit GetCancellation (I wouldn't say its unusual to hide inheritance for classes and re-expose the interface in a limited way)
Can you please make sure that replica status is reflected by "info replication" command? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added an explicit GetError
function to the context and fixed other small issues
if (!cntx->replica_conn) { | ||
ServerState::tl_connection_stats()->num_replicas += 1; | ||
} | ||
cntx->replica_conn = true; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do this inside PSYNC for Redis.
Does a replica count if its still setting up flows and preparing to do full sync (which should be a very short phase)? I guess, yes
Signed-off-by: Vladislav Oleshko <[email protected]>
a91ceca
to
d437950
Compare
src/server/replica.h
Outdated
::boost::fibers::mutex mu_; | ||
::boost::fibers::condition_variable cv_; | ||
|
||
void Add(unsigned delta); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why BlockingCounter does not fit it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm... I need to change it though a little
In case of an error, I need a way to unblock it. I can add a function like Reset()
which resets the count to 0
. However, flows still might decrement it and we'll get an underflow.
So I either:
- Use an int inside and
<=
in comparison - Use a CAS decrement inside the blocking counter
- Allow locking the context externally, so I can do
with cntx.Lock()
if not cntx.Error():
counter.Dec()
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do it differently. You do not really need 64 bit for count.
You can use the high bit as cancel bit. I do not like Reset name but you can use
Cancel() {
count_.fetch_or(1ULL << 63, std::memory_order_acquire);
ec_.notify(); // releases inside
}
void Wait() {
ec_.await([this] {
auto cnt = count_.load(std::memory_order_acquire);
return cnt == 0 || (cnt & (1 << 63));
});
}
LGTM. you probably need to rebase and start using fiber_ext::Fiber |
Signed-off-by: Vladislav <[email protected]>
Signed-off-by: Vladislav Oleshko <[email protected]>
Implements robust error handling and cancellation for the replica.