Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: track simulated timers per dispatcher to simplify thread interactions and locking. #12609

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GradientControllerTest : public testing::Test {
stats_, random_, time_system_);

// Advance time so that the latency sample calculations don't underflow if monotonic time is 0.
time_system_.advanceTimeAsync(std::chrono::hours(42));
advanceTimeAndLoop(std::chrono::hours(42));

return config;
}
Expand Down Expand Up @@ -107,6 +107,11 @@ class GradientControllerTest : public testing::Test {
.value());
}

template <typename DurationType> void advanceTimeAndLoop(DurationType duration) {
time_system_.advanceTimeAsync(duration);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about having advanceTimeAsync take a dispatcher and whether to run it blocking or non-blocking? It seems like it would be almost always broken to not do this pattern when using this API? If there are some cases I guess the dispatcher could be optional in the API but it would at least make test writers think about it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmarantz @antoniovicente ping on this one. I see a new commit but I'm unsure of the plan here. If this is not needed we can merge but wanted to check.

/wait-any

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, was doing some research in order to provide a complete reply to your comment.

As you noticed I rolled back this file since the changes to it are no longer required once I moved to updating monotonic time directly instead of updating next_monotonic_time and propagating it to event loops next time they run.

The idea of taking a dispatcher on advanceTimeAsync is an interesting suggestion. I found that most uses of this function happen in cases where there is a single active Dispatcher, but there's 1 exception to this in ServerStatsTest.FlushStats. Still it makes sense to have a SimulatedTimeSystem function that moves time forward and then runs the dispatcher event loop. Preferences for doing that change in this PR or a followup cleanup?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferences for doing that change in this PR or a followup cleanup?

Up to you, I'm fine either way. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do the cleanup as a followup.

dispatcher_->run(Event::Dispatcher::RunType::Block);
}

Event::SimulatedTimeSystem time_system_;
Stats::TestUtil::TestStore stats_;
NiceMock<Runtime::MockLoader> runtime_;
Expand Down Expand Up @@ -258,7 +263,7 @@ TEST_F(GradientControllerTest, MinRTTEpoch) {
const int min_concurrency = 2;
auto controller = makeController(yaml);
const auto min_rtt = std::chrono::milliseconds(1350);
time_system_.advanceTimeAsync(min_rtt);
advanceTimeAndLoop(min_rtt);

verifyMinRTTActive();
EXPECT_EQ(controller->concurrencyLimit(), min_concurrency);
Expand All @@ -270,7 +275,7 @@ TEST_F(GradientControllerTest, MinRTTEpoch) {
uint32_t last_limit = controller->concurrencyLimit();
for (int i = 0; i < 29; ++i) {
tryForward(controller, true);
time_system_.advanceTimeAsync(std::chrono::seconds(1));
advanceTimeAndLoop(std::chrono::seconds(1));
sampleLatency(controller, min_rtt);
dispatcher_->run(Event::Dispatcher::RunType::Block);
EXPECT_GT(controller->concurrencyLimit(), last_limit);
Expand All @@ -286,8 +291,7 @@ TEST_F(GradientControllerTest, MinRTTEpoch) {
}

// Move into the next minRTT window while the requests are outstanding.
time_system_.advanceTimeAsync(std::chrono::seconds(5));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::seconds(5));
verifyMinRTTActive();
EXPECT_EQ(controller->concurrencyLimit(), min_concurrency);

Expand Down Expand Up @@ -330,7 +334,7 @@ TEST_F(GradientControllerTest, MinRTTLogicTest) {
}
tryForward(controller, false);
tryForward(controller, false);
time_system_.advanceTimeAsync(min_rtt);
advanceTimeAndLoop(min_rtt);
for (int i = 0; i < 7; ++i) {
sampleLatency(controller, min_rtt);
}
Expand Down Expand Up @@ -427,8 +431,7 @@ TEST_F(GradientControllerTest, MinRTTBufferTest) {
// prevent the concurrency limit from decreasing.
sampleLatency(controller, std::chrono::milliseconds(6));
}
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
EXPECT_GT(controller->concurrencyLimit(), last_concurrency);
}
}
Expand Down Expand Up @@ -459,8 +462,7 @@ TEST_F(GradientControllerTest, ConcurrencyLimitBehaviorTestBasic) {

// Ensure that the concurrency window increases on its own due to the headroom calculation with
// the max gradient.
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
EXPECT_GE(controller->concurrencyLimit(), 7);
EXPECT_LE(controller->concurrencyLimit() / 7.0, 2.0);

Expand All @@ -472,8 +474,7 @@ TEST_F(GradientControllerTest, ConcurrencyLimitBehaviorTestBasic) {
tryForward(controller, true);
sampleLatency(controller, std::chrono::milliseconds(4));
}
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
// Verify the minimum gradient.
EXPECT_LE(last_concurrency, controller->concurrencyLimit());
EXPECT_GE(static_cast<double>(last_concurrency) / controller->concurrencyLimit(), 0.5);
Expand All @@ -486,8 +487,7 @@ TEST_F(GradientControllerTest, ConcurrencyLimitBehaviorTestBasic) {
tryForward(controller, true);
sampleLatency(controller, std::chrono::milliseconds(6));
}
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
EXPECT_LT(controller->concurrencyLimit(), last_concurrency);
EXPECT_GE(controller->concurrencyLimit(), 7);
}
Expand All @@ -513,7 +513,7 @@ TEST_F(GradientControllerTest, MinRTTReturnToPreviousLimit) {
// Get initial minRTT measurement out of the way and advance time so request samples are not
// thought to come from the previous minRTT epoch.
advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5));
time_system_.advanceTimeAsync(std::chrono::seconds(1));
advanceTimeAndLoop(std::chrono::seconds(1));

