-
Notifications
You must be signed in to change notification settings - Fork 998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: separate Heartbeat and ShardHandler to fibers #3936
Conversation
Signed-off-by: kostas <[email protected]>
src/server/engine_shard.h
Outdated
|
||
void Heartbeat(); | ||
void RetireExpiredAndEvict(); | ||
|
||
void RunPeriodic(std::chrono::milliseconds period_ms, std::function<void()> shard_handler); | ||
void RunHeartbeatPeriodic(std::chrono::milliseconds period_ms, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does heartbeat still gets shard handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to keep the callback just in case we want to use it the future but I agree it redundant. I will remove
src/server/engine_shard.cc
Outdated
}); | ||
} else { | ||
fiber_shard_handler_periodic_ = | ||
MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see the condition in origin code RunPeriodic to run the shard handler
if (shard_handler && last_handler_ms + 100 < last_heartbeat_ms)
we run shard_handler every 100 ms this is unrelated to to FLAGS_hz
I believe this function StartPeriodicFiberImpl is rednudant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on the condition implicitly when the period is greater than 100 ms because we wait period_ms
. So if let's say, period_ms=500
the shard handler will run every 500ms
(we always wait period_ms between runs) so that 100
is really a minimum. I refactored this a little bit to mirror this behaviour
tests/dragonfly/replication_test.py
Outdated
|
||
await asyncio.sleep(1) | ||
await asyncio.sleep(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, it was accidental
tests/dragonfly/replication_test.py
Outdated
|
||
await c_master.execute_command("debug", "populate", "100000", "foo", "5000") | ||
|
||
class ExpirySeeder: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put in utility.py
tests/dragonfly/replication_test.py
Outdated
|
||
seeder = ExpirySeeder() | ||
seeder_task = asyncio.create_task(seeder.run(c_master)) | ||
await seeder.wait_until(50000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the expire seeder will stop setting here wouldnt the bug will reproduce as well?
do we need to continue sending commands to get to the bug?
tests/dragonfly/replication_test.py
Outdated
|
||
async def run(self, client): | ||
while not self.stop_flag: | ||
await client.execute_command(f"SET tmp{self.i} bar{self.i} EX 4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use pipeline to speed up things
tests/dragonfly/replication_test.py
Outdated
await client.execute_command(f"SET tmp{self.i} bar{self.i} EX 4") | ||
self.i = self.i + 1 | ||
|
||
async def wait_until(self, count): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait_untill_n_inserts or something like this
tests/dragonfly/replication_test.py
Outdated
await c_replica.execute_command("debug replica pause") | ||
|
||
# Dragonfly will get stuck here. The journal writes to a channel which will block on write. | ||
# Hearbeat() will be called and will get stuck while it tries to evict an expired item (because |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a. please write "expires" and not "evict" expired item
b. I would not write blocked while pushes to channel as this is very specific implemenation detail which also going to change. The reason we get stuck is that we block on write to socket as replica does not reads from it
c. please dont write that BreakStalledFlows() will never be called as this is your bug fix, I would describe the bug and this details in github bug issue
# Timeout set to 3 seconds because we must first saturate the socket such that subsequent | ||
# writes block. Otherwise, we will break the flows before Heartbeat actually deadlocks. | ||
master = df_factory.create( | ||
proactor_threads=2, replication_timeout=3000, vmodule="replica=2,dflycmd=2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why specify proactor_threads=2 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it was easier to reproduce with 2 threads. I haven;t tried with more so maybe the test requires tuning.
tests/dragonfly/replication_test.py
Outdated
@@ -2295,7 +2295,9 @@ async def test_announce_ip_port(df_factory): | |||
@pytest.mark.asyncio | |||
async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, df_seeder_factory): | |||
# setting replication_timeout to a very small value to force the replica to timeout | |||
master = df_factory.create(replication_timeout=100, vmodule="replica=2,dflycmd=2") | |||
master = df_factory.create( | |||
proactor_threads=2, replication_timeout=100, vmodule="replica=2,dflycmd=2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, no need of proactor_threads=2
tests/dragonfly/replication_test.py
Outdated
|
||
await asyncio.sleep(1) | ||
await asyncio.sleep(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, it was accidental
src/server/engine_shard.h
Outdated
|
||
void Heartbeat(); | ||
void RetireExpiredAndEvict(); | ||
|
||
void RunPeriodic(std::chrono::milliseconds period_ms, std::function<void()> shard_handler); | ||
void RunHeartbeatPeriodic(std::chrono::milliseconds period_ms, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to keep the callback just in case we want to use it the future but I agree it redundant. I will remove
The issue surfaced when the test started failing in #3891.
Heartbeat
willblock indefinitely
ifTriggerJournalWriteToSink
can'tpush
theserialized data
to the channel (if the replica becomes unresponsive). The consequence of that is that theshard_handler
and theBreakStalledFlows
will never be called and df willstall
. The solution is to split the blocking/preemptive flow (Heartbeat) and theBreakStalledFlows
into separate fibers.Resolves: #3937