-
Notifications
You must be signed in to change notification settings - Fork 990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cluster): automatic slot migration finalization #2697 #2698
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -873,9 +873,12 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): | |
c_nodes_admin, | ||
) | ||
|
||
assert await c_nodes[0].set("KEY0", "value") | ||
assert await c_nodes[0].set("KEY1", "value") | ||
assert await c_nodes[1].set("KEY2", "value") | ||
assert await c_nodes[1].set("KEY3", "value") | ||
|
||
assert await c_nodes[0].set("KEY4", "value") | ||
assert await c_nodes[0].set("KEY5", "value") | ||
assert await c_nodes[1].set("KEY6", "value") | ||
assert await c_nodes[1].set("KEY7", "value") | ||
assert await c_nodes[0].set("KEY8", "value") | ||
|
@@ -891,26 +894,13 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): | |
assert await c_nodes[1].set("KEY18", "value") | ||
assert await c_nodes[1].set("KEY19", "value") | ||
|
||
assert await c_nodes[0].execute_command("DBSIZE") == 10 | ||
|
||
res = await c_nodes_admin[1].execute_command( | ||
"DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "3000", "9000" | ||
) | ||
assert 1 == res | ||
|
||
assert await c_nodes[0].set("KEY0", "value") | ||
assert await c_nodes[0].set("KEY1", "value") | ||
|
||
await asyncio.sleep(0.5) | ||
|
||
assert await c_nodes[0].set("KEY4", "value") | ||
assert await c_nodes[0].set("KEY5", "value") | ||
assert await c_nodes[0].execute_command("DBSIZE") == 10 | ||
|
||
# TODO remove when we add slot blocking | ||
await asyncio.sleep(0.5) | ||
|
||
res = await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-FINALIZE", "1") | ||
assert "OK" == res | ||
|
||
await asyncio.sleep(0.5) | ||
|
||
while ( | ||
|
@@ -1029,6 +1019,30 @@ async def generate_config(): | |
|
||
fill_task = asyncio.create_task(seeder.run()) | ||
|
||
# some time fo seeder | ||
await asyncio.sleep(0.5) | ||
|
||
# Counter that pushes values to a list | ||
async def list_counter(key, client: aioredis.RedisCluster): | ||
for i in itertools.count(start=1): | ||
await client.lpush(key, i) | ||
|
||
# Start ten counters | ||
counter_keys = [f"_counter{i}" for i in range(10)] | ||
counter_connections = [ | ||
aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port) for _ in range(10) | ||
] | ||
counters = [ | ||
asyncio.create_task(list_counter(key, conn)) | ||
for key, conn in zip(counter_keys, counter_connections) | ||
] | ||
|
||
seeder.stop() | ||
await fill_task | ||
|
||
# Generate capture, capture ignores counter keys | ||
capture = await seeder.capture() | ||
|
||
# Generate migration plan | ||
for node_idx, node in enumerate(nodes): | ||
random.shuffle(node.slots) | ||
|
@@ -1063,44 +1077,25 @@ async def generate_config(): | |
keeping = node.slots[num_outgoing:] | ||
node.next_slots.extend(keeping) | ||
|
||
# some more time fo seeder | ||
await asyncio.sleep(1.0) | ||
|
||
seeder.stop() | ||
await fill_task | ||
await asyncio.sleep(1.0) | ||
|
||
# Counter that pushes values to a list | ||
async def list_counter(key, client: aioredis.RedisCluster): | ||
for i in itertools.count(start=1): | ||
await client.lpush(key, i) | ||
|
||
# Start ten counters | ||
counter_keys = [f"_counter{i}" for i in range(10)] | ||
counter_connections = [ | ||
aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port) for _ in range(10) | ||
] | ||
counters = [ | ||
asyncio.create_task(list_counter(key, conn)) | ||
for key, conn in zip(counter_keys, counter_connections) | ||
] | ||
iterations = 0 | ||
while True: | ||
for node in nodes: | ||
states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") | ||
print(states) | ||
if not all(s.endswith("FINISHED") for s in states) and not states == "NO_STATE": | ||
break | ||
else: | ||
break | ||
|
||
# Generate capture, capture ignores counter keys | ||
capture = await seeder.capture() | ||
iterations += 1 | ||
assert iterations < 100 | ||
|
||
# Finalize slot migration | ||
for node in nodes: | ||
for sync_id in node.sync_ids: | ||
assert "OK" == await node.admin_client.execute_command( | ||
"DFLYCLUSTER", "SLOT-MIGRATION-FINALIZE", sync_id | ||
) | ||
await asyncio.sleep(0.1) | ||
|
||
# Stop counters | ||
for counter in counters: | ||
counter.cancel() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we push below the old slot configuration and not the new. also I see the code under # Transfer nodes that updates the node configuration only after the push There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like you are right. I will discuss this with Vlad. I've tried to exchange these operations and the test fails after that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see you changed it now.. is something fixed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no the test is failed) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just wanted to check is it the same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, we push the old config, because generate_config takes the values of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found the bug in the code so will fix it later) |
||
|
||
# need this sleep to avoid race between finalize and config | ||
await asyncio.sleep(0.5) | ||
# Push new config | ||
await push_config(json.dumps(await generate_config()), [node.admin_client for node in nodes]) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also start slot migration need to be removed dont forget
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will have another PR for migration initiation refactoring.