Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Add Memory-based APIs to Sockets and NetworkStream #24431

Merged
merged 1 commit into from
Oct 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/System.Net.Sockets/ref/System.Net.Sockets.netcoreapp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,16 @@ public partial class Socket : System.IDisposable
public int Send(ReadOnlySpan<byte> buffer, System.Net.Sockets.SocketFlags socketFlags) { throw null; }
public int Send(ReadOnlySpan<byte> buffer, System.Net.Sockets.SocketFlags socketFlags, out System.Net.Sockets.SocketError errorCode) { throw null; }
}

public partial static class SocketTaskExtensions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we adding these as extension methods? Why not just add them as new regular methods (i.e. new APIs)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the API review I believe we'd elected to keep them next to where the existing Task-based methods are. I don't have a strong preference, though; @weshaggard, @terrajobst, @KrzysztofCwalina, opinions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we added them a long time ago as extension methods because we didn't have a good plan for adding new APIs to .NET Core vs. figuring out how to eventually add them to .NET Framework and then .NETStandard.

But I think at this point, we should be adding them as new APIs (not extension methods) just like we're doing for SslStream (ALPN, SNI). I.e. add them to netcore first, then figure out how/when to add them to .net framework so that eventually they can roll into a new netstandard.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a preference. Both approaches seem reasonable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried it, and realized it's a breaking change unless we also add the existing extension methods as instance methods. Otherwise, the compiler won't consider the extension methods as valid targets once it finds the instance methods of the same name, and due to signature differences, various uses can then fail to compile.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened https://github.com/dotnet/corefx/issues/24442 to track revisiting this in an API discussion. I'll go ahead with this PR to keep things unblocked.

{
public static System.Threading.Tasks.ValueTask<int> ReceiveAsync(this System.Net.Sockets.Socket socket, System.Memory<byte> buffer, System.Net.Sockets.SocketFlags socketFlags, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public static System.Threading.Tasks.ValueTask<int> SendAsync(this System.Net.Sockets.Socket socket, System.ReadOnlyMemory<byte> buffer, System.Net.Sockets.SocketFlags socketFlags, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}

public partial class SocketAsyncEventArgs : System.EventArgs, System.IDisposable
{
public System.Memory<byte> GetBuffer() { throw null; }
public void SetBuffer(System.Memory<byte> buffer) { throw null; }
}
}
1 change: 1 addition & 0 deletions src/System.Net.Sockets/src/System.Net.Sockets.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@
<Reference Include="System.Net.Primitives" />
<Reference Include="System.Resources.ResourceManager" />
<Reference Include="System.Runtime" />
<Reference Include="System.Runtime.CompilerServices.Unsafe" />
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Runtime.InteropServices" />
<Reference Include="System.Security.Claims" />
Expand Down
60 changes: 60 additions & 0 deletions src/System.Net.Sockets/src/System/Net/Sockets/NetworkStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,34 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int size, Cancell
}
}

public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
{
bool canRead = CanRead; // Prevent race with Dispose.
if (_cleanedUp)
{
throw new ObjectDisposedException(this.GetType().FullName);
}
if (!canRead)
{
throw new InvalidOperationException(SR.net_writeonlystream);
}

try
{
return _streamSocket.ReceiveAsync(
destination,
SocketFlags.None,
fromNetworkStream: true,
cancellationToken: cancellationToken);
}
catch (Exception exception) when (!(exception is OutOfMemoryException))
{
// Some sort of error occurred on the socket call,
// set the SocketException as InnerException and throw.
throw new IOException(SR.Format(SR.net_io_readfailure, exception.Message), exception);
}
}

// WriteAsync - provide async write functionality.
//
// This method provides async write functionality. All we do is
Expand Down Expand Up @@ -810,6 +838,38 @@ public override Task WriteAsync(byte[] buffer, int offset, int size, Cancellatio
}
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
bool canWrite = CanWrite; // Prevent race with Dispose.
if (_cleanedUp)
{
throw new ObjectDisposedException(this.GetType().FullName);
}
if (!canWrite)
{
throw new InvalidOperationException(SR.net_readonlystream);
}

