Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Improved cancellation #599

Merged
merged 4 commits into from
Dec 27, 2022

Conversation

dranikpg
Copy link
Contributor

@dranikpg dranikpg commented Dec 25, 2022

I've been debugging replication cancellation the whole day. I've witnessed the coolest bugs, but its pure hell in general 🙂

The Test

I've been trying to make it pass the pytests (specifically replication cancellation) tests with the normal pytests, the new pytest generator and under intense load & many instances. They seem to pass usually, but if you increase the number of keys/instances/iterations, run it in optimized mode, add printing for delay, etc it tends to fail at places.

In short, the most problematic test was disconnect_master . It starts a master and a few replicas, kills the master, starts it again, waits some time and makes sure all the replicas have reconnected to the master. This sounds not very difficult, but it is. It makes sure that a replica:

  • Recovers from any error at any time (because the master disconnect can happen at different phases)
  • Unblocks the current operation in a timely manner and runs cleanup for it
  • Switches back to looping mode (where it tries to re-connect)
  • Once the master is back alive, is starts a new full sync operation

I do all of this a few times in a loop, so it can basically fail mid operation, while re-trying, while initializing flows, etc...

The cancellation mechanism

I'll provide my thought process as separate steps. The new cancellation is used only inside the replica:

1. What the error handler is for

  • We have blocking code in many operations on replica and master (block on socket, mutex, barrier, blocking counter, eyc. ) that need to be unblocked manually on cancellation.
  • The Context stores a specific error handler that knows how to do all of this for the currently running task
  • While we can precisely control error propagation inside some function, we rely on the context for collecting errors from different fibers (there can be many many for a single task), so we don't need to invent our own collection mechanism for every task and we can use contexts in general components.

2. Division of responsibility

  • What do I mean by task? The whole replication flow is split into parts, I like to call the big and difficult ones tasks. For now, there are exactly four - full and stable sync on replica and master accordingly (2x2). Those have clearly a start, an end, own resources and need complex cleanup. The other parts use cancellation as well - but in a simpler way.
  • Inside the replica, transferring responsibility when changing parts happens with switching the error handler atomically and retries are prepared with resetting the context
  • The master uses an older custom mechanism for storing explicit error callbacks for each FlowInfo and one general error handler (that is not changed during the whole process), which invokes the separate handlers. It needs to be re-viewed

2.5 Uneven split

  • The biggest difference here is that the replica is active - it coordinates the replication process - and the master is reactive - it just reacts to events - which makes the replica much more complicated
  • Seriously, it has all the pain points. Only the replica:
    • Initiates all phases
    • Does retries
    • Keeps track of the general flow, blocks there
    • Can fail during during task initialization because it is the one sending set-up requests

3. Starting the error handler

  • The Context invokes the error handler upon receiving the first error. This might happen from any fiber of the task
  • The Context guards its operations with a mutex. Previously, it directly ran the error handler in the fiber that reported the error and held the lock until if finished.
  • The benefits of this were:
    • Is is very simple
    • It prevent others from resetting the context (because the only lock is held)
    • It conveniently allows to wait until the handler finished
  • The downside is that its very difficult to make it work reliably:
    • The context is locked
    • Some fiber deep inside the task is occupied by the error handler itself.
    • The more features we add, the more complex it becomes, which increases the probability of it deadlocking and makes thinking about cleanup more difficult.
  • In reality we want:
    • Freely access the context, propagate its errors, block on resetting it, etc.
    • Allow the error handler to precisely manage its own lifetime (so it can run before, in-between, after all tasks unblocked)

4. Special case "Master"

  • There were no issues with the master cancellation though... Why? Because it uses a custom more sophisticated approach and doesn't have the notion of re-trying operations
  • Currently, cancelling the context for a session in master is equal to evicting it. The error handler cannot really manage destroying the whole object is belongs to - its essentially a reversed relationship - so I start a separate detached fiber CancelReplication that takes over the whole cancelling process. This avoids many many problems

5. The solution?

  • The solution in my mind is to externalize cancellation as a default in a way that it has as little as possible friction with the task that we are tearing down itself, so we move the whole complexity into the context itself.
  • The pain point here for the task (or its manager, the one who started it) is that:
    • It needs to wait for the cancellation to stop before starting a new task (because it might be re-trying)
    • If the context has more responsibility, the task is more vulnerable during the initialization of resources

6. To the point

  • Context now starts a separate error handler fibers and stores it
  • When resetting/switching handlers on the context, it waits for a running error handler to stop - this solves the retry and coordination problem
  • Checking for handler stop is done with Stop() on the context
  • On the replica, fast fails during the initialization are handled with a flows_op_mu_ mutex. Because we first transfer responsibility and only then initialize resources, we need for the error handler to wait until all initialized resources are in a state where they can be teared down correctly.

7. Future master perspectives

  • As I stated, the master is reactive and it has no long "critical control paths", that is why cancellation is simplified there
  • With its current model, cancelling operations mid execution is not possible. A faulty replica that freezes during stable state transition will freeze the master as well. If we have many pending operations, we can't shut down the master instantly.

Goals

  • Review my new replica cancellation mechanism
  • Review the master cancellation mechanism and maybe see what common parts they share
  • Maybe review how we split cancellation complexity between the two actors

Signed-off-by: Vladislav Oleshko <[email protected]>
@dranikpg dranikpg requested review from romange and adiholden and removed request for romange December 25, 2022 20:15
Copy link
Collaborator

@romange romange left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think using signal instead of report wording in this context is more accurate. report is associated with printing the error in my mind.

