Skip to content

Commit

Permalink
refactor: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Mar 7, 2024
1 parent b6e477c commit cdf8433
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace dfly {
enum class MigrationState : uint8_t {
C_NO_STATE,
C_CONNECTING,
C_FULL_SYNC,
C_SYNC,
C_FINISHED,
C_MAX_INVALID = std::numeric_limits<uint8_t>::max()
};
Expand Down
14 changes: 7 additions & 7 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,8 @@ static std::string_view state_to_str(MigrationState state) {
return "NO_STATE"sv;
case MigrationState::C_CONNECTING:
return "CONNECTING"sv;
case MigrationState::C_FULL_SYNC:
return "FULL_SYNC"sv;
case MigrationState::C_SYNC:
return "SYNC"sv;
case MigrationState::C_FINISHED:
return "FINISHED"sv;
case MigrationState::C_MAX_INVALID:
Expand Down Expand Up @@ -778,8 +778,8 @@ uint32_t ClusterFamily::CreateOutgoingMigration(ConnectionContext* cntx, uint16_
// Todo add error processing, stop migration process
// fb2::Fiber("stop_Migration", &ClusterFamily::StopMigration, this, sync_id).Detach();
};
auto info = make_shared<OutgoingMigration>(cntx->conn()->RemoteEndpointAddress(), port,
std::move(slots), err_handler, server_family_);
auto migration = make_shared<OutgoingMigration>(cntx->conn()->RemoteEndpointAddress(), port,
std::move(slots), err_handler, server_family_);
auto [it, inserted] = outgoing_migration_jobs_.emplace(sync_id, std::move(info));
CHECK(inserted);
return sync_id;
Expand All @@ -797,8 +797,8 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {

cntx->conn()->SetName(absl::StrCat("migration_flow_", sync_id));

auto info = GetOutgoingMigration(sync_id);
if (!info)
auto migration = GetOutgoingMigration(sync_id);
if (!migration)
return cntx->SendError(kIdNotFound);

cntx->conn()->Migrate(shard_set->pool()->at(shard_id));
Expand All @@ -809,7 +809,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
EngineShard* shard = EngineShard::tlocal();
DCHECK(shard->shard_id() == shard_id);

info->StartFlow(sync_id, server_family_->journal(), cntx->conn()->socket());
migration->StartFlow(sync_id, server_family_->journal(), cntx->conn()->socket());
}

void ClusterFamily::FinalizeIncomingMigration(uint32_t local_sync_id) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ void ClusterSlotMigration::Stop() {
void ClusterSlotMigration::MainMigrationFb() {
VLOG(1) << "Main migration fiber started " << sync_id_;

state_ = MigrationState::C_FULL_SYNC;
state_ = MigrationState::C_SYNC;

// TODO add reconnection code
if (auto ec = InitiateSlotsMigration(); ec) {
Expand Down
4 changes: 2 additions & 2 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class OutgoingMigration::SliceSlotMigration {
SliceSlotMigration(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal,
Context* cntx, io::Sink* dest)
: streamer_(slice, std::move(slots), sync_id, journal, cntx) {
state_.store(MigrationState::C_FULL_SYNC, memory_order_relaxed);
state_.store(MigrationState::C_SYNC, memory_order_relaxed);
sync_fb_ = Fiber("slot-snapshot", [this, dest] { streamer_.Start(dest); });
}

Expand Down Expand Up @@ -77,7 +77,7 @@ void OutgoingMigration::StartFlow(uint32_t sync_id, journal::Journal* journal, i
state = GetStateImpl();
}

if (state == MigrationState::C_FULL_SYNC) {
if (state == MigrationState::C_SYNC) {
main_sync_fb_ = Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this);
}
}
Expand Down

0 comments on commit cdf8433

Please sign in to comment.