Skip to content

Commit

Permalink
More FrameConnection refactoring (#1820)
Browse files Browse the repository at this point in the history
* More FrameConnection refactoring
- This change reverts the change to complete the writer with an
exception on abort because of the number of first chance exceptions
that get thrown.
- This change also moves connection logging into FrameConnection instead
of being split between the ConnectionHandler and FrameConnection.
- Fixed issues with LibuvOutputConsumerTests that leak WriteReq since
cancelled writes no longer end the connection.
  • Loading branch information
davidfowl authored May 10, 2017
1 parent c48113a commit c8b6a2b
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
Expand All @@ -16,17 +15,17 @@ public class AdaptedPipeline
private const int MinAllocBufferSize = 2048;

private readonly IKestrelTrace _trace;
private readonly IPipeWriter _transportOutputPipeWriter;
private readonly IPipe _transportOutputPipe;
private readonly IPipeReader _transportInputPipeReader;

public AdaptedPipeline(IPipeReader transportInputPipeReader,
IPipeWriter transportOutputPipeWriter,
IPipe transportOutputPipe,
IPipe inputPipe,
IPipe outputPipe,
IKestrelTrace trace)
{
_transportInputPipeReader = transportInputPipeReader;
_transportOutputPipeWriter = transportOutputPipeWriter;
_transportOutputPipe = transportOutputPipe;
Input = inputPipe;
Output = outputPipe;
_trace = trace;
Expand Down Expand Up @@ -58,18 +57,24 @@ private async Task WriteOutputAsync(Stream stream)

while (true)
{
var readResult = await Output.Reader.ReadAsync();
var buffer = readResult.Buffer;
var result = await Output.Reader.ReadAsync();
var buffer = result.Buffer;

try
{
if (buffer.IsEmpty && readResult.IsCompleted)
if (result.IsCancelled)
{
// Forward the cancellation to the transport pipe
_transportOutputPipe.Reader.CancelPendingRead();
break;
}

if (buffer.IsEmpty)
{
if (result.IsCompleted)
{
break;
}
await stream.FlushAsync();
}
else if (buffer.IsSingleSpan)
Expand Down Expand Up @@ -99,7 +104,7 @@ private async Task WriteOutputAsync(Stream stream)
finally
{
Output.Reader.Complete();
_transportOutputPipeWriter.Complete(error);
_transportOutputPipe.Writer.Complete(error);
}
}

Expand All @@ -111,8 +116,8 @@ private async Task ReadInputAsync(Stream stream)
{
if (stream == null)
{
// If the stream is null then we're going to abort the connection
throw new ConnectionAbortedException();
// REVIEW: Do we need an exception here?
return;
}

while (true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,13 @@ public IConnectionContext OnConnection(IConnectionInformation connectionInfo)
ConnectionId = connectionId,
FrameConnectionId = frameConnectionId,
ServiceContext = _serviceContext,
PipeFactory = connectionInfo.PipeFactory,
ConnectionInformation = connectionInfo,
ConnectionAdapters = _listenOptions.ConnectionAdapters,
Frame = frame,
Input = inputPipe,
Output = outputPipe,
});

_serviceContext.Log.ConnectionStart(connectionId);
KestrelEventSource.Log.ConnectionStart(connection, connectionInfo);

// Since data cannot be added to the inputPipe by the transport until OnConnection returns,
// Frame.ProcessRequestsAsync is guaranteed to unblock the transport thread before calling
// application code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public FrameConnection(FrameConnectionContext context)
public IPipeWriter Input => _context.Input.Writer;
public IPipeReader Output => _context.Output.Reader;

private PipeFactory PipeFactory => _context.PipeFactory;
private PipeFactory PipeFactory => _context.ConnectionInformation.PipeFactory;

// Internal for testing
internal PipeOptions AdaptedInputPipeOptions => new PipeOptions
Expand Down Expand Up @@ -70,21 +70,24 @@ private async Task ProcessRequestsAsync()
{
try
{
Log.ConnectionStart(ConnectionId);
KestrelEventSource.Log.ConnectionStart(this, _context.ConnectionInformation);

AdaptedPipeline adaptedPipeline = null;
var adaptedPipelineTask = Task.CompletedTask;
var input = _context.Input.Reader;
var output = _context.Output.Writer;
var output = _context.Output;

if (_connectionAdapters.Count > 0)
{
adaptedPipeline = new AdaptedPipeline(_context.Input.Reader,
_context.Output.Writer,
adaptedPipeline = new AdaptedPipeline(input,
output,
PipeFactory.Create(AdaptedInputPipeOptions),
PipeFactory.Create(AdaptedOutputPipeOptions),
Log);

input = adaptedPipeline.Input.Reader;
output = adaptedPipeline.Output.Writer;
output = adaptedPipeline.Output;
}

// Set these before the first await, this is to make sure that we don't yield control
Expand Down Expand Up @@ -114,6 +117,9 @@ private async Task ProcessRequestsAsync()
{
_context.ServiceContext.ConnectionManager.RemoveConnection(_context.FrameConnectionId);
DisposeAdaptedConnections();

Log.ConnectionStop(ConnectionId);
KestrelEventSource.Log.ConnectionStop(this);
}
}

Expand All @@ -122,8 +128,6 @@ public void OnConnectionClosed(Exception ex)
// Abort the connection (if not already aborted)
_frame.Abort(ex);

Log.ConnectionStop(ConnectionId);
KestrelEventSource.Log.ConnectionStop(this);
_socketClosedTcs.TrySetResult(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
Expand All @@ -13,8 +14,8 @@ public class FrameConnectionContext
public string ConnectionId { get; set; }
public long FrameConnectionId { get; set; }
public ServiceContext ServiceContext { get; set; }
public PipeFactory PipeFactory { get; set; }
public List<IConnectionAdapter> ConnectionAdapters { get; set; }
public IConnectionInformation ConnectionInformation { get; set; }
public Frame Frame { get; set; }

public IPipe Input { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;
using Microsoft.Extensions.Internal;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
Expand All @@ -21,10 +20,9 @@ public class OutputProducer : IDisposable
// This locks access to to all of the below fields
private readonly object _contextLock = new object();

private bool _cancelled = false;
private bool _completed = false;

private readonly IPipeWriter _pipe;
private readonly IPipe _pipe;

// https://github.com/dotnet/corefxlab/issues/1334
// Pipelines don't support multiple awaiters on flush
Expand All @@ -33,7 +31,7 @@ public class OutputProducer : IDisposable
private readonly object _flushLock = new object();
private Action _flushCompleted;

public OutputProducer(IPipeWriter pipe, string connectionId, IKestrelTrace log)
public OutputProducer(IPipe pipe, string connectionId, IKestrelTrace log)
{
_pipe = pipe;
_connectionId = connectionId;
Expand All @@ -50,13 +48,8 @@ public void Write(ArraySegment<byte> buffer, bool chunk = false)
{
if (cancellationToken.IsCancellationRequested)
{
_cancelled = true;
return Task.FromCanceled(cancellationToken);
}
else if (_cancelled)
{
return TaskCache.CompletedTask;
}

return WriteAsync(buffer, cancellationToken, chunk);
}
Expand All @@ -80,7 +73,7 @@ public void Write<T>(Action<WritableBuffer, T> callback, T state)
return;
}

var buffer = _pipe.Alloc(1);
var buffer = _pipe.Writer.Alloc(1);
callback(buffer, state);
buffer.Commit();
}
Expand All @@ -97,7 +90,7 @@ public void Dispose()

_log.ConnectionDisconnect(_connectionId);
_completed = true;
_pipe.Complete();
_pipe.Writer.Complete();
}
}

Expand All @@ -112,7 +105,7 @@ public void Abort()

_log.ConnectionDisconnect(_connectionId);
_completed = true;
_pipe.Complete(new ConnectionAbortedException());
_pipe.Reader.CancelPendingRead();
}
}

Expand All @@ -130,7 +123,7 @@ private Task WriteAsync(
return TaskCache.CompletedTask;
}

writableBuffer = _pipe.Alloc(1);
writableBuffer = _pipe.Writer.Alloc(1);
var writer = new WritableBufferWriter(writableBuffer);
if (buffer.Count > 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,50 +37,47 @@ public async Task WriteOutputAsync()

while (true)
{
var result = await _pipe.ReadAsync();
var buffer = result.Buffer;
var consumed = buffer.End;

try
{
var result = await _pipe.ReadAsync();
var buffer = result.Buffer;
var consumed = buffer.End;
if (result.IsCancelled)
{
break;
}

try
if (!buffer.IsEmpty)
{
if (!buffer.IsEmpty)
{
var writeReq = pool.Allocate();
var writeReq = pool.Allocate();

try
{
var writeResult = await writeReq.WriteAsync(_socket, buffer);
try
{
var writeResult = await writeReq.WriteAsync(_socket, buffer);

LogWriteInfo(writeResult.Status, writeResult.Error);
LogWriteInfo(writeResult.Status, writeResult.Error);

if (writeResult.Error != null)
{
consumed = buffer.Start;
throw writeResult.Error;
}
}
finally
if (writeResult.Error != null)
{
// Make sure we return the writeReq to the pool
pool.Return(writeReq);
consumed = buffer.Start;
throw writeResult.Error;
}
}

if (buffer.IsEmpty && result.IsCompleted)
finally
{
break;
// Make sure we return the writeReq to the pool
pool.Return(writeReq);
}
}
finally
else if (result.IsCompleted)
{
_pipe.Advance(consumed);
break;
}
}
catch (ConnectionAbortedException)
finally
{
break;
_pipe.Advance(consumed);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ private async Task DoSend()
var result = await _output.ReadAsync();
var buffer = result.Buffer;

if (result.IsCancelled)
{
break;
}

try
{
if (!buffer.IsEmpty)
Expand All @@ -189,15 +194,7 @@ private async Task DoSend()
}
}
}

if (result.IsCancelled)
{
// Send a FIN
_socket.Shutdown(SocketShutdown.Send);
break;
}

if (buffer.IsEmpty && result.IsCompleted)
else if (result.IsCompleted)
{
break;
}
Expand All @@ -207,6 +204,8 @@ private async Task DoSend()
_output.Advance(buffer.End);
}
}

_socket.Shutdown(SocketShutdown.Send);
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
{
Expand All @@ -216,10 +215,6 @@ private async Task DoSend()
{
error = null;
}
catch (ConnectionAbortedException)
{
error = null;
}
catch (IOException ex)
{
error = ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public FrameTests()
TimeoutControl = Mock.Of<ITimeoutControl>()
};

_frame.Output = new OutputProducer(output.Writer, "", Mock.Of<IKestrelTrace>());
_frame.Output = new OutputProducer(output, "", Mock.Of<IKestrelTrace>());

_frame.Reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private OutputProducer CreateOutputProducer(PipeOptions pipeOptions)
var pipe = _pipeFactory.Create(pipeOptions);
var serviceContext = new TestServiceContext();
var frame = new Frame<object>(null, new FrameContext { ServiceContext = serviceContext });
var socketOutput = new OutputProducer(pipe.Writer, "0", serviceContext.Log);
var socketOutput = new OutputProducer(pipe, "0", serviceContext.Log);

return socketOutput;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private TestFrame<object> MakeFrame()
Input = input.Reader,
};

frame.Output = new OutputProducer(output.Writer, "", null);
frame.Output = new OutputProducer(output, "", null);

frame.Reset();

Expand Down
Loading

0 comments on commit c8b6a2b

Please sign in to comment.