-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
connection: adding watermarks to the read buffer. #11170
Changes from 5 commits
1bdbb5d
a78eff4
8b87dfa
ca61e93
59a0bf5
6aafcb2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,10 +102,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback | |
} | ||
|
||
// Network::TransportSocketCallbacks | ||
IoHandle& ioHandle() override { return socket_->ioHandle(); } | ||
IoHandle& ioHandle() final { return socket_->ioHandle(); } | ||
const IoHandle& ioHandle() const override { return socket_->ioHandle(); } | ||
Connection& connection() override { return *this; } | ||
void raiseEvent(ConnectionEvent event) override; | ||
void raiseEvent(ConnectionEvent event) final; | ||
// Should the read buffer be drained? | ||
bool shouldDrainReadBuffer() override { | ||
return read_buffer_limit_ > 0 && read_buffer_.length() >= read_buffer_limit_; | ||
|
@@ -122,11 +122,19 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback | |
static uint64_t nextGlobalIdForTest() { return next_global_id_; } | ||
|
||
protected: | ||
// A convenience function which returns true if | ||
// 1) The read disable count is zero or | ||
// 2) The read disable count is one, due to the read buffer being overrun. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't there a chance that the read disable count is > 1 and the buffer is overrun? Like if it got overrun but also disabled some other way? Or is the idea that if it is > 1 we will wait until the only reason is it's disabled due to read overrun? If that's the case can you add more comments just to make it completely clear? |
||
// In either case the consumer of the data would like to read from the buffer. | ||
bool consumerWantsToRead(); | ||
|
||
// Network::ConnectionImplBase | ||
void closeConnectionImmediately() override; | ||
|
||
void closeSocket(ConnectionEvent close_type); | ||
|
||
void onReadBufferLowWatermark(); | ||
void onReadBufferHighWatermark(); | ||
void onWriteBufferLowWatermark(); | ||
void onWriteBufferHighWatermark(); | ||
|
||
|
@@ -135,7 +143,9 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback | |
StreamInfo::StreamInfo& stream_info_; | ||
FilterManagerImpl filter_manager_; | ||
|
||
Buffer::OwnedImpl read_buffer_; | ||
// Ensure that if the consumer of the data from this connection isn't | ||
// consuming, that the connection eventually stops reading from the wire. | ||
Buffer::WatermarkBuffer read_buffer_; | ||
// This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has | ||
// a generic pointer. | ||
// It MUST be defined after the filter_manager_ as some filters may have callbacks that | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,6 +93,12 @@ TEST_P(ConnectionImplDeathTest, BadFd) { | |
".*assert failure: SOCKET_VALID\\(ConnectionImpl::ioHandle\\(\\)\\.fd\\(\\)\\).*"); | ||
} | ||
|
||
class TestClientConnectionImpl : public Network::ClientConnectionImpl { | ||
public: | ||
using ClientConnectionImpl::ClientConnectionImpl; | ||
Buffer::WatermarkBuffer& readBuffer() { return read_buffer_; } | ||
}; | ||
|
||
class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> { | ||
protected: | ||
ConnectionImplTest() : api_(Api::createApiForTest(time_system_)), stream_info_(time_system_) {} | ||
|
@@ -104,9 +110,9 @@ class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> { | |
socket_ = std::make_shared<Network::TcpListenSocket>(Network::Test::getAnyAddress(GetParam()), | ||
nullptr, true); | ||
listener_ = dispatcher_->createListener(socket_, listener_callbacks_, true); | ||
client_connection_ = dispatcher_->createClientConnection( | ||
socket_->localAddress(), source_address_, Network::Test::createRawBufferSocket(), | ||
socket_options_); | ||
client_connection_ = std::make_unique<Network::TestClientConnectionImpl>( | ||
*dispatcher_, socket_->localAddress(), source_address_, | ||
Network::Test::createRawBufferSocket(), socket_options_); | ||
client_connection_->addConnectionCallbacks(client_callbacks_); | ||
EXPECT_EQ(nullptr, client_connection_->ssl()); | ||
const Network::ClientConnection& const_connection = *client_connection_; | ||
|
@@ -215,6 +221,9 @@ class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> { | |
return ConnectionMocks{std::move(dispatcher), timer, std::move(transport_socket), file_event, | ||
&file_ready_cb_}; | ||
} | ||
Network::TestClientConnectionImpl* testClientConnection() { | ||
return dynamic_cast<Network::TestClientConnectionImpl*>(client_connection_.get()); | ||
} | ||
|
||
Event::FileReadyCb file_ready_cb_; | ||
Event::SimulatedTimeSystem time_system_; | ||
|
@@ -742,7 +751,7 @@ TEST_P(ConnectionImplTest, HalfCloseNoEarlyCloseDetection) { | |
} | ||
|
||
// Test that as watermark levels are changed, the appropriate callbacks are triggered. | ||
TEST_P(ConnectionImplTest, Watermarks) { | ||
TEST_P(ConnectionImplTest, WriteWatermarks) { | ||
useMockBuffer(); | ||
|
||
setUpBasicConnection(); | ||
|
@@ -791,6 +800,123 @@ TEST_P(ConnectionImplTest, Watermarks) { | |
disconnect(false); | ||
} | ||
|
||
// Test that as watermark levels are changed, the appropriate callbacks are triggered. | ||
TEST_P(ConnectionImplTest, ReadWatermarks) { | ||
|
||
setUpBasicConnection(); | ||
client_connection_->setBufferLimits(2); | ||
std::shared_ptr<MockReadFilter> client_read_filter(new NiceMock<MockReadFilter>()); | ||
client_connection_->addReadFilter(client_read_filter); | ||
connect(); | ||
|
||
EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
EXPECT_TRUE(client_connection_->readEnabled()); | ||
// Add 4 bytes to the buffer and verify the connection becomes read disabled. | ||
{ | ||
Buffer::OwnedImpl buffer("data"); | ||
server_connection_->write(buffer, false); | ||
EXPECT_CALL(*client_read_filter, onData(_, false)) | ||
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { | ||
dispatcher_->exit(); | ||
return FilterStatus::StopIteration; | ||
})); | ||
dispatcher_->run(Event::Dispatcher::RunType::Block); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worth doing a second server_connection write before doing drains, and verify that there is no read from the client connection on dispatcher_->run()? |
||
|
||
EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
EXPECT_FALSE(client_connection_->readEnabled()); | ||
} | ||
|
||
// Drain 3 bytes from the buffer. This bring sit below the low watermark, and | ||
// read enables, as well as triggering a kick for the remaining byte. | ||
{ | ||
testClientConnection()->readBuffer().drain(3); | ||
EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
EXPECT_TRUE(client_connection_->readEnabled()); | ||
|
||
EXPECT_CALL(*client_read_filter, onData(_, false)); | ||
dispatcher_->run(Event::Dispatcher::RunType::NonBlock); | ||
} | ||
|
||
// Add 3 bytes to the buffer and verify the connection becomes read disabled | ||
// again. | ||
{ | ||
Buffer::OwnedImpl buffer("bye"); | ||
server_connection_->write(buffer, false); | ||
EXPECT_CALL(*client_read_filter, onData(_, false)) | ||
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { | ||
dispatcher_->exit(); | ||
return FilterStatus::StopIteration; | ||
})); | ||
dispatcher_->run(Event::Dispatcher::RunType::Block); | ||
|
||
EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
EXPECT_FALSE(client_connection_->readEnabled()); | ||
} | ||
|
||
// Now have the consumer read disable. | ||
// This time when the buffer is drained, there will be no kick as the consumer | ||
// does not want to read. | ||
{ | ||
client_connection_->readDisable(true); | ||
testClientConnection()->readBuffer().drain(3); | ||
EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
EXPECT_FALSE(client_connection_->readEnabled()); | ||
|
||
EXPECT_CALL(*client_read_filter, onData(_, false)).Times(0); | ||
dispatcher_->run(Event::Dispatcher::RunType::NonBlock); | ||
} | ||
|
||
// Now read enable again. | ||
// Inside the onData call, readDisable and readEnable. This should trigger | ||
// another kick on the next dispatcher loop, so onData gets called twice. | ||
{ | ||
client_connection_->readDisable(false); | ||
EXPECT_CALL(*client_read_filter, onData(_, false)) | ||
.Times(2) | ||
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { | ||
client_connection_->readDisable(true); | ||
client_connection_->readDisable(false); | ||
return FilterStatus::StopIteration; | ||
})) | ||
.WillRepeatedly(Return(FilterStatus::StopIteration)); | ||
dispatcher_->run(Event::Dispatcher::RunType::NonBlock); | ||
} | ||
|
||
// Test the same logic for dispatched_buffered_data from the | ||
// onReadReady() (read_disable_count_ != 0) path. | ||
{ | ||
// Fill the buffer and verify the socket is read disabled. | ||
Buffer::OwnedImpl buffer("bye"); | ||
server_connection_->write(buffer, false); | ||
EXPECT_CALL(*client_read_filter, onData(_, false)) | ||
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { | ||
dispatcher_->exit(); | ||
return FilterStatus::StopIteration; | ||
})); | ||
dispatcher_->run(Event::Dispatcher::RunType::Block); | ||
EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); | ||
EXPECT_FALSE(client_connection_->readEnabled()); | ||
|
||
// Read disable and read enable, to set dispatch_buffered_data_ true. | ||
client_connection_->readDisable(true); | ||
client_connection_->readDisable(false); | ||
// Now event loop. This hits the early on-Read path. As above, read | ||
// disable and read enable from inside the stack of onData, to ensure that | ||
// dispatch_buffered_data_ works correctly. | ||
EXPECT_CALL(*client_read_filter, onData(_, false)) | ||
.Times(2) | ||
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { | ||
client_connection_->readDisable(true); | ||
client_connection_->readDisable(false); | ||
return FilterStatus::StopIteration; | ||
})) | ||
.WillRepeatedly(Return(FilterStatus::StopIteration)); | ||
dispatcher_->run(Event::Dispatcher::RunType::NonBlock); | ||
} | ||
|
||
disconnect(true); | ||
} | ||
|
||
// Write some data to the connection. It will automatically attempt to flush | ||
// it to the upstream file descriptor via a write() call to buffer_, which is | ||
// configured to succeed and accept all bytes read. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While you are here can you ASSERT this is greater than 0 before decrementing? I'm surprised this was not already asserted.