From 2a01ceb2d004a125757a4bb95a9341cc283c5afd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emmanuel=20Andr=C3=A9?= <2341261+manandre@users.noreply.github.com> Date: Fri, 24 Jun 2022 16:11:52 +0200 Subject: [PATCH] Add IAsyncEnumerable support to TransformManyBlock (#49264) * Add IAsyncEnumerable support to TransformManyBlock * Update IAsyncEnumerable implementation and tests Co-authored-by: Stephen Toub --- ...tem.Threading.Tasks.Dataflow.netcoreapp.cs | 6 + .../TransformManyBlock.IAsyncEnumerable.cs | 270 +++++++ .../src/Blocks/TransformManyBlock.cs | 87 +-- .../src/Internal/ReorderingBuffer.cs | 14 + .../System.Threading.Tasks.Dataflow.csproj | 4 +- .../DataflowTestHelper.IAsyncEnumerable.cs | 43 ++ .../tests/Dataflow/DataflowTestHelper.cs | 2 +- ...ransformManyBlockTests.IAsyncEnumerable.cs | 708 ++++++++++++++++++ .../tests/Dataflow/TransformManyBlockTests.cs | 2 +- ...stem.Threading.Tasks.Dataflow.Tests.csproj | 4 +- 10 files changed, 1082 insertions(+), 58 deletions(-) create mode 100644 src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.IAsyncEnumerable.cs create mode 100644 src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.IAsyncEnumerable.cs create mode 100644 src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.IAsyncEnumerable.cs diff --git a/src/libraries/System.Threading.Tasks.Dataflow/ref/System.Threading.Tasks.Dataflow.netcoreapp.cs b/src/libraries/System.Threading.Tasks.Dataflow/ref/System.Threading.Tasks.Dataflow.netcoreapp.cs index ed789646e93f44..8dcdd9ef11af4a 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/ref/System.Threading.Tasks.Dataflow.netcoreapp.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/ref/System.Threading.Tasks.Dataflow.netcoreapp.cs @@ -10,4 +10,10 @@ public static partial class DataflowBlock { public static System.Collections.Generic.IAsyncEnumerable ReceiveAllAsync(this System.Threading.Tasks.Dataflow.IReceivableSourceBlock source, System.Threading.CancellationToken cancellationToken = default) { throw null; } } + + public sealed partial class TransformManyBlock : System.Threading.Tasks.Dataflow.IDataflowBlock, System.Threading.Tasks.Dataflow.IPropagatorBlock, System.Threading.Tasks.Dataflow.IReceivableSourceBlock, System.Threading.Tasks.Dataflow.ISourceBlock, System.Threading.Tasks.Dataflow.ITargetBlock + { + public TransformManyBlock(System.Func> transform) { } + public TransformManyBlock(System.Func> transform, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions dataflowBlockOptions) { } + } } diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.IAsyncEnumerable.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.IAsyncEnumerable.cs new file mode 100644 index 00000000000000..a60ae989eca20d --- /dev/null +++ b/src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.IAsyncEnumerable.cs @@ -0,0 +1,270 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks.Dataflow.Internal; + +namespace System.Threading.Tasks.Dataflow +{ + public partial class TransformManyBlock + { + /// Initializes the with the specified function. + /// + /// The function to invoke with each data element received. All of the data from the returned + /// will be made available as output from this . + /// + /// The is . + public TransformManyBlock(Func> transform) : + this(transform, ExecutionDataflowBlockOptions.Default) + { + } + + /// Initializes the with the specified function and . + /// + /// The function to invoke with each data element received. All of the data from the returned + /// will be made available as output from this . + /// + /// The options with which to configure this . + /// The or is . + public TransformManyBlock(Func> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) + { + if (transform is null) + { + throw new ArgumentNullException(nameof(transform)); + } + + Initialize(messageWithId => + { + Task t = ProcessMessageAsync(transform, messageWithId); +#if DEBUG + // Task returned from ProcessMessageAsync is explicitly ignored. + // That function handles all exceptions. + t.ContinueWith(t => Debug.Assert(t.IsCompletedSuccessfully), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); +#endif + }, dataflowBlockOptions, ref _source, ref _target, ref _reorderingBuffer, TargetCoreOptions.UsesAsyncCompletion); + } + + // Note: + // Enumerating the IAsyncEnumerable is done with ConfigureAwait(true), using the default behavior of + // paying attention to the current context/scheduler. This makes it so that the enumerable code runs on the target scheduler. + // For this to work correctly, there can't be any ConfigureAwait(false) in the same method prior to + // these await foreach loops, nor in the call chain prior to the method invocation. + + /// Processes the message with a user-provided transform function that returns an async enumerable. + /// The transform function to use to process the message. + /// The message to be processed. + private async Task ProcessMessageAsync(Func> transformFunction, KeyValuePair messageWithId) + { + try + { + // Run the user transform and store the results. + IAsyncEnumerable outputItems = transformFunction(messageWithId.Key); + await StoreOutputItemsAsync(messageWithId, outputItems).ConfigureAwait(false); + } + catch (Exception exc) + { + // Enumerating the user's collection failed. If this exception represents cancellation, + // swallow it rather than shutting down the block. + if (!Common.IsCooperativeCancellation(exc)) + { + // The exception was not for cancellation. We must add the exception before declining + // and signaling completion, as the exception is part of the operation, and the completion + // conditions depend on this. + Common.StoreDataflowMessageValueIntoExceptionData(exc, messageWithId.Key); + _target.Complete(exc, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false); + } + } + finally + { + // Let the target know that one of the asynchronous operations it launched has completed. + _target.SignalOneAsyncMessageCompleted(); + } + } + + /// + /// Stores the output items, either into the reordering buffer or into the source half. + /// Ensures that the bounding count is correctly updated. + /// + /// The message with id. + /// The output items to be persisted. + private async Task StoreOutputItemsAsync( + KeyValuePair messageWithId, IAsyncEnumerable? outputItems) + { + // If there's a reordering buffer, pass the data along to it. + // The reordering buffer will handle all details, including bounding. + if (_reorderingBuffer is not null) + { + await StoreOutputItemsReorderedAsync(messageWithId.Value, outputItems).ConfigureAwait(false); + } + // Otherwise, output the data directly. + else if (outputItems is not null) + { + await StoreOutputItemsNonReorderedWithIterationAsync(outputItems).ConfigureAwait(false); + } + else if (_target.IsBounded) + { + // outputItems is null and there's no reordering buffer + // and we're bounding, so decrement the bounding count to + // signify that the input element we already accounted for + // produced no output + _target.ChangeBoundingCount(count: -1); + } + // else there's no reordering buffer, there are no output items, and we're not bounded, + // so there's nothing more to be done. + } + + /// Stores the next item using the reordering buffer. + /// The ID of the item. + /// The async enumerable. + private async Task StoreOutputItemsReorderedAsync(long id, IAsyncEnumerable? item) + { + Debug.Assert(_reorderingBuffer is not null, "Expected a reordering buffer"); + Debug.Assert(id != Common.INVALID_REORDERING_ID, "This ID should never have been handed out."); + + // Grab info about the transform + TargetCore target = _target; + bool isBounded = target.IsBounded; + + // Handle invalid items (null enumerables) by delegating to the base + if (item is null) + { + _reorderingBuffer.AddItem(id, null, false); + if (isBounded) + { + target.ChangeBoundingCount(count: -1); + } + return; + } + + // By this point, either we're not the next item, in which case we need to make a copy of the + // data and store it, or we are the next item and can store it immediately but we need to enumerate + // the items and store them individually because we don't want to enumerate while holding a lock. + List? itemCopy = null; + try + { + // If this is the next item, we can output it now. + if (_reorderingBuffer.IsNext(id)) + { + await StoreOutputItemsNonReorderedWithIterationAsync(item).ConfigureAwait(false); + // here itemCopy remains null, so that base.AddItem will finish our interactions with the reordering buffer + } + else + { + // We're not the next item, and we're not trusted, so copy the data into a list. + // We need to enumerate outside of the lock in the base class. + int itemCount = 0; + try + { + itemCopy = new List(); + await foreach (TOutput element in item.ConfigureAwait(true)) + { + itemCopy.Add(element); + } + itemCount = itemCopy.Count; + } + finally + { + // If we're here successfully, then itemCount is the number of output items + // we actually received, and we should update the bounding count with it. + // If we're here because ToList threw an exception, then itemCount will be 0, + // and we still need to update the bounding count with this in order to counteract + // the increased bounding count for the corresponding input. + if (isBounded) + { + UpdateBoundingCountWithOutputCount(count: itemCount); + } + } + } + // else if the item isn't valid, the finally block will see itemCopy as null and output invalid + } + finally + { + // Tell the base reordering buffer that we're done. If we already output + // all of the data, itemCopy will be null, and we just pass down the invalid item. + // If we haven't, pass down the real thing. We do this even in the case of an exception, + // in which case this will be a dummy element. + _reorderingBuffer.AddItem(id, itemCopy, itemIsValid: itemCopy is not null); + } + } + + /// + /// Stores the untrusted async enumerable into the source core. + /// This method does not go through the reordering buffer. + /// + /// The untrusted enumerable. + private async Task StoreOutputItemsNonReorderedWithIterationAsync(IAsyncEnumerable outputItems) + { + // The _source we're adding to isn't thread-safe, so we need to determine + // whether we need to lock. If the block is configured with a max degree + // of parallelism of 1, then only one transform can run at a time, and so + // we don't need to lock. Similarly, if there's a reordering buffer, then + // it guarantees that we're invoked serially, and we don't need to lock. + bool isSerial = + _target.DataflowBlockOptions.MaxDegreeOfParallelism == 1 || + _reorderingBuffer is not null; + + // If we're bounding, we need to increment the bounded count + // for each individual item as we enumerate it. + if (_target.IsBounded) + { + // When the input item that generated this + // output was loaded, we incremented the bounding count. If it only + // output a single a item, then we don't need to touch the bounding count. + // Otherwise, we need to adjust the bounding count accordingly. + bool outputFirstItem = false; + try + { + await foreach (TOutput item in outputItems.ConfigureAwait(true)) + { + if (outputFirstItem) + { + _target.ChangeBoundingCount(count: 1); + } + outputFirstItem = true; + + if (isSerial) + { + _source.AddMessage(item); + } + else + { + lock (ParallelSourceLock) // don't hold lock while enumerating + { + _source.AddMessage(item); + } + } + } + } + finally + { + if (!outputFirstItem) + { + _target.ChangeBoundingCount(count: -1); + } + } + } + // If we're not bounding, just output each individual item. + else + { + if (isSerial) + { + await foreach (TOutput item in outputItems.ConfigureAwait(true)) + { + _source.AddMessage(item); + } + } + else + { + await foreach (TOutput item in outputItems.ConfigureAwait(true)) + { + lock (ParallelSourceLock) // don't hold lock while enumerating + { + _source.AddMessage(item); + } + } + } + } + } + } +} diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.cs index a2cfbef8a31044..85a980d1d0bd08 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.cs @@ -24,7 +24,7 @@ namespace System.Threading.Tasks.Dataflow /// Specifies the type of data output by this . [DebuggerDisplay("{DebuggerDisplayContent,nq}")] [DebuggerTypeProxy(typeof(TransformManyBlock<,>.DebugView))] - public sealed class TransformManyBlock : IPropagatorBlock, IReceivableSourceBlock, IDebuggerDisplay + public sealed partial class TransformManyBlock : IPropagatorBlock, IReceivableSourceBlock, IDebuggerDisplay { /// The target side. private readonly TargetCore _target; @@ -53,8 +53,9 @@ public sealed class TransformManyBlock : IPropagatorBlock /// The is null (Nothing in Visual Basic). public TransformManyBlock(Func> transform) : - this(transform, null, ExecutionDataflowBlockOptions.Default) - { } + this(transform, ExecutionDataflowBlockOptions.Default) + { + } /// Initializes the with the specified function and . /// @@ -64,9 +65,11 @@ public TransformManyBlock(Func> transform) : /// The options with which to configure this . /// The is null (Nothing in Visual Basic). /// The is null (Nothing in Visual Basic). - public TransformManyBlock(Func> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) : - this(transform, null, dataflowBlockOptions) - { } + public TransformManyBlock(Func> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) + { + if (transform == null) throw new ArgumentNullException(nameof(transform)); + Initialize(messageWithId => ProcessMessage(transform, messageWithId), dataflowBlockOptions, ref _source, ref _target, ref _reorderingBuffer, TargetCoreOptions.None); + } /// Initializes the with the specified function. /// @@ -75,7 +78,7 @@ public TransformManyBlock(Func> transform, Executio /// /// The is null (Nothing in Visual Basic). public TransformManyBlock(Func>> transform) : - this(null, transform, ExecutionDataflowBlockOptions.Default) + this(transform, ExecutionDataflowBlockOptions.Default) { } /// Initializes the with the specified function and . @@ -86,34 +89,34 @@ public TransformManyBlock(Func>> transform) : /// The options with which to configure this . /// The is null (Nothing in Visual Basic). /// The is null (Nothing in Visual Basic). - public TransformManyBlock(Func>> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) : - this(null, transform, dataflowBlockOptions) - { } + public TransformManyBlock(Func>> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) + { + if (transform == null) throw new ArgumentNullException(nameof(transform)); + Initialize(messageWithId => ProcessMessageWithTask(transform, messageWithId), dataflowBlockOptions, ref _source, ref _target, ref _reorderingBuffer, TargetCoreOptions.UsesAsyncCompletion); + } - /// Initializes the with the specified function and . - /// The synchronous function to invoke with each data element received. - /// The asynchronous function to invoke with each data element received. - /// The options with which to configure this . - /// The and are both null (Nothing in Visual Basic). - /// The is null (Nothing in Visual Basic). - private TransformManyBlock(Func>? transformSync, Func>>? transformAsync, ExecutionDataflowBlockOptions dataflowBlockOptions) + private void Initialize( + Action> processMessageAction, + ExecutionDataflowBlockOptions dataflowBlockOptions, + [NotNull] ref SourceCore? source, + [NotNull] ref TargetCore? target, + ref ReorderingBuffer>? reorderingBuffer, + TargetCoreOptions targetCoreOptions) { - // Validate arguments. It's ok for the filterFunction to be null, but not the other parameters. - if (transformSync == null && transformAsync == null) throw new ArgumentNullException("transform"); if (dataflowBlockOptions == null) throw new ArgumentNullException(nameof(dataflowBlockOptions)); - Debug.Assert(transformSync == null ^ transformAsync == null, "Exactly one of transformSync and transformAsync must be null."); - // Ensure we have options that can't be changed by the caller dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone(); // Initialize onItemsRemoved delegate if necessary Action, int>? onItemsRemoved = null; if (dataflowBlockOptions.BoundedCapacity > 0) + { onItemsRemoved = (owningSource, count) => ((TransformManyBlock)owningSource)._target.ChangeBoundingCount(-count); + } // Initialize source component - _source = new SourceCore(this, dataflowBlockOptions, + source = new SourceCore(this, dataflowBlockOptions, owningSource => ((TransformManyBlock)owningSource)._target.Complete(exception: null, dropPendingMessages: true), onItemsRemoved); @@ -121,49 +124,30 @@ private TransformManyBlock(Func>? transformSync, Fu // However, a developer can override this with EnsureOrdered == false. if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered) { - _reorderingBuffer = new ReorderingBuffer>( + reorderingBuffer = new ReorderingBuffer>( this, (source, messages) => ((TransformManyBlock)source)._source.AddMessages(messages)); } // Create the underlying target and source - if (transformSync != null) // sync - { - // If an enumerable function was provided, we can use synchronous completion, meaning - // that the target will consider a message fully processed as soon as the - // delegate returns. - _target = new TargetCore(this, - messageWithId => ProcessMessage(transformSync, messageWithId), - _reorderingBuffer, dataflowBlockOptions, TargetCoreOptions.None); - } - else // async - { - Debug.Assert(transformAsync != null, "Incorrect delegate type."); - - // If a task-based function was provided, we need to use asynchronous completion, meaning - // that the target won't consider a message completed until the task - // returned from that delegate has completed. - _target = new TargetCore(this, - messageWithId => ProcessMessageWithTask(transformAsync, messageWithId), - _reorderingBuffer, dataflowBlockOptions, TargetCoreOptions.UsesAsyncCompletion); - } + target = new TargetCore(this, processMessageAction, _reorderingBuffer, dataflowBlockOptions, targetCoreOptions); // Link up the target half with the source half. In doing so, // ensure exceptions are propagated, and let the source know no more messages will arrive. // As the target has completed, and as the target synchronously pushes work // through the reordering buffer when async processing completes, // we know for certain that no more messages will need to be sent to the source. - _target.Completion.ContinueWith((completed, state) => + target.Completion.ContinueWith((completed, state) => { var sourceCore = (SourceCore)state!; if (completed.IsFaulted) sourceCore.AddAndUnwrapAggregateException(completed.Exception!); sourceCore.Complete(); - }, _source, CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default); + }, source, CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default); // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception. // In those cases we need to fault the target half to drop its buffered messages and to release its // reservations. This should not create an infinite loop, because all our implementations are designed // to handle multiple completion requests and to carry over only one. - _source.Completion.ContinueWith((completed, state) => + source.Completion.ContinueWith((completed, state) => { var thisBlock = ((TransformManyBlock)state!) as IDataflowBlock; Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion."); @@ -172,7 +156,7 @@ private TransformManyBlock(Func>? transformSync, Fu // Handle async cancellation requests by declining on the target Common.WireCancellationToComplete( - dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore)state!).Complete(exception: null, dropPendingMessages: true), _target); + dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore)state!).Complete(exception: null, dropPendingMessages: true), target); DataflowEtwProvider etwLog = DataflowEtwProvider.Log; if (etwLog.IsEnabled()) { @@ -185,8 +169,6 @@ private TransformManyBlock(Func>? transformSync, Fu /// The message to be processed. private void ProcessMessage(Func> transformFunction, KeyValuePair messageWithId) { - Debug.Assert(transformFunction != null, "Function to invoke is required."); - bool userDelegateSucceeded = false; try { @@ -195,10 +177,9 @@ private void ProcessMessage(Func> transformFunction userDelegateSucceeded = true; StoreOutputItems(messageWithId, outputItems); } - catch (Exception exc) + catch (Exception exc) when (Common.IsCooperativeCancellation(exc)) { // If this exception represents cancellation, swallow it rather than shutting down the block. - if (!Common.IsCooperativeCancellation(exc)) throw; } finally { @@ -213,8 +194,6 @@ private void ProcessMessage(Func> transformFunction /// The message to be processed. private void ProcessMessageWithTask(Func>> function, KeyValuePair messageWithId) { - Debug.Assert(function != null, "Function to invoke is required."); - // Run the transform function to get the resulting task Task>? task = null; Exception? caughtException = null; @@ -378,7 +357,7 @@ private void StoreOutputItemsReordered(long id, IEnumerable? item) // Handle invalid items (null enumerables) by delegating to the base if (item == null) { - _reorderingBuffer.AddItem(id, null!, false); + _reorderingBuffer.AddItem(id, null, false); if (isBounded) target.ChangeBoundingCount(count: -1); return; } diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/ReorderingBuffer.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/ReorderingBuffer.cs index 80513f6eae2ec5..2c7e45f33f8963 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/ReorderingBuffer.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/ReorderingBuffer.cs @@ -89,6 +89,20 @@ internal void AddItem(long id, TOutput? item, bool itemIsValid) } } + /// Determines whether the specified id is next to be output. + /// The id of the item. + /// true if the item is next in line; otherwise, false. + internal bool IsNext(long id) + { + Debug.Assert(id != Common.INVALID_REORDERING_ID, "This ID should never have been handed out."); + Common.ContractAssertMonitorStatus(ValueLock, held: false); + + lock (ValueLock) + { + return _nextReorderedIdToOutput == id; + } + } + /// /// Determines whether the specified id is next to be output, and if it is /// and if the item is "trusted" (meaning it may be output into the output diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj b/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj index 946bf516fcc103..079bac7b48fc11 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj +++ b/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj @@ -1,4 +1,4 @@ - + $(NetCoreAppCurrent);$(NetCoreAppMinimum);netstandard2.1;netstandard2.0;$(NetFrameworkMinimum) true @@ -40,6 +40,8 @@ System.Threading.Tasks.Dataflow.WriteOnceBlock<T> + diff --git a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.IAsyncEnumerable.cs b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.IAsyncEnumerable.cs new file mode 100644 index 00000000000000..3eb8711a32e723 --- /dev/null +++ b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.IAsyncEnumerable.cs @@ -0,0 +1,43 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; + +namespace System.Threading.Tasks.Dataflow.Tests +{ + internal static partial class DataflowTestHelpers + { + internal static Func> ToAsyncEnumerable = item => AsyncEnumerable.Repeat(item, 1); + } + + internal static partial class AsyncEnumerable + { + internal static async IAsyncEnumerable Repeat(int item, int count) + { + for (int i = 0; i < count; i++) + { + await Task.Yield(); + yield return item; + } + } + + internal static async IAsyncEnumerable Range(int start, int count) + { + var end = start + count; + for (int i = start; i < end; i++) + { + await Task.Yield(); + yield return i; + } + } + + internal static async IAsyncEnumerable ToAsyncEnumerable(this IEnumerable enumerable) + { + foreach (T item in enumerable) + { + await Task.Yield(); + yield return item; + } + } + } +} diff --git a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.cs b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.cs index fd354c04d52cde..0e82c1340c144c 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.cs @@ -7,7 +7,7 @@ namespace System.Threading.Tasks.Dataflow.Tests { - internal static class DataflowTestHelpers + internal static partial class DataflowTestHelpers { internal static bool[] BooleanValues = { true, false }; internal static Func> ToEnumerable = item => Enumerable.Repeat(item, 1); diff --git a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.IAsyncEnumerable.cs b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.IAsyncEnumerable.cs new file mode 100644 index 00000000000000..7dcf4bdbfaebeb --- /dev/null +++ b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.IAsyncEnumerable.cs @@ -0,0 +1,708 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics; +using System.Linq; +using Xunit; + +namespace System.Threading.Tasks.Dataflow.Tests +{ + public partial class TransformManyBlockTests + { + [Fact] + public async Task TestCtorAsyncEnumerable() + { + var blocks = new[] { + new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable), + new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable, new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 1 }) + }; + foreach (var block in blocks) + { + Assert.Equal(expected: 0, actual: block.InputCount); + Assert.Equal(expected: 0, actual: block.OutputCount); + Assert.False(block.Completion.IsCompleted); + } + + var canceledBlock = new TransformManyBlock( + DataflowTestHelpers.ToAsyncEnumerable, + new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) }); + + Assert.Equal(expected: 0, actual: canceledBlock.InputCount); + Assert.Equal(expected: 0, actual: canceledBlock.OutputCount); + await Assert.ThrowsAnyAsync(() => canceledBlock.Completion); + } + + [Fact] + public void TestArgumentExceptionsAsyncEnumerable() + { + Assert.Throws(() => new TransformManyBlock((Func>)null)); + Assert.Throws(() => new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable, null)); + + DataflowTestHelpers.TestArgumentsExceptions(new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable)); + } + + [Fact] + public void TestToStringAsyncEnumerable() + { + DataflowTestHelpers.TestToString(nameFormat => + nameFormat != null ? + new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable, new ExecutionDataflowBlockOptions() { NameFormat = nameFormat }) : + new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable)); + } + + [Fact] + public async Task TestOfferMessageAsyncEnumerable() + { + var generators = new Func>[] + { + () => new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable), + () => new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable, new ExecutionDataflowBlockOptions { BoundedCapacity = 10 }), + () => new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable, new ExecutionDataflowBlockOptions { BoundedCapacity = 10, MaxMessagesPerTask = 1 }), + }; + foreach (var generator in generators) + { + DataflowTestHelpers.TestOfferMessage_ArgumentValidation(generator()); + + var target = generator(); + DataflowTestHelpers.TestOfferMessage_AcceptsDataDirectly(target); + DataflowTestHelpers.TestOfferMessage_CompleteAndOffer(target); + + target = generator(); + await DataflowTestHelpers.TestOfferMessage_AcceptsViaLinking(target); + DataflowTestHelpers.TestOfferMessage_CompleteAndOffer(target); + } + } + + [Fact] + public void TestPostAsyncEnumerable() + { + foreach (bool bounded in DataflowTestHelpers.BooleanValues) + { + var tb = new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable, new ExecutionDataflowBlockOptions { BoundedCapacity = bounded ? 1 : -1 }); + Assert.True(tb.Post(0)); + tb.Complete(); + Assert.False(tb.Post(0)); + } + } + + [Fact] + public Task TestCompletionTaskAsyncEnumerable() + { + return DataflowTestHelpers.TestCompletionTask(() => new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable)); + } + + [Fact] + public async Task TestLinkToOptionsAsyncEnumerable() + { + const int Messages = 1; + foreach (bool append in DataflowTestHelpers.BooleanValues) + { + var tb = new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable); + var values = new int[Messages]; + var targets = new ActionBlock[Messages]; + for (int i = 0; i < Messages; i++) + { + int slot = i; + targets[i] = new ActionBlock(item => values[slot] = item); + tb.LinkTo(targets[i], new DataflowLinkOptions { MaxMessages = 1, Append = append }); + } + + tb.PostRange(0, Messages); + tb.Complete(); + await tb.Completion; + + for (int i = 0; i < Messages; i++) + { + Assert.Equal( + expected: append ? i : Messages - i - 1, + actual: values[i]); + } + } + } + + [Fact] + public async Task TestReceivesAsyncEnumerable() + { + for (int test = 0; test < 2; test++) + { + var tb = new TransformManyBlock(i => AsyncEnumerable.Repeat(i * 2, 1)); + tb.PostRange(0, 5); + + for (int i = 0; i < 5; i++) + { + Assert.Equal(expected: i * 2, actual: await tb.ReceiveAsync()); + } + + Assert.False(tb.TryReceive(out _)); + Assert.False(tb.TryReceiveAll(out _)); + } + } + + [Fact] + public async Task TestCircularLinkingAsyncEnumerable() + { + const int Iters = 200; + + var tcs = new TaskCompletionSource(); + IAsyncEnumerable body(int i) + { + if (i >= Iters) tcs.SetResult(true); + return AsyncEnumerable.Repeat(i + 1, 1); + } + + TransformManyBlock tb = new TransformManyBlock(body); + + using (tb.LinkTo(tb)) + { + tb.Post(0); + await tcs.Task; + tb.Complete(); + } + } + + [Fact] + public async Task TestProducerConsumerAsyncEnumerable() + { + foreach (TaskScheduler scheduler in new[] { TaskScheduler.Default, new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler }) + foreach (int maxMessagesPerTask in new[] { DataflowBlockOptions.Unbounded, 1, 2 }) + foreach (int boundedCapacity in new[] { DataflowBlockOptions.Unbounded, 1, 2 }) + foreach (int dop in new[] { 1, 2, 8 }) + foreach (int elementsPerItem in new[] { 1, 5, 100 }) + foreach (bool ensureOrdered in DataflowTestHelpers.BooleanValues) + { + const int Messages = 100; + var options = new ExecutionDataflowBlockOptions + { + BoundedCapacity = boundedCapacity, + MaxDegreeOfParallelism = dop, + MaxMessagesPerTask = maxMessagesPerTask, + TaskScheduler = scheduler, + EnsureOrdered = ensureOrdered, + }; + TransformManyBlock tb = new TransformManyBlock(i => AsyncEnumerable.Repeat(i, elementsPerItem), options); + + await Task.WhenAll( + Task.Run(async delegate // consumer + { + if (ensureOrdered) + { + int i = 0; + int processed = 0; + while (await tb.OutputAvailableAsync()) + { + Assert.Equal(expected: i, actual: await tb.ReceiveAsync()); + processed++; + if (processed % elementsPerItem == 0) + { + i++; + } + } + } + else + { + var results = new List(); + await foreach (int result in tb.ReceiveAllAsync()) + { + results.Add(result); + } + + IEnumerable> messages = results.GroupBy(i => i); + Assert.Equal(Messages, messages.Count()); + Assert.All(messages, m => Assert.Equal(elementsPerItem, m.Count())); + } + }), + Task.Run(async delegate // producer + { + for (int i = 0; i < Messages; i++) + { + await tb.SendAsync(i); + } + tb.Complete(); + })); + } + } + + [Fact] + public async Task TestMessagePostponementAsyncEnumerable() + { + const int Excess = 10; + foreach (int boundedCapacity in new[] { 1, 3 }) + { + var options = new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity }; + var tb = new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable, options); + + var sendAsync = new Task[boundedCapacity + Excess]; + for (int i = 0; i < boundedCapacity + Excess; i++) + { + sendAsync[i] = tb.SendAsync(i); + } + tb.Complete(); + + for (int i = 0; i < boundedCapacity; i++) + { + Assert.True(sendAsync[i].IsCompleted); + Assert.True(sendAsync[i].Result); + } + + for (int i = 0; i < Excess; i++) + { + Assert.False(await sendAsync[boundedCapacity + i]); + } + } + } + + [Fact] + public async Task TestMultipleYieldsAsyncEnumerable() + { + const int Messages = 10; + + var t = new TransformManyBlock(i => AsyncEnumerable.Range(0, Messages)); + t.Post(42); + t.Complete(); + + for (int i = 0; i < Messages; i++) + { + Assert.False(t.Completion.IsCompleted); + Assert.Equal(expected: i, actual: await t.ReceiveAsync()); + } + await t.Completion; + } + + [Fact] + public async Task TestReserveReleaseConsumeAsyncEnumerable() + { + var tb = new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable); + tb.Post(1); + await DataflowTestHelpers.TestReserveAndRelease(tb); + + tb = new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable); + tb.Post(2); + await DataflowTestHelpers.TestReserveAndConsume(tb); + } + + [Fact] + public async Task TestCountZeroAtCompletionAsyncEnumerable() + { + var cts = new CancellationTokenSource(); + var tb = new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable, new ExecutionDataflowBlockOptions() { CancellationToken = cts.Token }); + tb.Post(1); + cts.Cancel(); + await Assert.ThrowsAnyAsync(() => tb.Completion); + Assert.Equal(expected: 0, actual: tb.InputCount); + Assert.Equal(expected: 0, actual: tb.OutputCount); + + tb = new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable); + tb.Post(1); + ((IDataflowBlock)tb).Fault(new InvalidOperationException()); + await Assert.ThrowsAnyAsync(() => tb.Completion); + Assert.Equal(expected: 0, actual: tb.InputCount); + Assert.Equal(expected: 0, actual: tb.OutputCount); + } + + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + public void TestInputCountAsyncEnumerable() + { + using Barrier barrier1 = new Barrier(2), barrier2 = new Barrier(2); + IAsyncEnumerable body(int item) + { + barrier1.SignalAndWait(); + // will test InputCount here + barrier2.SignalAndWait(); + return DataflowTestHelpers.ToAsyncEnumerable(item); + } + + TransformManyBlock tb = new TransformManyBlock(body); + + for (int iter = 0; iter < 2; iter++) + { + tb.PostItems(1, 2); + for (int i = 1; i >= 0; i--) + { + barrier1.SignalAndWait(); + Assert.Equal(expected: i, actual: tb.InputCount); + barrier2.SignalAndWait(); + } + } + } + + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [OuterLoop] // spins waiting for a condition to be true, though it should happen very quickly + public async Task TestCountAsyncEnumerable() + { + var tb = new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable); + Assert.Equal(expected: 0, actual: tb.InputCount); + Assert.Equal(expected: 0, actual: tb.OutputCount); + + tb.PostRange(1, 11); + await Task.Run(() => SpinWait.SpinUntil(() => tb.OutputCount == 10)); + for (int i = 10; i > 0; i--) + { + Assert.True(tb.TryReceive(out int item)); + Assert.Equal(expected: 11 - i, actual: item); + Assert.Equal(expected: i - 1, actual: tb.OutputCount); + } + } + + [Fact] + public async Task TestChainedSendReceiveAsyncEnumerable() + { + foreach (bool post in DataflowTestHelpers.BooleanValues) + { + static TransformManyBlock func() => new TransformManyBlock(i => AsyncEnumerable.Repeat(i * 2, 1)); + var network = DataflowTestHelpers.Chain, int>(4, func); + + const int Iters = 10; + for (int i = 0; i < Iters; i++) + { + if (post) + { + network.Post(i); + } + else + { + await network.SendAsync(i); + } + Assert.Equal(expected: i * 16, actual: await network.ReceiveAsync()); + } + } + } + + [Fact] + public async Task TestSendAllThenReceiveAsyncEnumerable() + { + foreach (bool post in DataflowTestHelpers.BooleanValues) + { + static TransformManyBlock func() => new TransformManyBlock(i => AsyncEnumerable.Repeat(i * 2, 1)); + var network = DataflowTestHelpers.Chain, int>(4, func); + + const int Iters = 10; + if (post) + { + network.PostRange(0, Iters); + } + else + { + await Task.WhenAll(from i in Enumerable.Range(0, Iters) select network.SendAsync(i)); + } + + for (int i = 0; i < Iters; i++) + { + Assert.Equal(expected: i * 16, actual: await network.ReceiveAsync()); + } + } + } + + [Fact] + public async Task TestPrecanceledAsyncEnumerable() + { + var bb = new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable, + new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true) }); + + IDisposable link = bb.LinkTo(DataflowBlock.NullTarget()); + Assert.NotNull(link); + link.Dispose(); + + Assert.False(bb.Post(42)); + var t = bb.SendAsync(42); + Assert.True(t.IsCompleted); + Assert.False(t.Result); + + Assert.False(bb.TryReceiveAll(out _)); + Assert.False(bb.TryReceive(out _)); + + Assert.NotNull(bb.Completion); + await Assert.ThrowsAnyAsync(() => bb.Completion); + bb.Complete(); // just make sure it doesn't throw + } + + [Fact] + public async Task TestExceptionsAsyncEnumerable() + { + var tb1 = new TransformManyBlock((Func>)(i => { throw new InvalidCastException(); })); + var tb2 = new TransformManyBlock(i => ExceptionAfterAsync(3)); + + for (int i = 0; i < 3; i++) + { + tb1.Post(i); + tb2.Post(i); + } + + await Assert.ThrowsAsync(() => tb1.Completion); + await Assert.ThrowsAsync(() => tb2.Completion); + + Assert.True(tb1.InputCount == 0 && tb1.OutputCount == 0); + } + + private async IAsyncEnumerable ExceptionAfterAsync(int iterations) + { + for (int i = 0; i < iterations; i++) + { + await Task.Yield(); + yield return i; + } + throw new FormatException(); + } + + [Fact] + public async Task TestFaultingAndCancellationAsyncEnumerable() + { + foreach (bool fault in DataflowTestHelpers.BooleanValues) + { + var cts = new CancellationTokenSource(); + var tb = new TransformManyBlock(DataflowTestHelpers.ToAsyncEnumerable, new ExecutionDataflowBlockOptions { CancellationToken = cts.Token }); + tb.PostRange(0, 4); + Assert.Equal(expected: 0, actual: await tb.ReceiveAsync()); + Assert.Equal(expected: 1, actual: await tb.ReceiveAsync()); + + if (fault) + { + Assert.Throws(() => ((IDataflowBlock)tb).Fault(null)); + ((IDataflowBlock)tb).Fault(new InvalidCastException()); + await Assert.ThrowsAsync(() => tb.Completion); + } + else + { + cts.Cancel(); + await Assert.ThrowsAnyAsync(() => tb.Completion); + } + + Assert.Equal(expected: 0, actual: tb.InputCount); + Assert.Equal(expected: 0, actual: tb.OutputCount); + } + } + + [Fact] + public async Task TestCancellationExceptionsIgnoredAsyncEnumerable() + { + static IAsyncEnumerable body(int i) + { + if ((i % 2) == 0) throw new OperationCanceledException(); + return DataflowTestHelpers.ToAsyncEnumerable(i); + } + + TransformManyBlock t = new TransformManyBlock(body); + + t.PostRange(0, 2); + t.Complete(); + for (int i = 0; i < 2; i++) + { + if ((i % 2) != 0) + { + Assert.Equal(expected: i, actual: await t.ReceiveAsync()); + } + } + + await t.Completion; + } + + [Fact] + public async Task TestYieldingNoResultsAsyncEnumerable() + { + foreach (int dop in new[] { 1, Environment.ProcessorCount }) + foreach (int boundedCapacity in new[] { DataflowBlockOptions.Unbounded, 1, 2 }) + { + const int Modes = 3, Iters = 100; + var tb = new TransformManyBlock(i => + { + switch (i % Modes) + { + default: + case 0: + return AsyncEnumerable.Range(i, 1); + case 1: + return AsyncEnumerable.Range(i, 0); + case 2: + return AsyncEnumerable.Range(i, 2); + } + }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = dop, BoundedCapacity = boundedCapacity }); + + var source = new BufferBlock(); + source.PostRange(0, Modes * Iters); + source.Complete(); + source.LinkTo(tb, new DataflowLinkOptions { PropagateCompletion = true }); + + int received = 0; + while (await tb.OutputAvailableAsync()) + { + await tb.ReceiveAsync(); + received++; + } + Assert.Equal(expected: Modes * Iters, actual: received); + } + } + + [Fact] + public async Task TestArrayListReusePossibleForDop1AsyncEnumerable() + { + foreach (int boundedCapacity in new[] { DataflowBlockOptions.Unbounded, 2 }) + { + foreach (int dop in new[] { 1, Environment.ProcessorCount }) + { + var dbo = new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity, MaxDegreeOfParallelism = dop }; + foreach (IList list in new IList[] { new int[1], new List { 0 }, new Collection { 0 } }) + { + int nextExpectedValue = 1; + + TransformManyBlock transform = null; + IAsyncEnumerable body(int i) + { + if (i == 100) // we're done iterating + { + transform.Complete(); + return null; + } + else if (dop == 1) + { + list[0] = i + 1; // reuse the list over and over, but only at dop == 1 + return list.ToAsyncEnumerable(); + } + else if (list is int[]) + { + return new int[1] { i + 1 }.ToAsyncEnumerable(); + } + else if (list is List) + { + return new List() { i + 1 }.ToAsyncEnumerable(); + } + else + { + return new Collection() { i + 1 }.ToAsyncEnumerable(); + } + } + + transform = new TransformManyBlock(body, dbo); + + TransformBlock verifier = new TransformBlock(i => + { + Assert.Equal(expected: nextExpectedValue, actual: i); + nextExpectedValue++; + return i; + }); + + transform.LinkTo(verifier); + verifier.LinkTo(transform); + + await transform.SendAsync(0); + await transform.Completion; + } + } + } + } + + [Theory] + [InlineData(DataflowBlockOptions.Unbounded, 1, null)] + [InlineData(DataflowBlockOptions.Unbounded, 2, null)] + [InlineData(DataflowBlockOptions.Unbounded, DataflowBlockOptions.Unbounded, null)] + [InlineData(1, 1, null)] + [InlineData(1, 2, null)] + [InlineData(1, DataflowBlockOptions.Unbounded, null)] + [InlineData(2, 2, true)] + [InlineData(2, 1, false)] // no force ordered, but dop == 1, so it doesn't matter + public async Task TestOrdering_Sync_OrderedEnabledAsyncEnumerable(int mmpt, int dop, bool? EnsureOrdered) + { + const int iters = 1000; + + var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = dop, MaxMessagesPerTask = mmpt }; + if (EnsureOrdered == null) + { + Assert.True(options.EnsureOrdered); + } + else + { + options.EnsureOrdered = EnsureOrdered.Value; + } + + var tb = new TransformManyBlock(i => DataflowTestHelpers.ToAsyncEnumerable(i), options); + tb.PostRange(0, iters); + for (int i = 0; i < iters; i++) + { + Assert.Equal(expected: i, actual: await tb.ReceiveAsync()); + } + tb.Complete(); + await tb.Completion; + } + + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(true)] + [InlineData(false)] + public async Task TestOrdering_Sync_OrderedDisabledAsyncEnumerable(bool trustedEnumeration) + { + // If ordering were enabled, this test would hang. + + var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2, EnsureOrdered = false }; + + using var mres = new ManualResetEventSlim(); + var tb = new TransformManyBlock(i => + { + if (i == 0) mres.Wait(); + return trustedEnumeration ? + DataflowTestHelpers.ToAsyncEnumerable(i) : + AsyncEnumerable.Repeat(i, 1); + }, options); + tb.Post(0); + tb.Post(1); + + Assert.Equal(1, await tb.ReceiveAsync()); + mres.Set(); + Assert.Equal(0, await tb.ReceiveAsync()); + + tb.Complete(); + await tb.Completion; + } + + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(false)] + [InlineData(true)] + public async Task TestOrdering_Sync_BlockingEnumeration_NoDeadlockAsyncEnumerable(bool ensureOrdered) + { + // If iteration of the yielded enumerables happened while holding a lock, this would deadlock. + var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2, EnsureOrdered = ensureOrdered }; + + using ManualResetEventSlim mres1 = new ManualResetEventSlim(), mres2 = new ManualResetEventSlim(); + var tb = new TransformManyBlock(i => i == 0 ? BlockableIterator(mres1, mres2) : BlockableIterator(mres2, mres1), options); + tb.Post(0); + tb.Post(1); + Assert.Equal(42, await tb.ReceiveAsync()); + Assert.Equal(42, await tb.ReceiveAsync()); + + tb.Complete(); + await tb.Completion; + + static IAsyncEnumerable BlockableIterator(ManualResetEventSlim wait, ManualResetEventSlim release) + { + release.Set(); + wait.Wait(); + return DataflowTestHelpers.ToAsyncEnumerable(42); + } + } + + [Fact] + public async Task TestScheduling_MoveNextAsync_RunsOnTargetScheduler() + { + TaskScheduler scheduler = new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler; + Assert.NotEqual(scheduler, TaskScheduler.Current); + + async IAsyncEnumerable Body(int value) + { + Assert.Equal(scheduler, TaskScheduler.Current); + await Task.Yield(); + Assert.Equal(scheduler, TaskScheduler.Current); + yield return value; + Assert.Equal(scheduler, TaskScheduler.Current); + } + + TransformManyBlock t = new TransformManyBlock(Body, new ExecutionDataflowBlockOptions { TaskScheduler = scheduler }); + + t.PostRange(0, 2); + t.Complete(); + for (int i = 0; i < 2; i++) + { + Assert.Equal(expected: i, actual: await t.ReceiveAsync()); + } + + await t.Completion; + } + } +} diff --git a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.cs b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.cs index fd94ffbb16c14d..8ecc9c54827a5a 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.cs @@ -8,7 +8,7 @@ namespace System.Threading.Tasks.Dataflow.Tests { - public class TransformManyBlockTests + public partial class TransformManyBlockTests { [Fact] public async Task TestCtor() diff --git a/src/libraries/System.Threading.Tasks.Dataflow/tests/System.Threading.Tasks.Dataflow.Tests.csproj b/src/libraries/System.Threading.Tasks.Dataflow/tests/System.Threading.Tasks.Dataflow.Tests.csproj index f75dee77d51233..813213d76d052e 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/tests/System.Threading.Tasks.Dataflow.Tests.csproj +++ b/src/libraries/System.Threading.Tasks.Dataflow/tests/System.Threading.Tasks.Dataflow.Tests.csproj @@ -1,4 +1,4 @@ - + true $(NetCoreAppCurrent);$(NetFrameworkMinimum) @@ -28,6 +28,8 @@ + +