Skip to content

Commit

Permalink
Change Feed Estimator: Fixes exception propagation (#2392)
Browse files Browse the repository at this point in the history
* Refactoring initialization out

* uts

* emulator tests

* Adding tests for transient errors
  • Loading branch information
ealsur authored Apr 14, 2021
1 parent 12c8aff commit 6a38b2d
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using Microsoft.Azure.Cosmos.ChangeFeed;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;

internal sealed class ChangeFeedEstimatorCore : ChangeFeedEstimator
{
private readonly string processorName;
private readonly ContainerInternal monitoredContainer;
private readonly ContainerInternal leaseContainer;
private readonly DocumentServiceLeaseContainer documentServiceLeaseContainer;

public ChangeFeedEstimatorCore(
string processorName,
ContainerInternal monitoredContainer,
ContainerInternal leaseContainer)
ContainerInternal leaseContainer,
DocumentServiceLeaseContainer documentServiceLeaseContainer)
{
this.processorName = processorName ?? throw new ArgumentNullException(nameof(processorName));
this.leaseContainer = leaseContainer ?? throw new ArgumentNullException(nameof(leaseContainer));
this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(monitoredContainer));
this.documentServiceLeaseContainer = documentServiceLeaseContainer;
}

public override FeedIterator<ChangeFeedProcessorState> GetCurrentStateIterator(ChangeFeedEstimatorRequestOptions changeFeedEstimatorRequestOptions = null)
Expand All @@ -29,6 +33,7 @@ public override FeedIterator<ChangeFeedProcessorState> GetCurrentStateIterator(C
this.processorName,
this.monitoredContainer,
this.leaseContainer,
this.documentServiceLeaseContainer,
changeFeedEstimatorRequestOptions);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using System.Globalization;
using System.Linq;
using System.Net;
using System.Security.Permissions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
Expand Down Expand Up @@ -45,11 +44,13 @@ public ChangeFeedEstimatorIterator(
string processorName,
ContainerInternal monitoredContainer,
ContainerInternal leaseContainer,
DocumentServiceLeaseContainer documentServiceLeaseContainer,
ChangeFeedEstimatorRequestOptions changeFeedEstimatorRequestOptions)
: this(
processorName,
monitoredContainer,
leaseContainer,
documentServiceLeaseContainer,
changeFeedEstimatorRequestOptions,
(DocumentServiceLease lease, string continuationToken, bool startFromBeginning) => ChangeFeedPartitionKeyResultSetIteratorCore.Create(
lease: lease,
Expand All @@ -74,16 +75,17 @@ internal ChangeFeedEstimatorIterator(
processorName: string.Empty,
monitoredContainer: monitoredContainer,
leaseContainer: leaseContainer,
documentServiceLeaseContainer: documentServiceLeaseContainer,
changeFeedEstimatorRequestOptions: changeFeedEstimatorRequestOptions,
monitoredContainerFeedCreator: monitoredContainerFeedCreator)
{
this.documentServiceLeaseContainer = documentServiceLeaseContainer;
}

private ChangeFeedEstimatorIterator(
string processorName,
ContainerInternal monitoredContainer,
ContainerInternal leaseContainer,
DocumentServiceLeaseContainer documentServiceLeaseContainer,
ChangeFeedEstimatorRequestOptions changeFeedEstimatorRequestOptions,
Func<DocumentServiceLease, string, bool, FeedIterator> monitoredContainerFeedCreator)
{
Expand All @@ -102,6 +104,7 @@ private ChangeFeedEstimatorIterator(
this.hasMoreResults = true;

this.monitoredContainerFeedCreator = monitoredContainerFeedCreator;
this.documentServiceLeaseContainer = documentServiceLeaseContainer;
}

public override bool HasMoreResults => this.hasMoreResults;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Core.Trace;
using static Microsoft.Azure.Cosmos.Container;

Expand All @@ -18,6 +19,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
/// </summary>
internal sealed class ChangeFeedEstimatorRunner : ChangeFeedProcessor
{
private const string EstimatorDefaultHostName = "Estimator";
private readonly ChangesEstimationHandler initialEstimateDelegate;
private readonly TimeSpan? estimatorPeriod;
private CancellationTokenSource shutdownCts;
Expand All @@ -26,6 +28,7 @@ internal sealed class ChangeFeedEstimatorRunner : ChangeFeedProcessor
private FeedEstimatorRunner feedEstimatorRunner;
private ChangeFeedEstimator remainingWorkEstimator;
private ChangeFeedLeaseOptions changeFeedLeaseOptions;
private DocumentServiceLeaseContainer documentServiceLeaseContainer;
private bool initialized = false;

private Task runAsync;
Expand Down Expand Up @@ -65,45 +68,48 @@ public void ApplyBuildConfiguration(
ChangeFeedProcessorOptions changeFeedProcessorOptions,
ContainerInternal monitoredContainer)
{
if (monitoredContainer == null) throw new ArgumentNullException(nameof(monitoredContainer));
if (leaseContainer == null && customDocumentServiceLeaseStoreManager == null) throw new ArgumentNullException(nameof(leaseContainer));

this.leaseContainer = leaseContainer;
this.monitoredContainer = monitoredContainer;
this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(monitoredContainer));
this.changeFeedLeaseOptions = changeFeedLeaseOptions;
this.documentServiceLeaseContainer = customDocumentServiceLeaseStoreManager?.LeaseContainer;
}

public override Task StartAsync()
public override async Task StartAsync()
{
if (!this.initialized)
{
await this.InitializeLeaseStoreAsync();
this.feedEstimatorRunner = this.BuildFeedEstimatorRunner();
this.initialized = true;
}

this.shutdownCts = new CancellationTokenSource();
DefaultTrace.TraceInformation("Starting estimator...");
this.runAsync = this.feedEstimatorRunner.RunAsync(this.shutdownCts.Token);
return Task.CompletedTask;
}

public override async Task StopAsync()
{
DefaultTrace.TraceInformation("Stopping estimator...");
this.shutdownCts.Cancel();
try
if (this.initialized)
{
await this.runAsync.ConfigureAwait(false);
}
catch (TaskCanceledException ex)
{
// Expected during shutdown
Cosmos.Extensions.TraceException(ex);
}
catch (OperationCanceledException ex)
{
// Expected during shutdown
Cosmos.Extensions.TraceException(ex);
this.shutdownCts.Cancel();
try
{
await this.runAsync.ConfigureAwait(false);
}
catch (TaskCanceledException ex)
{
// Expected during shutdown
Cosmos.Extensions.TraceException(ex);
}
catch (OperationCanceledException ex)
{
// Expected during shutdown
Cosmos.Extensions.TraceException(ex);
}
}
}

Expand All @@ -114,10 +120,27 @@ private FeedEstimatorRunner BuildFeedEstimatorRunner()
this.remainingWorkEstimator = new ChangeFeedEstimatorCore(
this.changeFeedLeaseOptions.LeasePrefix,
this.monitoredContainer,
this.leaseContainer);
this.leaseContainer,
this.documentServiceLeaseContainer);
}

return new FeedEstimatorRunner(this.initialEstimateDelegate, this.remainingWorkEstimator, this.estimatorPeriod);
}

