Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed Estimator: Fixes exception propagation #2392

Merged
merged 4 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using Microsoft.Azure.Cosmos.ChangeFeed;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;

internal sealed class ChangeFeedEstimatorCore : ChangeFeedEstimator
{
private readonly string processorName;
private readonly ContainerInternal monitoredContainer;
private readonly ContainerInternal leaseContainer;
private readonly DocumentServiceLeaseContainer documentServiceLeaseContainer;

public ChangeFeedEstimatorCore(
string processorName,
ContainerInternal monitoredContainer,
ContainerInternal leaseContainer)
ContainerInternal leaseContainer,
DocumentServiceLeaseContainer documentServiceLeaseContainer)
{
this.processorName = processorName ?? throw new ArgumentNullException(nameof(processorName));
this.leaseContainer = leaseContainer ?? throw new ArgumentNullException(nameof(leaseContainer));
this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(monitoredContainer));
this.documentServiceLeaseContainer = documentServiceLeaseContainer;
}

public override FeedIterator<ChangeFeedProcessorState> GetCurrentStateIterator(ChangeFeedEstimatorRequestOptions changeFeedEstimatorRequestOptions = null)
Expand All @@ -29,6 +33,7 @@ public override FeedIterator<ChangeFeedProcessorState> GetCurrentStateIterator(C
this.processorName,
this.monitoredContainer,
this.leaseContainer,
this.documentServiceLeaseContainer,
changeFeedEstimatorRequestOptions);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using System.Globalization;
using System.Linq;
using System.Net;
using System.Security.Permissions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
Expand Down Expand Up @@ -45,11 +44,13 @@ public ChangeFeedEstimatorIterator(
string processorName,
ContainerInternal monitoredContainer,
ContainerInternal leaseContainer,
DocumentServiceLeaseContainer documentServiceLeaseContainer,
ChangeFeedEstimatorRequestOptions changeFeedEstimatorRequestOptions)
: this(
processorName,
monitoredContainer,
leaseContainer,
documentServiceLeaseContainer,
changeFeedEstimatorRequestOptions,
(DocumentServiceLease lease, string continuationToken, bool startFromBeginning) => ChangeFeedPartitionKeyResultSetIteratorCore.Create(
lease: lease,
Expand All @@ -74,16 +75,17 @@ internal ChangeFeedEstimatorIterator(
processorName: string.Empty,
monitoredContainer: monitoredContainer,
leaseContainer: leaseContainer,
documentServiceLeaseContainer: documentServiceLeaseContainer,
changeFeedEstimatorRequestOptions: changeFeedEstimatorRequestOptions,
monitoredContainerFeedCreator: monitoredContainerFeedCreator)
{
this.documentServiceLeaseContainer = documentServiceLeaseContainer;
}

private ChangeFeedEstimatorIterator(
string processorName,
ContainerInternal monitoredContainer,
ContainerInternal leaseContainer,
DocumentServiceLeaseContainer documentServiceLeaseContainer,
ChangeFeedEstimatorRequestOptions changeFeedEstimatorRequestOptions,
Func<DocumentServiceLease, string, bool, FeedIterator> monitoredContainerFeedCreator)
{
Expand All @@ -102,6 +104,7 @@ private ChangeFeedEstimatorIterator(
this.hasMoreResults = true;

this.monitoredContainerFeedCreator = monitoredContainerFeedCreator;
this.documentServiceLeaseContainer = documentServiceLeaseContainer;
}

public override bool HasMoreResults => this.hasMoreResults;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Core.Trace;
using static Microsoft.Azure.Cosmos.Container;

Expand All @@ -18,6 +19,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
/// </summary>
internal sealed class ChangeFeedEstimatorRunner : ChangeFeedProcessor
{
private const string EstimatorDefaultHostName = "Estimator";
private readonly ChangesEstimationHandler initialEstimateDelegate;
private readonly TimeSpan? estimatorPeriod;
private CancellationTokenSource shutdownCts;
Expand All @@ -26,6 +28,7 @@ internal sealed class ChangeFeedEstimatorRunner : ChangeFeedProcessor
private FeedEstimatorRunner feedEstimatorRunner;
private ChangeFeedEstimator remainingWorkEstimator;
private ChangeFeedLeaseOptions changeFeedLeaseOptions;
private DocumentServiceLeaseContainer documentServiceLeaseContainer;
private bool initialized = false;

private Task runAsync;
Expand Down Expand Up @@ -65,45 +68,48 @@ public void ApplyBuildConfiguration(
ChangeFeedProcessorOptions changeFeedProcessorOptions,
ContainerInternal monitoredContainer)
{
if (monitoredContainer == null) throw new ArgumentNullException(nameof(monitoredContainer));
if (leaseContainer == null && customDocumentServiceLeaseStoreManager == null) throw new ArgumentNullException(nameof(leaseContainer));

this.leaseContainer = leaseContainer;
this.monitoredContainer = monitoredContainer;
this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(monitoredContainer));
this.changeFeedLeaseOptions = changeFeedLeaseOptions;
this.documentServiceLeaseContainer = customDocumentServiceLeaseStoreManager?.LeaseContainer;
}

public override Task StartAsync()
public override async Task StartAsync()
{
if (!this.initialized)
{
await this.InitializeLeaseStoreAsync();
this.feedEstimatorRunner = this.BuildFeedEstimatorRunner();
this.initialized = true;
}

this.shutdownCts = new CancellationTokenSource();
DefaultTrace.TraceInformation("Starting estimator...");
this.runAsync = this.feedEstimatorRunner.RunAsync(this.shutdownCts.Token);
return Task.CompletedTask;
}

public override async Task StopAsync()
{
DefaultTrace.TraceInformation("Stopping estimator...");
this.shutdownCts.Cancel();
try
if (this.initialized)
{
await this.runAsync.ConfigureAwait(false);
}
catch (TaskCanceledException ex)
{
// Expected during shutdown
Cosmos.Extensions.TraceException(ex);
}
catch (OperationCanceledException ex)
{
// Expected during shutdown
Cosmos.Extensions.TraceException(ex);
this.shutdownCts.Cancel();
try
{
await this.runAsync.ConfigureAwait(false);
}
catch (TaskCanceledException ex)
{
// Expected during shutdown
Cosmos.Extensions.TraceException(ex);
}
catch (OperationCanceledException ex)
{
// Expected during shutdown
Cosmos.Extensions.TraceException(ex);
}
}
}

Expand All @@ -114,10 +120,27 @@ private FeedEstimatorRunner BuildFeedEstimatorRunner()
this.remainingWorkEstimator = new ChangeFeedEstimatorCore(
this.changeFeedLeaseOptions.LeasePrefix,
this.monitoredContainer,
this.leaseContainer);
this.leaseContainer,
this.documentServiceLeaseContainer);
}

return new FeedEstimatorRunner(this.initialEstimateDelegate, this.remainingWorkEstimator, this.estimatorPeriod);
}

