Skip to content

Commit

Permalink
Change Feed Processor: Adds support for Graph API accounts (#2491)
Browse files Browse the repository at this point in the history
* Autodetecting pk type

* new tests

* emulator tests

* more tests

* comments

* accessor for PartitionKey and more tests
  • Loading branch information
ealsur authored May 27, 2021
1 parent 72b729b commit eb35647
Show file tree
Hide file tree
Showing 18 changed files with 516 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
[JsonConverter(typeof(DocumentServiceLeaseConverter))]
internal abstract class DocumentServiceLease
{
public const string IdPropertyName = "id";
public const string LeasePartitionKeyPropertyName = "partitionKey";

/// <summary>
/// Gets the processing distribution unit identifier.
/// </summary>
Expand Down Expand Up @@ -53,6 +56,11 @@ internal abstract class DocumentServiceLease
/// </summary>
public abstract string Id { get; }

/// <summary>
/// Gets the lease PartitionKey.
/// </summary>
public abstract string PartitionKey { get; }

/// <summary>
/// Gets the Concurrency Token.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public override async Task<DocumentServiceLease> CheckpointAsync(DocumentService
return await this.leaseUpdater.UpdateLeaseAsync(
lease,
lease.Id,
this.requestOptionsFactory.GetPartitionKey(lease.Id),
this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey),
serverLease =>
{
if (serverLease.Owner != lease.Owner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ public DocumentServiceLeaseCore()
{
}

[JsonProperty("id")]
[JsonProperty(IdPropertyName)]
public string LeaseId { get; set; }

[JsonProperty(LeasePartitionKeyPropertyName, NullValueHandling = NullValueHandling.Ignore)]
public string LeasePartitionKey { get; set; }

[JsonIgnore]
public override string PartitionKey => this.LeasePartitionKey;

[JsonProperty("version")]
public DocumentServiceLeaseVersion Version => DocumentServiceLeaseVersion.PartitionKeyRangeBasedLease;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ public DocumentServiceLeaseCoreEpk()
{
}

[JsonProperty("id")]
[JsonProperty(IdPropertyName)]
public string LeaseId { get; set; }

[JsonProperty(LeasePartitionKeyPropertyName, NullValueHandling = NullValueHandling.Ignore)]
public string LeasePartitionKey { get; set; }

[JsonIgnore]
public override string PartitionKey => this.LeasePartitionKey;

[JsonProperty("version")]
public DocumentServiceLeaseVersion Version => DocumentServiceLeaseVersion.EPKRangeBasedLease;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public override async Task<DocumentServiceLease> AcquireAsync(DocumentServiceLea
return await this.leaseUpdater.UpdateLeaseAsync(
lease,
lease.Id,
this.requestOptionsFactory.GetPartitionKey(lease.Id),
this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey),
serverLease =>
{
if (serverLease.Owner != oldOwner)
Expand Down Expand Up @@ -115,6 +115,8 @@ public override Task<DocumentServiceLease> CreateLeaseIfNotExistAsync(
FeedRange = new FeedRangeEpk(partitionKeyRange.ToRange())
};

this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());

return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease);
}

Expand All @@ -137,6 +139,8 @@ public override Task<DocumentServiceLease> CreateLeaseIfNotExistAsync(
FeedRange = feedRange
};

this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());

return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease);
}

Expand All @@ -155,7 +159,7 @@ public override async Task ReleaseAsync(DocumentServiceLease lease)
await this.leaseUpdater.UpdateLeaseAsync(
refreshedLease,
refreshedLease.Id,
this.requestOptionsFactory.GetPartitionKey(lease.Id),
this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey),
serverLease =>
{
if (serverLease.Owner != lease.Owner)
Expand All @@ -176,7 +180,7 @@ public override async Task DeleteAsync(DocumentServiceLease lease)
}

await this.leaseContainer.TryDeleteItemAsync<DocumentServiceLeaseCore>(
this.requestOptionsFactory.GetPartitionKey(lease.Id),
this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey),
lease.Id).ConfigureAwait(false);
}