I still did not understand from the explanation here why the error handler must run asynchronously. Especially if you join on its fiber everywhere, making it synchronous.

err_ = {};
err_handler_ = std::move(handler);
Cancellation::flag_.store(false, std::memory_order_relaxed);
}

GenericError Context::Switch(ErrHandler handler) {
std::lock_guard lk{mu_};
if (!err_)
if (!err_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Switch is a too generic name or maybe Context is a too-generic name. Does golang version of Context also has Switch and Reset equivalents?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesnt have any of this at all

CheckHandlerFb();
}

GenericError Context::ReportInternal(GenericError&& err) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context and all its routines avoid using Error in their names. Specifically, maybe rename ReportInternal to PropagateError ?

return err_;
err_ = std::move(err);

CheckHandlerFb();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckHandlerFb -> WaitForErrorHandlerToFinish() ? Or a more concise version of it? 😄

string_view kind = (stable) ? "STARTSTABLE"sv : "SYNC"sv;
string request = StrCat("DFLY ", kind, " ", master_context_.dfly_session_id);

LOG(INFO) << "Sending: " << request;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VLOG(1) ?

multi_shard_exe_.reset(new MultiShardExecution());

// Check this is the first run or the previous one finished cleaning up.
CHECK(shard_flows_.size() == 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CHECK_EQ to get more useful info in case it fails

@romange
Copy link
Collaborator

romange commented Dec 26, 2022

Ok, this is why you introduced a fiber inside the context:

The context is locked
Some fiber deep inside the task is occupied by the error handler itself.

@@ -243,7 +244,8 @@ using AggregateGenericError = AggregateValue<GenericError>;
// Context is a utility for managing error reporting and cancellation for complex tasks.
//
// When submitting an error with `Error`, only the first is stored (as in aggregate values).
// Then a special error handler is run, if present, and the context is cancelled.
// Then a special error handler is run, if present, and the context is cancelled. The error handler
// is run in a separate handler to free up the caller.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"in a separate fiber"...

auto err_handler = [this, sync_block](const auto& ge) mutable {
sync_block.Cancel(); // Unblock this function.
DefaultErrorHandler(ge); // Close sockets to unblock flows.
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); // Reset global state.
};
RETURN_ON_ERR(cntx_.Switch(std::move(err_handler)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error handler here touches sync_block, which lifetime ends at the end of this function but I dont see a call to cntx_.Switch before this function ends. So if the error handler will run between this function return and the next switch to different error handler it will try to call cancel on the sync_block which is not alive. right?

Copy link
Contributor Author

@dranikpg dranikpg Dec 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but its in a shared_ptr (custom boost intrusive ptr inside the BlockingCounter)

auto err_handler = [this, sync_block](const auto& ge) mutable {
sync_block.Cancel(); // Unblock this function.
DefaultErrorHandler(ge); // Close sockets to unblock flows.
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); // Reset global state.
Copy link
Collaborator

@adiholden adiholden Dec 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the error handler should make sure that all the flows are unblocked, but not do the cleanup
So in this case I would create an absl::Cleanup which will
JoinAllFlows
in case cntx_.IsCancelled() :switch the state , flush db
switch the error handler


// Lock to prevent error handler from running on mixed state.
lock_guard lk{flows_op_mu_};
shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the cancllation flow if one flow sets error that this causes all flow to shut down without consuming the data that was already received in the socket right? is this the desired behaviour?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we close the sockets anyway and retry the whole flow so I guess it should be fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not run until the guard holds the lock

Why should it not? Imagine a flow got an error, reported it, the handler run fast and cancelled the sockets, but not all flows actually created their sockets, so the "last ones to run" will not be affected by the cancellation at all. This can be solved by either moving initialization separately, checking for cancellation before inits or locking the initialization phase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yes, currently we just want to shut down everything and retry from scratch

Signed-off-by: Vladislav Oleshko <[email protected]>
@dranikpg
Copy link
Contributor Author

dranikpg commented Dec 26, 2022

I've pushed a new change where l let the replica do cleanup in RAII handlers where possible. It seems to have simplified some parts, but also complicated others.

I've checked pytests, it seems to work

in case cntx_.IsCancelled() :switch the state

We must do this either way. We can't actually use conditional statements inside the cleanup handler, because nothing prevents cntx_.IsCancelling() from returning false, but then spawning an error at the immediately at the next instant

Comment on lines 611 to 613
void Replica::DefaultErrorHandler(const GenericError& err) {
CloseAllSockets();
CloseSocket();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its important we don't access the replicas in the default error handler, because if we get cancelled during the initialization phase (where it resets them) of full sync we crash if we read from them at a bad timing

@dranikpg dranikpg requested a review from adiholden December 27, 2022 09:01

// Lock to prevent the error handler from running instantly
// while the flows are in a mixed state.
lock_guard lk{flows_op_mu_};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a deadlock here?
You call AwaitFiberOnAll with shard_cb that can invoke error (line 475).
The Error call will call ReportInternal which will wait for error handler to finish, but you want to prevent the error handler to run here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because the error handler is launched as a separate fiber. It will suspend and wake up once the initialization is done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a call to CheckHandlerFb to join the fiber.

adiholden
adiholden previously approved these changes Dec 27, 2022
@dranikpg dranikpg force-pushed the replication-cancellation-v2 branch from 1706645 to 0a4a54b Compare December 27, 2022 10:41
@dranikpg dranikpg merged commit e6721d8 into dragonflydb:main Dec 27, 2022
@dranikpg dranikpg deleted the replication-cancellation-v2 branch February 27, 2023 16:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants