Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cluster): automatic slot migration finalization #2697 #2698

Merged
merged 4 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ namespace dfly {
enum class MigrationState : uint8_t {
C_NO_STATE,
C_CONNECTING,
C_FULL_SYNC,
C_STABLE_SYNC,
C_SYNC,
C_FINISHED,
C_MAX_INVALID = std::numeric_limits<uint8_t>::max()
};
Expand Down
62 changes: 8 additions & 54 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,6 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
return DflyClusterStartSlotMigration(args, cntx);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also start slot migration need to be removed dont forget

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have another PR for migration initiation refactoring.

} else if (sub_cmd == "SLOT-MIGRATION-STATUS") {
return DflyClusterSlotMigrationStatus(args, cntx);
} else if (sub_cmd == "SLOT-MIGRATION-FINALIZE") {
return DflyClusterMigrationFinalize(args, cntx);
}

return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
Expand Down Expand Up @@ -634,10 +632,8 @@ static std::string_view state_to_str(MigrationState state) {
return "NO_STATE"sv;
case MigrationState::C_CONNECTING:
return "CONNECTING"sv;
case MigrationState::C_FULL_SYNC:
return "FULL_SYNC"sv;
case MigrationState::C_STABLE_SYNC:
return "STABLE_SYNC"sv;
case MigrationState::C_SYNC:
return "SYNC"sv;
case MigrationState::C_FINISHED:
return "FINISHED"sv;
case MigrationState::C_MAX_INVALID:
Expand Down Expand Up @@ -689,47 +685,6 @@ void ClusterFamily::DflyClusterSlotMigrationStatus(CmdArgList args, ConnectionCo
return rb->SendSimpleString(state_to_str(MigrationState::C_NO_STATE));
}

void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser{args};
auto sync_id = parser.Next<uint32_t>();

if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
}

auto migration = GetOutgoingMigration(sync_id);
if (!migration)
return cntx->SendError(kIdNotFound);

// TODO implement blocking on migrated slots only

bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
auto pause_fb_opt =
Pause(server_family_->GetListeners(), cntx->conn(), ClientPause::WRITE, is_pause_in_progress);

if (!pause_fb_opt) {
LOG(WARNING) << "Cluster migration finalization time out";
return cntx->SendError("Blocking connections time out");
}

absl::Cleanup cleanup([&is_block_active, &pause_fb_opt] {
is_block_active = false;
pause_fb_opt->JoinIfNeeded();
});

auto cb = [this, &migration](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
// TODO add error processing to move back into STABLE_SYNC state
migration->Finalize(shard->shard_id());
}
};

shard_set->pool()->AwaitFiberOnAll(std::move(cb));

return cntx->SendOk();
}

void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
Expand Down Expand Up @@ -823,10 +778,9 @@ uint32_t ClusterFamily::CreateOutgoingMigration(ConnectionContext* cntx, uint16_
// Todo add error processing, stop migration process
// fb2::Fiber("stop_Migration", &ClusterFamily::StopMigration, this, sync_id).Detach();
};
auto info =
make_shared<OutgoingMigration>(shard_set->size(), cntx->conn()->RemoteEndpointAddress(), port,
std::move(slots), err_handler);
auto [it, inserted] = outgoing_migration_jobs_.emplace(sync_id, std::move(info));
auto migration = make_shared<OutgoingMigration>(cntx->conn()->RemoteEndpointAddress(), port,
std::move(slots), err_handler, server_family_);
auto [it, inserted] = outgoing_migration_jobs_.emplace(sync_id, std::move(migration));
CHECK(inserted);
return sync_id;
}
Expand All @@ -843,8 +797,8 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {

cntx->conn()->SetName(absl::StrCat("migration_flow_", sync_id));

auto info = GetOutgoingMigration(sync_id);
if (!info)
auto migration = GetOutgoingMigration(sync_id);
if (!migration)
return cntx->SendError(kIdNotFound);

cntx->conn()->Migrate(shard_set->pool()->at(shard_id));
Expand All @@ -855,7 +809,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
EngineShard* shard = EngineShard::tlocal();
DCHECK(shard->shard_id() == shard_id);

info->StartFlow(&shard->db_slice(), sync_id, server_family_->journal(), cntx->conn()->socket());
migration->StartFlow(sync_id, server_family_->journal(), cntx->conn()->socket());
}