try
{
ValueTask<int> t = _streamSocket.SendAsync(
source,
SocketFlags.None,
fromNetworkStream: true,
cancellationToken: cancellationToken);

return t.IsCompletedSuccessfully ?
Task.CompletedTask :
t.AsTask();
}
catch (Exception exception) when (!(exception is OutOfMemoryException))
{
// Some sort of error occurred on the socket call,
// set the SocketException as InnerException and throw.
throw new IOException(SR.Format(SR.net_io_writefailure, exception.Message), exception);
}
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
// Validate arguments as would the base CopyToAsync
Expand Down
187 changes: 169 additions & 18 deletions src/System.Net.Sockets/src/System/Net/Sockets/Socket.Tasks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
Expand Down Expand Up @@ -195,17 +196,72 @@ internal Task<int> ReceiveAsync(ArraySegment<byte> buffer, SocketFlags socketFla
}
}

internal ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
}

// TODO https://github.com/dotnet/corefx/issues/24430:
// Fully plumb cancellation down into socket operations.

Int32TaskSocketAsyncEventArgs saea = RentSocketAsyncEventArgs(isReceive: true);
if (saea != null)
{
// We got a cached instance. Configure the buffer and initate the operation.
ConfigureBuffer(saea, buffer, socketFlags, wrapExceptionsInIOExceptions: fromNetworkStream);
return GetValueTaskForSendReceive(ReceiveAsync(saea), saea, fromNetworkStream, isReceive: true);
}
else
{
// We couldn't get a cached instance, due to a concurrent receive operation on the socket.
// Fall back to wrapping APM.
return new ValueTask<int>(ReceiveAsyncApm(buffer, socketFlags));
}
}

/// <summary>Implements Task-returning ReceiveAsync on top of Begin/EndReceive.</summary>
private Task<int> ReceiveAsyncApm(ArraySegment<byte> buffer, SocketFlags socketFlags)
private Task<int> ReceiveAsyncApm(Memory<byte> buffer, SocketFlags socketFlags)
{
var tcs = new TaskCompletionSource<int>(this);
BeginReceive(buffer.Array, buffer.Offset, buffer.Count, socketFlags, iar =>
if (buffer.TryGetArray(out ArraySegment<byte> bufferArray))
{
var innerTcs = (TaskCompletionSource<int>)iar.AsyncState;
try { innerTcs.TrySetResult(((Socket)innerTcs.Task.AsyncState).EndReceive(iar)); }
catch (Exception e) { innerTcs.TrySetException(e); }
}, tcs);
return tcs.Task;
// We were able to extract the underlying byte[] from the Memory<byte>. Use it.
var tcs = new TaskCompletionSource<int>(this);
BeginReceive(bufferArray.Array, bufferArray.Offset, bufferArray.Count, socketFlags, iar =>
{
var innerTcs = (TaskCompletionSource<int>)iar.AsyncState;
try { innerTcs.TrySetResult(((Socket)innerTcs.Task.AsyncState).EndReceive(iar)); }
catch (Exception e) { innerTcs.TrySetException(e); }
}, tcs);
return tcs.Task;
}
else
{
// We weren't able to extract an underlying byte[] from the Memory<byte>.
// Instead read into an ArrayPool array, then copy from that into the memory.
byte[] poolArray = ArrayPool<byte>.Shared.Rent(buffer.Length);
var tcs = new TaskCompletionSource<int>(this);
BeginReceive(poolArray, 0, buffer.Length, socketFlags, iar =>
{
var state = (Tuple<TaskCompletionSource<int>, Memory<byte>, byte[]>)iar.AsyncState;
try
{
int bytesCopied = ((Socket)state.Item1.Task.AsyncState).EndReceive(iar);
new ReadOnlyMemory<byte>(state.Item3, 0, bytesCopied).Span.CopyTo(state.Item2.Span);
state.Item1.TrySetResult(bytesCopied);
}
catch (Exception e)
{
state.Item1.TrySetException(e);
}
finally
{
ArrayPool<byte>.Shared.Return(state.Item3);
}
}, Tuple.Create(tcs, buffer, poolArray));
return tcs.Task;
}
}

internal Task<int> ReceiveAsync(IList<ArraySegment<byte>> buffers, SocketFlags socketFlags)
Expand Down Expand Up @@ -304,17 +360,70 @@ internal Task<int> SendAsync(ArraySegment<byte> buffer, SocketFlags socketFlags,
}
}

internal ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
}

// TODO https://github.com/dotnet/corefx/issues/24430:
// Fully plumb cancellation down into socket operations.

