Skip to content

Commit

Permalink
Update Async to server library (#22575)
Browse files Browse the repository at this point in the history
  • Loading branch information
hueifeng authored Sep 17, 2020
1 parent a5b6cda commit 1cb3f23
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,12 @@ public Http1ChunkedEncodingMessageBody(bool keepAlive, Http1Connection context)
_requestBodyPipe = CreateRequestBodyPipe(context);
}

public override void AdvanceTo(SequencePosition consumed)
{
AdvanceTo(consumed, consumed);
}

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
TrackConsumedAndExaminedBytes(_readResult, consumed, examined);
_requestBodyPipe.Reader.AdvanceTo(consumed, examined);
}

public override bool TryRead(out ReadResult readResult)
{
ThrowIfCompleted();

return TryReadInternal(out readResult);
}

public override bool TryReadInternal(out ReadResult readResult)
{
TryStart();
Expand All @@ -72,12 +60,6 @@ public override bool TryReadInternal(out ReadResult readResult)
return boolResult;
}

public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
ThrowIfCompleted();
return ReadAsyncInternal(cancellationToken);
}

public override async ValueTask<ReadResult> ReadAsyncInternal(CancellationToken cancellationToken = default)
{
TryStart();
Expand All @@ -103,12 +85,6 @@ public override async ValueTask<ReadResult> ReadAsyncInternal(CancellationToken
return _readResult;
}

public override void Complete(Exception exception)
{
_completed = true;
_context.ReportApplicationError(exception);
}

public override void CancelPendingRead()
{
_requestBodyPipe.Reader.CancelPendingRead();
Expand Down Expand Up @@ -182,15 +158,15 @@ private async Task PumpAsync()
}
finally
{
_requestBodyPipe.Writer.Complete(error);
await _requestBodyPipe.Writer.CompleteAsync(error);
}
}

protected override Task OnStopAsync()
protected override ValueTask OnStopAsync()
{
if (!_context.HasStartedConsumingRequestBody)
{
return Task.CompletedTask;
return default;
}

// call complete here on the reader
Expand All @@ -201,14 +177,14 @@ protected override Task OnStopAsync()
{
// At this point both the request body pipe reader and writer should be completed.
_requestBodyPipe.Reset();
return Task.CompletedTask;
return default;
}

// Should I call complete here?
return StopAsyncAwaited();
}

private async Task StopAsyncAwaited()
private async ValueTask StopAsyncAwaited()
{
_canceled = true;
_context.Input.CancelPendingRead();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ public Http1ContentLengthMessageBody(bool keepAlive, long contentLength, Http1Co
_unexaminedInputLength = _contentLength;
}

public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
ThrowIfCompleted();
return ReadAsyncInternal(cancellationToken);
}

public override async ValueTask<ReadResult> ReadAsyncInternal(CancellationToken cancellationToken = default)
{
VerifyIsNotReading();
Expand Down Expand Up @@ -125,12 +119,6 @@ void ResetReadingState()
return _readResult;
}

public override bool TryRead(out ReadResult readResult)
{
ThrowIfCompleted();
return TryReadInternal(out readResult);
}

public override bool TryReadInternal(out ReadResult readResult)
{
VerifyIsNotReading();
Expand Down Expand Up @@ -216,11 +204,6 @@ private long CreateReadResultFromConnectionReadResult()
return maxLength;
}

public override void AdvanceTo(SequencePosition consumed)
{
AdvanceTo(consumed, consumed);
}

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
if (!_isReading)
Expand Down Expand Up @@ -261,24 +244,12 @@ protected override void OnReadStarting()
}
}

public override void Complete(Exception exception)
{
_context.ReportApplicationError(exception);
_completed = true;
}

public override void CancelPendingRead()
{
Interlocked.Exchange(ref _userCanceled, 1);
_context.Input.CancelPendingRead();
}

protected override Task OnStopAsync()
{
Complete(null);
return Task.CompletedTask;
}

