From b3b0eaf86f842ce6d81699c2dd5f43e60cd729a1 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Fri, 17 Mar 2023 21:35:31 +0530 Subject: [PATCH 1/6] Code changes to mark the transport uri to unhealthy for which a connection reset event occures through the connection state listener. --- .../src/Routing/GatewayAddressCache.cs | 37 +++++++++++++++---- .../src/Routing/GlobalAddressResolver.cs | 25 ++----------- .../GatewayAddressCacheTests.cs | 26 ++++++++----- 3 files changed, 50 insertions(+), 38 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index b577443761..5bf93c45dc 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -387,7 +387,13 @@ private static void LogPartitionCacheRefresh( } } - public void TryRemoveAddresses( + /// + /// Marks the to Unhealthy that matches with the faulted + /// server key. + /// + /// An instance of that contains the host and + /// port of the backend replica. + public async Task MarkAddressesToUnhealthyAsync( ServerKey serverKey) { if (serverKey == null) @@ -395,7 +401,7 @@ public void TryRemoveAddresses( throw new ArgumentNullException(nameof(serverKey)); } - if (this.serverPartitionAddressToPkRangeIdMap.TryRemove(serverKey, out HashSet pkRangeIds)) + if (this.serverPartitionAddressToPkRangeIdMap.TryGetValue(serverKey, out HashSet pkRangeIds)) { PartitionKeyRangeIdentity[] pkRangeIdsCopy; lock (pkRangeIds) @@ -405,12 +411,29 @@ public void TryRemoveAddresses( foreach (PartitionKeyRangeIdentity pkRangeId in pkRangeIdsCopy) { - DefaultTrace.TraceInformation("Remove addresses for collectionRid :{0}, pkRangeId: {1}, serviceEndpoint: {2}", - pkRangeId.CollectionRid, - pkRangeId.PartitionKeyRangeId, - this.serviceEndpoint); + PartitionAddressInformation addressInfo = await this.serverPartitionAddressCache.GetAsync( + key: pkRangeId, + singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync( + null, + cachedAddresses: null, + pkRangeId.CollectionRid, + pkRangeId.PartitionKeyRangeId, + forceRefresh: false), + forceRefresh: (_) => false); + + IReadOnlyList transportAddresses = addressInfo.Get(Protocol.Tcp)?.ReplicaTransportAddressUris; + foreach (TransportAddressUri address in from TransportAddressUri transportAddress in transportAddresses + where serverKey.Equals(transportAddress.ReplicaServerKey) + select transportAddress) + { + DefaultTrace.TraceInformation("Marking a backend replica to Unhealthy for collectionRid :{0}, pkRangeId: {1}, serviceEndpoint: {2}, transportAddress: {3}", + pkRangeId.CollectionRid, + pkRangeId.PartitionKeyRangeId, + this.serviceEndpoint, + address.ToString()); - this.serverPartitionAddressCache.TryRemove(pkRangeId); + address.SetUnhealthy(); + } } } } diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs index 51a6c6d2f3..344f994395 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs @@ -228,33 +228,16 @@ public async Task ResolveAsync( } public async Task UpdateAsync( - IReadOnlyList addressCacheTokens, - CancellationToken cancellationToken) - { - List tasks = new List(); - - foreach (AddressCacheToken cacheToken in addressCacheTokens) - { - if (this.addressCacheByEndpoint.TryGetValue(cacheToken.ServiceEndpoint, out EndpointCache endpointCache)) - { - tasks.Add(endpointCache.AddressCache.UpdateAsync(cacheToken.PartitionKeyRangeIdentity, cancellationToken)); - } - } - - await Task.WhenAll(tasks); - } - - public Task UpdateAsync( ServerKey serverKey, CancellationToken cancellationToken) { foreach (KeyValuePair addressCache in this.addressCacheByEndpoint) { - // since we don't know which address cache contains the pkRanges mapped to this node, we do a tryRemove on all AddressCaches of all regions - addressCache.Value.AddressCache.TryRemoveAddresses(serverKey); + // since we don't know which address cache contains the pkRanges mapped to this node, + // we mark all transport uris that has the same server key to unhealthy status in the + // AddressCaches of all regions. + await addressCache.Value.AddressCache.MarkAddressesToUnhealthyAsync(serverKey); } - - return Task.CompletedTask; } /// diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs index 9e3b87ec83..3fa793e178 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs @@ -18,6 +18,7 @@ namespace Microsoft.Azure.Cosmos using Microsoft.Azure.Cosmos.Tests; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Client; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -129,8 +130,8 @@ public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync() Assert.IsNotNull(addresses.AllAddresses.Select(address => address.PhysicalUri == "https://blabla.com")); - // call updateAddress - cache.TryRemoveAddresses(new Documents.Rntbd.ServerKey(new Uri("https://blabla.com"))); + // Mark transport addresses to Unhealthy depcting a connection reset event. + await cache.MarkAddressesToUnhealthyAsync(new Documents.Rntbd.ServerKey(new Uri("https://blabla.com"))); // check if the addresss is updated addresses = await cache.TryGetAddressesAsync( @@ -140,7 +141,12 @@ public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync() false, CancellationToken.None); - Assert.IsNotNull(addresses.AllAddresses.Select(address => address.PhysicalUri == "https://blabla5.com")); + // Validate that the above transport uri with host blabla.com has been marked Unhealthy. + IReadOnlyList transportAddressUris = addresses.Get(Protocol.Tcp)?.ReplicaTransportAddressUris; + foreach (TransportAddressUri transportAddressUri in transportAddressUris) + { + Assert.IsTrue(transportAddressUri.GetCurrentHealthState().GetHealthStatus().Equals(TransportAddressHealthState.HealthStatus.Unhealthy)); + } } [TestMethod] @@ -1458,23 +1464,23 @@ public int GetMethodInvocationCount() } Task IOpenConnectionsHandler.TryOpenRntbdChannelsAsync( - IReadOnlyList addresses) + IEnumerable addresses) { - this.totalReceivedAddressesCounter = addresses.Count; - for (int i = 0; i < addresses.Count; i++) + this.totalReceivedAddressesCounter = addresses.Count(); + for (int i = 0; i < addresses.Count(); i++) { if (this.useAttemptBasedFailingIndexs) { if (this.failIndexesByAttempts.ContainsKey(i) && this.failIndexesByAttempts[i].Contains(this.methodInvocationCounter)) { this.ExecuteFailureCondition( - addresses: addresses, + addresses: addresses.ToList(), index: i); } else { this.ExecuteSuccessCondition( - addresses: addresses, + addresses: addresses.ToList(), index: i); } } @@ -1483,13 +1489,13 @@ Task IOpenConnectionsHandler.TryOpenRntbdChannelsAsync( if (this.failingIndexes.Contains(i)) { this.ExecuteFailureCondition( - addresses: addresses, + addresses: addresses.ToList(), index: i); } else { this.ExecuteSuccessCondition( - addresses: addresses, + addresses: addresses.ToList(), index: i); } } From 423fc2cb12068c6568379522a4e9adef8c959843 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Mon, 20 Mar 2023 14:24:24 +0530 Subject: [PATCH 2/6] Code changes to bump up the direct version. --- Directory.Build.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Directory.Build.props b/Directory.Build.props index e7ca7a8727..39004a08ab 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -3,7 +3,7 @@ 3.32.2 3.32.2 preview - 3.30.2 + 3.30.4 2.0.1 2.0.1 preview From 518d5ff00c70c21914deec029d2574a6249e549b Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Mon, 20 Mar 2023 16:49:39 +0530 Subject: [PATCH 3/6] Code changes to clean up Gateway Address Cache. --- .../src/Routing/GatewayAddressCache.cs | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 5bf93c45dc..49e9a188d0 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -438,28 +438,6 @@ where serverKey.Equals(transportAddress.ReplicaServerKey) } } - public async Task UpdateAsync( - PartitionKeyRangeIdentity partitionKeyRangeIdentity, - CancellationToken cancellationToken) - { - if (partitionKeyRangeIdentity == null) - { - throw new ArgumentNullException(nameof(partitionKeyRangeIdentity)); - } - - cancellationToken.ThrowIfCancellationRequested(); - - return await this.serverPartitionAddressCache.GetAsync( - key: partitionKeyRangeIdentity, - singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync( - null, - cachedAddresses: null, - partitionKeyRangeIdentity.CollectionRid, - partitionKeyRangeIdentity.PartitionKeyRangeId, - forceRefresh: true), - forceRefresh: (_) => true); - } - private async Task> ResolveMasterAsync(DocumentServiceRequest request, bool forceRefresh) { Tuple masterAddressAndRange = this.masterPartitionAddressCache; From 4cc7b929dc8e8f2b87a53995dc342161b1e7da1c Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Mon, 20 Mar 2023 19:24:23 +0530 Subject: [PATCH 4/6] Code changes to fix pipeline build. --- .../GatewayAddressCacheTests.cs | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs index 3fa793e178..809814aef1 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs @@ -19,6 +19,7 @@ namespace Microsoft.Azure.Cosmos using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Client; + using Microsoft.Azure.Documents.Rntbd; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -109,8 +110,11 @@ public void TestGatewayAddressCacheAutoRefreshOnSuboptimalPartition() public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync() { FakeMessageHandler messageHandler = new FakeMessageHandler(); - HttpClient httpClient = new HttpClient(messageHandler); - httpClient.Timeout = TimeSpan.FromSeconds(120); + HttpClient httpClient = new HttpClient(messageHandler) + { + Timeout = TimeSpan.FromSeconds(120) + }; + GatewayAddressCache cache = new GatewayAddressCache( new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint), Documents.Client.Protocol.Tcp, @@ -131,7 +135,8 @@ public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync() Assert.IsNotNull(addresses.AllAddresses.Select(address => address.PhysicalUri == "https://blabla.com")); // Mark transport addresses to Unhealthy depcting a connection reset event. - await cache.MarkAddressesToUnhealthyAsync(new Documents.Rntbd.ServerKey(new Uri("https://blabla.com"))); + ServerKey faultyServerKey = new (new Uri("https://blabla2.com")); + await cache.MarkAddressesToUnhealthyAsync(faultyServerKey); // check if the addresss is updated addresses = await cache.TryGetAddressesAsync( @@ -141,12 +146,15 @@ public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync() false, CancellationToken.None); - // Validate that the above transport uri with host blabla.com has been marked Unhealthy. - IReadOnlyList transportAddressUris = addresses.Get(Protocol.Tcp)?.ReplicaTransportAddressUris; - foreach (TransportAddressUri transportAddressUri in transportAddressUris) - { - Assert.IsTrue(transportAddressUri.GetCurrentHealthState().GetHealthStatus().Equals(TransportAddressHealthState.HealthStatus.Unhealthy)); - } + // Validate that the above transport uri with host blabla2.com has been marked Unhealthy. + IReadOnlyList transportAddressUris = addresses + .Get(Protocol.Tcp)? + .ReplicaTransportAddressUris; + + TransportAddressUri transportAddressUri = transportAddressUris + .Single(x => x.ReplicaServerKey.Equals(faultyServerKey)); + + Assert.IsTrue(condition: transportAddressUri.GetCurrentHealthState().GetHealthStatus().Equals(TransportAddressHealthState.HealthStatus.Unhealthy)); } [TestMethod] From 37def5d2bdc8927eca3086684ad876b841bffb91 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Mon, 20 Mar 2023 22:18:23 +0530 Subject: [PATCH 5/6] Code changes to fix serilization test failures. --- .../tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceTests.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceTests.cs index f747d35d22..ff27842308 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceTests.cs @@ -125,6 +125,8 @@ public void ValidateStoreResultSerialization() storeResultProperties.Remove(nameof(storeResult.Target.Exception)); storeResultProperties.Add("transportRequestTimeline"); storeResultProperties.Remove(nameof(storeResult.Target.TransportRequestStats)); + storeResultProperties.Add("ReplicaHealthStatuses"); + storeResultProperties.Remove(nameof(storeResult.Target.ReplicaHealthStatuses)); foreach (string key in jsonPropertyNames) { From 9abc152a27f3b60052a27d05f87fc7456fac14f5 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Tue, 21 Mar 2023 23:26:22 +0530 Subject: [PATCH 6/6] Code changes to add force refresh to the gateway function callback delegate. --- Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 49e9a188d0..eba425fb0d 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -411,6 +411,10 @@ public async Task MarkAddressesToUnhealthyAsync( foreach (PartitionKeyRangeIdentity pkRangeId in pkRangeIdsCopy) { + // The forceRefresh flag is set to true for the callback delegate is because, if the GetAsync() from the async + // non-blocking cache fails to look up the pkRangeId, then there are some inconsistency present in the cache, and it is + // more safe to do a force refresh to fetch the addresses from the gateway, instead of fetching it from the cache itself. + // Please note that, the chances of encountering such scenario is highly unlikely. PartitionAddressInformation addressInfo = await this.serverPartitionAddressCache.GetAsync( key: pkRangeId, singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync( @@ -418,7 +422,7 @@ public async Task MarkAddressesToUnhealthyAsync( cachedAddresses: null, pkRangeId.CollectionRid, pkRangeId.PartitionKeyRangeId, - forceRefresh: false), + forceRefresh: true), forceRefresh: (_) => false); IReadOnlyList transportAddresses = addressInfo.Get(Protocol.Tcp)?.ReplicaTransportAddressUris;