Skip to content

Commit

Permalink
Send connection WINDOW_UPDATE before RTT PING (#97881)
Browse files Browse the repository at this point in the history
Some servers in GCP have a non-standard ping accounting mechanism, meaning they reset their unsolicited PING counter when they receive DATA, HEADERS or WINDOW_UPDATE. To comply with this behavior, this PR adjusts the RTT logic ensuring that a connection WINDOW_UPDATE is being sent out before we send an RTT PING by applying two changes:

- Swap the order of the normal connection WINDOW_UPDATE we attempt after every DATA frame received and the sending of the RTT PING so WINDOW_UPDATE goes first.
- In case we need to send an RTT PING, and we didn't send a normal connection WINDOW_UPDATE (threshold not reached), send a WINDOW_UPDATE anyways with current _pendingWindowUpdate credits. Do not send a PING if this is not possible (_pendingWindowUpdate == 0). Just like RTT PINGs, such WINDOW_UPDATEs are relatively rare and should not hurt performance.
  • Loading branch information
antonfirsov authored Feb 13, 2024
1 parent b8e1296 commit fae6720
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ private async ValueTask ProcessHeadersFrame(FrameHeader frameHeader)
if (http2Stream != null)
{
http2Stream.OnHeadersStart();
_rttEstimator.OnDataOrHeadersReceived(this);
_rttEstimator.OnDataOrHeadersReceived(this, sendWindowUpdateBeforePing: true);
headersHandler = http2Stream;
}
else
Expand Down Expand Up @@ -766,21 +766,16 @@ private void ProcessDataFrame(FrameHeader frameHeader)

ReadOnlySpan<byte> frameData = GetFrameData(_incomingBuffer.ActiveSpan.Slice(0, frameHeader.PayloadLength), hasPad: frameHeader.PaddedFlag, hasPriority: false);

if (http2Stream != null)
{
bool endStream = frameHeader.EndStreamFlag;

http2Stream.OnResponseData(frameData, endStream);

if (!endStream && frameData.Length > 0)
{
_rttEstimator.OnDataOrHeadersReceived(this);
}
}
bool endStream = frameHeader.EndStreamFlag;
http2Stream?.OnResponseData(frameData, endStream);

if (frameData.Length > 0)
{
ExtendWindow(frameData.Length);
bool windowUpdateSent = ExtendWindow(frameData.Length);
if (http2Stream is not null && !endStream)
{
_rttEstimator.OnDataOrHeadersReceived(this, sendWindowUpdateBeforePing: !windowUpdateSent);
}
}

_incomingBuffer.Discard(frameHeader.PayloadLength);
Expand Down Expand Up @@ -1772,7 +1767,7 @@ private Task SendWindowUpdateAsync(int streamId, int amount)
});
}

private void ExtendWindow(int amount)
private bool ExtendWindow(int amount)
{
if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(amount)}={amount}");
Debug.Assert(amount > 0);
Expand All @@ -1786,14 +1781,25 @@ private void ExtendWindow(int amount)
if (_pendingWindowUpdate < ConnectionWindowThreshold)
{
if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_pendingWindowUpdate)} {_pendingWindowUpdate} < {ConnectionWindowThreshold}.");
return;
return false;
}

windowUpdateSize = _pendingWindowUpdate;
_pendingWindowUpdate = 0;
}

LogExceptions(SendWindowUpdateAsync(0, windowUpdateSize));
return true;
}

private bool ForceSendConnectionWindowUpdate()
{
if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_pendingWindowUpdate)}={_pendingWindowUpdate}");
if (_pendingWindowUpdate == 0) return false;

LogExceptions(SendWindowUpdateAsync(0, _pendingWindowUpdate));
_pendingWindowUpdate = 0;
return true;
}

