From a2fb70f7f035f66a1e0f8460de33324a1ae2ce4b Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Wed, 14 Apr 2021 13:02:30 -0700 Subject: [PATCH 1/4] Refactoring initialization out --- .../ChangeFeedEstimatorCore.cs | 7 ++- .../ChangeFeedEstimatorIterator.cs | 7 ++- .../ChangeFeedEstimatorRunner.cs | 59 +++++++++++++------ .../FeedProcessing/FeedEstimatorRunner.cs | 6 +- ...DocumentServiceLeaseStoreManagerBuilder.cs | 4 +- .../Resource/Container/ContainerCore.Items.cs | 3 +- 6 files changed, 59 insertions(+), 27 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorCore.cs index a12a1066ad..6b7df66a5f 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorCore.cs @@ -6,21 +6,25 @@ namespace Microsoft.Azure.Cosmos { using System; using Microsoft.Azure.Cosmos.ChangeFeed; + using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; internal sealed class ChangeFeedEstimatorCore : ChangeFeedEstimator { private readonly string processorName; private readonly ContainerInternal monitoredContainer; private readonly ContainerInternal leaseContainer; + private readonly DocumentServiceLeaseContainer documentServiceLeaseContainer; public ChangeFeedEstimatorCore( string processorName, ContainerInternal monitoredContainer, - ContainerInternal leaseContainer) + ContainerInternal leaseContainer, + DocumentServiceLeaseContainer documentServiceLeaseContainer) { this.processorName = processorName ?? throw new ArgumentNullException(nameof(processorName)); this.leaseContainer = leaseContainer ?? throw new ArgumentNullException(nameof(leaseContainer)); this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(monitoredContainer)); + this.documentServiceLeaseContainer = documentServiceLeaseContainer; } public override FeedIterator GetCurrentStateIterator(ChangeFeedEstimatorRequestOptions changeFeedEstimatorRequestOptions = null) @@ -29,6 +33,7 @@ public override FeedIterator GetCurrentStateIterator(C this.processorName, this.monitoredContainer, this.leaseContainer, + this.documentServiceLeaseContainer, changeFeedEstimatorRequestOptions); } } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs index 5410f78fb2..c1986d2518 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs @@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed using System.Globalization; using System.Linq; using System.Net; - using System.Security.Permissions; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; @@ -45,11 +44,13 @@ public ChangeFeedEstimatorIterator( string processorName, ContainerInternal monitoredContainer, ContainerInternal leaseContainer, + DocumentServiceLeaseContainer documentServiceLeaseContainer, ChangeFeedEstimatorRequestOptions changeFeedEstimatorRequestOptions) : this( processorName, monitoredContainer, leaseContainer, + documentServiceLeaseContainer, changeFeedEstimatorRequestOptions, (DocumentServiceLease lease, string continuationToken, bool startFromBeginning) => ChangeFeedPartitionKeyResultSetIteratorCore.Create( lease: lease, @@ -74,16 +75,17 @@ internal ChangeFeedEstimatorIterator( processorName: string.Empty, monitoredContainer: monitoredContainer, leaseContainer: leaseContainer, + documentServiceLeaseContainer: documentServiceLeaseContainer, changeFeedEstimatorRequestOptions: changeFeedEstimatorRequestOptions, monitoredContainerFeedCreator: monitoredContainerFeedCreator) { - this.documentServiceLeaseContainer = documentServiceLeaseContainer; } private ChangeFeedEstimatorIterator( string processorName, ContainerInternal monitoredContainer, ContainerInternal leaseContainer, + DocumentServiceLeaseContainer documentServiceLeaseContainer, ChangeFeedEstimatorRequestOptions changeFeedEstimatorRequestOptions, Func monitoredContainerFeedCreator) { @@ -102,6 +104,7 @@ private ChangeFeedEstimatorIterator( this.hasMoreResults = true; this.monitoredContainerFeedCreator = monitoredContainerFeedCreator; + this.documentServiceLeaseContainer = documentServiceLeaseContainer; } public override bool HasMoreResults => this.hasMoreResults; diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs index c5136c4f48..86083629a3 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs @@ -10,6 +10,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; + using Microsoft.Azure.Cosmos.ChangeFeed.Utils; using Microsoft.Azure.Cosmos.Core.Trace; using static Microsoft.Azure.Cosmos.Container; @@ -18,6 +19,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed /// internal sealed class ChangeFeedEstimatorRunner : ChangeFeedProcessor { + private const string EstimatorDefaultHostName = "Estimator"; private readonly ChangesEstimationHandler initialEstimateDelegate; private readonly TimeSpan? estimatorPeriod; private CancellationTokenSource shutdownCts; @@ -26,6 +28,7 @@ internal sealed class ChangeFeedEstimatorRunner : ChangeFeedProcessor private FeedEstimatorRunner feedEstimatorRunner; private ChangeFeedEstimator remainingWorkEstimator; private ChangeFeedLeaseOptions changeFeedLeaseOptions; + private DocumentServiceLeaseContainer documentServiceLeaseContainer; private bool initialized = false; private Task runAsync; @@ -65,18 +68,19 @@ public void ApplyBuildConfiguration( ChangeFeedProcessorOptions changeFeedProcessorOptions, ContainerInternal monitoredContainer) { - if (monitoredContainer == null) throw new ArgumentNullException(nameof(monitoredContainer)); if (leaseContainer == null && customDocumentServiceLeaseStoreManager == null) throw new ArgumentNullException(nameof(leaseContainer)); this.leaseContainer = leaseContainer; - this.monitoredContainer = monitoredContainer; + this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(monitoredContainer)); this.changeFeedLeaseOptions = changeFeedLeaseOptions; + this.documentServiceLeaseContainer = customDocumentServiceLeaseStoreManager?.LeaseContainer; } - public override Task StartAsync() + public override async Task StartAsync() { if (!this.initialized) { + await this.InitializeLeaseStoreAsync(); this.feedEstimatorRunner = this.BuildFeedEstimatorRunner(); this.initialized = true; } @@ -84,26 +88,28 @@ public override Task StartAsync() this.shutdownCts = new CancellationTokenSource(); DefaultTrace.TraceInformation("Starting estimator..."); this.runAsync = this.feedEstimatorRunner.RunAsync(this.shutdownCts.Token); - return Task.CompletedTask; } public override async Task StopAsync() { DefaultTrace.TraceInformation("Stopping estimator..."); - this.shutdownCts.Cancel(); - try + if (this.initialized) { - await this.runAsync.ConfigureAwait(false); - } - catch (TaskCanceledException ex) - { - // Expected during shutdown - Cosmos.Extensions.TraceException(ex); - } - catch (OperationCanceledException ex) - { - // Expected during shutdown - Cosmos.Extensions.TraceException(ex); + this.shutdownCts.Cancel(); + try + { + await this.runAsync.ConfigureAwait(false); + } + catch (TaskCanceledException ex) + { + // Expected during shutdown + Cosmos.Extensions.TraceException(ex); + } + catch (OperationCanceledException ex) + { + // Expected during shutdown + Cosmos.Extensions.TraceException(ex); + } } } @@ -114,10 +120,27 @@ private FeedEstimatorRunner BuildFeedEstimatorRunner() this.remainingWorkEstimator = new ChangeFeedEstimatorCore( this.changeFeedLeaseOptions.LeasePrefix, this.monitoredContainer, - this.leaseContainer); + this.leaseContainer, + this.documentServiceLeaseContainer); } return new FeedEstimatorRunner(this.initialEstimateDelegate, this.remainingWorkEstimator, this.estimatorPeriod); } + + private async Task InitializeLeaseStoreAsync() + { + if (this.documentServiceLeaseContainer == null) + { + string monitoredContainerAndDatabaseRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync(default); + string leasePrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredContainerAndDatabaseRid); + DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync( + monitoredContainer: this.monitoredContainer, + leaseContainer: this.leaseContainer, + leaseContainerPrefix: leasePrefix, + instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName); + + this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer; + } + } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedEstimatorRunner.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedEstimatorRunner.cs index 12163491f6..fd3d345fe5 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedEstimatorRunner.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedEstimatorRunner.cs @@ -20,7 +20,6 @@ internal sealed class FeedEstimatorRunner private readonly ChangeFeedEstimator remainingWorkEstimator; private readonly TimeSpan monitoringDelay; private readonly ChangesEstimationHandler dispatchEstimation; - private readonly Func estimateAndDispatchAsync; public FeedEstimatorRunner( ChangesEstimationHandler dispatchEstimation, @@ -28,7 +27,6 @@ public FeedEstimatorRunner( TimeSpan? estimationPeriod = null) { this.dispatchEstimation = dispatchEstimation; - this.estimateAndDispatchAsync = this.EstimateAsync; this.remainingWorkEstimator = remainingWorkEstimator; this.monitoringDelay = estimationPeriod ?? FeedEstimatorRunner.defaultMonitoringDelay; } @@ -39,12 +37,14 @@ public async Task RunAsync(CancellationToken cancellationToken) { try { - await this.estimateAndDispatchAsync(cancellationToken); + await this.EstimateAsync(cancellationToken); } catch (TaskCanceledException canceledException) { if (cancellationToken.IsCancellationRequested) + { throw; + } Extensions.TraceException(new Exception("exception within estimator", canceledException)); diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs index 81fc882bb2..f1b490799c 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement using System; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; + using Microsoft.Azure.Cosmos.Tracing; /// /// Provides flexible way to build lease manager constructor parameters. @@ -20,8 +21,7 @@ public static async Task InitializeAsync( string leaseContainerPrefix, string instanceName) { - ContainerResponse cosmosContainerResponse = await leaseContainer.ReadContainerAsync().ConfigureAwait(false); - ContainerProperties containerProperties = cosmosContainerResponse.Resource; + ContainerProperties containerProperties = await leaseContainer.GetCachedContainerPropertiesAsync(forceRefresh: false, NoOpTrace.Singleton, cancellationToken: default); bool isPartitioned = containerProperties.PartitionKey != null && diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs index fac23045c0..3ce5414337 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs @@ -632,7 +632,8 @@ public override ChangeFeedEstimator GetChangeFeedEstimator( return new ChangeFeedEstimatorCore( processorName: processorName, monitoredContainer: this, - leaseContainer: (ContainerInternal)leaseContainer); + leaseContainer: (ContainerInternal)leaseContainer, + documentServiceLeaseContainer: default); } public override TransactionalBatch CreateTransactionalBatch(PartitionKey partitionKey) From 3fd4d307585cb834c1bfcb1d3ae1a7e77db3cbd8 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Wed, 14 Apr 2021 13:02:35 -0700 Subject: [PATCH 2/4] uts --- .../ChangeFeedEstimatorIteratorTests.cs | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedEstimatorIteratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedEstimatorIteratorTests.cs index 7406037de4..54b2e0f9ee 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedEstimatorIteratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedEstimatorIteratorTests.cs @@ -12,6 +12,8 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; + using Microsoft.Azure.Cosmos.Tests; + using Microsoft.Azure.Cosmos.Tracing; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; using Newtonsoft.Json.Linq; @@ -322,6 +324,45 @@ FeedIterator feedCreator(DocumentServiceLease lease, string continuationToken, b Assert.AreEqual(leaseToken, remainingLeaseWork.LeaseToken); } + [TestMethod] + public async Task ShouldInitializeDocumentLeaseContainer() + { + static FeedIterator feedCreator(DocumentServiceLease lease, string continuationToken, bool startFromBeginning) + { + return Mock.Of(); + } + + Mock mockedContext = new Mock(MockBehavior.Strict); + mockedContext.Setup(c => c.Client).Returns(MockCosmosUtil.CreateMockCosmosClient()); + + string databaseRid = Guid.NewGuid().ToString(); + Mock mockedMonitoredDatabase = new Mock(MockBehavior.Strict); + mockedMonitoredDatabase.Setup(c => c.GetRIDAsync(It.IsAny())).ReturnsAsync(databaseRid); + + string monitoredContainerRid = Guid.NewGuid().ToString(); + Mock mockedMonitoredContainer = new Mock(MockBehavior.Strict); + mockedMonitoredContainer.Setup(c => c.GetCachedRIDAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(monitoredContainerRid); + mockedMonitoredContainer.Setup(c => c.Database).Returns(mockedMonitoredDatabase.Object); + mockedMonitoredContainer.Setup(c => c.ClientContext).Returns(mockedContext.Object); + + Mock leaseFeedIterator = new Mock(); + leaseFeedIterator.Setup(i => i.HasMoreResults).Returns(false); + + MockmockedLeaseContainer = new Mock(MockBehavior.Strict); + mockedLeaseContainer.Setup(c => c.GetCachedContainerPropertiesAsync(It.Is(b => b == false), It.IsAny(), It.IsAny())).ReturnsAsync(new ContainerProperties()); + mockedLeaseContainer.Setup(c => c.GetItemQueryStreamIterator(It.Is(queryText => queryText.Contains($"{databaseRid}_{monitoredContainerRid}")), It.Is(continuation => continuation == null), It.IsAny())) + .Returns(leaseFeedIterator.Object); + + ChangeFeedEstimatorIterator remainingWorkEstimator = new ChangeFeedEstimatorIterator( + mockedMonitoredContainer.Object, + mockedLeaseContainer.Object, + documentServiceLeaseContainer: default, + monitoredContainerFeedCreator: feedCreator, + changeFeedEstimatorRequestOptions: default); + + await remainingWorkEstimator.ReadNextAsync(default); + } + [TestMethod] public void ExtractLsnFromSessionToken_ShouldParseOldSessionToken() { From e275875f6265910b5dcba690b5108d3df476f0c4 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Wed, 14 Apr 2021 13:02:46 -0700 Subject: [PATCH 3/4] emulator tests --- .../ChangeFeed/EstimatorTests.cs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/EstimatorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/EstimatorTests.cs index c523072d66..b0416560c2 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/EstimatorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/EstimatorTests.cs @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed using System; using System.Collections.Generic; using System.Linq; + using System.Net; using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -54,6 +55,20 @@ public async Task WhenNoLeasesExist() Assert.AreEqual(1, receivedEstimation); } + [TestMethod] + public async Task StartAsync_ShouldThrowIfContainerDoesNotExist() + { + ChangeFeedProcessor estimator = this.cosmosClient.GetContainer(this.database.Id, "DoesNotExist") + .GetChangeFeedEstimatorBuilder("test", (long estimation, CancellationToken token) => + { + return Task.CompletedTask; + }, TimeSpan.FromSeconds(1)) + .WithLeaseContainer(this.LeaseContainer).Build(); + + CosmosException exception = await Assert.ThrowsExceptionAsync(() => estimator.StartAsync()); + Assert.AreEqual(HttpStatusCode.NotFound, exception.StatusCode); + } + [TestMethod] public async Task WhenNoLeasesExist_Pull() { From 4c3689dc98fea159a42fd50c55653ae01329539f Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Wed, 14 Apr 2021 13:12:30 -0700 Subject: [PATCH 4/4] Adding tests for transient errors --- .../FeedProcessing/FeedEstimatorRunner.cs | 2 +- .../ChangeFeed/FeedEstimatorRunnerTests.cs | 41 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedEstimatorRunner.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedEstimatorRunner.cs index fd3d345fe5..8038ff2735 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedEstimatorRunner.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedEstimatorRunner.cs @@ -57,9 +57,9 @@ public async Task RunAsync(CancellationToken cancellationToken) private async Task EstimateAsync(CancellationToken cancellationToken) { - long estimation = await this.GetEstimatedRemainingWorkAsync(cancellationToken).ConfigureAwait(false); try { + long estimation = await this.GetEstimatedRemainingWorkAsync(cancellationToken).ConfigureAwait(false); await this.dispatchEstimation(estimation, cancellationToken).ConfigureAwait(false); } catch (Exception userException) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/FeedEstimatorRunnerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/FeedEstimatorRunnerTests.cs index efeb93cd84..7e219810ed 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/FeedEstimatorRunnerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/FeedEstimatorRunnerTests.cs @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing; + using Microsoft.Azure.Cosmos.Resource.CosmosExceptions; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -53,6 +54,46 @@ Task estimatorDispatcher(long detectedEstimation, CancellationToken token) Assert.IsTrue(detectedEstimationCorrectly); } + [TestMethod] + public async Task FeedEstimatorRunner_TransientErrorsShouldContinue() + { + const long estimation = 10; + bool detectedEstimationCorrectly = false; + CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(500); + Task estimatorDispatcher(long detectedEstimation, CancellationToken token) + { + detectedEstimationCorrectly = estimation == detectedEstimation; + cancellationTokenSource.Cancel(); + return Task.CompletedTask; + } + + Mock> mockedResponse = new Mock>(); + mockedResponse.Setup(r => r.Count).Returns(1); + mockedResponse.Setup(r => r.GetEnumerator()).Returns(new List() { new ChangeFeedProcessorState(string.Empty, estimation, string.Empty) }.GetEnumerator()); + + Mock> mockedIterator = new Mock>(); + mockedIterator.SetupSequence(i => i.ReadNextAsync(It.IsAny())) + .ThrowsAsync(CosmosExceptionFactory.CreateThrottledException("throttled", new Headers())) + .ReturnsAsync(mockedResponse.Object); + + Mock mockedEstimator = new Mock(); + mockedEstimator.Setup(e => e.GetCurrentStateIterator(It.IsAny())).Returns(mockedIterator.Object); + + FeedEstimatorRunner estimatorCore = new FeedEstimatorRunner(estimatorDispatcher, mockedEstimator.Object, TimeSpan.FromMilliseconds(10)); + + try + { + await estimatorCore.RunAsync(cancellationTokenSource.Token); + } + catch (TaskCanceledException) + { + // expected + } + + Assert.IsTrue(detectedEstimationCorrectly); + mockedIterator.Verify(i => i.ReadNextAsync(It.IsAny()), Times.Exactly(2)); + } + [TestMethod] public async Task FeedEstimatorRunner_NoLeases() {