Skip to content

Commit

Permalink
Use IValueTaskSource in PipeStream on Windows (#52695)
Browse files Browse the repository at this point in the history
* Use IValueTaskSource in PipeStream on Windows

* Revise implementation of IValueTaskSources

Better match implementation in RandomAccess

Co-authored-by: Stephen Toub <[email protected]>
  • Loading branch information
manandre and stephentoub authored Aug 12, 2021
1 parent e0f6071 commit e4b4666
Show file tree
Hide file tree
Showing 8 changed files with 449 additions and 509 deletions.
4 changes: 1 addition & 3 deletions src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,16 @@
<Compile Include="Microsoft\Win32\SafeHandles\SafePipeHandle.Windows.cs" />
<Compile Include="System\IO\Pipes\AnonymousPipeServerStreamAcl.cs" />
<Compile Include="System\IO\Pipes\AnonymousPipeServerStream.Windows.cs" />
<Compile Include="System\IO\Pipes\ConnectionCompletionSource.cs" />
<Compile Include="System\IO\Pipes\NamedPipeServerStreamAcl.cs" />
<Compile Include="System\IO\Pipes\NamedPipeClientStream.Windows.cs" />
<Compile Include="System\IO\Pipes\NamedPipeServerStream.Windows.cs" />
<Compile Include="System\IO\Pipes\PipeAccessRights.cs" />
<Compile Include="System\IO\Pipes\PipeAccessRule.cs" />
<Compile Include="System\IO\Pipes\PipeAuditRule.cs" />
<Compile Include="System\IO\Pipes\PipeCompletionSource.cs" />
<Compile Include="System\IO\Pipes\PipesAclExtensions.cs" />
<Compile Include="System\IO\Pipes\PipeSecurity.cs" />
<Compile Include="System\IO\Pipes\PipeStream.ValueTaskSource.cs" />
<Compile Include="System\IO\Pipes\PipeStream.Windows.cs" />
<Compile Include="System\IO\Pipes\ReadWriteCompletionSource.cs" />
</ItemGroup>
<!-- Windows : Win32 only -->
<ItemGroup Condition="'$(TargetsWindows)' == 'true'">
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Security.AccessControl;
using System.Security.Principal;
Expand All @@ -18,6 +17,8 @@ namespace System.IO.Pipes
/// </summary>
public sealed partial class NamedPipeServerStream : PipeStream
{
private ConnectionValueTaskSource? _reusableConnectionValueTaskSource; // reusable ConnectionValueTaskSource that is currently NOT being used

internal NamedPipeServerStream(
string pipeName,
PipeDirection direction,
Expand All @@ -41,6 +42,31 @@ internal NamedPipeServerStream(
Create(pipeName, direction, maxNumberOfServerInstances, transmissionMode, options, inBufferSize, outBufferSize, pipeSecurity, inheritability, additionalAccessRights);
}

protected override void Dispose(bool disposing)
{
try
{
Interlocked.Exchange(ref _reusableConnectionValueTaskSource, null)?.Dispose();
}
finally
{
base.Dispose(disposing);
}
}

internal override void TryToReuse(PipeValueTaskSource source)
{
base.TryToReuse(source);

if (source is ConnectionValueTaskSource connectionSource)
{
if (Interlocked.CompareExchange(ref _reusableConnectionValueTaskSource, connectionSource, null) is not null)
{
source._preallocatedOverlapped.Dispose();
}
}
}

private void Create(string pipeName, PipeDirection direction, int maxNumberOfServerInstances,
PipeTransmissionMode transmissionMode, PipeOptions options, int inBufferSize, int outBufferSize,
HandleInheritability inheritability)
Expand Down Expand Up @@ -140,7 +166,8 @@ public void WaitForConnection()

if (IsAsync)
{
WaitForConnectionCoreAsync(CancellationToken.None).GetAwaiter().GetResult();
ValueTask vt = WaitForConnectionCoreAsync(CancellationToken.None);
vt.AsTask().GetAwaiter().GetResult();
}
else
{
Expand Down Expand Up @@ -180,7 +207,7 @@ public Task WaitForConnectionAsync(CancellationToken cancellationToken)
this, cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
}

return WaitForConnectionCoreAsync(cancellationToken);
return WaitForConnectionCoreAsync(cancellationToken).AsTask();
}

public void Disconnect()
Expand Down Expand Up @@ -293,50 +320,52 @@ internal ExecuteHelper(PipeStreamImpersonationWorker userCode, SafePipeHandle? h
}

// Async version of WaitForConnection. See the comments above for more info.
private unsafe Task WaitForConnectionCoreAsync(CancellationToken cancellationToken)
private unsafe ValueTask WaitForConnectionCoreAsync(CancellationToken cancellationToken)
{
CheckConnectOperationsServerWithHandle();
Debug.Assert(IsAsync);

if (!IsAsync)
{
throw new InvalidOperationException(SR.InvalidOperation_PipeNotAsync);
}

var completionSource = new ConnectionCompletionSource(this);

if (!Interop.Kernel32.ConnectNamedPipe(InternalHandle!, completionSource.Overlapped))
ConnectionValueTaskSource? vts = Interlocked.Exchange(ref _reusableConnectionValueTaskSource, null) ?? new ConnectionValueTaskSource(this);
try
{
int errorCode = Marshal.GetLastPInvokeError();

switch (errorCode)
vts.PrepareForOperation();
if (!Interop.Kernel32.ConnectNamedPipe(InternalHandle!, vts._overlapped))
{
case Interop.Errors.ERROR_IO_PENDING:
break;

// If we are here then the pipe is already connected, or there was an error
// so we should unpin and free the overlapped.
case Interop.Errors.ERROR_PIPE_CONNECTED:
// IOCompletitionCallback will not be called because we completed synchronously.
completionSource.ReleaseResources();
if (State == PipeState.Connected)
{
throw new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected);
}
completionSource.SetCompletedSynchronously();

// We return a cached task instead of TaskCompletionSource's Task allowing the GC to collect it.
return Task.CompletedTask;

default:
completionSource.ReleaseResources();
throw Win32Marshal.GetExceptionForWin32Error(errorCode);
int errorCode = Marshal.GetLastPInvokeError();
switch (errorCode)
{
case Interop.Errors.ERROR_IO_PENDING:
// Common case: IO was initiated, completion will be handled by callback.
// Register for cancellation now that the operation has been initiated.
vts.RegisterForCancellation(cancellationToken);
break;

case Interop.Errors.ERROR_PIPE_CONNECTED:
// If we are here then the pipe is already connected.
// IOCompletitionCallback will not be called because we completed synchronously.
vts.Dispose();
if (State == PipeState.Connected)
{
return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected)));
}
State = PipeState.Connected;
return ValueTask.CompletedTask;

default:
vts.Dispose();
return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(Win32Marshal.GetExceptionForWin32Error(errorCode)));
}
}
}
catch
{
vts.Dispose();
throw;
}

// If we are here then connection is pending.
completionSource.RegisterForCancellation(cancellationToken);

return completionSource.Task;
// Completion handled by callback.
vts.FinishedScheduling();
return new ValueTask(vts, vts.Version);
}

private void CheckConnectOperationsServerWithHandle()
Expand Down
Loading

0 comments on commit e4b4666

Please sign in to comment.