Expand All @@ -197,7 +201,7 @@ public override async Task<DocumentServiceLease> RenewAsync(DocumentServiceLease
return await this.leaseUpdater.UpdateLeaseAsync(
refreshedLease,
refreshedLease.Id,
this.requestOptionsFactory.GetPartitionKey(lease.Id),
this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey),
serverLease =>
{
if (serverLease.Owner != lease.Owner)
Expand All @@ -222,7 +226,7 @@ public override async Task<DocumentServiceLease> UpdatePropertiesAsync(DocumentS
return await this.leaseUpdater.UpdateLeaseAsync(
lease,
lease.Id,
this.requestOptionsFactory.GetPartitionKey(lease.Id),
this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey),
serverLease =>
{
if (serverLease.Owner != lease.Owner)
Expand All @@ -238,7 +242,7 @@ public override async Task<DocumentServiceLease> UpdatePropertiesAsync(DocumentS
private async Task<DocumentServiceLease> TryCreateDocumentServiceLeaseAsync(DocumentServiceLease documentServiceLease)
{
bool created = await this.leaseContainer.TryCreateItemAsync<DocumentServiceLease>(
this.requestOptionsFactory.GetPartitionKey(documentServiceLease.Id),
this.requestOptionsFactory.GetPartitionKey(documentServiceLease.Id, documentServiceLease.PartitionKey),
documentServiceLease).ConfigureAwait(false) != null;
if (created)
{
Expand All @@ -252,7 +256,7 @@ private async Task<DocumentServiceLease> TryCreateDocumentServiceLeaseAsync(Docu

private async Task<DocumentServiceLease> TryGetLeaseAsync(DocumentServiceLease lease)
{
return await this.leaseContainer.TryGetItemAsync<DocumentServiceLease>(this.requestOptionsFactory.GetPartitionKey(lease.Id), lease.Id).ConfigureAwait(false);
return await this.leaseContainer.TryGetItemAsync<DocumentServiceLease>(this.requestOptionsFactory.GetPartitionKey(lease.Id, lease.PartitionKey), lease.Id).ConfigureAwait(false);
}

private string GetDocumentId(string partitionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,21 @@ public override async Task<bool> IsInitializedAsync()
{
string markerDocId = this.GetStoreMarkerName();

return await this.container.ItemExistsAsync(this.requestOptionsFactory.GetPartitionKey(markerDocId), markerDocId).ConfigureAwait(false);
return await this.container.ItemExistsAsync(this.requestOptionsFactory.GetPartitionKey(markerDocId, markerDocId), markerDocId).ConfigureAwait(false);
}

public override async Task MarkInitializedAsync()
{
string markerDocId = this.GetStoreMarkerName();
dynamic containerDocument = new { id = markerDocId };
InitializedDocument containerDocument = new InitializedDocument { Id = markerDocId };

this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => containerDocument.PartitionKey = pk, markerDocId);

using (Stream itemStream = CosmosContainerExtensions.DefaultJsonSerializer.ToStream(containerDocument))
{
using (ResponseMessage responseMessage = await this.container.CreateItemStreamAsync(
itemStream,
this.requestOptionsFactory.GetPartitionKey(markerDocId)).ConfigureAwait(false))
this.requestOptionsFactory.GetPartitionKey(markerDocId, markerDocId)).ConfigureAwait(false))
{
responseMessage.EnsureSuccessStatusCode();
}
Expand All @@ -58,8 +60,10 @@ public override async Task<bool> AcquireInitializationLockAsync(TimeSpan lockTim
{
string lockId = this.GetStoreLockName();
LockDocument containerDocument = new LockDocument() { Id = lockId, TimeToLive = (int)lockTime.TotalSeconds };
this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => containerDocument.PartitionKey = pk, lockId);

ItemResponse<LockDocument> document = await this.container.TryCreateItemAsync<LockDocument>(
this.requestOptionsFactory.GetPartitionKey(lockId),
this.requestOptionsFactory.GetPartitionKey(lockId, lockId),
containerDocument).ConfigureAwait(false);

if (document != null)
Expand All @@ -74,13 +78,14 @@ public override async Task<bool> AcquireInitializationLockAsync(TimeSpan lockTim
public override async Task<bool> ReleaseInitializationLockAsync()
{
string lockId = this.GetStoreLockName();

ItemRequestOptions requestOptions = new ItemRequestOptions()
{
IfMatchEtag = this.lockETag,
};

bool deleted = await this.container.TryDeleteItemAsync<LockDocument>(
this.requestOptionsFactory.GetPartitionKey(lockId),
this.requestOptionsFactory.GetPartitionKey(lockId, lockId),
lockId,
requestOptions).ConfigureAwait(false);

Expand Down Expand Up @@ -108,8 +113,20 @@ private class LockDocument
[JsonProperty("id")]
public string Id { get; set; }

[JsonProperty("partitionKey", NullValueHandling = NullValueHandling.Ignore)]
public string PartitionKey { get; set; }

[JsonProperty("ttl")]
public int TimeToLive { get; set; }
}

private class InitializedDocument
{
[JsonProperty("id")]
public string Id { get; set; }

[JsonProperty("partitionKey", NullValueHandling = NullValueHandling.Ignore)]
public string PartitionKey { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
/// </summary>
internal class DocumentServiceLeaseStoreManagerBuilder
{
private static readonly string IdPkPathName = "/" + DocumentServiceLease.IdPropertyName;
private static readonly string PartitionKeyPkPathName = "/" + DocumentServiceLease.LeasePartitionKeyPropertyName;

public static async Task<DocumentServiceLeaseStoreManager> InitializeAsync(
ContainerInternal monitoredContainer,
ContainerInternal leaseContainer,
Expand All @@ -30,14 +33,25 @@ public static async Task<DocumentServiceLeaseStoreManager> InitializeAsync(
bool isMigratedFixed = containerProperties.PartitionKey?.IsSystemKey == true;
if (isPartitioned
&& !isMigratedFixed
&& (containerProperties.PartitionKey.Paths.Count != 1 || containerProperties.PartitionKey.Paths[0] != "/id"))
&& (containerProperties.PartitionKey.Paths.Count != 1
|| !(containerProperties.PartitionKey.Paths[0] == IdPkPathName
|| containerProperties.PartitionKey.Paths[0] == PartitionKeyPkPathName)))
{
throw new ArgumentException($"The lease container, if partitioned, must have partition key equal to {IdPkPathName} or {PartitionKeyPkPathName}.");
}

RequestOptionsFactory requestOptionsFactory;
if (isPartitioned && !isMigratedFixed)
{
throw new ArgumentException("The lease collection, if partitioned, must have partition key equal to id.");
requestOptionsFactory = containerProperties.PartitionKey.Paths[0] != IdPkPathName ?
(RequestOptionsFactory)new PartitionedByPartitionKeyCollectionRequestOptionsFactory()
: (RequestOptionsFactory)new PartitionedByIdCollectionRequestOptionsFactory();
}
else
{
requestOptionsFactory = (RequestOptionsFactory)new SinglePartitionRequestOptionsFactory();

RequestOptionsFactory requestOptionsFactory = isPartitioned && !isMigratedFixed ?
(RequestOptionsFactory)new PartitionedByIdCollectionRequestOptionsFactory() :
(RequestOptionsFactory)new SinglePartitionRequestOptionsFactory();
}

DocumentServiceLeaseStoreManagerBuilder leaseStoreManagerBuilder = new DocumentServiceLeaseStoreManagerBuilder()
.WithLeasePrefix(leaseContainerPrefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
{
using System;
using Microsoft.Azure.Cosmos;

/// <summary>
/// Used to create request options for partitioned lease collections, when partition key is defined as /id.
/// </summary>
internal sealed class PartitionedByIdCollectionRequestOptionsFactory : RequestOptionsFactory
{
public override PartitionKey GetPartitionKey(string itemId) => new PartitionKey(itemId);
public override PartitionKey GetPartitionKey(string itemId, string partitionKey) => new PartitionKey(itemId);

public override FeedOptions CreateFeedOptions() => new FeedOptions { EnableCrossPartitionQuery = true };
public override void AddPartitionKeyIfNeeded(Action<string> partitionKeySetter, string partitionKey)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
{
using System;
using Microsoft.Azure.Cosmos;

/// <summary>
/// Used to create request options for partitioned lease collections, when partition key is defined as /partitionKey.
/// </summary>
internal sealed class PartitionedByPartitionKeyCollectionRequestOptionsFactory : RequestOptionsFactory
{
public override PartitionKey GetPartitionKey(string itemId, string partitionKey) => new PartitionKey(partitionKey);

public override void AddPartitionKeyIfNeeded(Action<string> partitionKeySetter, string partitionKey)
{
partitionKeySetter(partitionKey);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
{
using System;
using Microsoft.Azure.Cosmos;

/// <summary>
/// Defines request options for lease requests to use with <see cref="DocumentServiceLeaseStoreManagerCosmos"/> and <see cref="DocumentServiceLeaseStoreCosmos"/>.
/// </summary>
internal abstract class RequestOptionsFactory
{
public abstract PartitionKey GetPartitionKey(string itemId);
public abstract PartitionKey GetPartitionKey(string itemId, string partitionKey);

public abstract FeedOptions CreateFeedOptions();
public abstract void AddPartitionKeyIfNeeded(Action<string> partitionKeySetter, string partitionKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
{
using System;
using Microsoft.Azure.Cosmos;

/// <summary>
/// Used to create request options for non-partitioned lease collections.
/// </summary>
internal sealed class SinglePartitionRequestOptionsFactory : RequestOptionsFactory
{
public override FeedOptions CreateFeedOptions() => null;
public override void AddPartitionKeyIfNeeded(Action<string> partitionKeySetter, string partitionKey)
{
}

public override PartitionKey GetPartitionKey(string itemId) => PartitionKey.None;
public override PartitionKey GetPartitionKey(string itemId, string partitionKey) => PartitionKey.None;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ public class BaseChangeFeedClientHelper : BaseCosmosClientHelper

public Container LeaseContainer = null;

public async Task ChangeFeedTestInit()
public async Task ChangeFeedTestInit(string leaseContainerPk = "/id")
{
await base.TestInit(customizeClientBuilder: (builder) => builder.WithContentResponseOnWrite(false));
string PartitionKey = "/id";

ContainerResponse response = await this.database.CreateContainerAsync(
new ContainerProperties(id: "leases", partitionKeyPath: PartitionKey),
new ContainerProperties(id: "leases", partitionKeyPath: leaseContainerPk),
cancellationToken: this.cancellationToken);

this.LeaseContainer = response;
Expand Down
Loading

0 comments on commit eb35647

Please sign in to comment.