Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Use System.Buffers in CopyToAsync implementations
Browse files Browse the repository at this point in the history
The default CopyToAsync on Stream uses an 80K temporary buffer, making it a good candidate for buffer pooling.  But Stream is in mscorlib and can't rely on System.Buffers.  So this commit introduces an implementation that streams in mscorlib can use, just by overriding CopyToAsync and delegating.  It then uses that in DeflateStream, GZipStream, FileStream, PipeStream, and UnmanagedMemoryStream.
  • Loading branch information
stephentoub committed Feb 9, 2016
1 parent bd382cf commit 34ce38b
Show file tree
Hide file tree
Showing 23 changed files with 255 additions and 33 deletions.
70 changes: 70 additions & 0 deletions src/Common/src/System/IO/StreamHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO
{
/// <summary>Provides methods to help in the implementation of Stream-derived types.</summary>
internal static class StreamHelpers
{
/// <summary>
/// Provides an implementation usable as an override of Stream.CopyToAsync but that uses the shared
/// ArrayPool for the intermediate buffer rather than allocating a new buffer each time.
/// </summary>
/// <remarks>
/// If/when the base CopyToAsync implementation is changed to use a pooled buffer,
/// this will no longer be necessary.
/// </remarks>
public static Task ArrayPoolCopyToAsync(Stream source, Stream destination, int bufferSize, CancellationToken cancellationToken)
{
Debug.Assert(source != null);

if (destination == null)
{
throw new ArgumentNullException("destination");
}
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException("bufferSize", bufferSize, SR.ArgumentOutOfRange_NeedPosNum);
}

if (!source.CanRead)
{
throw source.CanWrite ?
(Exception)new NotSupportedException(SR.NotSupported_UnreadableStream) :
new ObjectDisposedException(null); // passing null as this is used as part of an instance Stream.CopyToAsync override
}

if (!destination.CanWrite)
{
throw destination.CanRead ?
(Exception)new NotSupportedException(SR.NotSupported_UnwritableStream) :
new ObjectDisposedException("destination");
}

return ArrayPoolCopyToAsyncInternal(source, destination, bufferSize, cancellationToken);
}

private static async Task ArrayPoolCopyToAsyncInternal(Stream source, Stream destination, int bufferSize, CancellationToken cancellationToken)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
try
{
int bytesRead;
while ((bytesRead = await source.ReadAsync(buffer, 0, bufferSize, cancellationToken).ConfigureAwait(false)) != 0)
{
await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer, clearArray: true); // TODO: When an overload is available, pass bufferSize so we only clear the used part of the array
}
}
}
}
13 changes: 8 additions & 5 deletions src/System.IO.Compression/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@
<data name="ArgumentOutOfRange_Enum" xml:space="preserve">
<value>Enum value was out of legal range.</value>
</data>
<data name="ArgumentOutOfRange_NeedPosNum" xml:space="preserve">
<value>Positive number required.</value>
</data>
<data name="CannotReadFromDeflateStream" xml:space="preserve">
<value>Reading from the compression stream is not supported.</value>
</data>
Expand All @@ -141,14 +144,14 @@
<data name="InvalidHuffmanData" xml:space="preserve">
<value>Failed to construct a huffman tree using the length array. The stream might be corrupted.</value>
</data>
<data name="NotReadableStream" xml:space="preserve">
<value>The base stream is not readable.</value>
</data>
<data name="NotSupported" xml:space="preserve">
<value>This operation is not supported.</value>
</data>
<data name="NotWriteableStream" xml:space="preserve">
<value>The base stream is not writeable.</value>
<data name="NotSupported_UnreadableStream" xml:space="preserve">
<value>Stream does not support reading.</value>
</data>
<data name="NotSupported_UnwritableStream" xml:space="preserve">
<value>Stream does not support writing.</value>
</data>
<data name="ObjectDisposed_StreamClosed" xml:space="preserve">
<value>Can not access a closed Stream.</value>
Expand Down
3 changes: 3 additions & 0 deletions src/System.IO.Compression/src/System.IO.Compression.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
<Compile Include="$(CommonPath)\System\IO\PathInternal.cs">
<Link>Common\System\IO\PathInternal.cs</Link>
</Compile>
<Compile Include="$(CommonPath)\System\IO\StreamHelpers.cs">
<Link>Common\System\IO\StreamHelpers.cs</Link>
</Compile>
</ItemGroup>
<!-- Files exclusive to Core -->
<ItemGroup Condition="'$(TargetGroup)' != 'net46'">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ internal void InitializeInflater(Stream stream, bool leaveOpen, int windowBits)
{
Debug.Assert(stream != null);
if (!stream.CanRead)
throw new ArgumentException(SR.NotReadableStream, "stream");
throw new ArgumentException(SR.NotSupported_UnreadableStream, "stream");

_inflater = new Inflater(windowBits);

Expand All @@ -107,7 +107,7 @@ internal void InitializeDeflater(Stream stream, bool leaveOpen, int windowBits,
{
Debug.Assert(stream != null);
if (!stream.CanWrite)
throw new ArgumentException(SR.NotWriteableStream, "stream");
throw new ArgumentException(SR.NotSupported_UnwritableStream, "stream");

_deflater = new Deflater(compressionLevel, windowBits);

Expand Down Expand Up @@ -333,6 +333,11 @@ private static void ThrowCannotWriteToDeflateStreamException()
throw new InvalidOperationException(SR.CannotWriteToDeflateStream);
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return StreamHelpers.ArrayPoolCopyToAsync(this, destination, bufferSize, cancellationToken);
}

public override Task<int> ReadAsync(Byte[] array, int offset, int count, CancellationToken cancellationToken)
{
EnsureDecompressionMode();
Expand Down Expand Up @@ -373,7 +378,7 @@ public override Task<int> ReadAsync(Byte[] array, int offset, int count, Cancell
readTask = _stream.ReadAsync(_buffer, 0, _buffer.Length, cancellationToken);
if (readTask == null)
{
throw new InvalidOperationException(SR.NotReadableStream);
throw new InvalidOperationException(SR.NotSupported_UnreadableStream);
}

return ReadAsyncCore(readTask, array, offset, count, cancellationToken);
Expand Down Expand Up @@ -422,7 +427,7 @@ private async Task<int> ReadAsyncCore(Task<int> readTask, byte[] array, int offs
readTask = _stream.ReadAsync(_buffer, 0, _buffer.Length, cancellationToken);
if (readTask == null)
{
throw new InvalidOperationException(SR.NotReadableStream);
throw new InvalidOperationException(SR.NotSupported_UnreadableStream);
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public Stream BaseStream
}
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return StreamHelpers.ArrayPoolCopyToAsync(this, destination, bufferSize, cancellationToken);
}

public override Task<int> ReadAsync(Byte[] array, int offset, int count, CancellationToken cancellationToken)
{
CheckDeflateStream();
Expand Down
17 changes: 17 additions & 0 deletions src/System.IO.Compression/tests/DeflateStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,23 @@ public void ReadWriteArgumentValidation()
}
}

[Fact]
public void CopyToAsyncArgumentValidation()
{
using (DeflateStream ds = new DeflateStream(new MemoryStream(), CompressionMode.Decompress))
{
Assert.Throws<ArgumentNullException>("destination", () => { ds.CopyToAsync(null); });
Assert.Throws<ArgumentOutOfRangeException>("bufferSize", () => { ds.CopyToAsync(new MemoryStream(), 0); });
Assert.Throws<NotSupportedException>(() => { ds.CopyToAsync(new MemoryStream(new byte[1], writable: false)); });
ds.Dispose();
Assert.Throws<ObjectDisposedException>(() => { ds.CopyToAsync(new MemoryStream()); });
}
using (DeflateStream ds = new DeflateStream(new MemoryStream(), CompressionMode.Compress))
{
Assert.Throws<NotSupportedException>(() => { ds.CopyToAsync(new MemoryStream()); });
}
}

[Fact]
public void Precancellation()
{
Expand Down
32 changes: 18 additions & 14 deletions src/System.IO.Compression/tests/GZipStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,7 @@ private static async Task DecompressAsync(MemoryStream compareStream, MemoryStre
var zip = new GZipStream(gzStream, CompressionMode.Decompress);

var GZipStream = new MemoryStream();

int _bufferSize = 1024;
var bytes = new Byte[_bufferSize];
bool finished = false;
int retCount;
while (!finished)
{
retCount = await zip.ReadAsync(bytes, 0, _bufferSize);

if (retCount != 0)
await GZipStream.WriteAsync(bytes, 0, retCount);
else
finished = true;
}
await zip.CopyToAsync(GZipStream);

GZipStream.Position = 0;
compareStream.Position = 0;
Expand Down Expand Up @@ -225,6 +212,23 @@ public void WriteOnlyStreamThrowsOnDecompress()
});
}

[Fact]
public void CopyToAsyncArgumentValidation()
{
using (GZipStream gs = new GZipStream(new MemoryStream(), CompressionMode.Decompress))
{
Assert.Throws<ArgumentNullException>("destination", () => { gs.CopyToAsync(null); });
Assert.Throws<ArgumentOutOfRangeException>("bufferSize", () => { gs.CopyToAsync(new MemoryStream(), 0); });
Assert.Throws<NotSupportedException>(() => { gs.CopyToAsync(new MemoryStream(new byte[1], writable: false)); });
gs.Dispose();
Assert.Throws<ObjectDisposedException>(() => { gs.CopyToAsync(new MemoryStream()); });
}
using (GZipStream gs = new GZipStream(new MemoryStream(), CompressionMode.Compress))
{
Assert.Throws<NotSupportedException>(() => { gs.CopyToAsync(new MemoryStream()); });
}
}

[Fact]
public void TestCtors()
{
Expand Down
3 changes: 3 additions & 0 deletions src/System.IO.FileSystem/src/System.IO.FileSystem.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
<Compile Include="$(CommonPath)\System\Collections\Generic\EnumerableHelpers.cs">
<Link>Common\System\Collections\Generic\EnumerableHelpers.cs</Link>
</Compile>
<Compile Include="$(CommonPath)\System\IO\StreamHelpers.cs">
<Link>Common\System\IO\StreamHelpers.cs</Link>
</Compile>
<Compile Include="$(CommonPath)\System\IO\StringBuilderCache.cs">
<Link>Common\System\IO\StringBuilderCache.cs</Link>
</Compile>
Expand Down
13 changes: 13 additions & 0 deletions src/System.IO.FileSystem/src/System/IO/UnixFileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,19 @@ private unsafe int ReadNative(byte[] array, int offset, int count)
return bytesRead;
}

/// <summary>
/// Asynchronously reads the bytes from the current stream and writes them to another
/// stream, using a specified buffer size.
/// </summary>
/// <param name="destination">The stream to which the contents of the current stream will be copied.</param>
/// <param name="bufferSize">The size, in bytes, of the buffer. This value must be greater than zero.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous copy operation.</returns>
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return StreamHelpers.ArrayPoolCopyToAsync(this, destination, bufferSize, cancellationToken);
}

/// <summary>
/// Asynchronously reads a sequence of bytes from the current stream and advances
/// the position within the stream by the number of bytes read.
Expand Down
7 changes: 6 additions & 1 deletion src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ private void AllocateBuffer()
Debug.Assert(_buffer == null);
Debug.Assert(_preallocatedOverlapped == null);

_buffer = new byte[_bufferSize]; // TODO: Issue #5598: Use ArrayPool.
_buffer = new byte[_bufferSize];
if (_isAsync)
{
_preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, _buffer);
Expand Down Expand Up @@ -1689,6 +1689,11 @@ private int GetLastWin32ErrorAndDisposeHandleIfInvalid(bool throwIfInvalidHandle
return errorCode;
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return StreamHelpers.ArrayPoolCopyToAsync(this, destination, bufferSize, cancellationToken);
}

[System.Security.SecuritySafeCritical]
public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
Expand Down
1 change: 1 addition & 0 deletions src/System.IO.FileSystem/src/project.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"dependencies": {
"System.Buffers": "4.0.0-rc3-23808",
"System.Collections": "4.0.10",
"System.Diagnostics.Contracts": "4.0.0",
"System.Diagnostics.Debug": "4.0.10",
Expand Down
17 changes: 17 additions & 0 deletions src/System.IO.FileSystem/tests/FileStream/WriteAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,23 @@ public Task CopyToAsyncBetweenFileStreams()
numWrites: 10);
}

