diff --git a/src/Features/Core/Portable/AddImport/AbstractAddImportFeatureService.cs b/src/Features/Core/Portable/AddImport/AbstractAddImportFeatureService.cs index 7cf284b733d81..fb4f6cd8669ac 100644 --- a/src/Features/Core/Portable/AddImport/AbstractAddImportFeatureService.cs +++ b/src/Features/Core/Portable/AddImport/AbstractAddImportFeatureService.cs @@ -222,23 +222,20 @@ private static async Task FindResultsInUnreferencedProjectSourceSymbolsAsync( // Defer to the ProducerConsumer. We're search the unreferenced projects in parallel. As we get results, we'll // add them to the 'allSymbolReferences' queue. If we get enough results, we'll cancel all the other work. - await ProducerConsumer>.RunAsync( - ProducerConsumerOptions.SingleReaderOptions, - produceItems: static (onItemsFound, args) => RoslynParallel.ForEachAsync( - args.viableUnreferencedProjects, - args.linkedTokenSource.Token, - async (project, cancellationToken) => - { - // Search in this unreferenced project. But don't search in any of its' direct references. i.e. we - // don't want to search in its metadata references or in the projects it references itself. We'll be - // searching those entities individually. - var references = await args.finder.FindInSourceSymbolsInProjectAsync( - args.projectToAssembly, project, args.exact, cancellationToken).ConfigureAwait(false); - onItemsFound(references); - }), - consumeItems: static (symbolReferencesEnumerable, args) => + await ProducerConsumer>.RunParallelAsync( + source: viableUnreferencedProjects, + produceItems: static async (project, onItemsFound, args, cancellationToken) => + { + // Search in this unreferenced project. But don't search in any of its' direct references. i.e. we + // don't want to search in its metadata references or in the projects it references itself. We'll be + // searching those entities individually. + var references = await args.finder.FindInSourceSymbolsInProjectAsync( + args.projectToAssembly, project, args.exact, cancellationToken).ConfigureAwait(false); + onItemsFound(references); + }, + consumeItems: static (symbolReferencesEnumerable, args, cancellationToken) => ProcessReferencesAsync(args.allSymbolReferences, args.maxResults, symbolReferencesEnumerable, args.linkedTokenSource), - args: (projectToAssembly, allSymbolReferences, maxResults, finder, exact, viableUnreferencedProjects, linkedTokenSource), + args: (projectToAssembly, allSymbolReferences, maxResults, finder, exact, linkedTokenSource), linkedTokenSource.Token).ConfigureAwait(false); } @@ -266,27 +263,24 @@ private async Task FindResultsInUnreferencedMetadataSymbolsAsync( // Defer to the ProducerConsumer. We're search the metadata references in parallel. As we get results, we'll // add them to the 'allSymbolReferences' queue. If we get enough results, we'll cancel all the other work. - await ProducerConsumer>.RunAsync( - ProducerConsumerOptions.SingleReaderOptions, - produceItems: static (onItemsFound, args) => RoslynParallel.ForEachAsync( - args.newReferences, - args.linkedTokenSource.Token, - async (tuple, cancellationToken) => - { - var (referenceProject, reference) = tuple; - var compilation = args.referenceToCompilation.GetOrAdd( - reference, r => CreateCompilation(args.project, r)); - - // Ignore netmodules. First, they're incredibly esoteric and barely used. - // Second, the SymbolFinder API doesn't even support searching them. - if (compilation.GetAssemblyOrModuleSymbol(reference) is not IAssemblySymbol assembly) - return; - - var references = await args.finder.FindInMetadataSymbolsAsync( - assembly, referenceProject, reference, args.exact, args.linkedTokenSource.Token).ConfigureAwait(false); - onItemsFound(references); - }), - consumeItems: static (symbolReferencesEnumerable, args) => + await ProducerConsumer>.RunParallelAsync( + source: newReferences, + produceItems: static async (tuple, onItemsFound, args, cancellationToken) => + { + var (referenceProject, reference) = tuple; + var compilation = args.referenceToCompilation.GetOrAdd( + reference, r => CreateCompilation(args.project, r)); + + // Ignore netmodules. First, they're incredibly esoteric and barely used. + // Second, the SymbolFinder API doesn't even support searching them. + if (compilation.GetAssemblyOrModuleSymbol(reference) is not IAssemblySymbol assembly) + return; + + var references = await args.finder.FindInMetadataSymbolsAsync( + assembly, referenceProject, reference, args.exact, cancellationToken).ConfigureAwait(false); + onItemsFound(references); + }, + consumeItems: static (symbolReferencesEnumerable, args, cancellationToken) => ProcessReferencesAsync(args.allSymbolReferences, args.maxResults, symbolReferencesEnumerable, args.linkedTokenSource), args: (referenceToCompilation, project, allSymbolReferences, maxResults, finder, exact, newReferences, linkedTokenSource), linkedTokenSource.Token).ConfigureAwait(false); diff --git a/src/Features/Core/Portable/CodeRefactorings/CodeRefactoringService.cs b/src/Features/Core/Portable/CodeRefactorings/CodeRefactoringService.cs index 4195a47d145ff..79a820c37ee44 100644 --- a/src/Features/Core/Portable/CodeRefactorings/CodeRefactoringService.cs +++ b/src/Features/Core/Portable/CodeRefactorings/CodeRefactoringService.cs @@ -121,45 +121,41 @@ public async Task> GetRefactoringsAsync( foreach (var provider in orderedProviders) providerToIndex.Add(provider, providerToIndex.Count); - await ProducerConsumer<(CodeRefactoringProvider provider, CodeRefactoring codeRefactoring)>.RunAsync( - ProducerConsumerOptions.SingleReaderOptions, - produceItems: static (callback, args) => + await ProducerConsumer<(CodeRefactoringProvider provider, CodeRefactoring codeRefactoring)>.RunParallelAsync( + source: orderedProviders, + produceItems: static async (provider, callback, args, cancellationToken) => + { // Run all providers in parallel to get the set of refactorings for this document. - RoslynParallel.ForEachAsync( - args.orderedProviders, - args.cancellationToken, - async (provider, cancellationToken) => - { - // Log an individual telemetry event for slow code refactoring computations to - // allow targeted trace notifications for further investigation. 500 ms seemed like - // a good value so as to not be too noisy, but if fired, indicates a potential - // area requiring investigation. - const int CodeRefactoringTelemetryDelay = 500; - - var providerName = provider.GetType().Name; - - var logMessage = KeyValueLogMessage.Create(m => - { - m[TelemetryLogging.KeyName] = providerName; - m[TelemetryLogging.KeyLanguageName] = args.document.Project.Language; - }); - - using (args.addOperationScope(providerName)) - using (RoslynEventSource.LogInformationalBlock(FunctionId.Refactoring_CodeRefactoringService_GetRefactoringsAsync, providerName, cancellationToken)) - using (TelemetryLogging.LogBlockTime(FunctionId.CodeRefactoring_Delay, logMessage, CodeRefactoringTelemetryDelay)) - { - var refactoring = await args.@this.GetRefactoringFromProviderAsync( - args.document, args.state, provider, args.options, cancellationToken).ConfigureAwait(false); - if (refactoring != null) - callback((provider, refactoring)); - } - }), - consumeItems: static async (reader, args) => + // Log an individual telemetry event for slow code refactoring computations to + // allow targeted trace notifications for further investigation. 500 ms seemed like + // a good value so as to not be too noisy, but if fired, indicates a potential + // area requiring investigation. + const int CodeRefactoringTelemetryDelay = 500; + + var providerName = provider.GetType().Name; + + var logMessage = KeyValueLogMessage.Create(m => + { + m[TelemetryLogging.KeyName] = providerName; + m[TelemetryLogging.KeyLanguageName] = args.document.Project.Language; + }); + + using (args.addOperationScope(providerName)) + using (RoslynEventSource.LogInformationalBlock(FunctionId.Refactoring_CodeRefactoringService_GetRefactoringsAsync, providerName, cancellationToken)) + using (TelemetryLogging.LogBlockTime(FunctionId.CodeRefactoring_Delay, logMessage, CodeRefactoringTelemetryDelay)) + { + var refactoring = await args.@this.GetRefactoringFromProviderAsync( + args.document, args.state, provider, args.options, cancellationToken).ConfigureAwait(false); + if (refactoring != null) + callback((provider, refactoring)); + } + }, + consumeItems: static async (reader, args, cancellationToken) => { await foreach (var pair in reader) args.pairs.Add(pair); }, - args: (@this: this, document, state, orderedProviders, options, addOperationScope, pairs, cancellationToken), + args: (@this: this, document, state, options, addOperationScope, pairs), cancellationToken).ConfigureAwait(false); return pairs diff --git a/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.CachedDocumentSearch.cs b/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.CachedDocumentSearch.cs index 015ea7d4a04f7..faba6677a2c24 100644 --- a/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.CachedDocumentSearch.cs +++ b/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.CachedDocumentSearch.cs @@ -127,7 +127,8 @@ await PerformParallelSearchAsync( async ValueTask ProcessSingleProjectGroupAsync( IGrouping group, - Action onItemFound) + Action onItemFound, + CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) return; diff --git a/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.GeneratedDocumentSearch.cs b/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.GeneratedDocumentSearch.cs index 370fa6fc8fb71..cc28b997adc84 100644 --- a/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.GeneratedDocumentSearch.cs +++ b/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.GeneratedDocumentSearch.cs @@ -71,7 +71,7 @@ public static async Task SearchGeneratedDocumentsInCurrentProcessAsync( return; async ValueTask ProcessSingleProjectAsync( - Project project, Action onItemFound) + Project project, Action onItemFound, CancellationToken cancellationToken) { // First generate all the source-gen docs. Then handoff to the standard search routine to find matches in them. var sourceGeneratedDocs = await project.GetSourceGeneratedDocumentsAsync(cancellationToken).ConfigureAwait(false); diff --git a/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.NormalSearch.cs b/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.NormalSearch.cs index 33f95b3cf70a7..e76b1cb7aa5cc 100644 --- a/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.NormalSearch.cs +++ b/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.NormalSearch.cs @@ -127,7 +127,8 @@ await PerformParallelSearchAsync( async ValueTask SearchSingleProjectAsync( Project project, - Action onItemFound) + Action onItemFound, + CancellationToken cancellationToken) { using var _ = GetPooledHashSet(priorityDocuments.Where(d => project == d.Project), out var highPriDocs); diff --git a/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.cs b/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.cs index 4117157b3655d..6e440d501fce6 100644 --- a/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.cs +++ b/src/Features/Core/Portable/NavigateTo/AbstractNavigateToSearchService.cs @@ -90,13 +90,13 @@ private static IEnumerable Prioritize(IEnumerable items, Func /// private static Task PerformParallelSearchAsync( IEnumerable items, - Func, ValueTask> callback, + Func, CancellationToken, ValueTask> callback, Func, Task> onItemsFound, CancellationToken cancellationToken) - => ProducerConsumer.RunAsync( - ProducerConsumerOptions.SingleReaderOptions, - produceItems: static (onItemFound, args) => RoslynParallel.ForEachAsync(args.items, args.cancellationToken, (item, cancellationToken) => args.callback(item, onItemFound)), - consumeItems: static (items, args) => args.onItemsFound(items), - args: (items, callback, onItemsFound, cancellationToken), + => ProducerConsumer.RunParallelAsync( + source: items, + produceItems: static async (item, onItemFound, args, cancellationToken) => await args.callback(item, onItemFound, cancellationToken).ConfigureAwait(false), + consumeItems: static (items, args, cancellationToken) => args.onItemsFound(items), + args: (items, callback, onItemsFound), cancellationToken); } diff --git a/src/Workspaces/Core/Portable/FindSymbols/FindReferences/FindReferencesSearchEngine.cs b/src/Workspaces/Core/Portable/FindSymbols/FindReferences/FindReferencesSearchEngine.cs index 1bd2f4943985b..a80669feb1155 100644 --- a/src/Workspaces/Core/Portable/FindSymbols/FindReferences/FindReferencesSearchEngine.cs +++ b/src/Workspaces/Core/Portable/FindSymbols/FindReferences/FindReferencesSearchEngine.cs @@ -81,9 +81,9 @@ public async Task FindReferencesAsync( { await ProducerConsumer.RunAsync( ProducerConsumerOptions.SingleReaderOptions, - produceItems: static (onItemFound, args) => args.@this.PerformSearchAsync(args.symbols, onItemFound, args.cancellationToken), - consumeItems: static async (references, args) => await args.@this._progress.OnReferencesFoundAsync(references, @args.cancellationToken).ConfigureAwait(false), - (@this: this, symbols, cancellationToken), + produceItems: static (onItemFound, args, cancellationToken) => args.@this.PerformSearchAsync(args.symbols, onItemFound, cancellationToken), + consumeItems: static async (references, args, cancellationToken) => await args.@this._progress.OnReferencesFoundAsync(references, cancellationToken).ConfigureAwait(false), + (@this: this, symbols), cancellationToken).ConfigureAwait(false); } finally diff --git a/src/Workspaces/Core/Portable/Shared/Utilities/ProducerConsumer.cs b/src/Workspaces/Core/Portable/Shared/Utilities/ProducerConsumer.cs index 2de076fc5cbd0..e783e769c23c1 100644 --- a/src/Workspaces/Core/Portable/Shared/Utilities/ProducerConsumer.cs +++ b/src/Workspaces/Core/Portable/Shared/Utilities/ProducerConsumer.cs @@ -42,21 +42,21 @@ internal static class ProducerConsumer /// public static Task RunAsync( ProducerConsumerOptions options, - Func, TArgs, Task> produceItems, - Func, TArgs, Task> consumeItems, + Func, TArgs, CancellationToken, Task> produceItems, + Func, TArgs, CancellationToken, Task> consumeItems, TArgs args, CancellationToken cancellationToken) { return RunImplAsync( options, - static (onItemFound, args) => args.produceItems(onItemFound, args.args), - static (reader, args) => ConsumeItemsAsArrayAsync(reader, args.consumeItems, args.args, args.cancellationToken), - (produceItems, consumeItems, args, cancellationToken), + static (onItemFound, args, cancellationToken) => args.produceItems(onItemFound, args.args, cancellationToken), + static (reader, args, cancellationToken) => ConsumeItemsAsArrayAsync(reader, args.consumeItems, args.args, cancellationToken), + (produceItems, consumeItems, args), cancellationToken); static async Task ConsumeItemsAsArrayAsync( ChannelReader reader, - Func, TArgs, Task> consumeItems, + Func, TArgs, CancellationToken, Task> consumeItems, TArgs args, CancellationToken cancellationToken) { @@ -69,7 +69,7 @@ static async Task ConsumeItemsAsArrayAsync( while (reader.TryRead(out var item)) items.Add(item); - await consumeItems(items.ToImmutableAndClear(), args).ConfigureAwait(false); + await consumeItems(items.ToImmutableAndClear(), args, cancellationToken).ConfigureAwait(false); } } } @@ -79,16 +79,64 @@ static async Task ConsumeItemsAsArrayAsync( /// public static Task RunAsync( ProducerConsumerOptions options, - Func, TArgs, Task> produceItems, - Func, TArgs, Task> consumeItems, + Func, TArgs, CancellationToken, Task> produceItems, + Func, TArgs, CancellationToken, Task> consumeItems, TArgs args, CancellationToken cancellationToken) { return RunImplAsync( options, - static (onItemFound, args) => args.produceItems(onItemFound, args.args), - static (reader, args) => args.consumeItems(reader.ReadAllAsync(args.cancellationToken), args.args), - (produceItems, consumeItems, args, cancellationToken), + static (onItemFound, args, cancellationToken) => args.produceItems(onItemFound, args.args, cancellationToken), + static (reader, args, cancellationToken) => args.consumeItems(reader.ReadAllAsync(cancellationToken), args.args, cancellationToken), + (produceItems, consumeItems, args), + cancellationToken); + } + + /// + /// Version of RunAsync that will process in parallel. + /// + public static Task RunParallelAsync( + IEnumerable source, + Func, TArgs, CancellationToken, Task> produceItems, + Func, TArgs, CancellationToken, Task> consumeItems, + TArgs args, + CancellationToken cancellationToken) + { + return RunAsync( + // We're running in parallel, so we def have multiple writers + ProducerConsumerOptions.SingleReaderOptions, + produceItems: static (callback, args, cancellationToken) => + RoslynParallel.ForEachAsync( + args.source, + cancellationToken, + async (source, cancellationToken) => + await args.produceItems(source, callback, args.args, cancellationToken).ConfigureAwait(false)), + consumeItems: static (enumerable, args, cancellationToken) => args.consumeItems(enumerable, args.args, cancellationToken), + args: (source, produceItems, consumeItems, args), + cancellationToken); + } + + /// + /// Version of RunAsync that will process in parallel. + /// + public static Task RunParallelAsync( + IEnumerable source, + Func, TArgs, CancellationToken, Task> produceItems, + Func, TArgs, CancellationToken, Task> consumeItems, + TArgs args, + CancellationToken cancellationToken) + { + return RunAsync( + // We're running in parallel, so we def have multiple writers + ProducerConsumerOptions.SingleReaderOptions, + produceItems: static (callback, args, cancellationToken) => + RoslynParallel.ForEachAsync( + args.source, + cancellationToken, + async (source, cancellationToken) => + await args.produceItems(source, callback, args.args, cancellationToken).ConfigureAwait(false)), + consumeItems: static (enumerable, args, cancellationToken) => args.consumeItems(enumerable, args.args, cancellationToken), + args: (source, produceItems, consumeItems, args), cancellationToken); } @@ -109,8 +157,8 @@ public static Task RunAsync( /// private static async Task RunImplAsync( ProducerConsumerOptions options, - Func, TArgs, Task> produceItems, - Func, TArgs, Task> consumeItems, + Func, TArgs, CancellationToken, Task> produceItems, + Func, TArgs, CancellationToken, Task> consumeItems, TArgs args, CancellationToken cancellationToken) { @@ -139,7 +187,7 @@ await Task.WhenAll( async Task ReadFromChannelAndConsumeItemsAsync() { await Task.Yield().ConfigureAwait(false); - await consumeItems(channel.Reader, args).ConfigureAwait(false); + await consumeItems(channel.Reader, args, cancellationToken).ConfigureAwait(false); } async Task ProduceItemsAndWriteToChannelAsync() @@ -153,7 +201,7 @@ async Task ProduceItemsAndWriteToChannelAsync() // channel is only ever completed by us (after produceItems completes or throws an exception) or if the // cancellationToken is triggered above in RunAsync. In that latter case, it's ok for writing to the // channel to do nothing as we no longer need to write out those assets to the pipe. - await produceItems(item => channel.Writer.TryWrite(item), args).ConfigureAwait(false); + await produceItems(item => channel.Writer.TryWrite(item), args, cancellationToken).ConfigureAwait(false); } catch (Exception ex) when ((exception = ex) == null) { diff --git a/src/Workspaces/Remote/Core/RemoteHostAssetWriter.cs b/src/Workspaces/Remote/Core/RemoteHostAssetWriter.cs index a43e259eab20a..f1548eb3fbcb6 100644 --- a/src/Workspaces/Remote/Core/RemoteHostAssetWriter.cs +++ b/src/Workspaces/Remote/Core/RemoteHostAssetWriter.cs @@ -71,9 +71,9 @@ internal readonly struct RemoteHostAssetWriter( public Task WriteDataAsync(CancellationToken cancellationToken) => ProducerConsumer.RunAsync( ProducerConsumerOptions.SingleReaderWriterOptions, - produceItems: static (onItemFound, args) => args.@this.FindAssetsAsync(onItemFound, args.cancellationToken), - consumeItems: static (items, args) => args.@this.WriteBatchToPipeAsync(items, args.cancellationToken), - args: (@this: this, cancellationToken), + produceItems: static (onItemFound, @this, cancellationToken) => @this.FindAssetsAsync(onItemFound, cancellationToken), + consumeItems: static (items, @this, cancellationToken) => @this.WriteBatchToPipeAsync(items, cancellationToken), + args: this, cancellationToken); private Task FindAssetsAsync(Action onItemFound, CancellationToken cancellationToken)