public override long GetIdleTicks(long nowTicks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,18 @@ private void AdjustWindowDynamic(int bytesConsumed, Http2Stream stream)
// Assuming that the network characteristics of the connection wouldn't change much within its lifetime, we are maintaining a running minimum value.
// The more PINGs we send, the more accurate is the estimation of MinRtt, however we should be careful not to send too many of them,
// to avoid triggering the server's PING flood protection which may result in an unexpected GOAWAY.
// With most servers we are fine to send PINGs, as long as we are reading their data, this rule is well formalized for gRPC:
//
// Several strategies have been implemented to conform with real life servers.
// 1. With most servers we are fine to send PINGs as long as we are reading their data, a rule formalized by a gRPC spec:
// https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md
// As a rule of thumb, we can send send a PING whenever we receive DATA or HEADERS, however, there are some servers which allow receiving only
// a limited amount of PINGs within a given timeframe.
// To deal with the conflicting requirements:
// - We send an initial burst of 'InitialBurstCount' PINGs, to get a relatively good estimation fast
// - Afterwards, we send PINGs with the maximum frequency of 'PingIntervalInSeconds' PINGs per second
// According to this rule, we are OK to send a PING whenever we receive DATA or HEADERS, since the servers conforming to this doc
// will reset their unsolicited ping counter whenever they *send* DATA or HEADERS.
// 2. Some servers allow receiving only a limited amount of PINGs within a given timeframe.
// To deal with this, we send an initial burst of 'InitialBurstCount' (=4) PINGs, to get a relatively good estimation fast. Afterwards,
// we send PINGs each 'PingIntervalInSeconds' second, to maintain our estimation without triggering these servers.
// 3. Some servers in Google's backends reset their unsolicited ping counter when they *receive* DATA, HEADERS, or WINDOW_UPDATE.
// To deal with this, we need to make sure to send a connection WINDOW_UPDATE before sending a PING. The initial burst is an exception
// to this rule, since the mentioned server can tolerate 4 PINGs without receiving a WINDOW_UPDATE.
//
// Threading:
// OnInitialSettingsSent() is called during initialization, all other methods are triggered by HttpConnection.ProcessIncomingFramesAsync(),
Expand Down Expand Up @@ -194,7 +199,7 @@ internal void OnInitialSettingsAckReceived(Http2Connection connection)
_state = State.Waiting;
}

internal void OnDataOrHeadersReceived(Http2Connection connection)
internal void OnDataOrHeadersReceived(Http2Connection connection, bool sendWindowUpdateBeforePing)
{
if (_state != State.Waiting) return;

Expand All @@ -204,6 +209,14 @@ internal void OnDataOrHeadersReceived(Http2Connection connection)
{
if (initial) _initialBurst--;

// When sendWindowUpdateBeforePing is true, try to send a WINDOW_UPDATE to make Google backends happy.
// Unless we are doing the initial burst, do not send PING if we were not able to send the WINDOW_UPDATE.
// See point 3. in the comments above the class definition for more info.
if (sendWindowUpdateBeforePing && !connection.ForceSendConnectionWindowUpdate() && !initial)
{
return;
}

// Send a PING
_pingCounter--;
if (NetEventSource.Log.IsEnabled()) connection.Trace($"[FlowControl] Sending RTT PING with payload {_pingCounter}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2453,6 +2453,7 @@ public async Task PostAsyncDuplex_ClientSendsEndStream_Success()
HttpResponseMessage response = await responseTask;
Stream responseStream = await response.Content.ReadAsStreamAsync();

connection.IgnoreWindowUpdates();
// Send some data back and forth
await SendAndReceiveResponseDataAsync(contentBytes, responseStream, connection, streamId);
await SendAndReceiveResponseDataAsync(contentBytes, responseStream, connection, streamId);
Expand Down Expand Up @@ -2513,6 +2514,7 @@ public async Task PostAsyncDuplex_ServerSendsEndStream_Success()
HttpResponseMessage response = await responseTask;
Stream responseStream = await response.Content.ReadAsStreamAsync();

connection.IgnoreWindowUpdates();
// Send some data back and forth
await SendAndReceiveResponseDataAsync(contentBytes, responseStream, connection, streamId);
await SendAndReceiveResponseDataAsync(contentBytes, responseStream, connection, streamId);
Expand Down Expand Up @@ -2833,6 +2835,7 @@ public async Task PostAsyncDuplex_DisposeResponseBodyBeforeEnd_ResetsStreamAndTh
// This allows the request processing to complete.
duplexContent.Fail(e);

connection.IgnoreWindowUpdates(); // The RTT algorithm may send a WINDOW_UPDATE before RST_STREAM.
// Client should set RST_STREAM.
await connection.ReadRstStreamAsync(streamId);
}
Expand Down Expand Up @@ -2906,6 +2909,7 @@ public async Task PostAsyncDuplex_DisposeResponseBodyAfterEndReceivedButBeforeCo
// This allows the request processing to complete.
duplexContent.Fail(e);

connection.IgnoreWindowUpdates(); // The RTT algorithm may send a WINDOW_UPDATE before RST_STREAM.
// Client should set RST_STREAM.
await connection.ReadRstStreamAsync(streamId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ static async Task RunTest()
TimeSpan.FromMilliseconds(30),
TimeSpan.Zero,
2 * 1024 * 1024,
null);
maxWindowForPingStopValidation: MaxWindow);

Assert.True(maxCredit <= MaxWindow);
}
Expand Down Expand Up @@ -181,19 +181,34 @@ static async Task RunTest()
RemoteExecutor.Invoke(RunTest, options).Dispose();
}