private async Task InitializeLeaseStoreAsync()
{
if (this.documentServiceLeaseContainer == null)
{
string monitoredContainerAndDatabaseRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync(default);
string leasePrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredContainerAndDatabaseRid);
DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(
monitoredContainer: this.monitoredContainer,
leaseContainer: this.leaseContainer,
leaseContainerPrefix: leasePrefix,
instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName);

this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ internal sealed class FeedEstimatorRunner
private readonly ChangeFeedEstimator remainingWorkEstimator;
private readonly TimeSpan monitoringDelay;
private readonly ChangesEstimationHandler dispatchEstimation;
private readonly Func<CancellationToken, Task> estimateAndDispatchAsync;

public FeedEstimatorRunner(
ChangesEstimationHandler dispatchEstimation,
ChangeFeedEstimator remainingWorkEstimator,
TimeSpan? estimationPeriod = null)
{
this.dispatchEstimation = dispatchEstimation;
this.estimateAndDispatchAsync = this.EstimateAsync;
this.remainingWorkEstimator = remainingWorkEstimator;
this.monitoringDelay = estimationPeriod ?? FeedEstimatorRunner.defaultMonitoringDelay;
}
Expand All @@ -39,12 +37,14 @@ public async Task RunAsync(CancellationToken cancellationToken)
{
try
{
await this.estimateAndDispatchAsync(cancellationToken);
await this.EstimateAsync(cancellationToken);
}
catch (TaskCanceledException canceledException)
{
if (cancellationToken.IsCancellationRequested)
{
throw;
}

Extensions.TraceException(new Exception("exception within estimator", canceledException));

Expand All @@ -57,9 +57,9 @@ public async Task RunAsync(CancellationToken cancellationToken)

private async Task EstimateAsync(CancellationToken cancellationToken)
{
long estimation = await this.GetEstimatedRemainingWorkAsync(cancellationToken).ConfigureAwait(false);
try
{
long estimation = await this.GetEstimatedRemainingWorkAsync(cancellationToken).ConfigureAwait(false);
await this.dispatchEstimation(estimation, cancellationToken).ConfigureAwait(false);
}
catch (Exception userException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Tracing;

/// <summary>
/// Provides flexible way to build lease manager constructor parameters.
Expand All @@ -20,8 +21,7 @@ public static async Task<DocumentServiceLeaseStoreManager> InitializeAsync(
string leaseContainerPrefix,
string instanceName)
{
ContainerResponse cosmosContainerResponse = await leaseContainer.ReadContainerAsync().ConfigureAwait(false);
ContainerProperties containerProperties = cosmosContainerResponse.Resource;
ContainerProperties containerProperties = await leaseContainer.GetCachedContainerPropertiesAsync(forceRefresh: false, NoOpTrace.Singleton, cancellationToken: default);

bool isPartitioned =
containerProperties.PartitionKey != null &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,8 @@ public override ChangeFeedEstimator GetChangeFeedEstimator(
return new ChangeFeedEstimatorCore(
processorName: processorName,
monitoredContainer: this,
leaseContainer: (ContainerInternal)leaseContainer);
leaseContainer: (ContainerInternal)leaseContainer,
documentServiceLeaseContainer: default);
}

public override TransactionalBatch CreateTransactionalBatch(PartitionKey partitionKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -54,6 +55,20 @@ public async Task WhenNoLeasesExist()
Assert.AreEqual(1, receivedEstimation);
}

[TestMethod]
public async Task StartAsync_ShouldThrowIfContainerDoesNotExist()
{
ChangeFeedProcessor estimator = this.cosmosClient.GetContainer(this.database.Id, "DoesNotExist")
.GetChangeFeedEstimatorBuilder("test", (long estimation, CancellationToken token) =>
{
return Task.CompletedTask;
}, TimeSpan.FromSeconds(1))
.WithLeaseContainer(this.LeaseContainer).Build();

CosmosException exception = await Assert.ThrowsExceptionAsync<CosmosException>(() => estimator.StartAsync());
Assert.AreEqual(HttpStatusCode.NotFound, exception.StatusCode);
}

[TestMethod]
public async Task WhenNoLeasesExist_Pull()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.Tests;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Newtonsoft.Json.Linq;
Expand Down Expand Up @@ -322,6 +324,45 @@ FeedIterator feedCreator(DocumentServiceLease lease, string continuationToken, b
Assert.AreEqual(leaseToken, remainingLeaseWork.LeaseToken);
}

[TestMethod]
public async Task ShouldInitializeDocumentLeaseContainer()
{
static FeedIterator feedCreator(DocumentServiceLease lease, string continuationToken, bool startFromBeginning)
{
return Mock.Of<FeedIterator>();
}

Mock<CosmosClientContext> mockedContext = new Mock<CosmosClientContext>(MockBehavior.Strict);
mockedContext.Setup(c => c.Client).Returns(MockCosmosUtil.CreateMockCosmosClient());

string databaseRid = Guid.NewGuid().ToString();
Mock<DatabaseInternal> mockedMonitoredDatabase = new Mock<DatabaseInternal>(MockBehavior.Strict);
mockedMonitoredDatabase.Setup(c => c.GetRIDAsync(It.IsAny<CancellationToken>())).ReturnsAsync(databaseRid);

string monitoredContainerRid = Guid.NewGuid().ToString();
Mock<ContainerInternal> mockedMonitoredContainer = new Mock<ContainerInternal>(MockBehavior.Strict);
mockedMonitoredContainer.Setup(c => c.GetCachedRIDAsync(It.IsAny<bool>(), It.IsAny<ITrace>(), It.IsAny<CancellationToken>())).ReturnsAsync(monitoredContainerRid);
mockedMonitoredContainer.Setup(c => c.Database).Returns(mockedMonitoredDatabase.Object);
mockedMonitoredContainer.Setup(c => c.ClientContext).Returns(mockedContext.Object);

Mock<FeedIterator> leaseFeedIterator = new Mock<FeedIterator>();
leaseFeedIterator.Setup(i => i.HasMoreResults).Returns(false);

Mock<ContainerInternal>mockedLeaseContainer = new Mock<ContainerInternal>(MockBehavior.Strict);
mockedLeaseContainer.Setup(c => c.GetCachedContainerPropertiesAsync(It.Is<bool>(b => b == false), It.IsAny<ITrace>(), It.IsAny<CancellationToken>())).ReturnsAsync(new ContainerProperties());
mockedLeaseContainer.Setup(c => c.GetItemQueryStreamIterator(It.Is<string>(queryText => queryText.Contains($"{databaseRid}_{monitoredContainerRid}")), It.Is<string>(continuation => continuation == null), It.IsAny<QueryRequestOptions>()))
.Returns(leaseFeedIterator.Object);

ChangeFeedEstimatorIterator remainingWorkEstimator = new ChangeFeedEstimatorIterator(
mockedMonitoredContainer.Object,
mockedLeaseContainer.Object,
documentServiceLeaseContainer: default,
monitoredContainerFeedCreator: feedCreator,
changeFeedEstimatorRequestOptions: default);

await remainingWorkEstimator.ReadNextAsync(default);
}

[TestMethod]
public void ExtractLsnFromSessionToken_ShouldParseOldSessionToken()
{
Expand Down
Loading

0 comments on commit 6a38b2d

Please sign in to comment.