// Force the limit calculation to run a few times from some measurements.
for (int sample_iters = 0; sample_iters < 5; ++sample_iters) {
Expand All @@ -522,21 +522,19 @@ TEST_F(GradientControllerTest, MinRTTReturnToPreviousLimit) {
tryForward(controller, true);
sampleLatency(controller, std::chrono::milliseconds(4));
}
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
// Verify the value is growing.
EXPECT_GT(controller->concurrencyLimit(), last_concurrency);
}

const auto limit_val = controller->concurrencyLimit();

// Wait until the minRTT recalculation is triggered again and verify the limit drops.
time_system_.advanceTimeAsync(std::chrono::seconds(31));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::seconds(31));
EXPECT_EQ(controller->concurrencyLimit(), 3);

// Advance time again for request samples to appear from the current epoch.
time_system_.advanceTimeAsync(std::chrono::seconds(1));
advanceTimeAndLoop(std::chrono::seconds(1));

// 49 more requests should cause the minRTT to be done calculating.
for (int i = 0; i < 5; ++i) {
Expand Down Expand Up @@ -569,7 +567,7 @@ TEST_F(GradientControllerTest, MinRTTRescheduleTest) {
// Get initial minRTT measurement out of the way and advance time so request samples are not
// thought to come from the previous minRTT epoch.
advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5));
time_system_.advanceTimeAsync(std::chrono::seconds(1));
advanceTimeAndLoop(std::chrono::seconds(1));

// Force the limit calculation to run a few times from some measurements.
for (int sample_iters = 0; sample_iters < 5; ++sample_iters) {
Expand All @@ -578,20 +576,17 @@ TEST_F(GradientControllerTest, MinRTTRescheduleTest) {
tryForward(controller, true);
sampleLatency(controller, std::chrono::milliseconds(4));
}
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
// Verify the value is growing.
EXPECT_GT(controller->concurrencyLimit(), last_concurrency);
}

// Wait until the minRTT recalculation is triggered again and verify the limit drops.
time_system_.advanceTimeAsync(std::chrono::seconds(31));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::seconds(31));
EXPECT_EQ(controller->concurrencyLimit(), 3);

// Verify sample recalculation doesn't occur during the minRTT window.
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
EXPECT_EQ(controller->concurrencyLimit(), 3);
}

Expand Down Expand Up @@ -622,17 +617,15 @@ TEST_F(GradientControllerTest, NoSamplesTest) {
tryForward(controller, true);
sampleLatency(controller, std::chrono::milliseconds(4));
}
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
// Verify the value is growing.
EXPECT_GT(controller->concurrencyLimit(), last_concurrency);
}

// Now we make sure that the limit value doesn't change in the absence of samples.
for (int sample_iters = 0; sample_iters < 5; ++sample_iters) {
const auto old_limit = controller->concurrencyLimit();
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
EXPECT_EQ(old_limit, controller->concurrencyLimit());
}
}
Expand Down Expand Up @@ -676,7 +669,7 @@ TEST_F(GradientControllerTest, TimerAccuracyTest) {
EXPECT_CALL(*sample_timer, enableTimer(std::chrono::milliseconds(123), _));
for (int i = 0; i < 6; ++i) {
tryForward(controller, true);
time_system_.advanceTimeAsync(std::chrono::milliseconds(5));
advanceTimeAndLoop(std::chrono::milliseconds(5));
sampleLatency(controller, std::chrono::milliseconds(5));
}
}
Expand Down Expand Up @@ -716,7 +709,7 @@ TEST_F(GradientControllerTest, TimerAccuracyTestNoJitter) {
EXPECT_CALL(*sample_timer, enableTimer(std::chrono::milliseconds(123), _));
for (int i = 0; i < 6; ++i) {
tryForward(controller, true);
time_system_.advanceTimeAsync(std::chrono::milliseconds(5));
advanceTimeAndLoop(std::chrono::milliseconds(5));
sampleLatency(controller, std::chrono::milliseconds(5));
}
}
Expand Down Expand Up @@ -749,8 +742,7 @@ TEST_F(GradientControllerTest, ConsecutiveMinConcurrencyReset) {

// Ensure that the concurrency window increases on its own due to the headroom calculation with
// the max gradient.
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
EXPECT_GE(controller->concurrencyLimit(), 7);
EXPECT_LE(controller->concurrencyLimit() / 7.0, 2.0);

Expand All @@ -762,8 +754,7 @@ TEST_F(GradientControllerTest, ConsecutiveMinConcurrencyReset) {
tryForward(controller, true);
sampleLatency(controller, elevated_latency);
}
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
}

