Skip to content

Commit

Permalink
Refactoring and of FrameConnection and Frame (#1816)
Browse files Browse the repository at this point in the history
* Refactoring and of FrameConnection and Frame
- Building on top of the last refactoring of FrameConnection, this change aims to clean up
the communication between the Frame and FrameConnection by removing some concepts and
being consistent about the communication between Frame and FrameConnection with or without
connection adapters. Changes include:
- Removing ConnectionLifetimeControl, ISocketOutput, StreamSocketOutput
- Moving more initialization of the frame to FrameConnection after the pipes
are setup
- OutputProducer communicates cancellation via the IPipeWriter instead of the output's IPipeReader.
- Frame always communicates via the pipes and that communications flows through the layers to the transport.
This means that each 1/2 of the adapted pipeline handles closing the right side of the transport at the
right time, propagating exceptions as necessary.
- This is how the flow looks now:
            ->                        ->
[transport]     [connection adapters]     [frame]
            <-                        <-
- Transports need to handle a ConnectionAbortedException on the output as a signal to stop
writing and end the connection. This will no longer try to drain the output but will just stop
writing and end the response immediately.
- Remove frame.Abort when cancellation on Write fails.
- Unify the connection shutdown logic
- Dispose 1/2 initialized connection adapters

#1815
  • Loading branch information
davidfowl authored May 9, 2017
1 parent 6e2fdda commit c48113a
Show file tree
Hide file tree
Showing 25 changed files with 411 additions and 630 deletions.
3 changes: 1 addition & 2 deletions KestrelHttpServer.sln
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26405.0
VisualStudioVersion = 15.0.26403.7
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{7972A5D6-3385-4127-9277-428506DD44FF}"
ProjectSection(SolutionItems) = preProject
Expand All @@ -26,7 +26,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "shared", "shared", "{0EF2AC
test\shared\LifetimeNotImplemented.cs = test\shared\LifetimeNotImplemented.cs
test\shared\MockFrameControl.cs = test\shared\MockFrameControl.cs
test\shared\MockLogger.cs = test\shared\MockLogger.cs
test\shared\MockSocketOutput.cs = test\shared\MockSocketOutput.cs
test\shared\MockSystemClock.cs = test\shared\MockSystemClock.cs
test\shared\StringExtensions.cs = test\shared\StringExtensions.cs
test\shared\TestApp.cs = test\shared\TestApp.cs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,85 +7,157 @@
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.Extensions.Logging;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
public class AdaptedPipeline
{
private const int MinAllocBufferSize = 2048;

private readonly Stream _filteredStream;
private readonly StreamSocketOutput _output;
private readonly IKestrelTrace _trace;
private readonly IPipeWriter _transportOutputPipeWriter;
private readonly IPipeReader _transportInputPipeReader;

public AdaptedPipeline(
Stream filteredStream,
IPipe inputPipe,
IPipe outputPipe,
IKestrelTrace trace)
public AdaptedPipeline(IPipeReader transportInputPipeReader,
IPipeWriter transportOutputPipeWriter,
IPipe inputPipe,
IPipe outputPipe,
IKestrelTrace trace)
{
_transportInputPipeReader = transportInputPipeReader;
_transportOutputPipeWriter = transportOutputPipeWriter;
Input = inputPipe;
_output = new StreamSocketOutput(filteredStream, outputPipe);
_filteredStream = filteredStream;
Output = outputPipe;
_trace = trace;
}

public IPipe Input { get; }

public ISocketOutput Output => _output;
public IPipe Output { get; }

public async Task RunAsync()
public async Task RunAsync(Stream stream)
{
var inputTask = ReadInputAsync(stream);
var outputTask = WriteOutputAsync(stream);

await inputTask;
await outputTask;
}

private async Task WriteOutputAsync(Stream stream)
{
Exception error = null;

try
{
var inputTask = ReadInputAsync();
var outputTask = _output.WriteOutputAsync();
if (stream == null)
{
return;
}

await inputTask;
while (true)
{
var readResult = await Output.Reader.ReadAsync();
var buffer = readResult.Buffer;

_output.Dispose();
try
{
if (buffer.IsEmpty && readResult.IsCompleted)
{
break;
}

await outputTask;
if (buffer.IsEmpty)
{
await stream.FlushAsync();
}
else if (buffer.IsSingleSpan)
{
var array = buffer.First.GetArray();
await stream.WriteAsync(array.Array, array.Offset, array.Count);
}
else
{
foreach (var memory in buffer)
{
var array = memory.GetArray();
await stream.WriteAsync(array.Array, array.Offset, array.Count);
}
}
}
finally
{
Output.Reader.Advance(buffer.End);
}
}
}
catch (Exception ex)
{
// adaptedPipeline.RunAsync() shouldn't throw, unless filtered stream's WriteAsync throws.
_trace.LogError(0, ex, $"{nameof(AdaptedPipeline)}.{nameof(RunAsync)}");
error = ex;
}
finally
{
Output.Reader.Complete();
_transportOutputPipeWriter.Complete(error);
}
}

private async Task ReadInputAsync()
private async Task ReadInputAsync(Stream stream)
{
int bytesRead;
Exception error = null;

do
try
{
var block = Input.Writer.Alloc(MinAllocBufferSize);
if (stream == null)
{
// If the stream is null then we're going to abort the connection
throw new ConnectionAbortedException();
}

try
while (true)
{
var array = block.Buffer.GetArray();

var outputBuffer = Input.Writer.Alloc(MinAllocBufferSize);

var array = outputBuffer.Buffer.GetArray();
try
{
bytesRead = await _filteredStream.ReadAsync(array.Array, array.Offset, array.Count);
block.Advance(bytesRead);
var bytesRead = await stream.ReadAsync(array.Array, array.Offset, array.Count);
outputBuffer.Advance(bytesRead);

if (bytesRead == 0)
{
// FIN
break;
}
}
finally
{
await block.FlushAsync();
outputBuffer.Commit();
}
}
catch (Exception ex)
{
Input.Writer.Complete(ex);

// Don't rethrow the exception. It should be handled by the Pipeline consumer.
return;
}
} while (bytesRead != 0);
var result = await outputBuffer.FlushAsync();

Input.Writer.Complete();
if (result.IsCompleted)
{
break;
}

}
}
catch (Exception ex)
{
// Don't rethrow the exception. It should be handled by the Pipeline consumer.
error = ex;
}
finally
{
Input.Writer.Complete(error);
// The application could have ended the input pipe so complete
// the transport pipe as well
_transportInputPipeReader.Complete();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
public class RawStream : Stream
{
private readonly IPipeReader _input;
private readonly ISocketOutput _output;
private readonly IPipeWriter _output;

public RawStream(IPipeReader input, ISocketOutput output)
public RawStream(IPipeReader input, IPipeWriter output)
{
_input = input;
_output = output;
Expand Down Expand Up @@ -71,19 +70,10 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel

public override void Write(byte[] buffer, int offset, int count)
{
ArraySegment<byte> segment;
if (buffer != null)
{
segment = new ArraySegment<byte>(buffer, offset, count);
}
else
{
segment = default(ArraySegment<byte>);
}
_output.Write(segment);
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token)
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token)
{
ArraySegment<byte> segment;
if (buffer != null)
Expand All @@ -94,17 +84,19 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
{
segment = default(ArraySegment<byte>);
}
return _output.WriteAsync(segment, cancellationToken: token);
var output = _output.Alloc();
output.Write(segment);
await output.FlushAsync(token);
}

public override void Flush()
{
_output.Flush();
FlushAsync(CancellationToken.None).GetAwaiter().GetResult();
}

public override Task FlushAsync(CancellationToken cancellationToken)
{
return _output.FlushAsync(cancellationToken);
return WriteAsync(null, 0, 0, cancellationToken);
}

private async Task<int> ReadAsync(ArraySegment<byte> buffer)
Expand Down Expand Up @@ -209,11 +201,5 @@ private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken
}, tcs, cancellationToken);
return tcs.Task;
}

protected override void Dispose(bool disposing)
{
// _output is disposed by ConnectionLifetimeControl
_input.Complete();
}
}
}
}
Loading

0 comments on commit c48113a

Please sign in to comment.