Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Availability: Adds non-blocking cache to partition key ranges #3080

Merged
merged 2 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKey
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: CancellationToken.None,
j82w marked this conversation as resolved.
Show resolved Hide resolved
NoOpTrace.Singleton);

if (refreshCache && collectionRoutingMap != null)
Expand All @@ -378,7 +377,6 @@ private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKey
collectionRid: collection.ResourceId,
previousValue: collectionRoutingMap,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,20 @@ private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
AuthorizationTokenType.PrimaryMasterKey))
{
ContainerProperties collection = await this.collectionCache.ResolveCollectionAsync(request, cancellationToken, this.trace);
CollectionRoutingMap routingMap = await this.partitionKeyRangeCache.TryLookupAsync(collection.ResourceId, null, request, cancellationToken, this.trace);
CollectionRoutingMap routingMap = await this.partitionKeyRangeCache.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
trace: this.trace);

if (routingMap != null)
{
// Force refresh.
await this.partitionKeyRangeCache.TryLookupAsync(
collection.ResourceId,
routingMap,
request,
cancellationToken,
this.trace);
collectionRid: collection.ResourceId,
previousValue: routingMap,
request: request,
trace: this.trace);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ public override async Task<CollectionRoutingMap> GetRoutingMapAsync(Cancellation
collectionRid,
previousValue: null,
request: null,
cancellationToken,
NoOpTrace.Singleton);

// Not found.
Expand All @@ -523,7 +522,6 @@ public override async Task<CollectionRoutingMap> GetRoutingMapAsync(Cancellation
collectionRid,
previousValue: null,
request: null,
cancellationToken,
NoOpTrace.Singleton);
}

Expand Down
29 changes: 17 additions & 12 deletions Microsoft.Azure.Cosmos/src/Routing/AddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,22 @@ private async Task<ResolutionResult> ResolveAddressesAndIdentityAsync(

ContainerProperties collection = await this.collectionCache.ResolveCollectionAsync(request, cancellationToken, NoOpTrace.Singleton);
CollectionRoutingMap routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collection.ResourceId, null, request, cancellationToken, NoOpTrace.Singleton);
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
trace: NoOpTrace.Singleton);

if (routingMap != null && request.ForceCollectionRoutingMapRefresh)
{
DefaultTrace.TraceInformation(
"AddressResolver.ResolveAddressesAndIdentityAsync ForceCollectionRoutingMapRefresh collection.ResourceId = {0}",
collection.ResourceId);

routingMap = await this.collectionRoutingMapCache.TryLookupAsync(collection.ResourceId, routingMap, request, cancellationToken, NoOpTrace.Singleton);
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: routingMap,
request: request,
trace: NoOpTrace.Singleton);
}

if (request.ForcePartitionKeyRangeRefresh)
Expand All @@ -280,7 +287,11 @@ private async Task<ResolutionResult> ResolveAddressesAndIdentityAsync(
request.ForcePartitionKeyRangeRefresh = false;
if (routingMap != null)
{
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(collection.ResourceId, routingMap, request, cancellationToken, NoOpTrace.Singleton);
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: routingMap,
request: request,
trace: NoOpTrace.Singleton);
}
}

Expand All @@ -293,10 +304,9 @@ private async Task<ResolutionResult> ResolveAddressesAndIdentityAsync(
collectionRoutingMapCacheIsUptoDate = false;
collection = await this.collectionCache.ResolveCollectionAsync(request, cancellationToken, NoOpTrace.Singleton);
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collection.ResourceId,
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: cancellationToken,
trace: NoOpTrace.Singleton);
}

Expand All @@ -319,30 +329,26 @@ private async Task<ResolutionResult> ResolveAddressesAndIdentityAsync(
if (!collectionCacheIsUptoDate)
{
request.ForceNameCacheRefresh = true;
collectionCacheIsUptoDate = true;
collection = await this.collectionCache.ResolveCollectionAsync(request, cancellationToken, NoOpTrace.Singleton);
if (collection.ResourceId != routingMap.CollectionUniqueId)
{
// Collection cache was stale. We resolved to new Rid. routing map cache is potentially stale
// for this new collection rid. Mark it as such.
collectionRoutingMapCacheIsUptoDate = false;
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collection.ResourceId,
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: cancellationToken,
trace: NoOpTrace.Singleton);
}
}

if (!collectionRoutingMapCacheIsUptoDate)
{
collectionRoutingMapCacheIsUptoDate = true;
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collection.ResourceId,
previousValue: routingMap,
request: request,
cancellationToken: cancellationToken,
trace: NoOpTrace.Singleton);
}