private async Task InitializeLeaseStoreAsync()
{
if (this.documentServiceLeaseContainer == null)
{
string monitoredContainerAndDatabaseRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync(default);
string leasePrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredContainerAndDatabaseRid);
DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(
monitoredContainer: this.monitoredContainer,
leaseContainer: this.leaseContainer,
leaseContainerPrefix: leasePrefix,
instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName);

this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ internal sealed class FeedEstimatorRunner
private readonly ChangeFeedEstimator remainingWorkEstimator;
private readonly TimeSpan monitoringDelay;
private readonly ChangesEstimationHandler dispatchEstimation;
private readonly Func<CancellationToken, Task> estimateAndDispatchAsync;

public FeedEstimatorRunner(
ChangesEstimationHandler dispatchEstimation,
ChangeFeedEstimator remainingWorkEstimator,
TimeSpan? estimationPeriod = null)
{
this.dispatchEstimation = dispatchEstimation;
this.estimateAndDispatchAsync = this.EstimateAsync;
this.remainingWorkEstimator = remainingWorkEstimator;
this.monitoringDelay = estimationPeriod ?? FeedEstimatorRunner.defaultMonitoringDelay;
}
Expand All @@ -39,12 +37,14 @@ public async Task RunAsync(CancellationToken cancellationToken)
{
try
{
await this.estimateAndDispatchAsync(cancellationToken);
await this.EstimateAsync(cancellationToken);
}
catch (TaskCanceledException canceledException)
{
if (cancellationToken.IsCancellationRequested)
{
throw;
}

Extensions.TraceException(new Exception("exception within estimator", canceledException));

Expand All @@ -57,9 +57,9 @@ public async Task RunAsync(CancellationToken cancellationToken)

private async Task EstimateAsync(CancellationToken cancellationToken)
{
long estimation = await this.GetEstimatedRemainingWorkAsync(cancellationToken).ConfigureAwait(false);
try
{
long estimation = await this.GetEstimatedRemainingWorkAsync(cancellationToken).ConfigureAwait(false);
await this.dispatchEstimation(estimation, cancellationToken).ConfigureAwait(false);
}
catch (Exception userException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Tracing;

/// <summary>
/// Provides flexible way to build lease manager constructor parameters.
Expand All @@ -20,8 +21,7 @@ public static async Task<DocumentServiceLeaseStoreManager> InitializeAsync(
string leaseContainerPrefix,
string instanceName)
{
ContainerResponse cosmosContainerResponse = await leaseContainer.ReadContainerAsync().ConfigureAwait(false);
ContainerProperties containerProperties = cosmosContainerResponse.Resource;
ContainerProperties containerProperties = await leaseContainer.GetCachedContainerPropertiesAsync(forceRefresh: false, NoOpTrace.Singleton, cancellationToken: default);

bool isPartitioned =
containerProperties.PartitionKey != null &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,8 @@ public override ChangeFeedEstimator GetChangeFeedEstimator(
return new ChangeFeedEstimatorCore(
processorName: processorName,
monitoredContainer: this,
leaseContainer: (ContainerInternal)leaseContainer);
leaseContainer: (ContainerInternal)leaseContainer,
documentServiceLeaseContainer: default);
}

public override TransactionalBatch CreateTransactionalBatch(PartitionKey partitionKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -54,6 +55,20 @@ public async Task WhenNoLeasesExist()
Assert.AreEqual(1, receivedEstimation);
}

[TestMethod]
public async Task StartAsync_ShouldThrowIfContainerDoesNotExist()
{
ChangeFeedProcessor estimator = this.cosmosClient.GetContainer(this.database.Id, "DoesNotExist")
.GetChangeFeedEstimatorBuilder("test", (long estimation, CancellationToken token) =>
{
return Task.CompletedTask;
}, TimeSpan.FromSeconds(1))
.WithLeaseContainer(this.LeaseContainer).Build();

CosmosException exception = await Assert.ThrowsExceptionAsync<CosmosException>(() => estimator.StartAsync());
Assert.AreEqual(HttpStatusCode.NotFound, exception.StatusCode);
}

[TestMethod]
public async Task WhenNoLeasesExist_Pull()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.Tests;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Newtonsoft.Json.Linq;
Expand Down Expand Up @@ -322,6 +324,45 @@ FeedIterator feedCreator(DocumentServiceLease lease, string continuationToken, b
Assert.AreEqual(leaseToken, remainingLeaseWork.LeaseToken);
}

[TestMethod]
public async Task ShouldInitializeDocumentLeaseContainer()
{
static FeedIterator feedCreator(DocumentServiceLease lease, string continuationToken, bool startFromBeginning)
{
return Mock.Of<FeedIterator>();
}

Mock<CosmosClientContext> mockedContext = new Mock<CosmosClientContext>(MockBehavior.Strict);
mockedContext.Setup(c => c.Client).Returns(MockCosmosUtil.CreateMockCosmosClient());

string databaseRid = Guid.NewGuid().ToString();
Mock<DatabaseInternal> mockedMonitoredDatabase = new Mock<DatabaseInternal>(MockBehavior.Strict);
mockedMonitoredDatabase.Setup(c => c.GetRIDAsync(It.IsAny<CancellationToken>())).ReturnsAsync(databaseRid);

string monitoredContainerRid = Guid.NewGuid().ToString();
Mock<ContainerInternal> mockedMonitoredContainer = new Mock<ContainerInternal>(MockBehavior.Strict);
mockedMonitoredContainer.Setup(c => c.GetCachedRIDAsync(It.IsAny<bool>(), It.IsAny<ITrace>(), It.IsAny<CancellationToken>())).ReturnsAsync(monitoredContainerRid);
mockedMonitoredContainer.Setup(c => c.Database).Returns(mockedMonitoredDatabase.Object);
mockedMonitoredContainer.Setup(c => c.ClientContext).Returns(mockedContext.Object);

Mock<FeedIterator> leaseFeedIterator = new Mock<FeedIterator>();
leaseFeedIterator.Setup(i => i.HasMoreResults).Returns(false);

Mock<ContainerInternal>mockedLeaseContainer = new Mock<ContainerInternal>(MockBehavior.Strict);
mockedLeaseContainer.Setup(c => c.GetCachedContainerPropertiesAsync(It.Is<bool>(b => b == false), It.IsAny<ITrace>(), It.IsAny<CancellationToken>())).ReturnsAsync(new ContainerProperties());
mockedLeaseContainer.Setup(c => c.GetItemQueryStreamIterator(It.Is<string>(queryText => queryText.Contains($"{databaseRid}_{monitoredContainerRid}")), It.Is<string>(continuation => continuation == null), It.IsAny<QueryRequestOptions>()))
.Returns(leaseFeedIterator.Object);

ChangeFeedEstimatorIterator remainingWorkEstimator = new ChangeFeedEstimatorIterator(
mockedMonitoredContainer.Object,
mockedLeaseContainer.Object,
documentServiceLeaseContainer: default,
monitoredContainerFeedCreator: feedCreator,
changeFeedEstimatorRequestOptions: default);

await remainingWorkEstimator.ReadNextAsync(default);
}

[TestMethod]
public void ExtractLsnFromSessionToken_ShouldParseOldSessionToken()
{
Expand Down
Loading