Skip to content

Commit

Permalink
Pass workloads to proxy managers (#4422)
Browse files Browse the repository at this point in the history
  • Loading branch information
nohwnd authored May 29, 2023
1 parent b19ec25 commit a4dc055
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ internal sealed class ParallelOperationManager<TManager, TEventHandler, TWorkloa
out int num)
? num
: PreStart;
private readonly Func<TestRuntimeProviderInfo, TManager> _createNewManager;
private readonly Func<TestRuntimeProviderInfo, TWorkload, TManager> _createNewManager;

/// <summary>
/// Default number of Processes
Expand All @@ -50,7 +50,7 @@ internal sealed class ParallelOperationManager<TManager, TEventHandler, TWorkloa
/// <param name="createNewManager">Creates a new manager that is responsible for running a single part of the overall workload.
/// A manager is typically a testhost, and the part of workload is discovering or running a single test dll.</param>
/// <param name="parallelLevel">Determines the maximum amount of parallel managers that can be active at the same time.</param>
public ParallelOperationManager(Func<TestRuntimeProviderInfo, TManager> createNewManager, int parallelLevel)
public ParallelOperationManager(Func<TestRuntimeProviderInfo, TWorkload, TManager> createNewManager, int parallelLevel)
{
_createNewManager = createNewManager;
MaxParallelLevel = parallelLevel;
Expand Down Expand Up @@ -144,7 +144,7 @@ private bool RunWorkInParallel()
var workload = workloadsToAdd[i];
slot.ShouldPreStart = occupiedSlots + i + 1 > MaxParallelLevel;

var manager = _createNewManager(workload.Provider);
var manager = _createNewManager(workload.Provider, workload.Work);
var eventHandler = _getEventHandler(_eventHandler, manager);
slot.EventHandler = eventHandler;
slot.Manager = manager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal sealed class ParallelProxyDiscoveryManager : IParallelProxyDiscoveryMan

public ParallelProxyDiscoveryManager(
IRequestData requestData,
Func<TestRuntimeProviderInfo, IProxyDiscoveryManager> actualProxyManagerCreator,
Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> actualProxyManagerCreator,
DiscoveryDataAggregator dataAggregator,
int parallelLevel,
List<TestRuntimeProviderInfo> testHostProviders)
Expand All @@ -53,7 +53,7 @@ public ParallelProxyDiscoveryManager(

internal ParallelProxyDiscoveryManager(
IRequestData requestData,
Func<TestRuntimeProviderInfo, IProxyDiscoveryManager> actualProxyManagerCreator,
Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> actualProxyManagerCreator,
DiscoveryDataAggregator dataAggregator,
IDataSerializer dataSerializer,
int parallelLevel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ internal sealed class ParallelProxyExecutionManager : IParallelProxyExecutionMan

public ParallelProxyExecutionManager(
IRequestData requestData,
Func<TestRuntimeProviderInfo, IProxyExecutionManager> actualProxyManagerCreator,
Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> actualProxyManagerCreator,
int parallelLevel,
List<TestRuntimeProviderInfo> testHostProviders)
: this(requestData, actualProxyManagerCreator, JsonDataSerializer.Instance, parallelLevel, testHostProviders)
Expand All @@ -73,7 +73,7 @@ public ParallelProxyExecutionManager(

internal ParallelProxyExecutionManager(
IRequestData requestData,
Func<TestRuntimeProviderInfo, IProxyExecutionManager> actualProxyManagerCreator,
Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> actualProxyManagerCreator,
IDataSerializer dataSerializer,
int parallelLevel,
List<TestRuntimeProviderInfo> testHostProviders)
Expand Down
15 changes: 8 additions & 7 deletions src/Microsoft.TestPlatform.CrossPlatEngine/TestEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ public IProxyDiscoveryManager GetDiscoveryManager(
// discovery manager to publish its current state. But doing so we are losing the collected state of all the
// other managers.
var discoveryDataAggregator = new DiscoveryDataAggregator();
Func<TestRuntimeProviderInfo, IProxyDiscoveryManager> proxyDiscoveryManagerCreator = runtimeProviderInfo =>
Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> proxyDiscoveryManagerCreator = (runtimeProviderInfo, discoveryCriteria) =>
{
var sources = runtimeProviderInfo.SourceDetails.Select(r => r.Source!).ToList();
var sources = discoveryCriteria.Sources.ToList();
var hostManager = _testHostProviderManager.GetTestHostManagerByRunConfiguration(runtimeProviderInfo.RunSettings, sources);
hostManager?.Initialize(TestSessionMessageLogger.Instance, runtimeProviderInfo.RunSettings!);

Expand Down Expand Up @@ -241,15 +241,16 @@ public IProxyExecutionManager GetExecutionManager(
}

// This creates a single non-parallel execution manager, based requestData, isDataCollectorEnabled and the
// overall testRunCriteria. The overall testRunCriteria are split to smaller pieces (e.g. each source from the overall
// testRunCriteria) so we can run them in parallel, and those are then passed to those non-parallel execution managers.
// split testRunCriteria. The overall testRunCriteria are split to smaller pieces (e.g. each source from the overall
// testRunCriteria) so we can run them in parallel.
//
// The function below grabs most of the parameter via closure from the local context,
// but gets the runtime provider later, because that is specific info to the source (or sources) it will be running.
// but gets the runtime provider later, as well as the discovery request, because that is specific info to the source (or sources)
// it will be running.
// This creator does not get those smaller pieces of testRunCriteria, those come later when we call a method on
// the non-parallel execution manager we create here. E.g. StartTests(<single piece of testRunCriteria>).
Func<TestRuntimeProviderInfo, IProxyExecutionManager> proxyExecutionManagerCreator = runtimeProviderInfo =>
CreateNonParallelExecutionManager(requestData, testRunCriteria, isDataCollectorEnabled, runtimeProviderInfo);
Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> proxyExecutionManagerCreator = (runtimeProviderInfo, runCriteria) =>
CreateNonParallelExecutionManager(requestData, runCriteria, isDataCollectorEnabled, runtimeProviderInfo);

var executionManager = new ParallelProxyExecutionManager(requestData, proxyExecutionManagerCreator, parallelLevel, testHostProviders);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class ParallelOperationManagerTests
public void OperationManagerShouldRunOnlyMaximumParallelLevelOfWorkInParallelEvenWhenThereAreMoreWorkloads()
{
// Arrange
Func<TestRuntimeProviderInfo, SampleManager> createNewManager = _ => new SampleManager();
Func<TestRuntimeProviderInfo, SampleWorkload, SampleManager> createNewManager = (_, _2) => new SampleManager();
var maxParallelLevel = 3;
var parallelOperationManager = new ParallelOperationManager<SampleManager, SampleHandler, SampleWorkload>(createNewManager, maxParallelLevel);

Expand Down Expand Up @@ -72,7 +72,7 @@ public void OperationManagerShouldRunOnlyMaximumParallelLevelOfWorkInParallelEve
public void OperationManagerShouldCreateOnlyAsManyParallelWorkersAsThereAreWorkloadsWhenTheAmountOfWorkloadsIsSmallerThanMaxParallelLevel()
{
// Arrange
Func<TestRuntimeProviderInfo, SampleManager> createNewManager = _ => new SampleManager();
Func<TestRuntimeProviderInfo, SampleWorkload, SampleManager> createNewManager = (_, _2) => new SampleManager();
var maxParallelLevel = 10;
var parallelOperationManager = new ParallelOperationManager<SampleManager, SampleHandler, SampleWorkload>(createNewManager, maxParallelLevel);

Expand Down Expand Up @@ -108,7 +108,7 @@ public void OperationManagerShouldCreateOnlyAsManyParallelWorkersAsThereAreWorkl
public void OperationManagerShouldCreateAsManyMaxParallelLevel()
{
// Arrange
Func<TestRuntimeProviderInfo, SampleManager> createNewManager = _ => new SampleManager();
Func<TestRuntimeProviderInfo, SampleWorkload, SampleManager> createNewManager = (_, _2) => new SampleManager();
var maxParallelLevel = 10;
var parallelOperationManager = new ParallelOperationManager<SampleManager, SampleHandler, SampleWorkload>(createNewManager, maxParallelLevel);

Expand Down Expand Up @@ -148,7 +148,7 @@ public void OperationManagerShouldCreateAsManyMaxParallelLevel()
public void OperationManagerMovesToTheNextWorkloadOnlyWhenRunNextWorkIsCalled()
{
// Arrange
Func<TestRuntimeProviderInfo, SampleManager> createNewManager = _ => new SampleManager();
Func<TestRuntimeProviderInfo, SampleWorkload, SampleManager> createNewManager = (_, _2) => new SampleManager();
var maxParallelLevel = 2;
var parallelOperationManager = new ParallelOperationManager<SampleManager, SampleHandler, SampleWorkload>(createNewManager, maxParallelLevel);

Expand Down Expand Up @@ -195,7 +195,7 @@ public void OperationManagerRunsAnOperationOnAllActiveManagersWhenDoActionOnAllM
// Arrange
var createdManagers = new List<SampleManager>();
// Store the managers we created so we can inspect them later and see if Abort was called on them.
Func<TestRuntimeProviderInfo, SampleManager> createNewManager = _ =>
Func<TestRuntimeProviderInfo, SampleWorkload, SampleManager> createNewManager = (_, _2) =>
{
var manager = new SampleManager();
createdManagers.Add(manager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ParallelProxyDiscoveryManagerTests
private const int Timeout3Seconds = 3 * 1000;
private readonly Queue<Mock<IProxyDiscoveryManager>> _preCreatedMockManagers;
private readonly List<Mock<IProxyDiscoveryManager>> _usedMockManagers;
private readonly Func<TestRuntimeProviderInfo, IProxyDiscoveryManager> _createMockManager;
private readonly Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> _createMockManager;
private readonly Mock<ITestDiscoveryEventsHandler2> _mockEventHandler;
private readonly List<string> _sources = new() { "1.dll", "2.dll" };
private readonly DiscoveryCriteria _discoveryCriteriaWith2Sources;
Expand All @@ -55,7 +55,7 @@ public ParallelProxyDiscoveryManagerTests()
new Mock<IProxyDiscoveryManager>(),
});
_usedMockManagers = new List<Mock<IProxyDiscoveryManager>>();
_createMockManager = _ =>
_createMockManager = (_, _2) =>
{
// We create the manager at the last possible
// moment now, not when we create the parallel proxy manager class
Expand Down Expand Up @@ -163,7 +163,7 @@ public void HandlePartialDiscoveryCompleteShouldReturnTrueIfDiscoveryWasAbortedA
{
var discoveryManagerMock = new Mock<IProxyDiscoveryManager>();
_preCreatedMockManagers.Enqueue(discoveryManagerMock);
var parallelDiscoveryManager = SetupDiscoveryManager(_ => discoveryManagerMock.Object, 1, true);
var parallelDiscoveryManager = SetupDiscoveryManager((_, _2) => discoveryManagerMock.Object, 1, true);
var proxyDiscovermanager = new ProxyDiscoveryManager(_mockRequestData.Object, new Mock<ITestRequestSender>().Object, new Mock<ITestRuntimeProvider>().Object);

parallelDiscoveryManager.DiscoverTests(_discoveryCriteriaWith2Sources, _mockEventHandler.Object);
Expand All @@ -190,7 +190,7 @@ public void HandlePartialDiscoveryCompleteShouldReturnTrueIfDiscoveryWasAbortedA
{
var discoveryManagerMock = new Mock<IProxyDiscoveryManager>();
_preCreatedMockManagers.Enqueue(discoveryManagerMock);
var parallelDiscoveryManager = SetupDiscoveryManager(_ => discoveryManagerMock.Object, 1, true);
var parallelDiscoveryManager = SetupDiscoveryManager((_, _2) => discoveryManagerMock.Object, 1, true);
var proxyDiscovermanager = new ProxyDiscoveryManager(_mockRequestData.Object, new Mock<ITestRequestSender>().Object, new Mock<ITestRuntimeProvider>().Object);

parallelDiscoveryManager.DiscoverTests(_discoveryCriteriaWith2Sources, _mockEventHandler.Object);
Expand All @@ -206,7 +206,7 @@ public void DiscoveryTestsShouldStopDiscoveryIfAbortionWasRequested()
// Since the hosts are aborted, total aggregated tests sent across will be -1
var discoveryManagerMock = new Mock<IProxyDiscoveryManager>();
_preCreatedMockManagers.Enqueue(discoveryManagerMock);
var parallelDiscoveryManager = SetupDiscoveryManager(_ => discoveryManagerMock.Object, 1, true);
var parallelDiscoveryManager = SetupDiscoveryManager((_, _2) => discoveryManagerMock.Object, 1, true);

Task.Run(() =>
{
Expand All @@ -224,7 +224,7 @@ public void DiscoveryTestsShouldStopDiscoveryIfAbortionWithEventHandlerWasReques
// Since the hosts are aborted, total aggregated tests sent across will be -1
var discoveryManagerMock = new Mock<IProxyDiscoveryManager>();
_preCreatedMockManagers.Enqueue(discoveryManagerMock);
var parallelDiscoveryManager = SetupDiscoveryManager(_ => discoveryManagerMock.Object, 1, true);
var parallelDiscoveryManager = SetupDiscoveryManager((_, _2) => discoveryManagerMock.Object, 1, true);

Task.Run(() =>
{
Expand Down Expand Up @@ -331,7 +331,7 @@ public void DiscoveryTestsWithCompletionMarksAllSourcesAsFullyDiscovered()
Assert.AreEqual(0, _dataAggregator.GetSourcesWithStatus(DiscoveryStatus.NotDiscovered).Count);
}

private ParallelProxyDiscoveryManager SetupDiscoveryManager(Func<TestRuntimeProviderInfo, IProxyDiscoveryManager> getProxyManager, int parallelLevel, bool abortDiscovery)
private ParallelProxyDiscoveryManager SetupDiscoveryManager(Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> getProxyManager, int parallelLevel, bool abortDiscovery)
{
var parallelDiscoveryManager = new ParallelProxyDiscoveryManager(_mockRequestData.Object, getProxyManager, dataAggregator: new(), parallelLevel, _runtimeProviders);
SetupDiscoveryTests(_processedSources, abortDiscovery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ParallelProxyExecutionManagerTests
private static readonly int Timeout3Seconds = 3 * 1000; // In milliseconds

private readonly List<Mock<IProxyExecutionManager>> _usedMockManagers;
private readonly Func<TestRuntimeProviderInfo, IProxyExecutionManager> _createMockManager;
private readonly Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> _createMockManager;
private readonly Mock<IInternalTestRunEventsHandler> _mockEventHandler;

private readonly List<string> _sources;
Expand Down Expand Up @@ -59,7 +59,7 @@ public ParallelProxyExecutionManagerTests()
new Mock<IProxyExecutionManager>(),
});
_usedMockManagers = new List<Mock<IProxyExecutionManager>>();
_createMockManager = _ =>
_createMockManager = (_, _2) =>
{
_createMockManagerCalled++;
var manager = _preCreatedMockManagers.Dequeue();
Expand Down Expand Up @@ -245,7 +245,7 @@ public void StartTestRunWithTestsShouldNotSendCompleteUntilAllTestsAreProcessed(
public void StartTestRunShouldNotProcessAllSourcesOnExecutionCancelsForAnySource()
{
var executionManagerMock = new Mock<IProxyExecutionManager>();
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, _ => executionManagerMock.Object, 1, _runtimeProviders);
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, (_, _2) => executionManagerMock.Object, 1, _runtimeProviders);
_preCreatedMockManagers.Enqueue(executionManagerMock);
SetupMockManagers(_processedSources, isCanceled: true, isAborted: false);
SetupHandleTestRunComplete(_executionCompleted);
Expand All @@ -260,7 +260,7 @@ public void StartTestRunShouldNotProcessAllSourcesOnExecutionCancelsForAnySource
public void StartTestRunShouldNotProcessAllSourcesOnExecutionAborted()
{
var executionManagerMock = new Mock<IProxyExecutionManager>();
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, _ => executionManagerMock.Object, 1, _runtimeProviders);
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, (_, _2) => executionManagerMock.Object, 1, _runtimeProviders);
_preCreatedMockManagers.Enqueue(executionManagerMock);
SetupMockManagers(_processedSources, isCanceled: false, isAborted: false);
SetupHandleTestRunComplete(_executionCompleted);
Expand All @@ -276,7 +276,7 @@ public void StartTestRunShouldNotProcessAllSourcesOnExecutionAborted()
public void StartTestRunShouldProcessAllSourcesOnExecutionAbortsForAnySource()
{
var executionManagerMock = new Mock<IProxyExecutionManager>();
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, _ => executionManagerMock.Object, 1, _runtimeProviders);
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, (_, _2) => executionManagerMock.Object, 1, _runtimeProviders);
_preCreatedMockManagers.Enqueue(executionManagerMock);
SetupMockManagers(_processedSources, isCanceled: false, isAborted: true);
SetupHandleTestRunComplete(_executionCompleted);
Expand Down Expand Up @@ -432,12 +432,12 @@ public void StartTestRunShouldAggregateRunData()
AssertMissingAndDuplicateSources(_processedSources);
}

private ParallelProxyExecutionManager SetupExecutionManager(Func<TestRuntimeProviderInfo, IProxyExecutionManager> proxyManagerFunc, int parallelLevel)
private ParallelProxyExecutionManager SetupExecutionManager(Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> proxyManagerFunc, int parallelLevel)
{
return SetupExecutionManager(proxyManagerFunc, parallelLevel, false);
}

private ParallelProxyExecutionManager SetupExecutionManager(Func<TestRuntimeProviderInfo, IProxyExecutionManager> proxyManagerFunc, int parallelLevel, bool setupTestCases)
private ParallelProxyExecutionManager SetupExecutionManager(Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> proxyManagerFunc, int parallelLevel, bool setupTestCases)
{
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, proxyManagerFunc, parallelLevel, _runtimeProviders);

Expand Down

0 comments on commit a4dc055

Please sign in to comment.