[StackTraceHidden]
private void VerifyIsNotReading()
{
Expand Down
45 changes: 32 additions & 13 deletions src/Servers/Kestrel/Core/src/Internal/Http/Http1MessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,34 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
internal abstract class Http1MessageBody : MessageBody
{
protected readonly Http1Connection _context;
protected bool _completed;
private bool _readerCompleted;

protected Http1MessageBody(Http1Connection context) : base(context)
{
_context = context;
}

[StackTraceHidden]
protected void ThrowUnexpectedEndOfRequestContent()
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
// OnInputOrOutputCompleted() is an idempotent method that closes the connection. Sometimes
// input completion is observed here before the Input.OnWriterCompleted() callback is fired,
// so we call OnInputOrOutputCompleted() now to prevent a race in our tests where a 400
// response is written after observing the unexpected end of request content instead of just
// closing the connection without a response as expected.
_context.OnInputOrOutputCompleted();
ThrowIfReaderCompleted();
return ReadAsyncInternal(cancellationToken);
}

KestrelBadHttpRequestException.Throw(RequestRejectionReason.UnexpectedEndOfRequestContent);
public abstract ValueTask<ReadResult> ReadAsyncInternal(CancellationToken cancellationToken = default);

public override bool TryRead(out ReadResult readResult)
{
ThrowIfReaderCompleted();
return TryReadInternal(out readResult);
}

public abstract bool TryReadInternal(out ReadResult readResult);

public abstract ValueTask<ReadResult> ReadAsyncInternal(CancellationToken cancellationToken = default);
public override void Complete(Exception exception)
{
_readerCompleted = true;
_context.ReportApplicationError(exception);
}

protected override Task OnConsumeAsync()
{
Expand Down Expand Up @@ -184,12 +189,26 @@ public static MessageBody For(
return keepAlive ? MessageBody.ZeroContentLengthKeepAlive : MessageBody.ZeroContentLengthClose;
}

protected void ThrowIfCompleted()
[StackTraceHidden]
private void ThrowIfReaderCompleted()
{
if (_completed)
if (_readerCompleted)
{
throw new InvalidOperationException("Reading is not allowed after the reader was completed.");
}
}

[StackTraceHidden]
protected void ThrowUnexpectedEndOfRequestContent()
{
// OnInputOrOutputCompleted() is an idempotent method that closes the connection. Sometimes
// input completion is observed here before the Input.OnWriterCompleted() callback is fired,
// so we call OnInputOrOutputCompleted() now to prevent a race in our tests where a 400
// response is written after observing the unexpected end of request content instead of just
// closing the connection without a response as expected.
_context.OnInputOrOutputCompleted();

KestrelBadHttpRequestException.Throw(RequestRejectionReason.UnexpectedEndOfRequestContent);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,11 @@ public Http1UpgradeMessageBody(Http1Connection context)
// This returns IsEmpty so we can avoid draining the body (since it's basically an endless stream)
public override bool IsEmpty => true;

public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
ThrowIfCompleted();
return _context.Input.ReadAsync(cancellationToken);
}

public override bool TryRead(out ReadResult result)
{
ThrowIfCompleted();
return _context.Input.TryRead(out result);
}

public override void AdvanceTo(SequencePosition consumed)
{
_context.Input.AdvanceTo(consumed);
}

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
_context.Input.AdvanceTo(consumed, examined);
}

public override void Complete(Exception exception)
{
// Don't call Connection.Complete.
_context.ReportApplicationError(exception);
_completed = true;
}

public override void CancelPendingRead()
{
_context.Input.CancelPendingRead();
Expand All @@ -62,9 +38,9 @@ public override Task ConsumeAsync()
return Task.CompletedTask;
}

public override Task StopAsync()
public override ValueTask StopAsync()
{
return Task.CompletedTask;
return default;
}

public override bool TryReadInternal(out ReadResult readResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public override void Complete(Exception exception = null)
_body.Complete(exception);
}

public override ValueTask CompleteAsync(Exception exception = null)
{
ValidateState();

return _body.CompleteAsync(exception);
}

public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
ValidateState(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public override void Complete(Exception exception = null)
_completeTask = _pipeControl.CompleteAsync(exception);
}

public override ValueTask CompleteAsync(Exception exception = null)
{
Complete();
return new ValueTask(_completeTask);
}

public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
ValidateState(cancellationToken);
Expand Down
23 changes: 16 additions & 7 deletions src/Servers/Kestrel/Core/src/Internal/Http/MessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,26 @@ protected MessageBody(HttpProtocol context)

protected IKestrelTrace Log => _context.ServiceContext.Log;

public abstract void AdvanceTo(SequencePosition consumed);

public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);
public abstract ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default);

public abstract bool TryRead(out ReadResult readResult);

public abstract void Complete(Exception exception);
public void AdvanceTo(SequencePosition consumed)
{
AdvanceTo(consumed, consumed);
}

public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);

public abstract void CancelPendingRead();

public abstract ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default);
public abstract void Complete(Exception exception);

public virtual ValueTask CompleteAsync(Exception exception)
{
Complete(exception);
return default;
}

public virtual Task ConsumeAsync()
{
Expand All @@ -62,7 +71,7 @@ public virtual Task ConsumeAsync()
return OnConsumeAsync();
}

public virtual Task StopAsync()
public virtual ValueTask StopAsync()
{
TryStop();

Expand All @@ -71,7 +80,7 @@ public virtual Task StopAsync()

protected virtual Task OnConsumeAsync() => Task.CompletedTask;

protected virtual Task OnStopAsync() => Task.CompletedTask;
protected virtual ValueTask OnStopAsync() => default;

public virtual void Reset()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ public ZeroContentLengthMessageBody(bool keepAlive)

public override Task ConsumeAsync() => Task.CompletedTask;

public override Task StopAsync() => Task.CompletedTask;

public override void AdvanceTo(SequencePosition consumed) { }
public override ValueTask StopAsync() => default;

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public ValueTask<FlushResult> FlushAsync(IHttpOutputAborter outputAborter, Cance
{
return default;
}

var bytesWritten = _unflushedBytes;
_unflushedBytes = 0;

Expand Down
Loading

0 comments on commit 1cb3f23

Please sign in to comment.