Expand Down Expand Up @@ -449,7 +455,6 @@ private async Task<ResolutionResult> TryResolveServerPartitionAsync(
PartitionKeyRange range;
string partitionKeyString = request.Headers[HttpConstants.HttpHeaders.PartitionKey];

object effectivePartitionKeyStringObject = null;
if (partitionKeyString != null)
{
range = AddressResolver.TryResolveServerPartitionByPartitionKey(
Expand All @@ -461,7 +466,7 @@ private async Task<ResolutionResult> TryResolveServerPartitionAsync(
}
else if (request.Properties != null && request.Properties.TryGetValue(
WFConstants.BackendHeaders.EffectivePartitionKeyString,
out effectivePartitionKeyStringObject))
out object effectivePartitionKeyStringObject))
{
// Allow EPK only for partitioned collection (excluding migrated fixed collections)
if (!collection.HasPartitionKey || collection.PartitionKey.IsSystemKey.GetValueOrDefault(false))
Expand Down
7 changes: 5 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ public async Task OpenAsync(
ContainerProperties collection,
CancellationToken cancellationToken)
{
CollectionRoutingMap routingMap =
await this.routingMapProvider.TryLookupAsync(collection.ResourceId, null, null, cancellationToken, NoOpTrace.Singleton);
CollectionRoutingMap routingMap = await this.routingMapProvider.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: null,
request: null,
trace: NoOpTrace.Singleton);

if (routingMap == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Task<CollectionRoutingMap> TryLookupAsync(
string collectionRid,
CollectionRoutingMap previousValue,
DocumentServiceRequest request,
CancellationToken cancellationToken,
ITrace trace);
}
}
87 changes: 57 additions & 30 deletions Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal class PartitionKeyRangeCache : IRoutingMapProvider, ICollectionRoutingM
{
private const string PageSizeString = "-1";

private readonly AsyncCache<string, CollectionRoutingMap> routingMapCache;
private readonly AsyncCacheNonBlocking<string, CollectionRoutingMap> routingMapCache;

private readonly ICosmosAuthorizationTokenProvider authorizationTokenProvider;
private readonly IStoreModel storeModel;
Expand All @@ -36,9 +36,8 @@ public PartitionKeyRangeCache(
IStoreModel storeModel,
CollectionCache collectionCache)
{
this.routingMapCache = new AsyncCache<string, CollectionRoutingMap>(
EqualityComparer<CollectionRoutingMap>.Default,
j82w marked this conversation as resolved.
Show resolved Hide resolved
StringComparer.Ordinal);
this.routingMapCache = new AsyncCacheNonBlocking<string, CollectionRoutingMap>(
keyEqualityComparer: StringComparer.Ordinal);
this.authorizationTokenProvider = authorizationTokenProvider;
this.storeModel = storeModel;
this.collectionCache = collectionCache;
Expand All @@ -54,12 +53,19 @@ public virtual async Task<IReadOnlyList<PartitionKeyRange>> TryGetOverlappingRan
{
Debug.Assert(ResourceId.TryParse(collectionRid, out ResourceId collectionRidParsed), "Could not parse CollectionRid from ResourceId.");

CollectionRoutingMap routingMap =
await this.TryLookupAsync(collectionRid, null, null, CancellationToken.None, childTrace);
CollectionRoutingMap routingMap = await this.TryLookupAsync(
collectionRid: collectionRid,
previousValue: null,
request: null,
trace: childTrace);

if (forceRefresh && routingMap != null)
{
routingMap = await this.TryLookupAsync(collectionRid, routingMap, null, CancellationToken.None, childTrace);
routingMap = await this.TryLookupAsync(
collectionRid: collectionRid,
previousValue: routingMap,
request: null,
trace: childTrace);
}

if (routingMap == null)
Expand All @@ -78,15 +84,21 @@ public virtual async Task<PartitionKeyRange> TryGetPartitionKeyRangeByIdAsync(
ITrace trace,
bool forceRefresh = false)
{
ResourceId collectionRidParsed;
Debug.Assert(ResourceId.TryParse(collectionResourceId, out collectionRidParsed), "Could not parse CollectionRid from ResourceId.");
Debug.Assert(ResourceId.TryParse(collectionResourceId, out _), "Could not parse CollectionRid from ResourceId.");

CollectionRoutingMap routingMap =
await this.TryLookupAsync(collectionResourceId, null, null, CancellationToken.None, trace);
CollectionRoutingMap routingMap = await this.TryLookupAsync(
collectionRid: collectionResourceId,
previousValue: null,
request: null,
trace: trace);

if (forceRefresh && routingMap != null)
{
routingMap = await this.TryLookupAsync(collectionResourceId, routingMap, null, CancellationToken.None, trace);
routingMap = await this.TryLookupAsync(
collectionRid: collectionResourceId,
previousValue: routingMap,
request: null,
trace: trace);
}

if (routingMap == null)
Expand All @@ -102,20 +114,19 @@ public virtual async Task<CollectionRoutingMap> TryLookupAsync(
string collectionRid,
CollectionRoutingMap previousValue,
DocumentServiceRequest request,
CancellationToken cancellationToken,
ITrace trace)
{
try
{
return await this.routingMapCache.GetAsync(
collectionRid,
previousValue,
() => this.GetRoutingMapForCollectionAsync(collectionRid,
previousValue,
trace,
request?.RequestContext?.ClientRequestStatistics,
cancellationToken),
CancellationToken.None);
key: collectionRid,
singleValueInitFunc: () => this.GetRoutingMapForCollectionAsync(
collectionRid,
previousValue,
trace,
request?.RequestContext?.ClientRequestStatistics),
forceRefresh: (currentValue) => PartitionKeyRangeCache.ShouldForceRefresh(previousValue, currentValue),
callBackOnForceRefresh: null);
}
catch (DocumentClientException ex)
{
Expand All @@ -139,6 +150,20 @@ public virtual async Task<CollectionRoutingMap> TryLookupAsync(
}
}

private static bool ShouldForceRefresh(
CollectionRoutingMap previousValue,
CollectionRoutingMap currentValue)
{
// No need to refresh because it's a new cache value
if (previousValue == null || currentValue == null)
j82w marked this conversation as resolved.
Show resolved Hide resolved
{
return false;
}

// If none values match so no changes have occurred
j82w marked this conversation as resolved.
Show resolved Hide resolved
return previousValue.ChangeFeedNextIfNoneMatch == currentValue.ChangeFeedNextIfNoneMatch;
}

public async Task<PartitionKeyRange> TryGetRangeByPartitionKeyRangeIdAsync(string collectionRid,
string partitionKeyRangeId,
ITrace trace,
Expand All @@ -147,10 +172,14 @@ public async Task<PartitionKeyRange> TryGetRangeByPartitionKeyRangeIdAsync(strin
try
{
CollectionRoutingMap routingMap = await this.routingMapCache.GetAsync(
collectionRid,
null,
() => this.GetRoutingMapForCollectionAsync(collectionRid, null, trace, clientSideRequestStatistics, CancellationToken.None),
CancellationToken.None);
key: collectionRid,
singleValueInitFunc: () => this.GetRoutingMapForCollectionAsync(
collectionRid: collectionRid,
previousRoutingMap: null,
trace: trace,
clientSideRequestStatistics: clientSideRequestStatistics),
forceRefresh: (_) => false,
callBackOnForceRefresh: null);

return routingMap.TryGetRangeByPartitionKeyRangeId(partitionKeyRangeId);
}
Expand All @@ -169,11 +198,10 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
string collectionRid,
CollectionRoutingMap previousRoutingMap,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken)
IClientSideRequestStatistics clientSideRequestStatistics)
{
List<PartitionKeyRange> ranges = new List<PartitionKeyRange>();
string changeFeedNextIfNoneMatch = previousRoutingMap == null ? null : previousRoutingMap.ChangeFeedNextIfNoneMatch;
string changeFeedNextIfNoneMatch = previousRoutingMap?.ChangeFeedNextIfNoneMatch;

HttpStatusCode lastStatusCode = HttpStatusCode.OK;
do
Expand All @@ -190,8 +218,7 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
RetryOptions retryOptions = new RetryOptions();
using (DocumentServiceResponse response = await BackoffRetryUtility<DocumentServiceResponse>.ExecuteAsync(
() => this.ExecutePartitionKeyRangeReadChangeFeedAsync(collectionRid, headers, trace, clientSideRequestStatistics),
new ResourceThrottleRetryPolicy(retryOptions.MaxRetryAttemptsOnThrottledRequests, retryOptions.MaxRetryWaitTimeInSeconds),
cancellationToken))
new ResourceThrottleRetryPolicy(retryOptions.MaxRetryAttemptsOnThrottledRequests, retryOptions.MaxRetryWaitTimeInSeconds)))
{
lastStatusCode = response.StatusCode;
changeFeedNextIfNoneMatch = response.Headers[HttpConstants.HttpHeaders.ETag];
Expand Down
Loading