Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Resiliency: Refactors GatewayAddressCache to Mark TransportAddresses to Unhealthy when Connection Reset Event Occurs #3768

Merged
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<ClientOfficialVersion>3.32.2</ClientOfficialVersion>
<ClientPreviewVersion>3.32.2</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview</ClientPreviewSuffixVersion>
<DirectVersion>3.30.2</DirectVersion>
<DirectVersion>3.30.4</DirectVersion>
<EncryptionOfficialVersion>2.0.1</EncryptionOfficialVersion>
<EncryptionPreviewVersion>2.0.1</EncryptionPreviewVersion>
<EncryptionPreviewSuffixVersion>preview</EncryptionPreviewSuffixVersion>
Expand Down
61 changes: 33 additions & 28 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,21 @@ private static void LogPartitionCacheRefresh(
}
}

public void TryRemoveAddresses(
/// <summary>
/// Marks the <see cref="TransportAddressUri"/> to Unhealthy that matches with the faulted
/// server key.
/// </summary>
/// <param name="serverKey">An instance of <see cref="ServerKey"/> that contains the host and
/// port of the backend replica.</param>
public async Task MarkAddressesToUnhealthyAsync(
ServerKey serverKey)
{
if (serverKey == null)
{
throw new ArgumentNullException(nameof(serverKey));
}

if (this.serverPartitionAddressToPkRangeIdMap.TryRemove(serverKey, out HashSet<PartitionKeyRangeIdentity> pkRangeIds))
if (this.serverPartitionAddressToPkRangeIdMap.TryGetValue(serverKey, out HashSet<PartitionKeyRangeIdentity> pkRangeIds))
{
PartitionKeyRangeIdentity[] pkRangeIdsCopy;
lock (pkRangeIds)
Expand All @@ -405,36 +411,35 @@ public void TryRemoveAddresses(

foreach (PartitionKeyRangeIdentity pkRangeId in pkRangeIdsCopy)
{
DefaultTrace.TraceInformation("Remove addresses for collectionRid :{0}, pkRangeId: {1}, serviceEndpoint: {2}",
pkRangeId.CollectionRid,
pkRangeId.PartitionKeyRangeId,
this.serviceEndpoint);

this.serverPartitionAddressCache.TryRemove(pkRangeId);
}
}
}

public async Task<PartitionAddressInformation> UpdateAsync(
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
CancellationToken cancellationToken)
{
if (partitionKeyRangeIdentity == null)
{
throw new ArgumentNullException(nameof(partitionKeyRangeIdentity));
}

cancellationToken.ThrowIfCancellationRequested();

return await this.serverPartitionAddressCache.GetAsync(
key: partitionKeyRangeIdentity,
// The forceRefresh flag is set to true for the callback delegate is because, if the GetAsync() from the async
// non-blocking cache fails to look up the pkRangeId, then there are some inconsistency present in the cache, and it is
// more safe to do a force refresh to fetch the addresses from the gateway, instead of fetching it from the cache itself.
// Please note that, the chances of encountering such scenario is highly unlikely.
PartitionAddressInformation addressInfo = await this.serverPartitionAddressCache.GetAsync(
key: pkRangeId,
singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync(
null,
cachedAddresses: null,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
pkRangeId.CollectionRid,
pkRangeId.PartitionKeyRangeId,
forceRefresh: true),
forceRefresh: (_) => true);
forceRefresh: (_) => false);

IReadOnlyList<TransportAddressUri> transportAddresses = addressInfo.Get(Protocol.Tcp)?.ReplicaTransportAddressUris;
foreach (TransportAddressUri address in from TransportAddressUri transportAddress in transportAddresses
where serverKey.Equals(transportAddress.ReplicaServerKey)
select transportAddress)
{
DefaultTrace.TraceInformation("Marking a backend replica to Unhealthy for collectionRid :{0}, pkRangeId: {1}, serviceEndpoint: {2}, transportAddress: {3}",
pkRangeId.CollectionRid,
pkRangeId.PartitionKeyRangeId,
this.serviceEndpoint,
address.ToString());

address.SetUnhealthy();
}
}
}
}

private async Task<Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation>> ResolveMasterAsync(DocumentServiceRequest request, bool forceRefresh)
Expand Down
25 changes: 4 additions & 21 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,33 +228,16 @@ public async Task<PartitionAddressInformation> ResolveAsync(
}

public async Task UpdateAsync(
IReadOnlyList<AddressCacheToken> addressCacheTokens,
CancellationToken cancellationToken)
{
List<Task> tasks = new List<Task>();

foreach (AddressCacheToken cacheToken in addressCacheTokens)
{
if (this.addressCacheByEndpoint.TryGetValue(cacheToken.ServiceEndpoint, out EndpointCache endpointCache))
{
tasks.Add(endpointCache.AddressCache.UpdateAsync(cacheToken.PartitionKeyRangeIdentity, cancellationToken));
}
}

await Task.WhenAll(tasks);
}