Int32TaskSocketAsyncEventArgs saea = RentSocketAsyncEventArgs(isReceive: false);
if (saea != null)
{
// We got a cached instance. Configure the buffer and initate the operation.
ConfigureBuffer(saea, Unsafe.As<ReadOnlyMemory<byte>,Memory<byte>>(ref buffer), socketFlags, wrapExceptionsInIOExceptions: fromNetworkStream);
return GetValueTaskForSendReceive(SendAsync(saea), saea, fromNetworkStream, isReceive: false);
}
else
{
// We couldn't get a cached instance, due to a concurrent send operation on the socket.
// Fall back to wrapping APM.
return new ValueTask<int>(SendAsyncApm(buffer, socketFlags));
}
}

/// <summary>Implements Task-returning SendAsync on top of Begin/EndSend.</summary>
private Task<int> SendAsyncApm(ArraySegment<byte> buffer, SocketFlags socketFlags)
private Task<int> SendAsyncApm(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags)
{
var tcs = new TaskCompletionSource<int>(this);
BeginSend(buffer.Array, buffer.Offset, buffer.Count, socketFlags, iar =>
if (buffer.DangerousTryGetArray(out ArraySegment<byte> bufferArray))
{
var innerTcs = (TaskCompletionSource<int>)iar.AsyncState;
try { innerTcs.TrySetResult(((Socket)innerTcs.Task.AsyncState).EndSend(iar)); }
catch (Exception e) { innerTcs.TrySetException(e); }
}, tcs);
return tcs.Task;
var tcs = new TaskCompletionSource<int>(this);
BeginSend(bufferArray.Array, bufferArray.Offset, bufferArray.Count, socketFlags, iar =>
{
var innerTcs = (TaskCompletionSource<int>)iar.AsyncState;
try { innerTcs.TrySetResult(((Socket)innerTcs.Task.AsyncState).EndSend(iar)); }
catch (Exception e) { innerTcs.TrySetException(e); }
}, tcs);
return tcs.Task;
}
else
{
// We weren't able to extract an underlying byte[] from the Memory<byte>.
// Instead read into an ArrayPool array, then copy from that into the memory.
byte[] poolArray = ArrayPool<byte>.Shared.Rent(buffer.Length);
buffer.Span.CopyTo(poolArray);
var tcs = new TaskCompletionSource<int>(this);
BeginSend(poolArray, 0, buffer.Length, socketFlags, iar =>
{
var state = (Tuple<TaskCompletionSource<int>, byte[]>)iar.AsyncState;
try
{
state.Item1.TrySetResult(((Socket)state.Item1.Task.AsyncState).EndSend(iar));
}
catch (Exception e)
{
state.Item1.TrySetException(e);
}
finally
{
ArrayPool<byte>.Shared.Return(state.Item2);
}
}, Tuple.Create(tcs, poolArray));
return tcs.Task;
}
}

internal Task<int> SendAsync(IList<ArraySegment<byte>> buffers, SocketFlags socketFlags)
Expand Down Expand Up @@ -393,14 +502,14 @@ private static void ValidateBuffersList(IList<ArraySegment<byte>> buffers)
}

private static void ConfigureBuffer(
Int32TaskSocketAsyncEventArgs saea, ArraySegment<byte> buffer, SocketFlags socketFlags, bool wrapExceptionsInIOExceptions)
Int32TaskSocketAsyncEventArgs saea, Memory<byte> buffer, SocketFlags socketFlags, bool wrapExceptionsInIOExceptions)
{
// Configure the buffer. We don't clear the buffers when returning the SAEA to the pool,
// so as to minimize overhead if the same buffer is used for subsequent operations (which is likely).
// But SAEA doesn't support having both a buffer and a buffer list configured, so clear out a buffer list
// if there is one before we set the desired buffer.
if (saea.BufferList != null) saea.BufferList = null;
saea.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);
saea.SetBuffer(buffer);
saea.SocketFlags = socketFlags;
saea._wrapExceptionsInIOExceptions = wrapExceptionsInIOExceptions;
}
Expand Down Expand Up @@ -486,6 +595,48 @@ private Task<int> GetTaskForSendReceive(
return t;
}

