diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs
index 18b0e790ba..aa455969ab 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs
@@ -85,37 +85,33 @@ public void ChangeFeedModeSwitchingCheck(
DocumentServiceLease documentServiceLease = documentServiceLeases[0];
- // Mode attribute exists on lease document, but it is not set. legacy is always LatestVersion because
- // AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are
- // AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, an exception should thrown.
- // If the ChangeFeedProcessor mode is not the mode in the lease document, an exception should be thrown.
-
- bool shouldThrowException = this.VerifyChangeFeedProcessorMode(
- changeFeedMode:
- string.IsNullOrEmpty(documentServiceLease.Mode)
- ? ChangeFeedMode.LatestVersion
- : changeFeedLeaseOptionsMode,
- leaseChangeFeedMode: documentServiceLease.Mode,
- normalizedProcessorChangeFeedMode: out string normalizedProcessorChangeFeedMode);
-
- // If shouldThrowException is true, throw the exception.
-
- if (shouldThrowException)
- {
- throw new ArgumentException(message: $"Switching {nameof(ChangeFeedMode)} {documentServiceLease.Mode} to {normalizedProcessorChangeFeedMode} is not allowed.");
- }
+ this.VerifyChangeFeedProcessorMode(
+ changeFeedMode: changeFeedLeaseOptionsMode,
+ leaseChangeFeedMode: documentServiceLease.Mode);
}
- private bool VerifyChangeFeedProcessorMode(
+ ///
+ /// Mode attribute exists on lease document, but it is not set. Legacy is always LatestVersion/IncrementalFeed
+ /// because AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are
+ /// AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, an exception should thrown.
+ /// If the ChangeFeedProcessor mode is not the mode in the lease document, an exception should be thrown.
+ ///
+ /// The current change feed mode.
+ /// The change feed mode on the lease document.
+ private void VerifyChangeFeedProcessorMode(
ChangeFeedMode changeFeedMode,
- string leaseChangeFeedMode,
- out string normalizedProcessorChangeFeedMode)
+ string leaseChangeFeedMode)
{
- normalizedProcessorChangeFeedMode = changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes
+ leaseChangeFeedMode ??= HttpConstants.A_IMHeaderValues.IncrementalFeed;
+
+ string normalizedProcessorChangeFeedMode = changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes
? HttpConstants.A_IMHeaderValues.FullFidelityFeed
: HttpConstants.A_IMHeaderValues.IncrementalFeed;
- return string.Compare(leaseChangeFeedMode, normalizedProcessorChangeFeedMode, StringComparison.OrdinalIgnoreCase) != 0;
+ if (string.Compare(leaseChangeFeedMode, normalizedProcessorChangeFeedMode, StringComparison.OrdinalIgnoreCase) != 0)
+ {
+ throw new ArgumentException(message: $"Switching {nameof(ChangeFeedMode)} {leaseChangeFeedMode} to {normalizedProcessorChangeFeedMode} is not allowed.");
+ }
}
}
}
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs
index 5a31025e71..7217993fd5 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs
@@ -6,11 +6,12 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
{
using System;
using System.Collections.Generic;
- using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+ using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.VisualStudio.TestTools.UnitTesting;
+ using Newtonsoft.Json.Linq;
[TestClass]
[TestCategory("ChangeFeedProcessor")]
@@ -107,8 +108,6 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
Assert.IsTrue(condition: createChange.Metadata.Lsn < replaceChange.Metadata.Lsn, message: "The create operation must happen before the replace operation.");
Assert.IsTrue(condition: createChange.Metadata.Lsn < replaceChange.Metadata.Lsn, message: "The replace operation must happen before the delete operation.");
- Debug.WriteLine("Assertions completed.");
-
return Task.CompletedTask;
})
.WithInstanceName(Guid.NewGuid().ToString())
@@ -117,9 +116,6 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
{
exception = error.InnerException;
- Debug.WriteLine("WithErrorNotification");
- Debug.WriteLine(error.ToString());
-
return Task.CompletedTask;
})
.Build();
@@ -176,11 +172,46 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
leaseContainer: this.LeaseContainer,
allDocsProcessed: allDocsProcessed));
- Debug.WriteLine(exception.ToString());
-
Assert.AreEqual(expected: "Switching ChangeFeedMode Incremental Feed to Full-Fidelity Feed is not allowed.", actual: exception.Message);
+ }
+
+
+ ///
+ /// This is based on an issue located at .
+ ///
+ [TestMethod]
+ [Owner("philipthomas-MSFT")]
+ [Description("Scenario: For Legacy lease documents with no Mode property, When ChangeFeedMode on ChangeFeedProcessor, switches from LatestVersion to AllVersionsAndDeletes," +
+ "an exception is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")]
+ [DataRow(false)]
+ [DataRow(true)]
+ public async Task WhenLegacyLatestVersionSwitchToAllVersionsAndDeletesExpectsAexceptionTestAsync(bool withStartFromBeginning)
+ {
+ ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.LatestVersion);
+ ManualResetEvent allDocsProcessed = new(false);
+
+ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithLatestVersionAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed,
+ withStartFromBeginning: withStartFromBeginning);
+
+ // Read lease documents, remove the Mode, and update the lease documents, so that it mimics a legacy lease document.
+
+ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .RevertLeaseDocumentsToLegacyWithNoMode(
+ leaseContainer: this.LeaseContainer,
+ leaseDocumentCount: 2);
+
+ ArgumentException exception = await Assert.ThrowsExceptionAsync(
+ () => GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed));
- Debug.WriteLine("Assertions completed.");
+ Assert.AreEqual(expected: "Switching ChangeFeedMode Incremental Feed to Full-Fidelity Feed is not allowed.", actual: exception.Message);
}
///
@@ -211,11 +242,7 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
allDocsProcessed: allDocsProcessed,
withStartFromBeginning: withStartFromBeginning));
- Debug.WriteLine(exception.ToString());
-
Assert.AreEqual(expected: "Switching ChangeFeedMode Full-Fidelity Feed to Incremental Feed is not allowed.", actual: exception.Message);
-
- Debug.WriteLine("Assertions completed.");
}
///
@@ -243,8 +270,6 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
monitoredContainer: monitoredContainer,
leaseContainer: this.LeaseContainer,
allDocsProcessed: allDocsProcessed);
-
- Debug.WriteLine("No exceptions occurred.");
}
catch
{
@@ -281,8 +306,6 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
leaseContainer: this.LeaseContainer,
allDocsProcessed: allDocsProcessed,
withStartFromBeginning: withStartFromBeginning);
-
- Debug.WriteLine("No exceptions occurred.");
}
catch
{
@@ -290,6 +313,80 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
}
}
+ ///
+ /// This is based on an issue located at .
+ ///
+ [TestMethod]
+ [Owner("philipthomas-MSFT")]
+ [Description("Scenario: For Legacy lease documents with no Mode property, When ChangeFeedMode on ChangeFeedProcessor " +
+ "does not switch, LatestVersion, no exception is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")]
+ [DataRow(false)]
+ [DataRow(true)]
+ public async Task WhenLegacyNoSwitchLatestVersionDoesNotExpectAnExceptionTestAsync(bool withStartFromBeginning)
+ {
+ ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.LatestVersion);
+ ManualResetEvent allDocsProcessed = new(false);
+
+ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithLatestVersionAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed,
+ withStartFromBeginning: withStartFromBeginning);
+
+ // Read lease documents, remove the Mode, and update the lease documents, so that it mimics a legacy lease document.
+
+ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .RevertLeaseDocumentsToLegacyWithNoMode(
+ leaseContainer: this.LeaseContainer,
+ leaseDocumentCount: 2);
+
+ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithLatestVersionAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed,
+ withStartFromBeginning: withStartFromBeginning);
+ }
+
+ private static async Task RevertLeaseDocumentsToLegacyWithNoMode(
+ Container leaseContainer,
+ int leaseDocumentCount)
+ {
+ FeedIterator iterator = leaseContainer.GetItemQueryStreamIterator(
+ queryText: "SELECT * FROM c",
+ continuationToken: null);
+
+ List leases = new List();
+ while (iterator.HasMoreResults)
+ {
+ using (ResponseMessage responseMessage = await iterator.ReadNextAsync().ConfigureAwait(false))
+ {
+ responseMessage.EnsureSuccessStatusCode();
+ leases.AddRange(CosmosFeedResponseSerializer.FromFeedResponseStream(
+ serializerCore: CosmosContainerExtensions.DefaultJsonSerializer,
+ streamWithServiceEnvelope: responseMessage.Content));
+ }
+ }
+
+ int counter = 0;
+
+ foreach (JObject lease in leases)
+ {
+ if (!lease.ContainsKey("Mode"))
+ {
+ continue;
+ }
+
+ counter++;
+ lease.Remove("Mode");
+
+ _ = await leaseContainer.UpsertItemAsync(item: lease);
+ }
+
+ Assert.AreEqual(expected: leaseDocumentCount, actual: counter);
+ }
+
private static async Task BuildChangeFeedProcessorWithLatestVersionAsync(
ContainerInternal monitoredContainer,
Container leaseContainer,
@@ -307,9 +404,6 @@ private static async Task BuildChangeFeedProcessorWithLatestVersionAsync(
{
exception = error.InnerException;
- Debug.WriteLine("WithErrorNotification");
- Debug.WriteLine(error.ToString());
-
return Task.CompletedTask;
});
@@ -318,7 +412,6 @@ private static async Task BuildChangeFeedProcessorWithLatestVersionAsync(
processorBuilder.WithStartFromBeginning();
}
-
ChangeFeedProcessor processor = processorBuilder.Build();
Interlocked.Exchange(ref latestVersionProcessorAtomic, processor);
@@ -349,9 +442,6 @@ private static async Task BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync
{
exception = error.InnerException;
- Debug.WriteLine("WithErrorNotification");
- Debug.WriteLine(error.ToString());
-
return Task.FromResult(exception);
});
@@ -376,8 +466,6 @@ private async Task CreateMonitoredContainer(ChangeFeedMode ch
if (changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes)
{
- Debug.WriteLine($"{nameof(properties.ChangeFeedPolicy.FullFidelityRetention)} initialized.");
-
properties.ChangeFeedPolicy.FullFidelityRetention = TimeSpan.FromMinutes(5);
}