// Verify that the concurrency limit starts growing with newly measured minRTT.
Expand All @@ -773,8 +764,7 @@ TEST_F(GradientControllerTest, ConsecutiveMinConcurrencyReset) {
tryForward(controller, true);
sampleLatency(controller, elevated_latency);
}
time_system_.advanceTimeAsync(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
advanceTimeAndLoop(std::chrono::milliseconds(101));
EXPECT_GE(controller->concurrencyLimit(), last_concurrency);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ class ActiveQuicListenerTest : public QuicMultiVersionTest {
auto proof_source = std::make_unique<TestProofSource>();
filter_chain_ = &proof_source->filterChain();
crypto_config_peer.ResetProofSource(std::move(proof_source));
simulated_time_system_.advanceTimeWait(std::chrono::milliseconds(100));
simulated_time_system_.advanceTimeAsync(std::chrono::milliseconds(100));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}

Network::ActiveUdpListenerFactoryPtr createQuicListenerFactory(const std::string& yaml) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class EnvoyQuicDispatcherTest : public QuicMultiVersionTest,

void SetUp() override {
// Advance time a bit because QuicTime regards 0 as uninitialized timestamp.
time_system_.advanceTimeWait(std::chrono::milliseconds(100));
time_system_.advanceTimeAsync(std::chrono::milliseconds(100));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
EXPECT_CALL(listener_config_, perConnectionBufferLimitBytes())
.WillRepeatedly(Return(1024 * 1024));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ class EnvoyQuicServerSessionTest : public testing::TestWithParam<bool> {

// Advance time and trigger update of Dispatcher::approximateMonotonicTime()
// because zero QuicTime is considered uninitialized.
time_system_.advanceTimeWait(std::chrono::milliseconds(1));
time_system_.advanceTimeAsync(std::chrono::milliseconds(1));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
connection_helper_.GetClock()->Now();

ON_CALL(writer_, WritePacket(_, _, _, _, _))
Expand Down Expand Up @@ -669,7 +670,8 @@ TEST_P(EnvoyQuicServerSessionTest, FlushAndWaitForCloseWithTimeout) {
EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state());
// Unblocking the stream shouldn't close the connection as it should be
// delayed.
time_system_.advanceTimeWait(std::chrono::milliseconds(10));
time_system_.advanceTimeAsync(std::chrono::milliseconds(10));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
envoy_quic_session_.OnCanWrite();
// delay close alarm should have been rescheduled.
time_system_.advanceTimeAsync(std::chrono::milliseconds(90));
Expand Down Expand Up @@ -700,7 +702,8 @@ TEST_P(EnvoyQuicServerSessionTest, FlusWriteTransitToFlushWriteWithDelay) {
envoy_quic_session_.close(Network::ConnectionCloseType::FlushWrite);
EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state());

time_system_.advanceTimeWait(std::chrono::milliseconds(10));
time_system_.advanceTimeAsync(std::chrono::milliseconds(10));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
// The closing behavior should be changed.
envoy_quic_session_.close(Network::ConnectionCloseType::FlushWriteAndDelay);
// Unblocking the stream shouldn't close the connection as it should be
Expand Down Expand Up @@ -732,7 +735,8 @@ TEST_P(EnvoyQuicServerSessionTest, FlushAndWaitForCloseWithNoPendingData) {

// Advance the time a bit and try to close again. The delay close timer
// shouldn't be rescheduled by this call.
time_system_.advanceTimeWait(std::chrono::milliseconds(10));
time_system_.advanceTimeAsync(std::chrono::milliseconds(10));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
envoy_quic_session_.close(Network::ConnectionCloseType::FlushWriteAndDelay);
EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state());

Expand Down
1 change: 1 addition & 0 deletions test/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ envoy_cc_test(

envoy_cc_test(
name = "guarddog_impl_test",
size = "small",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explanation of this change:
Before other changes in this PR this test occasionally times out due to flaky alarm behavior as seen in #12638

This test usually runs in about 5s to 10s in standard confirgurations, and 20 seconds under clang-tsan. It may make sense to reduce the configured timeout for this test to small which has an associated timeout of 60 secs in order to get faster feedback when this test fails due to timeout.

srcs = ["guarddog_impl_test.cc"],
tags = ["flaky_on_windows"],
deps = [
Expand Down
Loading