Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cluster): fix slot filtration to RestoreStreamer #2477

Merged
merged 4 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const {
return ShouldWrite(*item.slot);
}

bool RestoreStreamer::ShouldWrite(std::string_view key) const {
return ShouldWrite(ClusterConfig::KeySlot(key));
}

bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {
return my_slots_.contains(slot_id);
}
Expand All @@ -121,28 +125,29 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), snapshot_version_);
it.SetVersion(snapshot_version_);

bool is_data_present = false;
{
FiberAtomicGuard fg; // Can't switch fibers because that could invalidate iterator

while (!it.is_done()) {
string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) {
const auto& pv = it->second;

string key_buffer;
string_view key = it->first.GetSlice(&key_buffer);
if (ShouldWrite(key)) {
is_data_present = true;

uint64_t expire = 0;
if (pv.HasExpire()) {
auto eit = db_slice_->databases()[0]->expire.Find(it->first);
expire = db_slice_->ExpireTime(eit);
}
uint64_t expire = 0;
if (pv.HasExpire()) {
auto eit = db_slice_->databases()[0]->expire.Find(it->first);
expire = db_slice_->ExpireTime(eit);
}

WriteEntry(key, pv, expire);

++it;
WriteEntry(key, pv, expire);
}
}
}

NotifyWritten(true);
if (is_data_present)
NotifyWritten(true);
}

void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
Expand All @@ -166,7 +171,6 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req

void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pv, uint64_t expire_ms) {
absl::InlinedVector<string_view, 4> args;

args.push_back(key);

string expire_str = absl::StrCat(expire_ms);
Expand Down
1 change: 1 addition & 0 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class RestoreStreamer : public JournalStreamer {
private:
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
bool ShouldWrite(const journal::JournalItem& item) const override;
bool ShouldWrite(std::string_view key) const;
bool ShouldWrite(SlotId slot_id) const;

void WriteBucket(PrimeTable::bucket_iterator it);
Expand Down
145 changes: 117 additions & 28 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,31 +762,13 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
config = f"""
[
{{
"slot_ranges": [
{{
"start": 0,
"end": LAST_SLOT_CUTOFF
}}
],
"master": {{
"id": "{node_ids[0]}",
"ip": "localhost",
"port": {nodes[0].port}
}},
"slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ],
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }},
"replicas": []
}},
{{
"slot_ranges": [
{{
"start": NEXT_SLOT_CUTOFF,
"end": 16383
}}
],
"master": {{
"id": "{node_ids[1]}",
"ip": "localhost",
"port": {nodes[1].port}
}},
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
"master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }},
"replicas": []
}}
]
Expand All @@ -807,12 +789,13 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
)
assert "OK" == res

await asyncio.sleep(0.5)

status = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
)
assert "STABLE_SYNC" == status
while (
await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
)
!= "STABLE_SYNC"
):
await asyncio.sleep(0.05)

status = await c_nodes_admin[0].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[1].port)
Expand All @@ -835,5 +818,111 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
except redis.exceptions.ResponseError as e:
assert e.args[0] == "Can't start the migration, another one is in progress"

await push_config(
config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
c_nodes_admin,
)

await c_nodes_admin[0].close()
await c_nodes_admin[1].close()


@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
Comment on lines +830 to +831
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need 2 tests for data and slot migration? Why not do both in the same test? What's the benefit of having 2 tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to have a really big test. in cluster_migration_test I check corner cases of API and in current test data.

# Check data migration from one node to another
nodes = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
for i in range(2)
]

df_local_factory.start_all(nodes)

c_nodes = [node.client() for node in nodes]
c_nodes_admin = [node.admin_client() for node in nodes]

node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin))

config = f"""
[
{{
"slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ],
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }},
"replicas": []
}},
{{
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
"master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }},
"replicas": []
}}
]
"""

await push_config(
config.replace("LAST_SLOT_CUTOFF", "9000").replace("NEXT_SLOT_CUTOFF", "9001"),
c_nodes_admin,
)

assert await c_nodes[0].set("KEY0", "value")
assert await c_nodes[0].set("KEY1", "value")
assert await c_nodes[1].set("KEY2", "value")
assert await c_nodes[1].set("KEY3", "value")
assert await c_nodes[0].set("KEY4", "value")
assert await c_nodes[0].set("KEY5", "value")
assert await c_nodes[1].set("KEY6", "value")
assert await c_nodes[1].set("KEY7", "value")
assert await c_nodes[0].set("KEY8", "value")
assert await c_nodes[0].set("KEY9", "value")
assert await c_nodes[1].set("KEY10", "value")
assert await c_nodes[1].set("KEY11", "value")
assert await c_nodes[0].set("KEY12", "value")
assert await c_nodes[0].set("KEY13", "value")
assert await c_nodes[1].set("KEY14", "value")
assert await c_nodes[1].set("KEY15", "value")
assert await c_nodes[0].set("KEY16", "value")
assert await c_nodes[0].set("KEY17", "value")
assert await c_nodes[1].set("KEY18", "value")
assert await c_nodes[1].set("KEY19", "value")
assert await c_nodes[0].execute_command("DBSIZE") == 10

res = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "3000", "9000"
)
assert "OK" == res

while (
await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
)
!= "STABLE_SYNC"
):
await asyncio.sleep(0.05)

await push_config(
config.replace("LAST_SLOT_CUTOFF", "2999").replace("NEXT_SLOT_CUTOFF", "3000"),
c_nodes_admin,
)

assert await c_nodes[0].get("KEY0") == "value"
assert await c_nodes[1].get("KEY1") == "value"
assert await c_nodes[1].get("KEY2") == "value"
assert await c_nodes[1].get("KEY3") == "value"
assert await c_nodes[0].get("KEY4") == "value"
assert await c_nodes[1].get("KEY5") == "value"
assert await c_nodes[1].get("KEY6") == "value"
assert await c_nodes[1].get("KEY7") == "value"
assert await c_nodes[0].get("KEY8") == "value"
assert await c_nodes[1].get("KEY9") == "value"
assert await c_nodes[1].get("KEY10") == "value"
assert await c_nodes[1].get("KEY11") == "value"
assert await c_nodes[1].get("KEY12") == "value"
assert await c_nodes[1].get("KEY13") == "value"
assert await c_nodes[1].get("KEY14") == "value"
assert await c_nodes[1].get("KEY15") == "value"
assert await c_nodes[1].get("KEY16") == "value"
assert await c_nodes[1].get("KEY17") == "value"
assert await c_nodes[1].get("KEY18") == "value"
assert await c_nodes[1].get("KEY19") == "value"
assert await c_nodes[1].execute_command("DBSIZE") == 17

await c_nodes_admin[0].close()
await c_nodes_admin[1].close()
Loading