void ClusterFamily::FinalizeIncomingMigration(uint32_t local_sync_id) {
Expand Down
1 change: 0 additions & 1 deletion src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class ClusterFamily {
private: // Slots migration section
void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx);
void DflyClusterSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx);
void DflyClusterMigrationFinalize(CmdArgList args, ConnectionContext* cntx);

// DFLYMIGRATE is internal command defines several steps in slots migrations process
void DflyMigrate(CmdArgList args, ConnectionContext* cntx);
Expand Down
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ void ClusterSlotMigration::Stop() {
void ClusterSlotMigration::MainMigrationFb() {
VLOG(1) << "Main migration fiber started " << sync_id_;

state_ = MigrationState::C_FULL_SYNC;
state_ = MigrationState::C_SYNC;

// TODO add reconnection code
if (auto ec = InitiateSlotsMigration(); ec) {
Expand Down
51 changes: 44 additions & 7 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

#include <atomic>

#include "absl/cleanup/cleanup.h"
#include "base/logging.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/journal/streamer.h"
#include "server/server_family.h"

using namespace std;
namespace dfly {
Expand All @@ -18,7 +21,7 @@ class OutgoingMigration::SliceSlotMigration {
SliceSlotMigration(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal,
Context* cntx, io::Sink* dest)
: streamer_(slice, std::move(slots), sync_id, journal, cntx) {
state_.store(MigrationState::C_FULL_SYNC, memory_order_relaxed);
state_.store(MigrationState::C_SYNC, memory_order_relaxed);
sync_fb_ = Fiber("slot-snapshot", [this, dest] { streamer_.Start(dest); });
}

Expand Down Expand Up @@ -46,17 +49,24 @@ class OutgoingMigration::SliceSlotMigration {
Fiber sync_fb_;
};

OutgoingMigration::OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port,
SlotRanges slots, Context::ErrHandler err_handler)
: host_ip_(ip), port_(port), slots_(slots), cntx_(err_handler), slot_migrations_(flows_num) {
OutgoingMigration::OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots,
Context::ErrHandler err_handler, ServerFamily* sf)
: host_ip_(ip),
port_(port),
slots_(slots),
cntx_(err_handler),
slot_migrations_(shard_set->size()),
server_family_(sf) {
}

OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();
}

void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal,
io::Sink* dest) {
void OutgoingMigration::StartFlow(uint32_t sync_id, journal::Journal* journal, io::Sink* dest) {
EngineShard* shard = EngineShard::tlocal();
DbSlice* slice = &shard->db_slice();

const auto shard_id = slice->shard_id();

MigrationState state = MigrationState::C_NO_STATE;
Expand All @@ -67,7 +77,7 @@ void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Jou
state = GetStateImpl();
}

if (state == MigrationState::C_FULL_SYNC) {
if (state == MigrationState::C_SYNC) {
main_sync_fb_ = Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this);
}
}
Expand Down Expand Up @@ -102,6 +112,33 @@ void OutgoingMigration::SyncFb() {
migration->WaitForSnapshotFinished();
}
VLOG(1) << "Migrations snapshot is finihed";

BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
// TODO implement blocking on migrated slots only

bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
auto pause_fb_opt =
Pause(server_family_->GetListeners(), nullptr, ClientPause::WRITE, is_pause_in_progress);

if (!pause_fb_opt) {
LOG(WARNING) << "Cluster migration finalization time out";
}

absl::Cleanup cleanup([&is_block_active, &pause_fb_opt] {
is_block_active = false;
pause_fb_opt->JoinIfNeeded();
});

auto cb = [this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
// TODO add error processing to move back into STABLE_SYNC state
Finalize(shard->shard_id());
}
};

shard_set->pool()->AwaitFiberOnAll(std::move(cb));

// TODO add ACK here and config update
}

} // namespace dfly
9 changes: 6 additions & 3 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ class Journal;
}

class DbSlice;
class ServerFamily;

// Whole outgoing slots migration manager
class OutgoingMigration {
public:
OutgoingMigration() = default;
~OutgoingMigration();
OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port, SlotRanges slots,
Context::ErrHandler err_handler);
OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots, Context::ErrHandler,
ServerFamily* sf);

void StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal, io::Sink* dest);
// should be run for all shards
void StartFlow(uint32_t sync_id, journal::Journal* journal, io::Sink* dest);