/// <summary>Gets a value task to represent the operation.</summary>
/// <param name="pending">true if the operation completes asynchronously; false if it completed synchronously.</param>
/// <param name="saea">The event args instance used with the operation.</param>
/// <param name="fromNetworkStream">
/// true if the request is coming from NetworkStream, which has special semantics for
/// exceptions and cached tasks; otherwise, false.
/// </param>
/// <param name="isReceive">true if this is a receive; false if this is a send.</param>
private ValueTask<int> GetValueTaskForSendReceive(
bool pending, Int32TaskSocketAsyncEventArgs saea,
bool fromNetworkStream, bool isReceive)
{
ValueTask<int> t;

if (pending)
{
// The operation is completing asynchronously (it may have already completed).
// Get the task for the operation, with appropriate synchronization to coordinate
// with the async callback that'll be completing the task.
bool responsibleForReturningToPool;
t = new ValueTask<int>(saea.GetCompletionResponsibility(out responsibleForReturningToPool).Task);
if (responsibleForReturningToPool)
{
// We're responsible for returning it only if the callback has already been invoked
// and gotten what it needs from the SAEA; otherwise, the callback will return it.
ReturnSocketAsyncEventArgs(saea, isReceive);
}
}
else
{
// The operation completed synchronously. Return a ValueTask for it.
t = saea.SocketError == SocketError.Success ?
new ValueTask<int>(saea.BytesTransferred) :
new ValueTask<int>(Task.FromException<int>(GetException(saea.SocketError, wrapExceptionsInIOExceptions: fromNetworkStream)));

// There won't be a callback, and we're done with the SAEA, so return it to the pool.
ReturnSocketAsyncEventArgs(saea, isReceive);
}

return t;
}

/// <summary>Completes the SocketAsyncEventArg's Task with the result of the send or receive, and returns it to the specified pool.</summary>
private static void CompleteAccept(Socket s, TaskSocketAsyncEventArgs<Socket> saea)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ public byte[] Buffer
get { return _buffer; }
}

public Memory<byte> GetBuffer()
{
// TODO https://github.com/dotnet/corefx/issues/24429:
// Actually support Memory<byte> natively.
return _buffer != null ?
new Memory<byte>(_buffer, _offset, _count) :
Memory<byte>.Empty;
}

public int Offset
{
get { return _offset; }
Expand Down Expand Up @@ -283,6 +292,18 @@ public void SetBuffer(int offset, int count)
SetBufferInternal(_buffer, offset, count);
}

public void SetBuffer(Memory<byte> buffer)
{
if (!buffer.TryGetArray(out ArraySegment<byte> array))
{
// TODO https://github.com/dotnet/corefx/issues/24429:
// Actually support Memory<byte> natively.
throw new ArgumentException();
}

SetBuffer(array.Array, array.Offset, array.Count);
}

internal bool HasMultipleBuffers
{
get { return _bufferList != null; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace System.Net.Sockets
Expand All @@ -25,6 +26,8 @@ public static Task ConnectAsync(this Socket socket, string host, int port) =>

public static Task<int> ReceiveAsync(this Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags) =>
socket.ReceiveAsync(buffer, socketFlags, fromNetworkStream: false);
public static ValueTask<int> ReceiveAsync(this Socket socket, Memory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default) =>
socket.ReceiveAsync(buffer, socketFlags, fromNetworkStream: false, cancellationToken: cancellationToken);
public static Task<int> ReceiveAsync(this Socket socket, IList<ArraySegment<byte>> buffers, SocketFlags socketFlags) =>
socket.ReceiveAsync(buffers, socketFlags);
public static Task<SocketReceiveFromResult> ReceiveFromAsync(this Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags, EndPoint remoteEndPoint) =>
Expand All @@ -34,6 +37,8 @@ public static Task<SocketReceiveMessageFromResult> ReceiveMessageFromAsync(this

public static Task<int> SendAsync(this Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags) =>
socket.SendAsync(buffer, socketFlags, fromNetworkStream: false);
public static ValueTask<int> SendAsync(this Socket socket, ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default) =>
socket.SendAsync(buffer, socketFlags, fromNetworkStream: false, cancellationToken: cancellationToken);
public static Task<int> SendAsync(this Socket socket, IList<ArraySegment<byte>> buffers, SocketFlags socketFlags) =>
socket.SendAsync(buffers, socketFlags);
public static Task<int> SendToAsync(this Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags, EndPoint remoteEP) =>
Expand Down
Loading