From 18aa3de59310e844033935480c6951c1f7057597 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Thu, 23 Jun 2022 21:25:55 -0400 Subject: [PATCH] Update IAsyncEnumerable implementation and tests --- .../TransformManyBlock.IAsyncEnumerable.cs | 93 ++++++++------ .../src/Blocks/TransformManyBlock.cs | 23 ++-- .../src/Internal/ReorderingBuffer.cs | 14 +++ .../DataflowTestHelper.IAsyncEnumerable.cs | 5 +- ...ransformManyBlockTests.IAsyncEnumerable.cs | 117 ++++++++++++------ 5 files changed, 165 insertions(+), 87 deletions(-) diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.IAsyncEnumerable.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.IAsyncEnumerable.cs index 8141185e9687f7..a60ae989eca20d 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.IAsyncEnumerable.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.IAsyncEnumerable.cs @@ -11,40 +11,56 @@ public partial class TransformManyBlock { /// Initializes the with the specified function. /// - /// The function to invoke with each data element received. All of the data from the returned + /// The function to invoke with each data element received. All of the data from the returned /// will be made available as output from this . /// - /// The is . + /// The is . public TransformManyBlock(Func> transform) : this(transform, ExecutionDataflowBlockOptions.Default) - { } + { + } /// Initializes the with the specified function and . /// - /// The function to invoke with each data element received. All of the data from the returned + /// The function to invoke with each data element received. All of the data from the returned /// will be made available as output from this . /// /// The options with which to configure this . - /// The or is . + /// The or is . public TransformManyBlock(Func> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) { - // Validate arguments. - if (transform == null) throw new ArgumentNullException(nameof(transform)); - Initialize(messageWithId => ProcessMessage(transform, messageWithId), dataflowBlockOptions, ref _source!, ref _target!, ref _reorderingBuffer, TargetCoreOptions.UsesAsyncCompletion); + if (transform is null) + { + throw new ArgumentNullException(nameof(transform)); + } + + Initialize(messageWithId => + { + Task t = ProcessMessageAsync(transform, messageWithId); +#if DEBUG + // Task returned from ProcessMessageAsync is explicitly ignored. + // That function handles all exceptions. + t.ContinueWith(t => Debug.Assert(t.IsCompletedSuccessfully), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); +#endif + }, dataflowBlockOptions, ref _source, ref _target, ref _reorderingBuffer, TargetCoreOptions.UsesAsyncCompletion); } + // Note: + // Enumerating the IAsyncEnumerable is done with ConfigureAwait(true), using the default behavior of + // paying attention to the current context/scheduler. This makes it so that the enumerable code runs on the target scheduler. + // For this to work correctly, there can't be any ConfigureAwait(false) in the same method prior to + // these await foreach loops, nor in the call chain prior to the method invocation. + /// Processes the message with a user-provided transform function that returns an async enumerable. /// The transform function to use to process the message. /// The message to be processed. - private void ProcessMessage(Func> transformFunction, KeyValuePair messageWithId) + private async Task ProcessMessageAsync(Func> transformFunction, KeyValuePair messageWithId) { - Debug.Assert(transformFunction != null, "Function to invoke is required."); - try { // Run the user transform and store the results. IAsyncEnumerable outputItems = transformFunction(messageWithId.Key); - StoreOutputItemsAsync(messageWithId, outputItems).GetAwaiter().GetResult(); + await StoreOutputItemsAsync(messageWithId, outputItems).ConfigureAwait(false); } catch (Exception exc) { @@ -77,12 +93,12 @@ private async Task StoreOutputItemsAsync( { // If there's a reordering buffer, pass the data along to it. // The reordering buffer will handle all details, including bounding. - if (_reorderingBuffer != null) + if (_reorderingBuffer is not null) { await StoreOutputItemsReorderedAsync(messageWithId.Value, outputItems).ConfigureAwait(false); } // Otherwise, output the data directly. - else if (outputItems != null) + else if (outputItems is not null) { await StoreOutputItemsNonReorderedWithIterationAsync(outputItems).ConfigureAwait(false); } @@ -103,7 +119,7 @@ private async Task StoreOutputItemsAsync( /// The async enumerable. private async Task StoreOutputItemsReorderedAsync(long id, IAsyncEnumerable? item) { - Debug.Assert(_reorderingBuffer != null, "Expected a reordering buffer"); + Debug.Assert(_reorderingBuffer is not null, "Expected a reordering buffer"); Debug.Assert(id != Common.INVALID_REORDERING_ID, "This ID should never have been handed out."); // Grab info about the transform @@ -111,20 +127,16 @@ private async Task StoreOutputItemsReorderedAsync(long id, IAsyncEnumerable(); - await foreach (TOutput element in item.ConfigureAwait(false)) + await foreach (TOutput element in item.ConfigureAwait(true)) { itemCopy.Add(element); } @@ -158,7 +170,10 @@ private async Task StoreOutputItemsReorderedAsync(long id, IAsyncEnumerable : IPropagatorBlo /// The is null (Nothing in Visual Basic). public TransformManyBlock(Func> transform) : this(transform, ExecutionDataflowBlockOptions.Default) - { } + { + } /// Initializes the with the specified function and . /// @@ -66,9 +67,8 @@ public TransformManyBlock(Func> transform) : /// The is null (Nothing in Visual Basic). public TransformManyBlock(Func> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) { - // Validate arguments. if (transform == null) throw new ArgumentNullException(nameof(transform)); - Initialize(messageWithId => ProcessMessage(transform, messageWithId), dataflowBlockOptions, ref _source!, ref _target!, ref _reorderingBuffer, TargetCoreOptions.None); + Initialize(messageWithId => ProcessMessage(transform, messageWithId), dataflowBlockOptions, ref _source, ref _target, ref _reorderingBuffer, TargetCoreOptions.None); } /// Initializes the with the specified function. @@ -91,20 +91,18 @@ public TransformManyBlock(Func>> transform) : /// The is null (Nothing in Visual Basic). public TransformManyBlock(Func>> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) { - // Validate arguments. if (transform == null) throw new ArgumentNullException(nameof(transform)); - Initialize(messageWithId => ProcessMessageWithTask(transform, messageWithId), dataflowBlockOptions, ref _source!, ref _target!, ref _reorderingBuffer, TargetCoreOptions.UsesAsyncCompletion); + Initialize(messageWithId => ProcessMessageWithTask(transform, messageWithId), dataflowBlockOptions, ref _source, ref _target, ref _reorderingBuffer, TargetCoreOptions.UsesAsyncCompletion); } private void Initialize( Action> processMessageAction, ExecutionDataflowBlockOptions dataflowBlockOptions, - ref SourceCore source, - ref TargetCore target, + [NotNull] ref SourceCore? source, + [NotNull] ref TargetCore? target, ref ReorderingBuffer>? reorderingBuffer, TargetCoreOptions targetCoreOptions) { - // Validate arguments. if (dataflowBlockOptions == null) throw new ArgumentNullException(nameof(dataflowBlockOptions)); // Ensure we have options that can't be changed by the caller @@ -113,7 +111,9 @@ private void Initialize( // Initialize onItemsRemoved delegate if necessary Action, int>? onItemsRemoved = null; if (dataflowBlockOptions.BoundedCapacity > 0) + { onItemsRemoved = (owningSource, count) => ((TransformManyBlock)owningSource)._target.ChangeBoundingCount(-count); + } // Initialize source component source = new SourceCore(this, dataflowBlockOptions, @@ -169,8 +169,6 @@ private void Initialize( /// The message to be processed. private void ProcessMessage(Func> transformFunction, KeyValuePair messageWithId) { - Debug.Assert(transformFunction != null, "Function to invoke is required."); - bool userDelegateSucceeded = false; try { @@ -179,10 +177,9 @@ private void ProcessMessage(Func> transformFunction userDelegateSucceeded = true; StoreOutputItems(messageWithId, outputItems); } - catch (Exception exc) + catch (Exception exc) when (Common.IsCooperativeCancellation(exc)) { // If this exception represents cancellation, swallow it rather than shutting down the block. - if (!Common.IsCooperativeCancellation(exc)) throw; } finally { @@ -197,8 +194,6 @@ private void ProcessMessage(Func> transformFunction /// The message to be processed. private void ProcessMessageWithTask(Func>> function, KeyValuePair messageWithId) { - Debug.Assert(function != null, "Function to invoke is required."); - // Run the transform function to get the resulting task Task>? task = null; Exception? caughtException = null; diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/ReorderingBuffer.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/ReorderingBuffer.cs index 80513f6eae2ec5..2c7e45f33f8963 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/ReorderingBuffer.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/ReorderingBuffer.cs @@ -89,6 +89,20 @@ internal void AddItem(long id, TOutput? item, bool itemIsValid) } } + /// Determines whether the specified id is next to be output. + /// The id of the item. + /// true if the item is next in line; otherwise, false. + internal bool IsNext(long id) + { + Debug.Assert(id != Common.INVALID_REORDERING_ID, "This ID should never have been handed out."); + Common.ContractAssertMonitorStatus(ValueLock, held: false); + + lock (ValueLock) + { + return _nextReorderedIdToOutput == id; + } + } + /// /// Determines whether the specified id is next to be output, and if it is /// and if the item is "trusted" (meaning it may be output into the output diff --git a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.IAsyncEnumerable.cs b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.IAsyncEnumerable.cs index acdcbc7211fadd..3eb8711a32e723 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.IAsyncEnumerable.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.IAsyncEnumerable.cs @@ -12,11 +12,11 @@ internal static partial class DataflowTestHelpers internal static partial class AsyncEnumerable { -#pragma warning disable 1998 internal static async IAsyncEnumerable Repeat(int item, int count) { for (int i = 0; i < count; i++) { + await Task.Yield(); yield return item; } } @@ -26,6 +26,7 @@ internal static async IAsyncEnumerable Range(int start, int count) var end = start + count; for (int i = start; i < end; i++) { + await Task.Yield(); yield return i; } } @@ -34,9 +35,9 @@ internal static async IAsyncEnumerable ToAsyncEnumerable(this IEnumerable< { foreach (T item in enumerable) { + await Task.Yield(); yield return item; } } -#pragma warning restore 1998 } } diff --git a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.IAsyncEnumerable.cs b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.IAsyncEnumerable.cs index 26e4e7192c2639..7dcf4bdbfaebeb 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.IAsyncEnumerable.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.IAsyncEnumerable.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Collections.ObjectModel; +using System.Diagnostics; using System.Linq; using Xunit; @@ -165,45 +166,62 @@ IAsyncEnumerable body(int i) public async Task TestProducerConsumerAsyncEnumerable() { foreach (TaskScheduler scheduler in new[] { TaskScheduler.Default, new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler }) - foreach (int maxMessagesPerTask in new[] { DataflowBlockOptions.Unbounded, 1, 2 }) - foreach (int boundedCapacity in new[] { DataflowBlockOptions.Unbounded, 1, 2 }) - foreach (int dop in new[] { 1, 2 }) - foreach (int elementsPerItem in new[] { 1, 3, 5 }) + foreach (int maxMessagesPerTask in new[] { DataflowBlockOptions.Unbounded, 1, 2 }) + foreach (int boundedCapacity in new[] { DataflowBlockOptions.Unbounded, 1, 2 }) + foreach (int dop in new[] { 1, 2, 8 }) + foreach (int elementsPerItem in new[] { 1, 5, 100 }) + foreach (bool ensureOrdered in DataflowTestHelpers.BooleanValues) + { + const int Messages = 100; + var options = new ExecutionDataflowBlockOptions + { + BoundedCapacity = boundedCapacity, + MaxDegreeOfParallelism = dop, + MaxMessagesPerTask = maxMessagesPerTask, + TaskScheduler = scheduler, + EnsureOrdered = ensureOrdered, + }; + TransformManyBlock tb = new TransformManyBlock(i => AsyncEnumerable.Repeat(i, elementsPerItem), options); + + await Task.WhenAll( + Task.Run(async delegate // consumer + { + if (ensureOrdered) + { + int i = 0; + int processed = 0; + while (await tb.OutputAvailableAsync()) { - const int Messages = 50; - var options = new ExecutionDataflowBlockOptions + Assert.Equal(expected: i, actual: await tb.ReceiveAsync()); + processed++; + if (processed % elementsPerItem == 0) { - BoundedCapacity = boundedCapacity, - MaxDegreeOfParallelism = dop, - MaxMessagesPerTask = maxMessagesPerTask, - TaskScheduler = scheduler - }; - TransformManyBlock tb = new TransformManyBlock(i => AsyncEnumerable.Repeat(i, elementsPerItem), options); - - await Task.WhenAll( - Task.Run(async delegate - { // consumer - int i = 0; - int processed = 0; - while (await tb.OutputAvailableAsync()) - { - Assert.Equal(expected: i, actual: await tb.ReceiveAsync()); - processed++; - if (processed % elementsPerItem == 0) - { - i++; - } - } - }), - Task.Run(async delegate - { // producer - for (int i = 0; i < Messages; i++) - { - await tb.SendAsync(i); - } - tb.Complete(); - })); + i++; + } + } + } + else + { + var results = new List(); + await foreach (int result in tb.ReceiveAllAsync()) + { + results.Add(result); } + + IEnumerable> messages = results.GroupBy(i => i); + Assert.Equal(Messages, messages.Count()); + Assert.All(messages, m => Assert.Equal(elementsPerItem, m.Count())); + } + }), + Task.Run(async delegate // producer + { + for (int i = 0; i < Messages; i++) + { + await tb.SendAsync(i); + } + tb.Complete(); + })); + } } [Fact] @@ -659,5 +677,32 @@ static IAsyncEnumerable BlockableIterator(ManualResetEventSlim wait, Manual return DataflowTestHelpers.ToAsyncEnumerable(42); } } + + [Fact] + public async Task TestScheduling_MoveNextAsync_RunsOnTargetScheduler() + { + TaskScheduler scheduler = new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler; + Assert.NotEqual(scheduler, TaskScheduler.Current); + + async IAsyncEnumerable Body(int value) + { + Assert.Equal(scheduler, TaskScheduler.Current); + await Task.Yield(); + Assert.Equal(scheduler, TaskScheduler.Current); + yield return value; + Assert.Equal(scheduler, TaskScheduler.Current); + } + + TransformManyBlock t = new TransformManyBlock(Body, new ExecutionDataflowBlockOptions { TaskScheduler = scheduler }); + + t.PostRange(0, 2); + t.Complete(); + for (int i = 0; i < 2; i++) + { + Assert.Equal(expected: i, actual: await t.ReceiveAsync()); + } + + await t.Completion; + } } }