[Fact]
public void CopyToAsync_InvalidArgs_Throws()
{
using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create))
{
Assert.Throws<ArgumentNullException>("destination", () => { fs.CopyToAsync(null); });
Assert.Throws<ArgumentOutOfRangeException>("bufferSize", () => { fs.CopyToAsync(new MemoryStream(), 0); });
Assert.Throws<NotSupportedException>(() => { fs.CopyToAsync(new MemoryStream(new byte[1], writable: false)); });
fs.Dispose();
Assert.Throws<ObjectDisposedException>(() => { fs.CopyToAsync(new MemoryStream()); });
}
using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create, FileAccess.Write))
{
Assert.Throws<NotSupportedException>(() => { fs.CopyToAsync(new MemoryStream()); });
}
}

[Theory]
[MemberData("MemberData_FileStreamAsyncWriting")]
[OuterLoop] // many combinations: we test just one in inner loop and the rest outer
Expand Down
3 changes: 3 additions & 0 deletions src/System.IO.Pipes/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@
<data name="ArgumentOutOfRange_MaxNumServerInstances" xml:space="preserve">
<value>maxNumberOfServerInstances must either be a value between 1 and 254, or NamedPipeServerStream.MaxAllowedServerInstances (to obtain the maximum number allowed by system resources).</value>
</data>
<data name="ArgumentOutOfRange_NeedPosNum" xml:space="preserve">
<value>Positive number required.</value>
</data>
<data name="InvalidOperation_PipeNotYetConnected" xml:space="preserve">
<value>Pipe hasn't been connected yet.</value>
</data>
Expand Down
3 changes: 3 additions & 0 deletions src/System.IO.Pipes/src/System.IO.Pipes.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
<Compile Include="System\IO\Pipes\NamedPipeServerStream.cs" />
<Compile Include="System\IO\Pipes\PipeState.cs" />
<Compile Include="System\IO\Pipes\PipeStream.cs" />
<Compile Include="$(CommonPath)\System\IO\StreamHelpers.cs">
<Link>Common\System\IO\StreamHelpers.cs</Link>
</Compile>
</ItemGroup>
<ItemGroup Condition=" '$(TargetsWindows)' == 'true' ">
<Compile Include="$(CommonPath)\Interop\Windows\Interop.Libraries.cs">
Expand Down
10 changes: 4 additions & 6 deletions src/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,18 +373,16 @@ private unsafe int ReadCoreWithCancellation(byte[] buffer, int offset, int count

if (signaledFdCount != 0)
{
// Our pipe is ready. Break out of the loop to read from it.
Debug.Assert((events[0].TriggeredEvents & Interop.Sys.PollEvents.POLLIN) != 0, "Expected revents on read fd to have POLLIN set");
// Our pipe is ready. Break out of the loop to read from it. The fd may have been signaled due to
// POLLIN (data available), POLLHUP (hang-up), POLLERR (some error on the stream), etc... any such
// data will be propagated to us when we do the actual read.
break;
}
}

// Read it.
Debug.Assert((events[0].TriggeredEvents & Interop.Sys.PollEvents.POLLIN) != 0);
int result = CheckPipeCall(Interop.Sys.Read(_handle, bufPtr + offset, count));
Debug.Assert(result <= count);

Debug.Assert(result >= 0);
Debug.Assert(result >= 0 && result <= count, "Expected 0 <= result <= count bytes, got " + result);

// return what we read.
return result;
Expand Down
5 changes: 5 additions & 0 deletions src/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ internal void InitializeHandle(SafePipeHandle handle, bool isExposed, bool isAsy
_isFromExistingHandle = isExposed;
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return StreamHelpers.ArrayPoolCopyToAsync(this, destination, bufferSize, cancellationToken);
}

[SecurityCritical]
public override int Read([In, Out] byte[] buffer, int offset, int count)
{
Expand Down
1 change: 1 addition & 0 deletions src/System.IO.Pipes/src/project.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"dependencies": {
"System.Buffers": "4.0.0-rc3-23808",
"System.Diagnostics.Contracts": "4.0.0",
"System.Diagnostics.Debug": "4.0.10",
"System.Diagnostics.Tools": "4.0.0",
Expand Down
Loading

0 comments on commit 34ce38b

Please sign in to comment.