From c48113ad805a53bc98a999a7004b6bb13763bf25 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Mon, 8 May 2017 20:44:13 -0700 Subject: [PATCH] Refactoring and of FrameConnection and Frame (#1816) * Refactoring and of FrameConnection and Frame - Building on top of the last refactoring of FrameConnection, this change aims to clean up the communication between the Frame and FrameConnection by removing some concepts and being consistent about the communication between Frame and FrameConnection with or without connection adapters. Changes include: - Removing ConnectionLifetimeControl, ISocketOutput, StreamSocketOutput - Moving more initialization of the frame to FrameConnection after the pipes are setup - OutputProducer communicates cancellation via the IPipeWriter instead of the output's IPipeReader. - Frame always communicates via the pipes and that communications flows through the layers to the transport. This means that each 1/2 of the adapted pipeline handles closing the right side of the transport at the right time, propagating exceptions as necessary. - This is how the flow looks now: -> -> [transport] [connection adapters] [frame] <- <- - Transports need to handle a ConnectionAbortedException on the output as a signal to stop writing and end the connection. This will no longer try to drain the output but will just stop writing and end the response immediately. - Remove frame.Abort when cancellation on Write fails. - Unify the connection shutdown logic - Dispose 1/2 initialized connection adapters #1815 --- KestrelHttpServer.sln | 3 +- .../Adapter/Internal/AdaptedPipeline.cs | 148 +++++++++++++----- .../Adapter/Internal/RawStream.cs | 34 ++-- .../Adapter/Internal/StreamSocketOutput.cs | 145 ----------------- .../Internal/ConnectionHandler.cs | 4 - .../Internal/FrameConnection.cs | 97 ++++++------ .../Internal/FrameConnectionContext.cs | 1 - .../Http/ConnectionLifetimeControl.cs | 45 ------ .../Internal/Http/Frame.cs | 9 +- .../Internal/Http/FrameOfT.cs | 2 +- .../Internal/Http/ISocketOutput.cs | 22 --- .../Internal/Http/OutputProducer.cs | 144 +++++++++-------- .../Internal/LibuvOutputConsumer.cs | 55 ++++--- .../SocketConnection.cs | 4 + .../FrameTests.cs | 4 +- .../OutputProducerTests.cs | 4 +- .../StreamSocketOutputTests.cs | 110 ------------- .../ConnectionAdapterTests.cs | 67 +++++++- .../RequestTests.cs | 18 ++- .../TestServer.cs | 6 + .../FrameWritingBenchmark.cs | 9 +- ...pNetCore.Server.Kestrel.Performance.csproj | 3 +- .../ResponseHeadersWritingBenchmark.cs | 9 +- .../LibuvOutputConsumerTests.cs | 56 ++++--- test/shared/MockSocketOutput.cs | 42 ----- 25 files changed, 411 insertions(+), 630 deletions(-) delete mode 100644 src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/StreamSocketOutput.cs delete mode 100644 src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ConnectionLifetimeControl.cs delete mode 100644 src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ISocketOutput.cs delete mode 100644 test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/StreamSocketOutputTests.cs delete mode 100644 test/shared/MockSocketOutput.cs diff --git a/KestrelHttpServer.sln b/KestrelHttpServer.sln index 3c10b81d0524..210365479a19 100644 --- a/KestrelHttpServer.sln +++ b/KestrelHttpServer.sln @@ -1,6 +1,6 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26405.0 +VisualStudioVersion = 15.0.26403.7 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{7972A5D6-3385-4127-9277-428506DD44FF}" ProjectSection(SolutionItems) = preProject @@ -26,7 +26,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "shared", "shared", "{0EF2AC test\shared\LifetimeNotImplemented.cs = test\shared\LifetimeNotImplemented.cs test\shared\MockFrameControl.cs = test\shared\MockFrameControl.cs test\shared\MockLogger.cs = test\shared\MockLogger.cs - test\shared\MockSocketOutput.cs = test\shared\MockSocketOutput.cs test\shared\MockSystemClock.cs = test\shared\MockSystemClock.cs test\shared\StringExtensions.cs = test\shared\StringExtensions.cs test\shared\TestApp.cs = test\shared\TestApp.cs diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs index c8240a2e21dc..b73cbdb50e7c 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs @@ -7,7 +7,7 @@ using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; -using Microsoft.Extensions.Logging; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal { @@ -15,77 +15,149 @@ public class AdaptedPipeline { private const int MinAllocBufferSize = 2048; - private readonly Stream _filteredStream; - private readonly StreamSocketOutput _output; private readonly IKestrelTrace _trace; + private readonly IPipeWriter _transportOutputPipeWriter; + private readonly IPipeReader _transportInputPipeReader; - public AdaptedPipeline( - Stream filteredStream, - IPipe inputPipe, - IPipe outputPipe, - IKestrelTrace trace) + public AdaptedPipeline(IPipeReader transportInputPipeReader, + IPipeWriter transportOutputPipeWriter, + IPipe inputPipe, + IPipe outputPipe, + IKestrelTrace trace) { + _transportInputPipeReader = transportInputPipeReader; + _transportOutputPipeWriter = transportOutputPipeWriter; Input = inputPipe; - _output = new StreamSocketOutput(filteredStream, outputPipe); - _filteredStream = filteredStream; + Output = outputPipe; _trace = trace; } public IPipe Input { get; } - public ISocketOutput Output => _output; + public IPipe Output { get; } - public async Task RunAsync() + public async Task RunAsync(Stream stream) { + var inputTask = ReadInputAsync(stream); + var outputTask = WriteOutputAsync(stream); + + await inputTask; + await outputTask; + } + + private async Task WriteOutputAsync(Stream stream) + { + Exception error = null; + try { - var inputTask = ReadInputAsync(); - var outputTask = _output.WriteOutputAsync(); + if (stream == null) + { + return; + } - await inputTask; + while (true) + { + var readResult = await Output.Reader.ReadAsync(); + var buffer = readResult.Buffer; - _output.Dispose(); + try + { + if (buffer.IsEmpty && readResult.IsCompleted) + { + break; + } - await outputTask; + if (buffer.IsEmpty) + { + await stream.FlushAsync(); + } + else if (buffer.IsSingleSpan) + { + var array = buffer.First.GetArray(); + await stream.WriteAsync(array.Array, array.Offset, array.Count); + } + else + { + foreach (var memory in buffer) + { + var array = memory.GetArray(); + await stream.WriteAsync(array.Array, array.Offset, array.Count); + } + } + } + finally + { + Output.Reader.Advance(buffer.End); + } + } } catch (Exception ex) { - // adaptedPipeline.RunAsync() shouldn't throw, unless filtered stream's WriteAsync throws. - _trace.LogError(0, ex, $"{nameof(AdaptedPipeline)}.{nameof(RunAsync)}"); + error = ex; + } + finally + { + Output.Reader.Complete(); + _transportOutputPipeWriter.Complete(error); } } - private async Task ReadInputAsync() + private async Task ReadInputAsync(Stream stream) { - int bytesRead; + Exception error = null; - do + try { - var block = Input.Writer.Alloc(MinAllocBufferSize); + if (stream == null) + { + // If the stream is null then we're going to abort the connection + throw new ConnectionAbortedException(); + } - try + while (true) { - var array = block.Buffer.GetArray(); + + var outputBuffer = Input.Writer.Alloc(MinAllocBufferSize); + + var array = outputBuffer.Buffer.GetArray(); try { - bytesRead = await _filteredStream.ReadAsync(array.Array, array.Offset, array.Count); - block.Advance(bytesRead); + var bytesRead = await stream.ReadAsync(array.Array, array.Offset, array.Count); + outputBuffer.Advance(bytesRead); + + if (bytesRead == 0) + { + // FIN + break; + } } finally { - await block.FlushAsync(); + outputBuffer.Commit(); } - } - catch (Exception ex) - { - Input.Writer.Complete(ex); - // Don't rethrow the exception. It should be handled by the Pipeline consumer. - return; - } - } while (bytesRead != 0); + var result = await outputBuffer.FlushAsync(); - Input.Writer.Complete(); + if (result.IsCompleted) + { + break; + } + + } + } + catch (Exception ex) + { + // Don't rethrow the exception. It should be handled by the Pipeline consumer. + error = ex; + } + finally + { + Input.Writer.Complete(error); + // The application could have ended the input pipe so complete + // the transport pipe as well + _transportInputPipeReader.Complete(); + } } } -} +} \ No newline at end of file diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/RawStream.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/RawStream.cs index 2423630c917b..cec2ba1e7532 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/RawStream.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/RawStream.cs @@ -5,7 +5,6 @@ using System.IO; using System.Threading; using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal @@ -13,9 +12,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal public class RawStream : Stream { private readonly IPipeReader _input; - private readonly ISocketOutput _output; + private readonly IPipeWriter _output; - public RawStream(IPipeReader input, ISocketOutput output) + public RawStream(IPipeReader input, IPipeWriter output) { _input = input; _output = output; @@ -71,19 +70,10 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel public override void Write(byte[] buffer, int offset, int count) { - ArraySegment segment; - if (buffer != null) - { - segment = new ArraySegment(buffer, offset, count); - } - else - { - segment = default(ArraySegment); - } - _output.Write(segment); + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); } - public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token) { ArraySegment segment; if (buffer != null) @@ -94,17 +84,19 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati { segment = default(ArraySegment); } - return _output.WriteAsync(segment, cancellationToken: token); + var output = _output.Alloc(); + output.Write(segment); + await output.FlushAsync(token); } public override void Flush() { - _output.Flush(); + FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); } public override Task FlushAsync(CancellationToken cancellationToken) { - return _output.FlushAsync(cancellationToken); + return WriteAsync(null, 0, 0, cancellationToken); } private async Task ReadAsync(ArraySegment buffer) @@ -209,11 +201,5 @@ private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken }, tcs, cancellationToken); return tcs.Task; } - - protected override void Dispose(bool disposing) - { - // _output is disposed by ConnectionLifetimeControl - _input.Complete(); - } } -} +} \ No newline at end of file diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/StreamSocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/StreamSocketOutput.cs deleted file mode 100644 index 6055f271f592..000000000000 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/StreamSocketOutput.cs +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; -using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal -{ - public class StreamSocketOutput : ISocketOutput - { - private readonly Stream _outputStream; - private readonly IPipe _pipe; - private readonly object _sync = new object(); - private bool _completed; - - public StreamSocketOutput(Stream outputStream, IPipe pipe) - { - _outputStream = outputStream; - _pipe = pipe; - } - - public void Write(ArraySegment buffer, bool chunk) - { - WriteAsync(buffer, chunk, default(CancellationToken)).GetAwaiter().GetResult(); - } - - public async Task WriteAsync(ArraySegment buffer, bool chunk, CancellationToken cancellationToken) - { - var flushAwaiter = default(WritableBufferAwaitable); - - lock (_sync) - { - if (_completed) - { - return; - } - - var writableBuffer = _pipe.Writer.Alloc(1); - var writer = new WritableBufferWriter(writableBuffer); - if (buffer.Count > 0) - { - if (chunk) - { - ChunkWriter.WriteBeginChunkBytes(ref writer, buffer.Count); - writer.Write(buffer.Array, buffer.Offset, buffer.Count); - ChunkWriter.WriteEndChunkBytes(ref writer); - } - else - { - writer.Write(buffer.Array, buffer.Offset, buffer.Count); - } - } - - flushAwaiter = writableBuffer.FlushAsync(cancellationToken); - } - - await flushAwaiter; - } - - public void Dispose() - { - lock (_sync) - { - _completed = true; - } - - _pipe.Writer.Complete(); - } - - public void Flush() - { - FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); - } - - public Task FlushAsync(CancellationToken cancellationToken) - { - return WriteAsync(default(ArraySegment), chunk: false, cancellationToken: cancellationToken); - } - - public void Write(Action callback, T state) where T : struct - { - lock (_sync) - { - if (_completed) - { - return; - } - - var buffer = _pipe.Writer.Alloc(1); - callback(buffer, state); - buffer.Commit(); - } - } - - public async Task WriteOutputAsync() - { - try - { - while (true) - { - var readResult = await _pipe.Reader.ReadAsync(); - var buffer = readResult.Buffer; - - try - { - if (buffer.IsEmpty && readResult.IsCompleted) - { - break; - } - - if (buffer.IsEmpty) - { - await _outputStream.FlushAsync(); - } - else if (buffer.IsSingleSpan) - { - var array = buffer.First.GetArray(); - await _outputStream.WriteAsync(array.Array, array.Offset, array.Count); - } - else - { - foreach (var memory in buffer) - { - var array = memory.GetArray(); - await _outputStream.WriteAsync(array.Array, array.Offset, array.Count); - } - } - } - finally - { - _pipe.Reader.Advance(buffer.End); - } - } - } - finally - { - _pipe.Reader.Complete(); - } - } - } -} diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs index 921f87424c4d..523af72020f1 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs @@ -40,10 +40,7 @@ public IConnectionContext OnConnection(IConnectionInformation connectionInfo) ServiceContext = _serviceContext }; - // TODO: Untangle this mess var frame = new Frame(_application, frameContext); - var outputProducer = new OutputProducer(outputPipe.Writer, frame, connectionId, _serviceContext.Log); - frame.LifetimeControl = new ConnectionLifetimeControl(connectionId, outputPipe.Reader, outputProducer, _serviceContext.Log); var connection = new FrameConnection(new FrameConnectionContext { @@ -55,7 +52,6 @@ public IConnectionContext OnConnection(IConnectionInformation connectionInfo) Frame = frame, Input = inputPipe, Output = outputPipe, - OutputProducer = outputProducer }); _serviceContext.Log.ConnectionStart(connectionId); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs index e0d0a9063d7e..b25f7a983d8b 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs @@ -28,7 +28,6 @@ public class FrameConnection : IConnectionContext, ITimeoutControl private TimeoutAction _timeoutAction; private Task _lifetimeTask; - private Stream _filteredStream; public FrameConnection(FrameConnectionContext context) { @@ -69,46 +68,39 @@ public void StartRequestProcessing() private async Task ProcessRequestsAsync() { - RawStream rawStream = null; - try { - Task adaptedPipelineTask = Task.CompletedTask; + AdaptedPipeline adaptedPipeline = null; + var adaptedPipelineTask = Task.CompletedTask; + var input = _context.Input.Reader; + var output = _context.Output.Writer; - if (_connectionAdapters.Count == 0) + if (_connectionAdapters.Count > 0) { - _frame.Input = _context.Input.Reader; - _frame.Output = _context.OutputProducer; - } - else - { - rawStream = new RawStream(_context.Input.Reader, _context.OutputProducer); - - try - { - var adaptedPipeline = await ApplyConnectionAdaptersAsync(rawStream); - - _frame.Input = adaptedPipeline.Input.Reader; - _frame.Output = adaptedPipeline.Output; - - adaptedPipelineTask = adaptedPipeline.RunAsync(); - } - catch (Exception ex) - { - Log.LogError(0, ex, $"Uncaught exception from the {nameof(IConnectionAdapter.OnConnectionAsync)} method of an {nameof(IConnectionAdapter)}."); - - // Since Frame.ProcessRequestsAsync() isn't called, we have to close the socket here. - _context.OutputProducer.Dispose(); - - await _socketClosedTcs.Task; - return; - } - + adaptedPipeline = new AdaptedPipeline(_context.Input.Reader, + _context.Output.Writer, + PipeFactory.Create(AdaptedInputPipeOptions), + PipeFactory.Create(AdaptedOutputPipeOptions), + Log); + + input = adaptedPipeline.Input.Reader; + output = adaptedPipeline.Output.Writer; } + // Set these before the first await, this is to make sure that we don't yield control + // to the transport until we've added the connection to the connection manager _frame.TimeoutControl = this; - _lastTimestamp = _context.ServiceContext.SystemClock.UtcNow.Ticks; + _frame.Input = input; + _frame.Output = new OutputProducer(output, ConnectionId, Log); _context.ServiceContext.ConnectionManager.AddConnection(_context.FrameConnectionId, this); + _lastTimestamp = _context.ServiceContext.SystemClock.UtcNow.Ticks; + + if (adaptedPipeline != null) + { + // Stream can be null here and run async will close the connection in that case + var stream = await ApplyConnectionAdaptersAsync(); + adaptedPipelineTask = adaptedPipeline.RunAsync(stream); + } await _frame.ProcessRequestsAsync(); await adaptedPipelineTask; @@ -121,14 +113,13 @@ private async Task ProcessRequestsAsync() finally { _context.ServiceContext.ConnectionManager.RemoveConnection(_context.FrameConnectionId); - rawStream?.Dispose(); DisposeAdaptedConnections(); } } public void OnConnectionClosed(Exception ex) { - // Abort the connection (if it isn't already aborted) + // Abort the connection (if not already aborted) _frame.Abort(ex); Log.ConnectionStop(ConnectionId); @@ -139,17 +130,21 @@ public void OnConnectionClosed(Exception ex) public Task StopAsync() { _frame.Stop(); + return _lifetimeTask; } public void Abort(Exception ex) { + // Abort the connection (if not already aborted) _frame.Abort(ex); } public Task AbortAsync(Exception ex) { + // Abort the connection (if not already aborted) _frame.Abort(ex); + return _lifetimeTask; } @@ -158,25 +153,33 @@ public void Timeout() _frame.SetBadRequestState(RequestRejectionReason.RequestTimeout); } - private async Task ApplyConnectionAdaptersAsync(RawStream rawStream) + private async Task ApplyConnectionAdaptersAsync() { - var adapterContext = new ConnectionAdapterContext(rawStream); + var stream = new RawStream(_context.Input.Reader, _context.Output.Writer); + var adapterContext = new ConnectionAdapterContext(stream); var adaptedConnections = new IAdaptedConnection[_connectionAdapters.Count]; - for (var i = 0; i < _connectionAdapters.Count; i++) + try { - var adaptedConnection = await _connectionAdapters[i].OnConnectionAsync(adapterContext); - adaptedConnections[i] = adaptedConnection; - adapterContext = new ConnectionAdapterContext(adaptedConnection.ConnectionStream); + for (var i = 0; i < _connectionAdapters.Count; i++) + { + var adaptedConnection = await _connectionAdapters[i].OnConnectionAsync(adapterContext); + adaptedConnections[i] = adaptedConnection; + adapterContext = new ConnectionAdapterContext(adaptedConnection.ConnectionStream); + } } + catch (Exception ex) + { + Log.LogError(0, ex, $"Uncaught exception from the {nameof(IConnectionAdapter.OnConnectionAsync)} method of an {nameof(IConnectionAdapter)}."); - _filteredStream = adapterContext.ConnectionStream; - _frame.AdaptedConnections = adaptedConnections; + return null; + } + finally + { + _frame.AdaptedConnections = adaptedConnections; + } - return new AdaptedPipeline(_filteredStream, - PipeFactory.Create(AdaptedInputPipeOptions), - PipeFactory.Create(AdaptedOutputPipeOptions), - Log); + return adapterContext.ConnectionStream; } private void DisposeAdaptedConnections() diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs index 1394f6c150c4..d9a7c381485a 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs @@ -16,7 +16,6 @@ public class FrameConnectionContext public PipeFactory PipeFactory { get; set; } public List ConnectionAdapters { get; set; } public Frame Frame { get; set; } - public OutputProducer OutputProducer { get; set; } public IPipe Input { get; set; } public IPipe Output { get; set; } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ConnectionLifetimeControl.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ConnectionLifetimeControl.cs deleted file mode 100644 index 1d6efcd6db08..000000000000 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ConnectionLifetimeControl.cs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; -using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http -{ - public class ConnectionLifetimeControl - { - public ConnectionLifetimeControl( - string connectionId, - IPipeReader outputPipeReader, - OutputProducer outputProducer, - IKestrelTrace log) - { - ConnectionId = connectionId; - OutputReader = outputPipeReader; - OutputProducer = outputProducer; - Log = log; - } - - private string ConnectionId { get; } - private IPipeReader OutputReader { get; } - private OutputProducer OutputProducer { get; } - private IKestrelTrace Log { get; } - - public void End(ProduceEndType endType) - { - switch (endType) - { - case ProduceEndType.ConnectionKeepAlive: - Log.ConnectionKeepAlive(ConnectionId); - break; - case ProduceEndType.SocketShutdown: - OutputReader.CancelPendingRead(); - goto case ProduceEndType.SocketDisconnect; - case ProduceEndType.SocketDisconnect: - OutputProducer.Dispose(); - Log.ConnectionDisconnect(ConnectionId); - break; - } - } - } -} diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs index 417fadb671aa..6a85d647571a 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs @@ -102,9 +102,8 @@ public Frame(FrameContext frameContext) public IConnectionInformation ConnectionInformation => _frameContext.ConnectionInformation; public IPipeReader Input { get; set; } - public ISocketOutput Output { get; set; } + public OutputProducer Output { get; set; } public IAdaptedConnection[] AdaptedConnections { get; set; } - public ConnectionLifetimeControl LifetimeControl { get; set; } public ITimeoutControl TimeoutControl { get; set; } protected IKestrelTrace Log => ServiceContext.Log; @@ -411,7 +410,7 @@ public void Abort(Exception error) _frameStreams?.Abort(error); - LifetimeControl.End(ProduceEndType.SocketDisconnect); + Output.Abort(); // Potentially calling user code. CancelRequestAbortedToken logs any exceptions. ServiceContext.ThreadPool.UnsafeRun(state => ((Frame)state).CancelRequestAbortedToken(), this); @@ -827,7 +826,7 @@ private Task WriteSuffix() if (_keepAlive) { - LifetimeControl.End(ProduceEndType.ConnectionKeepAlive); + Log.ConnectionKeepAlive(ConnectionId); } if (HttpMethods.IsHead(Method) && _responseBytesWritten > 0) @@ -847,7 +846,7 @@ private async Task WriteAutoChunkSuffixAwaited() if (_keepAlive) { - LifetimeControl.End(ProduceEndType.ConnectionKeepAlive); + Log.ConnectionKeepAlive(ConnectionId); } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/FrameOfT.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/FrameOfT.cs index 3be6a58c83c4..b0890e17aefc 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/FrameOfT.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/FrameOfT.cs @@ -229,7 +229,7 @@ public override async Task ProcessRequestsAsync() if (Volatile.Read(ref _requestAborted) == 0) { await TryProduceInvalidRequestResponse(); - LifetimeControl.End(ProduceEndType.SocketShutdown); + Output.Dispose(); } } catch (Exception ex) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ISocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ISocketOutput.cs deleted file mode 100644 index 139a5000a0d6..000000000000 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ISocketOutput.cs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http -{ - /// - /// Operations performed for buffered socket output - /// - public interface ISocketOutput - { - void Write(ArraySegment buffer, bool chunk = false); - Task WriteAsync(ArraySegment buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken)); - void Flush(); - Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)); - void Write(Action write, T state) where T : struct; - } -} diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs index c5fe3dee5149..0d024d6a7f25 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs @@ -6,11 +6,12 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; using Microsoft.Extensions.Internal; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { - public class OutputProducer : ISocketOutput, IDisposable + public class OutputProducer : IDisposable { private static readonly ArraySegment _emptyData = new ArraySegment(new byte[0]); @@ -24,7 +25,6 @@ public class OutputProducer : ISocketOutput, IDisposable private bool _completed = false; private readonly IPipeWriter _pipe; - private readonly Frame _frame; // https://github.com/dotnet/corefxlab/issues/1334 // Pipelines don't support multiple awaiters on flush @@ -33,16 +33,90 @@ public class OutputProducer : ISocketOutput, IDisposable private readonly object _flushLock = new object(); private Action _flushCompleted; - public OutputProducer(IPipeWriter pipe, Frame frame, string connectionId, IKestrelTrace log) + public OutputProducer(IPipeWriter pipe, string connectionId, IKestrelTrace log) { _pipe = pipe; - _frame = frame; _connectionId = connectionId; _log = log; _flushCompleted = OnFlushCompleted; } - public Task WriteAsync( + public void Write(ArraySegment buffer, bool chunk = false) + { + WriteAsync(buffer, default(CancellationToken), chunk).GetAwaiter().GetResult(); + } + + public Task WriteAsync(ArraySegment buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken)) + { + if (cancellationToken.IsCancellationRequested) + { + _cancelled = true; + return Task.FromCanceled(cancellationToken); + } + else if (_cancelled) + { + return TaskCache.CompletedTask; + } + + return WriteAsync(buffer, cancellationToken, chunk); + } + + public void Flush() + { + WriteAsync(_emptyData, default(CancellationToken)).GetAwaiter().GetResult(); + } + + public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) + { + return WriteAsync(_emptyData, cancellationToken); + } + + public void Write(Action callback, T state) + { + lock (_contextLock) + { + if (_completed) + { + return; + } + + var buffer = _pipe.Alloc(1); + callback(buffer, state); + buffer.Commit(); + } + } + + public void Dispose() + { + lock (_contextLock) + { + if (_completed) + { + return; + } + + _log.ConnectionDisconnect(_connectionId); + _completed = true; + _pipe.Complete(); + } + } + + public void Abort() + { + lock (_contextLock) + { + if (_completed) + { + return; + } + + _log.ConnectionDisconnect(_connectionId); + _completed = true; + _pipe.Complete(new ConnectionAbortedException()); + } + } + + private Task WriteAsync( ArraySegment buffer, CancellationToken cancellationToken, bool chunk = false) @@ -108,11 +182,6 @@ private async Task FlushAsyncAwaited(WritableBufferAwaitable awaitable, Cancella } await _flushTcs.Task; - if (cancellationToken.IsCancellationRequested) - { - _frame.Abort(error: null); - } - cancellationToken.ThrowIfCancellationRequested(); } @@ -120,60 +189,5 @@ private void OnFlushCompleted() { _flushTcs.TrySetResult(null); } - - void ISocketOutput.Write(ArraySegment buffer, bool chunk) - { - WriteAsync(buffer, default(CancellationToken), chunk).GetAwaiter().GetResult(); - } - - Task ISocketOutput.WriteAsync(ArraySegment buffer, bool chunk, CancellationToken cancellationToken) - { - if (cancellationToken.IsCancellationRequested) - { - _frame.Abort(error: null); - _cancelled = true; - return Task.FromCanceled(cancellationToken); - } - else if (_cancelled) - { - return TaskCache.CompletedTask; - } - - return WriteAsync(buffer, cancellationToken, chunk); - } - - void ISocketOutput.Flush() - { - WriteAsync(_emptyData, default(CancellationToken)).GetAwaiter().GetResult(); - } - - Task ISocketOutput.FlushAsync(CancellationToken cancellationToken) - { - return WriteAsync(_emptyData, cancellationToken); - } - - void ISocketOutput.Write(Action callback, T state) - { - lock (_contextLock) - { - if (_completed) - { - return; - } - - var buffer = _pipe.Alloc(1); - callback(buffer, state); - buffer.Commit(); - } - } - - public void Dispose() - { - lock (_contextLock) - { - _completed = true; - _pipe.Complete(); - } - } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs index d00610fa53a9..30b151a2c229 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs @@ -4,6 +4,7 @@ using System; using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal @@ -36,48 +37,50 @@ public async Task WriteOutputAsync() while (true) { - var result = await _pipe.ReadAsync(); - var buffer = result.Buffer; - var consumed = buffer.End; - try { - if (!buffer.IsEmpty) - { - var writeReq = pool.Allocate(); + var result = await _pipe.ReadAsync(); + var buffer = result.Buffer; + var consumed = buffer.End; - try + try + { + if (!buffer.IsEmpty) { - var writeResult = await writeReq.WriteAsync(_socket, buffer); + var writeReq = pool.Allocate(); - LogWriteInfo(writeResult.Status, writeResult.Error); + try + { + var writeResult = await writeReq.WriteAsync(_socket, buffer); + + LogWriteInfo(writeResult.Status, writeResult.Error); - if (writeResult.Error != null) + if (writeResult.Error != null) + { + consumed = buffer.Start; + throw writeResult.Error; + } + } + finally { - consumed = buffer.Start; - throw writeResult.Error; + // Make sure we return the writeReq to the pool + pool.Return(writeReq); } } - finally + + if (buffer.IsEmpty && result.IsCompleted) { - // Make sure we return the writeReq to the pool - pool.Return(writeReq); + break; } } - - if (result.IsCancelled) - { - break; - } - - if (buffer.IsEmpty && result.IsCompleted) + finally { - break; + _pipe.Advance(consumed); } } - finally + catch (ConnectionAbortedException) { - _pipe.Advance(consumed); + break; } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs index 6d5c7e7cf6d2..28685c3b601e 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs @@ -216,6 +216,10 @@ private async Task DoSend() { error = null; } + catch (ConnectionAbortedException) + { + error = null; + } catch (IOException ex) { error = ex; diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs index 43e076be7f24..eb2c93f583ae 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs @@ -54,6 +54,7 @@ public FrameTests() { _pipelineFactory = new PipeFactory(); _input = _pipelineFactory.Create(); + var output = _pipelineFactory.Create(); _serviceContext = new TestServiceContext(); @@ -66,10 +67,11 @@ public FrameTests() _frame = new TestFrame(application: null, context: _frameContext) { Input = _input.Reader, - Output = new MockSocketOutput(), TimeoutControl = Mock.Of() }; + _frame.Output = new OutputProducer(output.Writer, "", Mock.Of()); + _frame.Reset(); } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs index e96126d087dc..1e8ae196bd55 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs @@ -38,7 +38,7 @@ public void WritesNoopAfterConnectionCloses() var called = false; - ((ISocketOutput)socketOutput).Write((buffer, state) => + socketOutput.Write((buffer, state) => { called = true; }, @@ -53,7 +53,7 @@ private OutputProducer CreateOutputProducer(PipeOptions pipeOptions) var pipe = _pipeFactory.Create(pipeOptions); var serviceContext = new TestServiceContext(); var frame = new Frame(null, new FrameContext { ServiceContext = serviceContext }); - var socketOutput = new OutputProducer(pipe.Writer, frame, "0", serviceContext.Log); + var socketOutput = new OutputProducer(pipe.Writer, "0", serviceContext.Log); return socketOutput; } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/StreamSocketOutputTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/StreamSocketOutputTests.cs deleted file mode 100644 index 0a3f383dd9a2..000000000000 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/StreamSocketOutputTests.cs +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.IO; -using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal; -using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; -using Xunit; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests -{ - public class StreamSocketOutputTests - { - [Fact] - public void DoesNotThrowForNullBuffers() - { - // This test was added because SslStream throws if passed null buffers with (count == 0) - // Which happens if ProduceEnd is called in Frame without _responseStarted == true - // As it calls ProduceStart with write immediate == true - // This happens in WebSocket Upgrade over SSL - using (var factory = new PipeFactory()) - { - var socketOutput = new StreamSocketOutput(new ThrowsOnNullWriteStream(), factory.Create()); - - // Should not throw - socketOutput.Write(default(ArraySegment), true); - - Assert.True(true); - - socketOutput.Dispose(); - } - } - - private class ThrowsOnNullWriteStream : Stream - { - public override bool CanRead - { - get - { - throw new NotImplementedException(); - } - } - - public override bool CanSeek - { - get - { - throw new NotImplementedException(); - } - } - - public override bool CanWrite - { - get - { - throw new NotImplementedException(); - } - } - - public override long Length - { - get - { - throw new NotImplementedException(); - } - } - - public override long Position - { - get - { - throw new NotImplementedException(); - } - - set - { - throw new NotImplementedException(); - } - } - - public override void Flush() - { - throw new NotImplementedException(); - } - - public override int Read(byte[] buffer, int offset, int count) - { - throw new NotImplementedException(); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotImplementedException(); - } - - public override void SetLength(long value) - { - throw new NotImplementedException(); - } - - public override void Write(byte[] buffer, int offset, int count) - { - if (buffer == null) - { - throw new ArgumentNullException(nameof(buffer)); - } - } - } - } -} diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/ConnectionAdapterTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/ConnectionAdapterTests.cs index 906e81ae76a6..eb5a857358d4 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/ConnectionAdapterTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/ConnectionAdapterTests.cs @@ -4,6 +4,7 @@ using System; using System.IO; using System.Net; +using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Http.Features; @@ -76,6 +77,70 @@ await connection.ReceiveEnd( } } + [Fact] + public async Task ImmediateFinAfterOnConnectionAsyncClosesGracefully() + { + var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)) + { + ConnectionAdapters = { new AsyncConnectionAdapter() } + }; + + var serviceContext = new TestServiceContext(); + + using (var server = new TestServer(TestApp.EchoApp, serviceContext, listenOptions)) + { + using (var connection = server.CreateConnection()) + { + // FIN + connection.Shutdown(SocketShutdown.Send); + await connection.WaitForConnectionClose(); + } + } + } + + [Fact] + public async Task ImmediateFinAfterThrowingClosesGracefully() + { + var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)) + { + ConnectionAdapters = { new ThrowingConnectionAdapter() } + }; + + var serviceContext = new TestServiceContext(); + + using (var server = new TestServer(TestApp.EchoApp, serviceContext, listenOptions)) + { + using (var connection = server.CreateConnection()) + { + // FIN + connection.Shutdown(SocketShutdown.Send); + await connection.WaitForConnectionClose(); + } + } + } + + [Fact] + public async Task ImmediateShutdownAfterOnConnectionAsyncDoesNotCrash() + { + var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)) + { + ConnectionAdapters = { new AsyncConnectionAdapter() } + }; + + var serviceContext = new TestServiceContext(); + + var stopTask = Task.CompletedTask; + using (var server = new TestServer(TestApp.EchoApp, serviceContext, listenOptions)) + { + using (var connection = server.CreateConnection()) + { + stopTask = server.StopAsync(); + } + + await stopTask; + } + } + [Fact] public async Task ThrowingSynchronousConnectionAdapterDoesNotCrashServer() { @@ -121,7 +186,7 @@ public Task OnConnectionAsync(ConnectionAdapterContext conte } public int BytesRead => _rewritingStream.BytesRead; - } + } private class AsyncConnectionAdapter : IConnectionAdapter { diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/RequestTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/RequestTests.cs index c4eb51dfdf6c..5c58b486648c 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/RequestTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/RequestTests.cs @@ -1051,24 +1051,28 @@ public async Task RequestsCanBeAbortedMidRead(ListenOptions listenOptions) { using (var connection = server.CreateConnection()) { - // Never send the body so CopyToAsync always fails. + // Full request and response await connection.Send( "POST / HTTP/1.1", "Host:", "Content-Length: 5", "", - "HelloPOST / HTTP/1.1", - "Host:", - "Content-Length: 5", - "", - ""); + "Hello"); - await connection.ReceiveForcedEnd( + await connection.Receive( "HTTP/1.1 200 OK", $"Date: {testContext.DateHeaderValue}", "Content-Length: 5", "", "World"); + + // Never send the body so CopyToAsync always fails. + await connection.Send("POST / HTTP/1.1", + "Host:", + "Content-Length: 5", + "", + ""); + await connection.WaitForConnectionClose(); } } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/TestServer.cs b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/TestServer.cs index 73f2adfbf980..930c21cd08a9 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/TestServer.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/TestServer.cs @@ -5,6 +5,7 @@ using System.Net; using System.Net.Sockets; using System.Reflection; +using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting.Server; @@ -108,6 +109,11 @@ public TestConnection CreateConnection() return new TestConnection(Port, AddressFamily); } + public Task StopAsync() + { + return _host.StopAsync(); + } + public void Dispose() { _host.Dispose(); diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs index b735e76ff910..bef07b90637d 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs @@ -87,7 +87,9 @@ public async Task ProduceEndChunked() private TestFrame MakeFrame() { - var socketInput = new PipeFactory().Create(); + var factory = new PipeFactory(); + var input = factory.Create(); + var output = factory.Create(); var serviceContext = new ServiceContext { @@ -104,10 +106,11 @@ private TestFrame MakeFrame() var frame = new TestFrame(application: null, context: frameContext) { - Input = socketInput.Reader, - Output = new MockSocketOutput() + Input = input.Reader, }; + frame.Output = new OutputProducer(output.Writer, "", null); + frame.Reset(); return frame; diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Microsoft.AspNetCore.Server.Kestrel.Performance.csproj b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Microsoft.AspNetCore.Server.Kestrel.Performance.csproj index 54cd49910666..42fcca542b21 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Microsoft.AspNetCore.Server.Kestrel.Performance.csproj +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Microsoft.AspNetCore.Server.Kestrel.Performance.csproj @@ -1,4 +1,4 @@ - + @@ -14,7 +14,6 @@ - diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs index 012deb1b7060..8f28af9ef6fe 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs @@ -8,7 +8,6 @@ using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using Microsoft.AspNetCore.Server.Kestrel.Core; -using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; @@ -113,7 +112,6 @@ public void Setup() var factory = new PipeFactory(); var input = factory.Create(); var output = factory.Create(); - var socketOutput = new StreamSocketOutput(Stream.Null, output); var serviceContext = new ServiceContext { @@ -129,19 +127,14 @@ public void Setup() ConnectionInformation = new MockConnectionInformation() }; - var outputProducer = new OutputProducer(output.Writer, null, null, null); var frame = new TestFrame(application: null, context: frameContext) { Input = input.Reader, - Output = socketOutput, - LifetimeControl = new ConnectionLifetimeControl(null, output.Reader, outputProducer, serviceContext.Log) }; + frame.Output = new OutputProducer(output.Writer, "", null); frame.Reset(); - // Start writing - var ignore = socketOutput.WriteOutputAsync(); - _frame = frame; } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs index bbaaea6054f2..13b3706657cc 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs @@ -9,7 +9,6 @@ using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; -using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers; @@ -75,7 +74,7 @@ public async Task CanWrite1MB(long? maxResponseBufferSize) var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act - var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask = socketOutput.WriteAsync(buffer); // Assert await writeTask.TimeoutAfter(TimeSpan.FromSeconds(5)); @@ -110,7 +109,7 @@ public async Task NullMaxResponseBufferSizeAllowsUnlimitedBuffer() var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act - var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask = socketOutput.WriteAsync(buffer); // Assert await writeTask.TimeoutAfter(TimeSpan.FromSeconds(5)); @@ -156,7 +155,7 @@ public async Task ZeroMaxResponseBufferSizeDisablesBuffering() var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act - var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask = socketOutput.WriteAsync(buffer); // Assert Assert.False(writeTask.IsCompleted); @@ -211,14 +210,14 @@ public async Task WritesDontCompleteImmediatelyWhenTooManyBytesAreAlreadyBuffere var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act - var writeTask1 = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask1 = socketOutput.WriteAsync(buffer); // Assert // The first write should pre-complete since it is <= _maxBytesPreCompleted. Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status); // Act - var writeTask2 = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask2 = socketOutput.WriteAsync(buffer); await _mockLibuv.OnPostTask; // Assert @@ -275,7 +274,7 @@ await Task.Run(async () => var halfWriteBehindBuffer = new ArraySegment(data, 0, bufferSize); // Act - var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); + var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer); // Assert // The first write should pre-complete since it is <= _maxBytesPreCompleted. @@ -284,17 +283,17 @@ await Task.Run(async () => Assert.NotEmpty(completeQueue); // Add more bytes to the write-behind buffer to prevent the next write from - ((ISocketOutput)socketOutput).Write((writableBuffer, state) => + socketOutput.Write((writableBuffer, state) => { writableBuffer.Write(state); }, halfWriteBehindBuffer); // Act - var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); + var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer); Assert.False(writeTask2.IsCompleted); - var writeTask3 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); + var writeTask3 = socketOutput.WriteAsync(halfWriteBehindBuffer); Assert.False(writeTask3.IsCompleted); // Drain the write queue @@ -345,7 +344,7 @@ await Task.Run(async () => var fullBuffer = new ArraySegment(data, 0, bufferSize); // Act - var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + var task1Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // task1 should complete successfully as < _maxBytesPreCompleted // First task is completed and successful @@ -354,8 +353,8 @@ await Task.Run(async () => Assert.False(task1Success.IsFaulted); // following tasks should wait. - var task2Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); - var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + var task2Success = socketOutput.WriteAsync(fullBuffer); + var task3Canceled = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // Give time for tasks to percolate await _mockLibuv.OnPostTask; @@ -383,7 +382,7 @@ await Task.Run(async () => // A final write guarantees that the error is observed by OutputProducer, // but doesn't return a canceled/faulted task. - var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + var task4Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: default(CancellationToken)); Assert.True(task4Success.IsCompleted); Assert.False(task4Success.IsCanceled); Assert.False(task4Success.IsFaulted); @@ -429,7 +428,7 @@ await Task.Run(async () => var fullBuffer = new ArraySegment(data, 0, bufferSize); // Act - var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + var task1Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // task1 should complete successfully as < _maxBytesPreCompleted // First task is completed and successful @@ -438,7 +437,7 @@ await Task.Run(async () => Assert.False(task1Success.IsFaulted); // following tasks should wait. - var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + var task3Canceled = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // Give time for tasks to percolate await _mockLibuv.OnPostTask; @@ -458,7 +457,7 @@ await Task.Run(async () => // A final write guarantees that the error is observed by OutputProducer, // but doesn't return a canceled/faulted task. - var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + var task4Success = socketOutput.WriteAsync(fullBuffer); Assert.True(task4Success.IsCompleted); Assert.False(task4Success.IsCanceled); Assert.False(task4Success.IsFaulted); @@ -504,7 +503,7 @@ await Task.Run(async () => var fullBuffer = new ArraySegment(data, 0, bufferSize); // Act - var task1Waits = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + var task1Waits = socketOutput.WriteAsync(fullBuffer); // First task is not completed Assert.False(task1Waits.IsCompleted); @@ -512,7 +511,7 @@ await Task.Run(async () => Assert.False(task1Waits.IsFaulted); // following tasks should wait. - var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + var task3Canceled = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // Give time for tasks to percolate await _mockLibuv.OnPostTask; @@ -537,7 +536,7 @@ await Task.Run(async () => // A final write guarantees that the error is observed by OutputProducer, // but doesn't return a canceled/faulted task. - var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + var task4Success = socketOutput.WriteAsync(fullBuffer); Assert.True(task4Success.IsCompleted); Assert.False(task4Success.IsCanceled); Assert.False(task4Success.IsFaulted); @@ -575,7 +574,7 @@ public async Task WritesDontGetCompletedTooQuickly(int maxResponseBufferSize) var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act (Pre-complete the maximum number of bytes in preparation for the rest of the test) - var writeTask1 = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask1 = socketOutput.WriteAsync(buffer); // Assert // The first write should pre-complete since it is < _maxBytesPreCompleted. @@ -584,8 +583,8 @@ public async Task WritesDontGetCompletedTooQuickly(int maxResponseBufferSize) Assert.NotEmpty(completeQueue); // Act - var writeTask2 = socketOutput.WriteAsync(buffer, default(CancellationToken)); - var writeTask3 = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask2 = socketOutput.WriteAsync(buffer); + var writeTask3 = socketOutput.WriteAsync(buffer); // Drain the write queue while (completeQueue.TryDequeue(out var triggerNextCompleted)) @@ -635,8 +634,8 @@ public async Task WritesAreAggregated(long? maxResponseBufferSize) // Two calls to WriteAsync trigger uv_write once if both calls // are made before write is scheduled - var ignore = socketOutput.WriteAsync(buffer, CancellationToken.None); - ignore = socketOutput.WriteAsync(buffer, CancellationToken.None); + var ignore = socketOutput.WriteAsync(buffer); + ignore = socketOutput.WriteAsync(buffer); _mockLibuv.KestrelThreadBlocker.Set(); @@ -671,10 +670,9 @@ private OutputProducer CreateOutputProducer(PipeOptions pipeOptions, Cancellatio var frame = new Frame(null, new FrameContext { ServiceContext = serviceContext }); var socket = new MockSocket(_mockLibuv, _libuvThread.Loop.ThreadId, transportContext.Log); - var socketOutput = new OutputProducer(pipe.Writer, frame, "0", serviceContext.Log); + var outputProducer = new OutputProducer(pipe.Writer, "0", serviceContext.Log); var consumer = new LibuvOutputConsumer(pipe.Reader, _libuvThread, socket, "0", transportContext.Log); - - frame.LifetimeControl = new ConnectionLifetimeControl("0", pipe.Reader, socketOutput, serviceContext.Log); + frame.Output = outputProducer; if (cts != null) { @@ -683,7 +681,7 @@ private OutputProducer CreateOutputProducer(PipeOptions pipeOptions, Cancellatio var ignore = WriteOutputAsync(consumer, pipe.Reader, frame); - return socketOutput; + return outputProducer; } private async Task WriteOutputAsync(LibuvOutputConsumer consumer, IPipeReader outputReader, Frame frame) diff --git a/test/shared/MockSocketOutput.cs b/test/shared/MockSocketOutput.cs deleted file mode 100644 index 9089bca83d38..000000000000 --- a/test/shared/MockSocketOutput.cs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; -using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; -using Microsoft.Extensions.Internal; - -namespace Microsoft.AspNetCore.Testing -{ - public class MockSocketOutput : ISocketOutput - { - public MockSocketOutput() - { - } - - public void Write(ArraySegment buffer, bool chunk = false) - { - } - - public Task WriteAsync(ArraySegment buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken)) - { - return TaskCache.CompletedTask; - } - - public void Flush() - { - } - - public Task FlushAsync(CancellationToken cancellationToken = new CancellationToken()) - { - return TaskCache.CompletedTask; - } - - public void Write(Action write, T state) where T : struct - { - - } - } -}