[OuterLoop("Runs long")]
[Fact]
public async Task LongRunningSlowServerStream_NoInvalidPingsAreSent()
{
// A scenario similar to https://github.com/grpc/grpc-dotnet/issues/2361.
// We need to send a small amount of data so the connection window is not consumed and no "standard" WINDOW_UPDATEs are sent and
// we also need to do it very slowly to cover some RTT PINGs after the initial burst.
// This scenario should trigger the "forced WINDOW_UPDATE" logic in the implementation, ensuring that no more than 4 PINGs are sent without a WINDOW_UPDATE.
await TestClientWindowScalingAsync(
TimeSpan.FromMilliseconds(500),
TimeSpan.FromMilliseconds(500),
1024,
_output,
dataPerFrame: 32);
}

private static async Task<int> TestClientWindowScalingAsync(
TimeSpan networkDelay,
TimeSpan slowBandwidthSimDelay,
int bytesToDownload,
ITestOutputHelper output = null,
int maxWindowForPingStopValidation = int.MaxValue, // set to actual maximum to test if we stop sending PING when window reached maximum
Action<SocketsHttpHandler> configureHandler = null)
int dataPerFrame = 16384,
int maxWindowForPingStopValidation = 16 * 1024 * 1024) // set to actual maximum to test if we stop sending PING when window reached maximum
{
TimeSpan timeout = TimeSpan.FromSeconds(30);
CancellationTokenSource timeoutCts = new CancellationTokenSource(timeout);

HttpClientHandler handler = CreateHttpClientHandler(HttpVersion20.Value);
configureHandler?.Invoke(GetUnderlyingSocketsHttpHandler(handler));

using Http2LoopbackServer server = Http2LoopbackServer.CreateServer(NoAutoPingResponseHttp2Options);
using HttpClient client = new HttpClient(handler, true);
Expand Down Expand Up @@ -225,13 +240,13 @@ private static async Task<int> TestClientWindowScalingAsync(
using SemaphoreSlim writeSemaphore = new SemaphoreSlim(1);
int remainingBytes = bytesToDownload;

bool pingReceivedAfterReachingMaxWindow = false;
string unexpectedPingReason = null;
bool unexpectedFrameReceived = false;
CancellationTokenSource stopFrameProcessingCts = new CancellationTokenSource();

CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stopFrameProcessingCts.Token, timeoutCts.Token);
Task processFramesTask = ProcessIncomingFramesAsync(linkedCts.Token);
byte[] buffer = new byte[16384];
byte[] buffer = new byte[dataPerFrame];

while (remainingBytes > 0)
{
Expand Down Expand Up @@ -259,7 +274,7 @@ private static async Task<int> TestClientWindowScalingAsync(

int dataReceived = (await response.Content.ReadAsByteArrayAsync()).Length;
Assert.Equal(bytesToDownload, dataReceived);
Assert.False(pingReceivedAfterReachingMaxWindow, "Server received a PING after reaching max window");
Assert.Null(unexpectedPingReason);
Assert.False(unexpectedFrameReceived, "Server received an unexpected frame, see test output for more details.");

return maxCredit;
Expand All @@ -270,6 +285,7 @@ async Task ProcessIncomingFramesAsync(CancellationToken cancellationToken)
// We should not receive any more RTT PING's after this point
int maxWindowCreditThreshold = (int) (0.9 * maxWindowForPingStopValidation);
output?.WriteLine($"maxWindowCreditThreshold: {maxWindowCreditThreshold} maxWindowForPingStopValidation: {maxWindowForPingStopValidation}");
int pingsWithoutWindowUpdate = 0;

try
{
Expand All @@ -284,10 +300,18 @@ async Task ProcessIncomingFramesAsync(CancellationToken cancellationToken)

output?.WriteLine($"Received PING ({pingFrame.Data})");

pingsWithoutWindowUpdate++;
if (maxCredit > maxWindowCreditThreshold)
{
output?.WriteLine("PING was unexpected");
Volatile.Write(ref pingReceivedAfterReachingMaxWindow, true);
Volatile.Write(ref unexpectedPingReason, "The server received a PING after reaching max window");
output?.WriteLine($"PING was unexpected: {unexpectedPingReason}");
}

// Exceeding this limit may trigger a GOAWAY on some servers. See implementation comments for more details.
if (pingsWithoutWindowUpdate > 4)
{
Volatile.Write(ref unexpectedPingReason, $"The server received {pingsWithoutWindowUpdate} PINGs without receiving a WINDOW_UPDATE");
output?.WriteLine($"PING was unexpected: {unexpectedPingReason}");
}

await writeSemaphore.WaitAsync(cancellationToken);
Expand All @@ -296,6 +320,7 @@ async Task ProcessIncomingFramesAsync(CancellationToken cancellationToken)
}
else if (frame is WindowUpdateFrame windowUpdateFrame)
{
pingsWithoutWindowUpdate = 0;
// Ignore connection window:
if (windowUpdateFrame.StreamId != streamId) continue;

Expand Down

0 comments on commit fae6720

Please sign in to comment.