void Finalize(uint32_t shard_id);
void Cancel(uint32_t shard_id);
Expand Down Expand Up @@ -56,6 +58,7 @@ class OutgoingMigration {
Context cntx_;
mutable Mutex flows_mu_;
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_ ABSL_GUARDED_BY(flows_mu_);
ServerFamily* server_family_;

Fiber main_sync_fb_;
};
Expand Down
89 changes: 42 additions & 47 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,9 +873,12 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
c_nodes_admin,
)

assert await c_nodes[0].set("KEY0", "value")
assert await c_nodes[0].set("KEY1", "value")
assert await c_nodes[1].set("KEY2", "value")
assert await c_nodes[1].set("KEY3", "value")

assert await c_nodes[0].set("KEY4", "value")
assert await c_nodes[0].set("KEY5", "value")
assert await c_nodes[1].set("KEY6", "value")
assert await c_nodes[1].set("KEY7", "value")
assert await c_nodes[0].set("KEY8", "value")
Expand All @@ -891,26 +894,13 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
assert await c_nodes[1].set("KEY18", "value")
assert await c_nodes[1].set("KEY19", "value")

assert await c_nodes[0].execute_command("DBSIZE") == 10

res = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "3000", "9000"
)
assert 1 == res

assert await c_nodes[0].set("KEY0", "value")
assert await c_nodes[0].set("KEY1", "value")

await asyncio.sleep(0.5)

assert await c_nodes[0].set("KEY4", "value")
assert await c_nodes[0].set("KEY5", "value")
assert await c_nodes[0].execute_command("DBSIZE") == 10

# TODO remove when we add slot blocking
await asyncio.sleep(0.5)

res = await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-FINALIZE", "1")
assert "OK" == res

await asyncio.sleep(0.5)

while (
Expand Down Expand Up @@ -1029,6 +1019,30 @@ async def generate_config():

fill_task = asyncio.create_task(seeder.run())

# some time fo seeder
await asyncio.sleep(0.5)

# Counter that pushes values to a list
async def list_counter(key, client: aioredis.RedisCluster):
for i in itertools.count(start=1):
await client.lpush(key, i)

# Start ten counters
counter_keys = [f"_counter{i}" for i in range(10)]
counter_connections = [
aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port) for _ in range(10)
]
counters = [
asyncio.create_task(list_counter(key, conn))
for key, conn in zip(counter_keys, counter_connections)
]

seeder.stop()
await fill_task

# Generate capture, capture ignores counter keys
capture = await seeder.capture()

# Generate migration plan
for node_idx, node in enumerate(nodes):
random.shuffle(node.slots)
Expand Down Expand Up @@ -1063,44 +1077,25 @@ async def generate_config():
keeping = node.slots[num_outgoing:]
node.next_slots.extend(keeping)

# some more time fo seeder
await asyncio.sleep(1.0)

seeder.stop()
await fill_task
await asyncio.sleep(1.0)

# Counter that pushes values to a list
async def list_counter(key, client: aioredis.RedisCluster):
for i in itertools.count(start=1):
await client.lpush(key, i)

# Start ten counters
counter_keys = [f"_counter{i}" for i in range(10)]
counter_connections = [
aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port) for _ in range(10)
]
counters = [
asyncio.create_task(list_counter(key, conn))
for key, conn in zip(counter_keys, counter_connections)
]
iterations = 0
while True:
for node in nodes:
states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
print(states)
if not all(s.endswith("FINISHED") for s in states) and not states == "NO_STATE":
break
else:
break

# Generate capture, capture ignores counter keys
capture = await seeder.capture()
iterations += 1
assert iterations < 100

# Finalize slot migration
for node in nodes:
for sync_id in node.sync_ids:
assert "OK" == await node.admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-FINALIZE", sync_id
)
await asyncio.sleep(0.1)

# Stop counters
for counter in counters:
counter.cancel()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we push below the old slot configuration and not the new. also I see the code under # Transfer nodes that updates the node configuration only after the push

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you are right. I will discuss this with Vlad. I've tried to exchange these operations and the test fails after that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you changed it now.. is something fixed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no the test is failed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wanted to check is it the same

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we push the old config, because generate_config takes the values of node.slots. We should first update the node objects and only then push the new config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the bug in the code so will fix it later)


# need this sleep to avoid race between finalize and config
await asyncio.sleep(0.5)
# Push new config
await push_config(json.dumps(await generate_config()), [node.admin_client for node in nodes])

Expand Down
Loading