diff --git a/src/server/cluster/cluster_defs.h b/src/server/cluster/cluster_defs.h index aa97f856955c..1206a340a93e 100644 --- a/src/server/cluster/cluster_defs.h +++ b/src/server/cluster/cluster_defs.h @@ -81,7 +81,6 @@ enum class MigrationState : uint8_t { C_SYNC, C_ERROR, C_FINISHED, - C_MAX_INVALID = std::numeric_limits::max() }; SlotId KeySlot(std::string_view key); diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index e0ce17a23203..83b7654e381c 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -660,8 +660,6 @@ static string_view StateToStr(MigrationState state) { return "ERROR"sv; case MigrationState::C_FINISHED: return "FINISHED"sv; - case MigrationState::C_MAX_INVALID: - break; } DCHECK(false) << "Unknown State value " << static_cast>(state); return "UNDEFINED_STATE"sv; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index e71b359cd48e..7261138a67d7 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -107,18 +107,37 @@ bool OutgoingMigration::ChangeState(MigrationState new_state) { } void OutgoingMigration::Finish(bool is_error) { - const auto new_state = is_error ? MigrationState::C_ERROR : MigrationState::C_FINISHED; - if (!ChangeState(new_state)) { - return; + bool should_cancel_flows = false; + + { + std::lock_guard lk(state_mu_); + switch (state_) { + case MigrationState::C_FINISHED: + return; // Already finished, nothing else to do + + case MigrationState::C_NO_STATE: + case MigrationState::C_CONNECTING: + should_cancel_flows = false; + break; + + case MigrationState::C_SYNC: + case MigrationState::C_ERROR: + should_cancel_flows = true; + break; + } + + state_ = is_error ? MigrationState::C_ERROR : MigrationState::C_FINISHED; } - shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { - if (const auto* shard = EngineShard::tlocal(); shard) { - auto& flow = slot_migrations_[shard->shard_id()]; - CHECK(flow != nullptr); - flow->Cancel(); - } - }); + if (should_cancel_flows) { + shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { + if (const auto* shard = EngineShard::tlocal(); shard) { + auto& flow = slot_migrations_[shard->shard_id()]; + CHECK(flow != nullptr); + flow->Cancel(); + } + }); + } } MigrationState OutgoingMigration::GetState() const { @@ -174,10 +193,6 @@ void OutgoingMigration::SyncFb() { continue; } - if (!ChangeState(MigrationState::C_SYNC)) { - break; - } - shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { if (auto* shard = EngineShard::tlocal(); shard) { server_family_->journal()->StartInThread(); @@ -186,13 +201,16 @@ void OutgoingMigration::SyncFb() { } }); - // Start migrations in a separate hop to make sure that Finish() is called only after all - // migrations are created (see #3139) + if (!ChangeState(MigrationState::C_SYNC)) { + break; + } + shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { if (auto* shard = EngineShard::tlocal(); shard) { - auto& migration = *slot_migrations_[shard->shard_id()]; - migration.Sync(cf_->MyID(), shard->shard_id()); - if (migration.GetError()) { + auto& migration = slot_migrations_[shard->shard_id()]; + CHECK(migration != nullptr); + migration->Sync(cf_->MyID(), shard->shard_id()); + if (migration->GetError()) { Finish(true); } }