Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cluster): Wait before all access to slot migrations #3180

Merged
merged 3 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv
: ProtocolClient(info.ip, info.port),
migration_info_(std::move(info)),
slot_migrations_(shard_set->size()),
slot_migrations_init_guard_(shard_set->size()),
server_family_(sf),
cf_(cf) {
}
Expand All @@ -112,6 +113,7 @@ void OutgoingMigration::Finish(bool is_error) {
return;
}

slot_migrations_init_guard_.Wait();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intuitively I find it not right that you wait for all the callbacks to finish in one of the callbacks. Moreover, seems from the first sight redundant since you already use AwaitFiberOnAll as a barrier.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intuitively I find it not right that you wait for all the callbacks to finish in one of the callbacks

Why, though? That callback can yield to other callbacks if needed. It's not blocking on the shard queue, so what's wrong with that?
(further, one could argue that basically everything that runs in Dragonfly is callbacks :) )

Moreover, seems from the first sight redundant since you already use AwaitFiberOnAll as a barrier.

That's what I had in mind when I fixed this code originally (a few days ago). But the problem is that cancellation can happen in another thread (it comes from the client via the admin-port). So AwaitFiberOnAll wouldn't do on its own :(

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callbacks can block, I am not saying there is a technical problem, but usually we do not synchronise callbacks between themselves when we run AwaitFiberOnAll - we wait for them to return all the information, aggregate it and then do the next step if needed. Maybe what's missing for me is to understand what the problematic use-case is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think now I understand it actually. I would separate it into two
shard_set->pool()->Await(Fiber?)OnAll steps: the first one initializing the migration, and the second one doing the cancellable sync

shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
auto& flow = slot_migrations_[shard->shard_id()];
Expand Down Expand Up @@ -181,18 +183,13 @@ void OutgoingMigration::SyncFb() {
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
server_family_->journal()->StartInThread();
slot_migrations_[shard->shard_id()] = std::make_unique<SliceSlotMigration>(
auto& migration = slot_migrations_[shard->shard_id()];
DCHECK(migration == nullptr);
migration = std::make_unique<SliceSlotMigration>(
&shard->db_slice(), server(), migration_info_.slot_ranges, server_family_->journal());
}
});

// Start migrations in a separate hop to make sure that Finish() is called only after all
// migrations are created (see #3139)
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
auto& migration = *slot_migrations_[shard->shard_id()];
migration.Sync(cf_->MyID(), shard->shard_id());
if (migration.GetError()) {
slot_migrations_init_guard_.Dec(); // Must be done before Finish(), as it blocks on it
migration->Sync(cf_->MyID(), shard->shard_id());
if (migration->GetError()) {
Finish(true);
}
}
Expand Down Expand Up @@ -251,13 +248,13 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
pause_fb_opt->JoinIfNeeded();
});

slot_migrations_init_guard_.Wait();
auto cb = [this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
VLOG(1) << "FINALIZE outgoing migration" << shard->shard_id();
slot_migrations_[shard->shard_id()]->Finalize();
}
};

shard_set->pool()->AwaitFiberOnAll(std::move(cb));

auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID(), " ", attempt);
Expand Down Expand Up @@ -308,12 +305,15 @@ void OutgoingMigration::Start() {
}

bool OutgoingMigration::CheckFlowsForErrors() {
slot_migrations_init_guard_.Wait();

for (const auto& flow : slot_migrations_) {
if (flow->GetError()) {
cntx_.ReportError(flow->GetError());
return true;
}
}

return false;
}

Expand Down
4 changes: 4 additions & 0 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ class OutgoingMigration : private ProtocolClient {

private:
MigrationInfo migration_info_;

// Only use after Wait()ing for guard below
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_;
util::fb2::EmbeddedBlockingCounter slot_migrations_init_guard_;

ServerFamily* server_family_;
ClusterFamily* cf_;
dfly::GenericError last_error_;
Expand Down
Loading