public Task UpdateAsync(
ServerKey serverKey,
CancellationToken cancellationToken)
{
foreach (KeyValuePair<Uri, EndpointCache> addressCache in this.addressCacheByEndpoint)
{
// since we don't know which address cache contains the pkRanges mapped to this node, we do a tryRemove on all AddressCaches of all regions
addressCache.Value.AddressCache.TryRemoveAddresses(serverKey);
// since we don't know which address cache contains the pkRanges mapped to this node,
// we mark all transport uris that has the same server key to unhealthy status in the
// AddressCaches of all regions.
await addressCache.Value.AddressCache.MarkAddressesToUnhealthyAsync(serverKey);
}

return Task.CompletedTask;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Tests;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Rntbd;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

Expand Down Expand Up @@ -108,8 +110,11 @@ public void TestGatewayAddressCacheAutoRefreshOnSuboptimalPartition()
public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync()
{
FakeMessageHandler messageHandler = new FakeMessageHandler();
HttpClient httpClient = new HttpClient(messageHandler);
httpClient.Timeout = TimeSpan.FromSeconds(120);
HttpClient httpClient = new HttpClient(messageHandler)
{
Timeout = TimeSpan.FromSeconds(120)
};

GatewayAddressCache cache = new GatewayAddressCache(
new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint),
Documents.Client.Protocol.Tcp,
Expand All @@ -129,8 +134,9 @@ public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync()

Assert.IsNotNull(addresses.AllAddresses.Select(address => address.PhysicalUri == "https://blabla.com"));

// call updateAddress
cache.TryRemoveAddresses(new Documents.Rntbd.ServerKey(new Uri("https://blabla.com")));
// Mark transport addresses to Unhealthy depcting a connection reset event.
ServerKey faultyServerKey = new (new Uri("https://blabla2.com"));
await cache.MarkAddressesToUnhealthyAsync(faultyServerKey);

// check if the addresss is updated
addresses = await cache.TryGetAddressesAsync(
Expand All @@ -140,7 +146,15 @@ public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync()
false,
CancellationToken.None);

Assert.IsNotNull(addresses.AllAddresses.Select(address => address.PhysicalUri == "https://blabla5.com"));
// Validate that the above transport uri with host blabla2.com has been marked Unhealthy.
IReadOnlyList<TransportAddressUri> transportAddressUris = addresses
.Get(Protocol.Tcp)?
.ReplicaTransportAddressUris;

TransportAddressUri transportAddressUri = transportAddressUris
.Single(x => x.ReplicaServerKey.Equals(faultyServerKey));

Assert.IsTrue(condition: transportAddressUri.GetCurrentHealthState().GetHealthStatus().Equals(TransportAddressHealthState.HealthStatus.Unhealthy));
}

[TestMethod]
Expand Down Expand Up @@ -1458,23 +1472,23 @@ public int GetMethodInvocationCount()
}

Task IOpenConnectionsHandler.TryOpenRntbdChannelsAsync(
IReadOnlyList<TransportAddressUri> addresses)
IEnumerable<TransportAddressUri> addresses)
{
this.totalReceivedAddressesCounter = addresses.Count;
for (int i = 0; i < addresses.Count; i++)
this.totalReceivedAddressesCounter = addresses.Count();
for (int i = 0; i < addresses.Count(); i++)
{
if (this.useAttemptBasedFailingIndexs)
{
if (this.failIndexesByAttempts.ContainsKey(i) && this.failIndexesByAttempts[i].Contains(this.methodInvocationCounter))
{
this.ExecuteFailureCondition(
addresses: addresses,
addresses: addresses.ToList(),
index: i);
}
else
{
this.ExecuteSuccessCondition(
addresses: addresses,
addresses: addresses.ToList(),
index: i);
}
}
Expand All @@ -1483,13 +1497,13 @@ Task IOpenConnectionsHandler.TryOpenRntbdChannelsAsync(
if (this.failingIndexes.Contains(i))
{
this.ExecuteFailureCondition(
addresses: addresses,
addresses: addresses.ToList(),
index: i);
}
else
{
this.ExecuteSuccessCondition(
addresses: addresses,
addresses: addresses.ToList(),
index: i);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public void ValidateStoreResultSerialization()
storeResultProperties.Remove(nameof(storeResult.Target.Exception));
storeResultProperties.Add("transportRequestTimeline");
storeResultProperties.Remove(nameof(storeResult.Target.TransportRequestStats));
storeResultProperties.Add("ReplicaHealthStatuses");
storeResultProperties.Remove(nameof(storeResult.Target.ReplicaHealthStatuses));

foreach (string key in jsonPropertyNames)
{
Expand Down