diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs index 88595fc665..95200f48ee 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs @@ -20,6 +20,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement [JsonConverter(typeof(DocumentServiceLeaseConverter))] internal abstract class DocumentServiceLease { + public const string IdPropertyName = "id"; + public const string LeasePartitionKeyPropertyName = "partitionKey"; + /// /// Gets the processing distribution unit identifier. /// @@ -53,6 +56,11 @@ internal abstract class DocumentServiceLease /// public abstract string Id { get; } + /// + /// Gets the lease PartitionKey. + /// + public abstract string PartitionKey { get; } + /// /// Gets the Concurrency Token. /// diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCheckpointerCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCheckpointerCore.cs index 3aaec330be..93c899b920 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCheckpointerCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCheckpointerCore.cs @@ -37,7 +37,7 @@ public override async Task CheckpointAsync(DocumentService return await this.leaseUpdater.UpdateLeaseAsync( lease, lease.Id, - this.requestOptionsFactory.GetPartitionKey(lease.Id), + this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey), serverLease => { if (serverLease.Owner != lease.Owner) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs index 3ae717faf4..65e4e6e725 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs @@ -23,9 +23,15 @@ public DocumentServiceLeaseCore() { } - [JsonProperty("id")] + [JsonProperty(IdPropertyName)] public string LeaseId { get; set; } + [JsonProperty(LeasePartitionKeyPropertyName, NullValueHandling = NullValueHandling.Ignore)] + public string LeasePartitionKey { get; set; } + + [JsonIgnore] + public override string PartitionKey => this.LeasePartitionKey; + [JsonProperty("version")] public DocumentServiceLeaseVersion Version => DocumentServiceLeaseVersion.PartitionKeyRangeBasedLease; diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs index dceddbcd8c..55c4105c3a 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs @@ -20,9 +20,15 @@ public DocumentServiceLeaseCoreEpk() { } - [JsonProperty("id")] + [JsonProperty(IdPropertyName)] public string LeaseId { get; set; } + [JsonProperty(LeasePartitionKeyPropertyName, NullValueHandling = NullValueHandling.Ignore)] + public string LeasePartitionKey { get; set; } + + [JsonIgnore] + public override string PartitionKey => this.LeasePartitionKey; + [JsonProperty("version")] public DocumentServiceLeaseVersion Version => DocumentServiceLeaseVersion.EPKRangeBasedLease; diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs index 38fc4b6e36..6f3604ba95 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs @@ -82,7 +82,7 @@ public override async Task AcquireAsync(DocumentServiceLea return await this.leaseUpdater.UpdateLeaseAsync( lease, lease.Id, - this.requestOptionsFactory.GetPartitionKey(lease.Id), + this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey), serverLease => { if (serverLease.Owner != oldOwner) @@ -115,6 +115,8 @@ public override Task CreateLeaseIfNotExistAsync( FeedRange = new FeedRangeEpk(partitionKeyRange.ToRange()) }; + this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString()); + return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease); } @@ -137,6 +139,8 @@ public override Task CreateLeaseIfNotExistAsync( FeedRange = feedRange }; + this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString()); + return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease); } @@ -155,7 +159,7 @@ public override async Task ReleaseAsync(DocumentServiceLease lease) await this.leaseUpdater.UpdateLeaseAsync( refreshedLease, refreshedLease.Id, - this.requestOptionsFactory.GetPartitionKey(lease.Id), + this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey), serverLease => { if (serverLease.Owner != lease.Owner) @@ -176,7 +180,7 @@ public override async Task DeleteAsync(DocumentServiceLease lease) } await this.leaseContainer.TryDeleteItemAsync( - this.requestOptionsFactory.GetPartitionKey(lease.Id), + this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey), lease.Id).ConfigureAwait(false); } @@ -197,7 +201,7 @@ public override async Task RenewAsync(DocumentServiceLease return await this.leaseUpdater.UpdateLeaseAsync( refreshedLease, refreshedLease.Id, - this.requestOptionsFactory.GetPartitionKey(lease.Id), + this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey), serverLease => { if (serverLease.Owner != lease.Owner) @@ -222,7 +226,7 @@ public override async Task UpdatePropertiesAsync(DocumentS return await this.leaseUpdater.UpdateLeaseAsync( lease, lease.Id, - this.requestOptionsFactory.GetPartitionKey(lease.Id), + this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey), serverLease => { if (serverLease.Owner != lease.Owner) @@ -238,7 +242,7 @@ public override async Task UpdatePropertiesAsync(DocumentS private async Task TryCreateDocumentServiceLeaseAsync(DocumentServiceLease documentServiceLease) { bool created = await this.leaseContainer.TryCreateItemAsync( - this.requestOptionsFactory.GetPartitionKey(documentServiceLease.Id), + this.requestOptionsFactory.GetPartitionKey(documentServiceLease.Id, documentServiceLease.PartitionKey), documentServiceLease).ConfigureAwait(false) != null; if (created) { @@ -252,7 +256,7 @@ private async Task TryCreateDocumentServiceLeaseAsync(Docu private async Task TryGetLeaseAsync(DocumentServiceLease lease) { - return await this.leaseContainer.TryGetItemAsync(this.requestOptionsFactory.GetPartitionKey(lease.Id), lease.Id).ConfigureAwait(false); + return await this.leaseContainer.TryGetItemAsync(this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey), lease.Id).ConfigureAwait(false); } private string GetDocumentId(string partitionId) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreCosmos.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreCosmos.cs index 0b4a3e6123..f3e54bad8e 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreCosmos.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreCosmos.cs @@ -35,19 +35,21 @@ public override async Task IsInitializedAsync() { string markerDocId = this.GetStoreMarkerName(); - return await this.container.ItemExistsAsync(this.requestOptionsFactory.GetPartitionKey(markerDocId), markerDocId).ConfigureAwait(false); + return await this.container.ItemExistsAsync(this.requestOptionsFactory.GetPartitionKey(markerDocId, markerDocId), markerDocId).ConfigureAwait(false); } public override async Task MarkInitializedAsync() { string markerDocId = this.GetStoreMarkerName(); - dynamic containerDocument = new { id = markerDocId }; + InitializedDocument containerDocument = new InitializedDocument { Id = markerDocId }; + + this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => containerDocument.PartitionKey = pk, markerDocId); using (Stream itemStream = CosmosContainerExtensions.DefaultJsonSerializer.ToStream(containerDocument)) { using (ResponseMessage responseMessage = await this.container.CreateItemStreamAsync( itemStream, - this.requestOptionsFactory.GetPartitionKey(markerDocId)).ConfigureAwait(false)) + this.requestOptionsFactory.GetPartitionKey(markerDocId, markerDocId)).ConfigureAwait(false)) { responseMessage.EnsureSuccessStatusCode(); } @@ -58,8 +60,10 @@ public override async Task AcquireInitializationLockAsync(TimeSpan lockTim { string lockId = this.GetStoreLockName(); LockDocument containerDocument = new LockDocument() { Id = lockId, TimeToLive = (int)lockTime.TotalSeconds }; + this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => containerDocument.PartitionKey = pk, lockId); + ItemResponse document = await this.container.TryCreateItemAsync( - this.requestOptionsFactory.GetPartitionKey(lockId), + this.requestOptionsFactory.GetPartitionKey(lockId, lockId), containerDocument).ConfigureAwait(false); if (document != null) @@ -74,13 +78,14 @@ public override async Task AcquireInitializationLockAsync(TimeSpan lockTim public override async Task ReleaseInitializationLockAsync() { string lockId = this.GetStoreLockName(); + ItemRequestOptions requestOptions = new ItemRequestOptions() { IfMatchEtag = this.lockETag, }; bool deleted = await this.container.TryDeleteItemAsync( - this.requestOptionsFactory.GetPartitionKey(lockId), + this.requestOptionsFactory.GetPartitionKey(lockId, lockId), lockId, requestOptions).ConfigureAwait(false); @@ -108,8 +113,20 @@ private class LockDocument [JsonProperty("id")] public string Id { get; set; } + [JsonProperty("partitionKey", NullValueHandling = NullValueHandling.Ignore)] + public string PartitionKey { get; set; } + [JsonProperty("ttl")] public int TimeToLive { get; set; } } + + private class InitializedDocument + { + [JsonProperty("id")] + public string Id { get; set; } + + [JsonProperty("partitionKey", NullValueHandling = NullValueHandling.Ignore)] + public string PartitionKey { get; set; } + } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs index f1b490799c..70358bca22 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs @@ -15,6 +15,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement /// internal class DocumentServiceLeaseStoreManagerBuilder { + private static readonly string IdPkPathName = "/" + DocumentServiceLease.IdPropertyName; + private static readonly string PartitionKeyPkPathName = "/" + DocumentServiceLease.LeasePartitionKeyPropertyName; + public static async Task InitializeAsync( ContainerInternal monitoredContainer, ContainerInternal leaseContainer, @@ -30,14 +33,25 @@ public static async Task InitializeAsync( bool isMigratedFixed = containerProperties.PartitionKey?.IsSystemKey == true; if (isPartitioned && !isMigratedFixed - && (containerProperties.PartitionKey.Paths.Count != 1 || containerProperties.PartitionKey.Paths[0] != "/id")) + && (containerProperties.PartitionKey.Paths.Count != 1 + || !(containerProperties.PartitionKey.Paths[0] == IdPkPathName + || containerProperties.PartitionKey.Paths[0] == PartitionKeyPkPathName))) + { + throw new ArgumentException($"The lease container, if partitioned, must have partition key equal to {IdPkPathName} or {PartitionKeyPkPathName}."); + } + + RequestOptionsFactory requestOptionsFactory; + if (isPartitioned && !isMigratedFixed) { - throw new ArgumentException("The lease collection, if partitioned, must have partition key equal to id."); + requestOptionsFactory = containerProperties.PartitionKey.Paths[0] != IdPkPathName ? + (RequestOptionsFactory)new PartitionedByPartitionKeyCollectionRequestOptionsFactory() + : (RequestOptionsFactory)new PartitionedByIdCollectionRequestOptionsFactory(); } + else + { + requestOptionsFactory = (RequestOptionsFactory)new SinglePartitionRequestOptionsFactory(); - RequestOptionsFactory requestOptionsFactory = isPartitioned && !isMigratedFixed ? - (RequestOptionsFactory)new PartitionedByIdCollectionRequestOptionsFactory() : - (RequestOptionsFactory)new SinglePartitionRequestOptionsFactory(); + } DocumentServiceLeaseStoreManagerBuilder leaseStoreManagerBuilder = new DocumentServiceLeaseStoreManagerBuilder() .WithLeasePrefix(leaseContainerPrefix) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/PartitionedByIdCollectionRequestOptionsFactory.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/PartitionedByIdCollectionRequestOptionsFactory.cs index a4b54f1a35..dadc1c4a25 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/PartitionedByIdCollectionRequestOptionsFactory.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/PartitionedByIdCollectionRequestOptionsFactory.cs @@ -4,6 +4,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement { + using System; using Microsoft.Azure.Cosmos; /// @@ -11,8 +12,10 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement /// internal sealed class PartitionedByIdCollectionRequestOptionsFactory : RequestOptionsFactory { - public override PartitionKey GetPartitionKey(string itemId) => new PartitionKey(itemId); + public override PartitionKey GetPartitionKey(string itemId, string partitionKey) => new PartitionKey(itemId); - public override FeedOptions CreateFeedOptions() => new FeedOptions { EnableCrossPartitionQuery = true }; + public override void AddPartitionKeyIfNeeded(Action partitionKeySetter, string partitionKey) + { + } } } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/PartitionedByPartitionKeyCollectionRequestOptionsFactory.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/PartitionedByPartitionKeyCollectionRequestOptionsFactory.cs new file mode 100644 index 0000000000..26031bfc5c --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/PartitionedByPartitionKeyCollectionRequestOptionsFactory.cs @@ -0,0 +1,22 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement +{ + using System; + using Microsoft.Azure.Cosmos; + + /// + /// Used to create request options for partitioned lease collections, when partition key is defined as /partitionKey. + /// + internal sealed class PartitionedByPartitionKeyCollectionRequestOptionsFactory : RequestOptionsFactory + { + public override PartitionKey GetPartitionKey(string itemId, string partitionKey) => new PartitionKey(partitionKey); + + public override void AddPartitionKeyIfNeeded(Action partitionKeySetter, string partitionKey) + { + partitionKeySetter(partitionKey); + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/RequestOptionsFactory.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/RequestOptionsFactory.cs index 2f91154001..fb63bfd9ee 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/RequestOptionsFactory.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/RequestOptionsFactory.cs @@ -4,6 +4,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement { + using System; using Microsoft.Azure.Cosmos; /// @@ -11,8 +12,8 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement /// internal abstract class RequestOptionsFactory { - public abstract PartitionKey GetPartitionKey(string itemId); + public abstract PartitionKey GetPartitionKey(string itemId, string partitionKey); - public abstract FeedOptions CreateFeedOptions(); + public abstract void AddPartitionKeyIfNeeded(Action partitionKeySetter, string partitionKey); } } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/SinglePartitionRequestOptionsFactory.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/SinglePartitionRequestOptionsFactory.cs index f07c4cfae7..a57fc7bc1c 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/SinglePartitionRequestOptionsFactory.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/SinglePartitionRequestOptionsFactory.cs @@ -4,6 +4,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement { + using System; using Microsoft.Azure.Cosmos; /// @@ -11,8 +12,10 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement /// internal sealed class SinglePartitionRequestOptionsFactory : RequestOptionsFactory { - public override FeedOptions CreateFeedOptions() => null; + public override void AddPartitionKeyIfNeeded(Action partitionKeySetter, string partitionKey) + { + } - public override PartitionKey GetPartitionKey(string itemId) => PartitionKey.None; + public override PartitionKey GetPartitionKey(string itemId, string partitionKey) => PartitionKey.None; } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/BaseChangeFeedClientHelper.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/BaseChangeFeedClientHelper.cs index 36181d5f32..14c362ee90 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/BaseChangeFeedClientHelper.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/BaseChangeFeedClientHelper.cs @@ -14,13 +14,12 @@ public class BaseChangeFeedClientHelper : BaseCosmosClientHelper public Container LeaseContainer = null; - public async Task ChangeFeedTestInit() + public async Task ChangeFeedTestInit(string leaseContainerPk = "/id") { await base.TestInit(customizeClientBuilder: (builder) => builder.WithContentResponseOnWrite(false)); - string PartitionKey = "/id"; ContainerResponse response = await this.database.CreateContainerAsync( - new ContainerProperties(id: "leases", partitionKeyPath: PartitionKey), + new ContainerProperties(id: "leases", partitionKeyPath: leaseContainerPk), cancellationToken: this.cancellationToken); this.LeaseContainer = response; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GremlinSmokeTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GremlinSmokeTests.cs new file mode 100644 index 0000000000..dfe8c89bc9 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GremlinSmokeTests.cs @@ -0,0 +1,118 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed +{ + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + [TestCategory("ChangeFeed")] + public class GremlinSmokeTests : BaseChangeFeedClientHelper + { + private Container Container; + + [TestInitialize] + public async Task TestInitialize() + { + await base.ChangeFeedTestInit("/partitionKey"); + ContainerResponse response = await this.database.CreateContainerAsync( + new ContainerProperties(id: "monitored", partitionKeyPath: "/pk"), + cancellationToken: this.cancellationToken); + this.Container = response; + } + + [TestCleanup] + public async Task Cleanup() + { + await base.TestCleanup(); + } + + [TestMethod] + public async Task WritesTriggerDelegate_WithLeaseContainer() + { + ManualResetEvent allDocsProcessed = new ManualResetEvent(false); + + IEnumerable expectedIds = Enumerable.Range(0, 100); + List receivedIds = new List(); + ChangeFeedProcessor processor = this.Container + .GetChangeFeedProcessorBuilder("test", (IReadOnlyCollection docs, CancellationToken token) => + { + foreach (TestClass doc in docs) + { + receivedIds.Add(int.Parse(doc.id)); + } + + if (receivedIds.Count == 100) + { + allDocsProcessed.Set(); + } + + return Task.CompletedTask; + }) + .WithInstanceName("random") + .WithLeaseContainer(this.LeaseContainer).Build(); + + await processor.StartAsync(); + // Letting processor initialize + await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); + // Inserting documents + foreach (int id in expectedIds) + { + await this.Container.CreateItemAsync(new { id = id.ToString() }); + } + + // Waiting on all notifications to finish + bool isStartOk = allDocsProcessed.WaitOne(30 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); + await processor.StopAsync(); + Assert.IsTrue(isStartOk, "Timed out waiting for docs to process"); + // Verify that we maintain order + CollectionAssert.AreEqual(expectedIds.ToList(), receivedIds); + + } + + [TestMethod] + public async Task Schema_DefaultsToHavingPartitionKey() + { + ChangeFeedProcessor processor = this.Container + .GetChangeFeedProcessorBuilder("test", (IReadOnlyCollection docs, CancellationToken token) => Task.CompletedTask) + .WithInstanceName("random") + .WithLeaseContainer(this.LeaseContainer).Build(); + + await processor.StartAsync(); + // Letting processor initialize + await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); + + // Verify that leases have the partitionKey attribute + using FeedIterator iterator = this.LeaseContainer.GetItemQueryIterator(); + while (iterator.HasMoreResults) + { + FeedResponse page = await iterator.ReadNextAsync(); + foreach (dynamic lease in page) + { + string leaseId = lease.id; + Assert.IsNotNull(lease.partitionKey); + if (leaseId.Contains(".info") || leaseId.Contains(".lock")) + { + // These are the store initialization marks + continue; + } + + Assert.IsNotNull(lease.LeaseToken); + Assert.IsNull(lease.PartitionId); + } + } + + await processor.StopAsync(); + } + + public class TestClass + { + public string id { get; set; } + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/SmokeTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/SmokeTests.cs index 63802db6fe..f00d9055d8 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/SmokeTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/SmokeTests.cs @@ -39,6 +39,7 @@ public async Task Cleanup() [TestMethod] public async Task WritesTriggerDelegate_WithLeaseContainer() { + ManualResetEvent allDocsProcessed = new ManualResetEvent(false); IEnumerable expectedIds = Enumerable.Range(0, 100); List receivedIds = new List(); ChangeFeedProcessor processor = this.Container @@ -49,6 +50,11 @@ public async Task WritesTriggerDelegate_WithLeaseContainer() receivedIds.Add(int.Parse(doc.id)); } + if (receivedIds.Count == 100) + { + allDocsProcessed.Set(); + } + return Task.CompletedTask; }) .WithInstanceName("random") @@ -64,8 +70,9 @@ public async Task WritesTriggerDelegate_WithLeaseContainer() } // Waiting on all notifications to finish - await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedCleanupTime); + bool isStartOk = allDocsProcessed.WaitOne(30 * BaseChangeFeedClientHelper.ChangeFeedCleanupTime); await processor.StopAsync(); + Assert.IsTrue(isStartOk, "Timed out waiting for docs to process"); // Verify that we maintain order CollectionAssert.AreEqual(expectedIds.ToList(), receivedIds); } @@ -141,6 +148,7 @@ public async Task Schema_DefaultsToNoPartitionId() foreach (dynamic lease in page) { string leaseId = lease.id; + Assert.IsNull(lease.partitionKey); if (leaseId.Contains(".info") || leaseId.Contains(".lock")) { // These are the store initialization marks @@ -268,6 +276,7 @@ public async Task NotExistentLeaseContainer() [TestMethod] public async Task WritesTriggerDelegate_WithLeaseContainerWithDynamic() { + ManualResetEvent allDocsProcessed = new ManualResetEvent(false); IEnumerable expectedIds = Enumerable.Range(0, 100); List receivedIds = new List(); ChangeFeedProcessor processor = this.Container @@ -278,6 +287,11 @@ public async Task WritesTriggerDelegate_WithLeaseContainerWithDynamic() receivedIds.Add(int.Parse(doc.id.Value)); } + if (receivedIds.Count == 100) + { + allDocsProcessed.Set(); + } + return Task.CompletedTask; }) .WithInstanceName("random") @@ -293,8 +307,9 @@ public async Task WritesTriggerDelegate_WithLeaseContainerWithDynamic() } // Waiting on all notifications to finish - await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedCleanupTime); + bool isStartOk = allDocsProcessed.WaitOne(30 * BaseChangeFeedClientHelper.ChangeFeedCleanupTime); await processor.StopAsync(); + Assert.IsTrue(isStartOk, "Timed out waiting for docs to process"); // Verify that we maintain order CollectionAssert.AreEqual(expectedIds.ToList(), receivedIds); } @@ -302,6 +317,7 @@ public async Task WritesTriggerDelegate_WithLeaseContainerWithDynamic() [TestMethod] public async Task WritesTriggerDelegate_WithInMemoryContainer() { + ManualResetEvent allDocsProcessed = new ManualResetEvent(false); IEnumerable expectedIds = Enumerable.Range(0, 100); List receivedIds = new List(); ChangeFeedProcessor processor = this.Container @@ -312,6 +328,11 @@ public async Task WritesTriggerDelegate_WithInMemoryContainer() receivedIds.Add(int.Parse(doc.id)); } + if (receivedIds.Count == 100) + { + allDocsProcessed.Set(); + } + return Task.CompletedTask; }) .WithInstanceName("random") @@ -330,9 +351,9 @@ public async Task WritesTriggerDelegate_WithInMemoryContainer() } // Waiting on all notifications to finish - await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedCleanupTime); + bool isStartOk = allDocsProcessed.WaitOne(30 * BaseChangeFeedClientHelper.ChangeFeedCleanupTime); await processor.StopAsync(); - + Assert.IsTrue(isStartOk, "Timed out waiting for docs to process"); // Verify that we maintain order CollectionAssert.AreEqual(expectedIds.ToList(), receivedIds); } @@ -340,6 +361,7 @@ public async Task WritesTriggerDelegate_WithInMemoryContainer() [TestMethod] public async Task WritesTriggerDelegate_WithInMemoryContainerWithDynamic() { + ManualResetEvent allDocsProcessed = new ManualResetEvent(false); IEnumerable expectedIds = Enumerable.Range(0, 100); List receivedIds = new List(); ChangeFeedProcessor processor = this.Container @@ -350,6 +372,11 @@ public async Task WritesTriggerDelegate_WithInMemoryContainerWithDynamic() receivedIds.Add(int.Parse(doc.id.Value)); } + if (receivedIds.Count == 100) + { + allDocsProcessed.Set(); + } + return Task.CompletedTask; }) .WithInstanceName("random") @@ -365,8 +392,9 @@ public async Task WritesTriggerDelegate_WithInMemoryContainerWithDynamic() } // Waiting on all notifications to finish - await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedCleanupTime); + bool isStartOk = allDocsProcessed.WaitOne(30 * BaseChangeFeedClientHelper.ChangeFeedCleanupTime); await processor.StopAsync(); + Assert.IsTrue(isStartOk, "Timed out waiting for docs to process"); // Verify that we maintain order CollectionAssert.AreEqual(expectedIds.ToList(), receivedIds); } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseManagerCosmosTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseManagerCosmosTests.cs index 221940f201..545b9c4150 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseManagerCosmosTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseManagerCosmosTests.cs @@ -64,9 +64,13 @@ public async Task AcquireCompletes() Assert.AreEqual(options.HostName, afterAcquire.Owner); } - [TestMethod] - public async Task CreatesEPKBasedLease() + [DataTestMethod] + [DataRow(0, DisplayName = "Container with system PK")] + [DataRow(1, DisplayName = "Container with id PK")] + [DataRow(2, DisplayName = "Container with partitionKey PK")] + public async Task CreatesEPKBasedLease(int factoryType) { + RequestOptionsFactory requestOptionsFactory = GetRequestOptionsFactory(factoryType); string continuation = Guid.NewGuid().ToString(); DocumentServiceLeaseStoreManagerOptions options = new DocumentServiceLeaseStoreManagerOptions { @@ -89,7 +93,7 @@ public async Task CreatesEPKBasedLease() mockedContainer.Object, mockUpdater.Object, options, - Mock.Of()); + requestOptionsFactory); DocumentServiceLease afterAcquire = await documentServiceLeaseManagerCosmos.CreateLeaseIfNotExistAsync(feedRangeEpk, continuation); @@ -99,11 +103,16 @@ public async Task CreatesEPKBasedLease() Assert.AreEqual(continuation, afterAcquire.ContinuationToken); Assert.AreEqual(feedRangeEpk.Range.Min, ((FeedRangeEpk)epkBasedLease.FeedRange).Range.Min); Assert.AreEqual(feedRangeEpk.Range.Max, ((FeedRangeEpk)epkBasedLease.FeedRange).Range.Max); + ValidateRequestOptionsFactory(requestOptionsFactory, epkBasedLease); } - [TestMethod] - public async Task CreatesPartitionKeyBasedLease() + [DataTestMethod] + [DataRow(0, DisplayName = "Container with system PK")] + [DataRow(1, DisplayName = "Container with id PK")] + [DataRow(2, DisplayName = "Container with partitionKey PK")] + public async Task CreatesPartitionKeyBasedLease(int factoryType) { + RequestOptionsFactory requestOptionsFactory = GetRequestOptionsFactory(factoryType); string continuation = Guid.NewGuid().ToString(); DocumentServiceLeaseStoreManagerOptions options = new DocumentServiceLeaseStoreManagerOptions { @@ -131,7 +140,7 @@ public async Task CreatesPartitionKeyBasedLease() mockedContainer.Object, mockUpdater.Object, options, - Mock.Of()); + requestOptionsFactory); DocumentServiceLease afterAcquire = await documentServiceLeaseManagerCosmos.CreateLeaseIfNotExistAsync(partitionKeyRange, continuation); @@ -140,6 +149,7 @@ public async Task CreatesPartitionKeyBasedLease() Assert.IsNotNull(pkRangeBasedLease); Assert.AreEqual(continuation, afterAcquire.ContinuationToken); Assert.AreEqual(partitionKeyRange.Id, pkRangeBasedLease.CurrentLeaseToken); + ValidateRequestOptionsFactory(requestOptionsFactory, pkRangeBasedLease); } /// @@ -297,5 +307,36 @@ public async Task PopulateMissingRange() Assert.IsNotNull(afterAcquire.FeedRange); } + + private static RequestOptionsFactory GetRequestOptionsFactory(int factoryType) + { + return factoryType switch + { + 0 => new SinglePartitionRequestOptionsFactory(), + 1 => new PartitionedByIdCollectionRequestOptionsFactory(), + 2 => new PartitionedByPartitionKeyCollectionRequestOptionsFactory(), + _ => throw new Exception($"Unkown value for FactoryType: {factoryType}."), + }; + } + + private static void ValidateRequestOptionsFactory(RequestOptionsFactory requestOptionsFactory, DocumentServiceLease lease) + { + if (requestOptionsFactory is SinglePartitionRequestOptionsFactory) + { + Assert.IsNull(lease.PartitionKey); + } + else if (requestOptionsFactory is PartitionedByIdCollectionRequestOptionsFactory) + { + Assert.IsNull(lease.PartitionKey); + } + else if (requestOptionsFactory is PartitionedByPartitionKeyCollectionRequestOptionsFactory) + { + Assert.IsNotNull(lease.PartitionKey); + } + else + { + throw new Exception($"Unkown type mapping for FactoryType: {requestOptionsFactory.GetType()}."); + } + } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseStoreManagerBuilderTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseStoreManagerBuilderTests.cs new file mode 100644 index 0000000000..f6901bb75b --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseStoreManagerBuilderTests.cs @@ -0,0 +1,111 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + [TestCategory("ChangeFeed")] + public class DocumentServiceLeaseStoreManagerBuilderTests + { + [TestMethod] + public async Task InitializeAsync_WithContainerPartitionedById() + { + ContainerProperties containerProperties = new ContainerProperties + { + PartitionKey = new Documents.PartitionKeyDefinition() { Paths = new System.Collections.ObjectModel.Collection() { "/id" } } + }; + + + Mock leaseContainerMock = new Mock(); + leaseContainerMock.Setup(c => c.GetCachedContainerPropertiesAsync( + It.Is(b => b == false), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(containerProperties); + + await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync( + Mock.Of(), + leaseContainerMock.Object, + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString()); + } + + [TestMethod] + public async Task InitializeAsync_WithContainerPartitionedByPartitionKey() + { + ContainerProperties containerProperties = new ContainerProperties + { + PartitionKey = new Documents.PartitionKeyDefinition() { Paths = new System.Collections.ObjectModel.Collection() { "/partitionKey" } } + }; + + + Mock leaseContainerMock = new Mock(); + leaseContainerMock.Setup(c => c.GetCachedContainerPropertiesAsync( + It.Is(b => b == false), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(containerProperties); + + await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync( + Mock.Of(), + leaseContainerMock.Object, + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString()); + } + + [TestMethod] + public async Task InitializeAsync_WithContainerPartitionedBySystemPK() + { + ContainerProperties containerProperties = new ContainerProperties + { + PartitionKey = new Documents.PartitionKeyDefinition() { IsSystemKey = true } + }; + + + Mock leaseContainerMock = new Mock(); + leaseContainerMock.Setup(c => c.GetCachedContainerPropertiesAsync( + It.Is(b => b == false), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(containerProperties); + + await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync( + Mock.Of(), + leaseContainerMock.Object, + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString()); + } + + [TestMethod] + public async Task InitializeAsync_WithContainerPartitionedByRandom() + { + ContainerProperties containerProperties = new ContainerProperties + { + PartitionKey = new Documents.PartitionKeyDefinition() { Paths = new System.Collections.ObjectModel.Collection() { "/random" } } + }; + + + Mock leaseContainerMock = new Mock(); + leaseContainerMock.Setup(c => c.GetCachedContainerPropertiesAsync( + It.Is(b => b == false), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(containerProperties); + + await Assert.ThrowsExceptionAsync(() => DocumentServiceLeaseStoreManagerBuilder.InitializeAsync( + Mock.Of(), + leaseContainerMock.Object, + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString())); + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseTests.cs index 7196629159..dc6b1bb91d 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseTests.cs @@ -59,6 +59,7 @@ public void ValidateSerialization_AllFields() LeaseToken = "0", Owner = "owner", ContinuationToken = "continuation", + LeasePartitionKey = "pk", Timestamp = DateTime.Now - TimeSpan.FromSeconds(5), Properties = new Dictionary { { "key", "value" } } }; @@ -77,6 +78,7 @@ public void ValidateSerialization_AllFields() Assert.AreEqual(originalLease.Owner, lease.Owner); Assert.AreEqual(originalLease.ContinuationToken, lease.ContinuationToken); Assert.AreEqual(originalLease.Timestamp, lease.Timestamp); + Assert.AreEqual(originalLease.PartitionKey, lease.PartitionKey); Assert.AreEqual(originalLease.Properties["key"], lease.Properties["key"]); } @@ -98,6 +100,7 @@ public void ValidateSerialization_NullFields() Assert.IsNull(lease.LeaseToken); Assert.IsNull(lease.Owner); Assert.IsNull(lease.ContinuationToken); + Assert.IsNull(lease.PartitionKey); Assert.AreEqual(new DocumentServiceLeaseCore().Timestamp, lease.Timestamp); Assert.IsTrue(lease.Properties.Count == 0); } @@ -113,6 +116,7 @@ public void ValidateJsonSerialization_PKRangeLease() Owner = "owner", ContinuationToken = "continuation", Timestamp = DateTime.Now - TimeSpan.FromSeconds(5), + LeasePartitionKey = "partitionKey", Properties = new Dictionary { { "key", "value" } }, FeedRange = new FeedRangePartitionKeyRange("0") }; @@ -127,6 +131,7 @@ public void ValidateJsonSerialization_PKRangeLease() Assert.AreEqual(originalLease.ETag, documentServiceLeaseCore.ETag); Assert.AreEqual(originalLease.LeaseToken, documentServiceLeaseCore.LeaseToken); Assert.AreEqual(originalLease.Owner, documentServiceLeaseCore.Owner); + Assert.AreEqual(originalLease.PartitionKey, documentServiceLeaseCore.PartitionKey); Assert.AreEqual(originalLease.ContinuationToken, documentServiceLeaseCore.ContinuationToken); Assert.AreEqual(originalLease.Timestamp, documentServiceLeaseCore.Timestamp); Assert.AreEqual(originalLease.Properties["key"], documentServiceLeaseCore.Properties["key"]); @@ -149,6 +154,7 @@ public void ValidateJsonSerialization_EPKLease() Owner = "owner", ContinuationToken = "continuation", Timestamp = DateTime.Now - TimeSpan.FromSeconds(5), + LeasePartitionKey = "partitionKey", Properties = new Dictionary { { "key", "value" } }, FeedRange = new FeedRangeEpk(new Documents.Routing.Range("AA", "BB", true, false)) }; @@ -163,6 +169,7 @@ public void ValidateJsonSerialization_EPKLease() Assert.AreEqual(originalLease.ETag, documentServiceLeaseCore.ETag); Assert.AreEqual(originalLease.LeaseToken, documentServiceLeaseCore.LeaseToken); Assert.AreEqual(originalLease.Owner, documentServiceLeaseCore.Owner); + Assert.AreEqual(originalLease.PartitionKey, documentServiceLeaseCore.PartitionKey); Assert.AreEqual(originalLease.ContinuationToken, documentServiceLeaseCore.ContinuationToken); Assert.AreEqual(originalLease.Timestamp, documentServiceLeaseCore.Timestamp); Assert.AreEqual(originalLease.Properties["key"], documentServiceLeaseCore.Properties["key"]); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/RequestOptionsFactoryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/RequestOptionsFactoryTests.cs new file mode 100644 index 0000000000..5c1c7d26dc --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/RequestOptionsFactoryTests.cs @@ -0,0 +1,88 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + [TestCategory("ChangeFeed")] + public class RequestOptionsFactoryTests + { + private const string IdValue = "anId"; + private const string PartitionKeyValue = "aPartitionKey"; + + [TestMethod] + public void SinglePartitionRequestOptionsFactory_GetPartitionKey() + { + SinglePartitionRequestOptionsFactory factory = new SinglePartitionRequestOptionsFactory(); + Assert.AreEqual(PartitionKey.None, factory.GetPartitionKey(IdValue, PartitionKeyValue)); + } + + [TestMethod] + public void PartitionedByIdCollectionRequestOptionsFactory_GetPartitionKey() + { + PartitionedByIdCollectionRequestOptionsFactory factory = new PartitionedByIdCollectionRequestOptionsFactory(); + Assert.AreEqual(new PartitionKey(IdValue), factory.GetPartitionKey(IdValue, PartitionKeyValue)); + } + + [TestMethod] + public void PartitionedByPartitionKeyCollectionRequestOptionsFactory_GetPartitionKey() + { + PartitionedByPartitionKeyCollectionRequestOptionsFactory factory = new PartitionedByPartitionKeyCollectionRequestOptionsFactory(); + Assert.AreEqual(new PartitionKey(PartitionKeyValue), factory.GetPartitionKey(IdValue, PartitionKeyValue)); + } + + [TestMethod] + public void SinglePartitionRequestOptionsFactory_AddPartitionKeyIfNeeded() + { + bool invoked = false; + void action(string pk) + { + invoked = true; + Assert.Fail("Should not invoke"); + } + + SinglePartitionRequestOptionsFactory factory = new SinglePartitionRequestOptionsFactory(); + factory.AddPartitionKeyIfNeeded(action, PartitionKeyValue); + Assert.IsFalse(invoked); + } + + [TestMethod] + public void PartitionedByIdCollectionRequestOptionsFactory_AddPartitionKeyIfNeeded() + { + bool invoked = false; + void action(string pk) + { + invoked = true; + Assert.Fail("Should not invoke"); + } + + PartitionedByIdCollectionRequestOptionsFactory factory = new PartitionedByIdCollectionRequestOptionsFactory(); + factory.AddPartitionKeyIfNeeded(action, PartitionKeyValue); + Assert.IsFalse(invoked); + } + + [TestMethod] + public void PartitionedByPartitionKeyCollectionRequestOptionsFactory_AddPartitionKeyIfNeeded() + { + bool invoked = false; + void action(string pk) + { + invoked = true; + Assert.AreEqual(PartitionKeyValue, pk); ; + } + + PartitionedByPartitionKeyCollectionRequestOptionsFactory factory = new PartitionedByPartitionKeyCollectionRequestOptionsFactory(); + factory.AddPartitionKeyIfNeeded(action, PartitionKeyValue); + Assert.IsTrue(invoked); + } + } +}