-
Notifications
You must be signed in to change notification settings - Fork 998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): Improved cancellation #599
feat(server): Improved cancellation #599
Conversation
Signed-off-by: Vladislav Oleshko <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think using signal
instead of report
wording in this context is more accurate. report
is associated with printing the error in my mind.
I still did not understand from the explanation here why the error handler must run asynchronously. Especially if you join on its fiber everywhere, making it synchronous.
err_ = {}; | ||
err_handler_ = std::move(handler); | ||
Cancellation::flag_.store(false, std::memory_order_relaxed); | ||
} | ||
|
||
GenericError Context::Switch(ErrHandler handler) { | ||
std::lock_guard lk{mu_}; | ||
if (!err_) | ||
if (!err_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Switch
is a too generic name or maybe Context is a too-generic name. Does golang version of Context also has Switch and Reset equivalents?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it doesnt have any of this at all
src/server/common.cc
Outdated
CheckHandlerFb(); | ||
} | ||
|
||
GenericError Context::ReportInternal(GenericError&& err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context and all its routines avoid using Error in their names. Specifically, maybe rename ReportInternal
to PropagateError
?
src/server/common.cc
Outdated
return err_; | ||
err_ = std::move(err); | ||
|
||
CheckHandlerFb(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CheckHandlerFb
-> WaitForErrorHandlerToFinish()
? Or a more concise version of it? 😄
string_view kind = (stable) ? "STARTSTABLE"sv : "SYNC"sv; | ||
string request = StrCat("DFLY ", kind, " ", master_context_.dfly_session_id); | ||
|
||
LOG(INFO) << "Sending: " << request; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VLOG(1) ?
src/server/replica.cc
Outdated
multi_shard_exe_.reset(new MultiShardExecution()); | ||
|
||
// Check this is the first run or the previous one finished cleaning up. | ||
CHECK(shard_flows_.size() == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CHECK_EQ to get more useful info in case it fails
Ok, this is why you introduced a fiber inside the context:
|
@@ -243,7 +244,8 @@ using AggregateGenericError = AggregateValue<GenericError>; | |||
// Context is a utility for managing error reporting and cancellation for complex tasks. | |||
// | |||
// When submitting an error with `Error`, only the first is stored (as in aggregate values). | |||
// Then a special error handler is run, if present, and the context is cancelled. | |||
// Then a special error handler is run, if present, and the context is cancelled. The error handler | |||
// is run in a separate handler to free up the caller. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"in a separate fiber"...
src/server/replica.cc
Outdated
auto err_handler = [this, sync_block](const auto& ge) mutable { | ||
sync_block.Cancel(); // Unblock this function. | ||
DefaultErrorHandler(ge); // Close sockets to unblock flows. | ||
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); // Reset global state. | ||
}; | ||
RETURN_ON_ERR(cntx_.Switch(std::move(err_handler))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the error handler here touches sync_block, which lifetime ends at the end of this function but I dont see a call to cntx_.Switch before this function ends. So if the error handler will run between this function return and the next switch to different error handler it will try to call cancel on the sync_block which is not alive. right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but its in a shared_ptr (custom boost intrusive ptr inside the BlockingCounter)
src/server/replica.cc
Outdated
auto err_handler = [this, sync_block](const auto& ge) mutable { | ||
sync_block.Cancel(); // Unblock this function. | ||
DefaultErrorHandler(ge); // Close sockets to unblock flows. | ||
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); // Reset global state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the error handler should make sure that all the flows are unblocked, but not do the cleanup
So in this case I would create an absl::Cleanup which will
JoinAllFlows
in case cntx_.IsCancelled() :switch the state , flush db
switch the error handler
|
||
// Lock to prevent error handler from running on mixed state. | ||
lock_guard lk{flows_op_mu_}; | ||
shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the cancllation flow if one flow sets error that this causes all flow to shut down without consuming the data that was already received in the socket right? is this the desired behaviour?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we close the sockets anyway and retry the whole flow so I guess it should be fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will not run until the guard holds the lock
Why should it not? Imagine a flow got an error, reported it, the handler run fast and cancelled the sockets, but not all flows actually created their sockets, so the "last ones to run" will not be affected by the cancellation at all. This can be solved by either moving initialization separately, checking for cancellation before inits or locking the initialization phase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And yes, currently we just want to shut down everything and retry from scratch
Signed-off-by: Vladislav Oleshko <[email protected]>
I've pushed a new change where l let the replica do cleanup in RAII handlers where possible. It seems to have simplified some parts, but also complicated others. I've checked pytests, it seems to work
We must do this either way. We can't actually use conditional statements inside the cleanup handler, because nothing prevents cntx_.IsCancelling() from returning false, but then spawning an error at the immediately at the next instant |
void Replica::DefaultErrorHandler(const GenericError& err) { | ||
CloseAllSockets(); | ||
CloseSocket(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its important we don't access the replicas in the default error handler, because if we get cancelled during the initialization phase (where it resets them) of full sync we crash if we read from them at a bad timing
|
||
// Lock to prevent the error handler from running instantly | ||
// while the flows are in a mixed state. | ||
lock_guard lk{flows_op_mu_}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a deadlock here?
You call AwaitFiberOnAll with shard_cb that can invoke error (line 475).
The Error call will call ReportInternal which will wait for error handler to finish, but you want to prevent the error handler to run here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because the error handler is launched as a separate fiber. It will suspend and wake up once the initialization is done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a call to CheckHandlerFb to join the fiber.
Signed-off-by: Vladislav Oleshko <[email protected]>
1706645
to
0a4a54b
Compare
I've been debugging replication cancellation the whole day. I've witnessed the coolest bugs, but its pure hell in general 🙂
The Test
I've been trying to make it pass the pytests (specifically replication cancellation) tests with the normal pytests, the new pytest generator and under intense load & many instances. They seem to pass usually, but if you increase the number of keys/instances/iterations, run it in optimized mode, add printing for delay, etc it tends to fail at places.
In short, the most problematic test was
disconnect_master
. It starts a master and a few replicas, kills the master, starts it again, waits some time and makes sure all the replicas have reconnected to the master. This sounds not very difficult, but it is. It makes sure that a replica:I do all of this a few times in a loop, so it can basically fail mid operation, while re-trying, while initializing flows, etc...
The cancellation mechanism
I'll provide my thought process as separate steps. The new cancellation is used only inside the replica:
1. What the error handler is for
Context
stores a specific error handler that knows how to do all of this for the currently running task2. Division of responsibility
FlowInfo
and one general error handler (that is not changed during the whole process), which invokes the separate handlers. It needs to be re-viewed2.5 Uneven split
3. Starting the error handler
Context
invokes the error handler upon receiving the first error. This might happen from any fiber of the taskContext
guards its operations with a mutex. Previously, it directly ran the error handler in the fiber that reported the error and held the lock until if finished.4. Special case "Master"
CancelReplication
that takes over the whole cancelling process. This avoids many many problems5. The solution?
6. To the point
Context
now starts a separate error handler fibers and stores itStop()
on the contextflows_op_mu_
mutex. Because we first transfer responsibility and only then initialize resources, we need for the error handler to wait until all initialized resources are in a state where they can be teared down correctly.7. Future master perspectives
Goals