diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java index 7c4fd74dd2b0c..7801395b85f8d 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java @@ -116,6 +116,14 @@ public Object[][] replicaValidationArgsProvider() { }; } + @DataProvider(name = "isCollectionUnderWarmUpFlowArgsProvider") + public Object[] isCollectionUnderWarmUpFlowArgsProvider() { + return new Object[] { + // isCollectionUnderWarmUpFlow + true, false + }; + } + @Test(groups = { "direct" }, dataProvider = "targetPartitionsKeyRangeListAndCollectionLinkParams", timeOut = TIMEOUT) public void getServerAddressesViaGateway(List partitionKeyRangeIds, String collectionLink, @@ -255,10 +263,10 @@ public void tryGetAddresses_ForDataPartitions(String partitionKeyRangeId, String @DataProvider(name = "openAsyncTargetAndTargetPartitionsKeyRangeAndCollectionLinkParams") public Object[][] openAsyncTargetAndPartitionsKeyRangeTargetAndCollectionLinkParams() { return new Object[][] { - // openAsync target partition key range ids, target partition key range id, collection link - { ImmutableList.of("0", "1"), "0", getNameBasedCollectionLink() }, - { ImmutableList.of("0", "1"), "1", getNameBasedCollectionLink() }, - { ImmutableList.of("0", "1"), "1", getCollectionSelfLink() }, + // openAsync target partition key range ids, target partition key range id, collection link, is collection under warm up flow + { ImmutableList.of("0", "1"), "0", getNameBasedCollectionLink(), true }, + { ImmutableList.of("0", "1"), "1", getNameBasedCollectionLink(), true }, + { ImmutableList.of("0", "1"), "1", getCollectionSelfLink(), false }, }; } @@ -268,7 +276,8 @@ public Object[][] openAsyncTargetAndPartitionsKeyRangeTargetAndCollectionLinkPar public void tryGetAddresses_ForDataPartitions_AddressCachedByOpenAsync_NoHttpRequest( List allPartitionKeyRangeIds, String partitionKeyRangeId, - String collectionLink) throws Exception { + String collectionLink, + boolean isCollectionUnderWarmUpFlow) throws Exception { Configs configs = new Configs(); HttpClientUnderTestWrapper httpClientWrapper = getHttpClientUnderTestWrapper(configs); @@ -334,7 +343,8 @@ public void tryGetAddresses_ForDataPartitions_AddressCachedByOpenAsync_NoHttpReq public void tryGetAddresses_ForDataPartitions_ForceRefresh( List allPartitionKeyRangeIds, String partitionKeyRangeId, - String collectionLink) throws Exception { + String collectionLink, + boolean isCollectionUnderWarmUpFlow) throws Exception { Configs configs = new Configs(); HttpClientUnderTestWrapper httpClientWrapper = getHttpClientUnderTestWrapper(configs); @@ -375,21 +385,43 @@ public void tryGetAddresses_ForDataPartitions_ForceRefresh( collectionLink, new Database(), new HashMap<>()); + if (isCollectionUnderWarmUpFlow) { + Mockito + .when(proactiveOpenConnectionsProcessorMock.isCollectionRidUnderOpenConnectionsFlow(Mockito.anyString())) + .thenReturn(true); + } else { + Mockito + .when(proactiveOpenConnectionsProcessorMock.isCollectionRidUnderOpenConnectionsFlow(Mockito.anyString())) + .thenReturn(false); + } + PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, partitionKeyRangeId); Mono> addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, true); ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); - // isCollectionRidUnderOpenConnectionsFlow is called twice + // isCollectionRidUnderOpenConnectionsFlow is called 6 times // 1. as forceRefreshPartitionAddresses = true, this invokes isCollectionRidUnderOpenConnectionsFlow eventually // refreshing collectionRid->addresses map maintained by proactiveOpenConnectionsProcessor to track containers / addresses // under connection warm up flow // 2. replica validation will get triggered in case of unhealthyPending / unknown addresses, replica validation will do a // submitOpenConnectionTaskOutsideLoop for each of these addresses but before that it will also do // isCollectionRidUnderOpenConnectionsFlow check to determine the no. of connections to open - Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.times(2)) + // 3. it needs to be checked if the replicas (up for validation) with Unknown health status are used by a container which + // is part of the warm up flow - isCollectionRidUnderOpenConnectionsFlow is called here again for each replica + Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.times(6)) .isCollectionRidUnderOpenConnectionsFlow(Mockito.any()); - Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.atLeastOnce()) - .submitOpenConnectionTaskOutsideLoop(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt()); + + if (isCollectionUnderWarmUpFlow) { + // if the collection is under the warm up flow and the replica health status is Unknown / UnhealthyPending + // status then trigger replica validation for this replica address + Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.atLeastOnce()) + .submitOpenConnectionTaskOutsideLoop(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt()); + } else { + // if the collection is not under the warm up flow and the replica health status is Unknown + // then don't trigger replica validation for this replica address + Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.never()) + .submitOpenConnectionTaskOutsideLoop(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt()); + } Mockito.clearInvocations(proactiveOpenConnectionsProcessorMock); // no new request is made @@ -415,7 +447,8 @@ public void tryGetAddresses_ForDataPartitions_ForceRefresh( public void tryGetAddresses_ForDataPartitions_Suboptimal_Refresh( List allPartitionKeyRangeIds, String partitionKeyRangeId, - String collectionLink) throws Exception { + String collectionLink, + boolean isCollectionUnderWarmUpFlow) throws Exception { Configs configs = new Configs(); HttpClientUnderTestWrapper httpClientWrapper = getHttpClientUnderTestWrapper(configs); @@ -458,21 +491,39 @@ public void tryGetAddresses_ForDataPartitions_Suboptimal_Refresh( collectionLink, new Database(), new HashMap<>()); + if (isCollectionUnderWarmUpFlow) { + Mockito + .when(proactiveOpenConnectionsProcessorMock.isCollectionRidUnderOpenConnectionsFlow(Mockito.anyString())) + .thenReturn(true); + } else { + Mockito + .when(proactiveOpenConnectionsProcessorMock.isCollectionRidUnderOpenConnectionsFlow(Mockito.anyString())) + .thenReturn(false); + } + PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, partitionKeyRangeId); Mono> addressesInfosFromCacheObs = origCache.tryGetAddresses(req, partitionKeyRangeIdentity, true); ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); - // isCollectionRidUnderOpenConnectionsFlow called twice + // isCollectionRidUnderOpenConnectionsFlow called 6 times // 1. as forceRefreshPartitionAddresses = true, this invokes isCollectionRidUnderOpenConnectionsFlow eventually // refreshing collectionRid->addresses map maintained by proactiveOpenConnectionsProcessor to track containers / addresses // under connection warm up flow // 2. replica validation will get triggered in case of unhealthyPending / unknown addresses, replica validation will do a // submitOpenConnectionTaskOutsideLoop for each of these addresses but before that it will also do // isCollectionRidUnderOpenConnectionsFlow check to determine the no. of connections to open - Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.times(2)) - .isCollectionRidUnderOpenConnectionsFlow(Mockito.any()); - Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.atLeastOnce()) - .submitOpenConnectionTaskOutsideLoop(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt()); + // 3. it needs to be checked if the replicas (up for validation) with Unknown health status are used by a container which + // is part of the warm up flow - isCollectionRidUnderOpenConnectionsFlow is called here again for each replica + Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.times(6)) + .isCollectionRidUnderOpenConnectionsFlow(Mockito.any()); + if (isCollectionUnderWarmUpFlow) { + Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.atLeastOnce()) + .submitOpenConnectionTaskOutsideLoop(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt()); + } else { + Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.never()) + .submitOpenConnectionTaskOutsideLoop(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt()); + } + // no new request is made assertThat(httpClientWrapper.capturedRequests) .describedAs("force refresh fetched from gateway") @@ -956,6 +1007,7 @@ public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnable .recordCollectionRidsAndUrisUnderOpenConnectionsAndInitCaches(Mockito.any(), Mockito.any()); Mockito.clearInvocations(proactiveOpenConnectionsProcessorMock); httpClientWrapper.capturedRequests.clear(); + Mockito.when(proactiveOpenConnectionsProcessorMock.isCollectionRidUnderOpenConnectionsFlow(Mockito.anyString())).thenReturn(true); } PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(createdCollection.getResourceId(), "0"); @@ -979,19 +1031,23 @@ public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnable ArgumentCaptor serviceEndpointArguments = ArgumentCaptor.forClass(URI.class); if (submitOpenConnectionTasksAndInitCaches) { + // If submitOpenConnectionTasksAndInitCaches is called, then replica validation will also include for unknown status - // isCollectionRidUnderOpenConnectionsFlow called twice + // isCollectionRidUnderOpenConnectionsFlow called 6 times // 1. as forceRefreshPartitionAddresses = true, this invokes isCollectionRidUnderOpenConnectionsFlow eventually // refreshing collectionRid->addresses map maintained by proactiveOpenConnectionsProcessor to track containers / addresses // under connection warm up flow // 2. replica validation will get triggered in case of unhealthyPending / unknown addresses, replica validation will do a // submitOpenConnectionTaskOutsideLoop for each of these addresses but before that it will also do // isCollectionRidUnderOpenConnectionsFlow check to determine the no. of connections to open + // 3. it needs to be checked if the replicas (up for validation) with Unknown health status are used by a container which + // is part of the warm-up flow - isCollectionRidUnderOpenConnectionsFlow is called here again (called up to 4 times - 1 for each replica) + // for a given physical partition Mockito .verify(proactiveOpenConnectionsProcessorMock, Mockito.atLeastOnce()) .submitOpenConnectionTaskOutsideLoop(Mockito.any(), serviceEndpointArguments.capture(), openConnectionArguments.capture(), Mockito.anyInt()); assertThat(openConnectionArguments.getAllValues()).hasSize(addressInfosFromCache.size()); - Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.times(2)) + Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.times(6)) .isCollectionRidUnderOpenConnectionsFlow(Mockito.any()); } else { // Open connection will only be called for unhealthyPending status address @@ -1214,11 +1270,11 @@ public void tryGetAddress_unhealthyStatus_forceRefresh() throws Exception { .asList().hasSize(1); assertThat(cachedAddresses).hasSize(addressInfosFromCache.size()).containsAll(addressInfosFromCache); - // isCollectionRidUnderOpenConnectionsFlow called twice + // isCollectionRidUnderOpenConnectionsFlow called 2 times // 1. forceRefreshPartitionAddresses = false but when addresses are unhealthy for long enough, the SDK does a forceRefresh=true for addresses // which refreshes collectionRid->addresses map maintained by proactiveOpenConnectionsProcessor to track containers / addresses // under connection warm up flow, the refresh of this map only happens if isCollectionRidUnderOpenConnectionsFlow is true - // 2. replica validation will get triggered in case of unhealthyPending / unknown addresses, replica validation will do a + // 2. replica validation will get triggered in case of unhealthyPending addresses, replica validation will do a // submitOpenConnectionTaskOutsideLoop for each of these addresses but before that it will also do // isCollectionRidUnderOpenConnectionsFlow check to determine the no. of connections to open Mockito.verify(proactiveOpenConnectionsProcessorMock, Mockito.times(2)) @@ -1318,8 +1374,8 @@ public void tryGetAddress_repeatedlySetUnhealthyStatus_forceRefresh() throws Int } @SuppressWarnings({"unchecked", "rawtypes"}) - @Test(groups = { "direct" }, timeOut = TIMEOUT) - public void validateReplicaAddressesTests() throws URISyntaxException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + @Test(groups = { "direct" }, dataProvider = "isCollectionUnderWarmUpFlowArgsProvider", timeOut = TIMEOUT) + public void validateReplicaAddressesTests(boolean isCollectionUnderWarmUpFlow) throws URISyntaxException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { Configs configs = ConfigsBuilder.instance().withProtocol(Protocol.TCP).build(); URI serviceEndpoint = new URI(TestConfigurations.HOST); IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client; @@ -1368,6 +1424,16 @@ public void validateReplicaAddressesTests() throws URISyntaxException, NoSuchMet replicaValidationScopes.add(Unknown); replicaValidationScopes.add(UnhealthyPending); + if (isCollectionUnderWarmUpFlow) { + Mockito + .when(proactiveOpenConnectionsProcessorMock.isCollectionRidUnderOpenConnectionsFlow(Mockito.anyString())) + .thenReturn(true); + } else { + Mockito + .when(proactiveOpenConnectionsProcessorMock.isCollectionRidUnderOpenConnectionsFlow(Mockito.anyString())) + .thenReturn(false); + } + validateReplicaAddressesMethod .invoke( cache, @@ -1376,15 +1442,37 @@ public void validateReplicaAddressesTests() throws URISyntaxException, NoSuchMet // Validate openConnection will only be called for address in unhealthyPending status ArgumentCaptor openConnectionArguments = ArgumentCaptor.forClass(Uri.class); ArgumentCaptor serviceEndpointArguments = ArgumentCaptor.forClass(URI.class); - Mockito - .verify(proactiveOpenConnectionsProcessorMock, Mockito.times(2)) - .submitOpenConnectionTaskOutsideLoop(Mockito.any(), serviceEndpointArguments.capture(), openConnectionArguments.capture(), Mockito.anyInt()); - assertThat(openConnectionArguments.getAllValues()).containsExactlyElementsOf( + if (isCollectionUnderWarmUpFlow) { + Mockito + .verify(proactiveOpenConnectionsProcessorMock, Mockito.times(2)) + .submitOpenConnectionTaskOutsideLoop(Mockito.any(), serviceEndpointArguments.capture(), openConnectionArguments.capture(), Mockito.anyInt()); + + // when collection is under warm up flow both Unknown and UnhealthyPending are in the scope + // for replica validation + // address4 is in an UnhealthyPending state (prioritize UnhealthyPending for replica validation) + // and address2 is in an Unknown state + assertThat(openConnectionArguments.getAllValues()).containsExactlyElementsOf( Arrays.asList(address4, address2) - .stream() - .map(addressInformation -> addressInformation.getPhysicalUri()) - .collect(Collectors.toList())); + .stream() + .map(addressInformation -> addressInformation.getPhysicalUri()) + .collect(Collectors.toList())); + } else { + // when collection is not under warm up flow only UnhealthyPending is in the scope + // for replica validation + // address4 is in an UnhealthyPending state (prioritize UnhealthyPending for replica validation) + // and address2 is in an Unknown state (excluded from replica validation) + Mockito + .verify(proactiveOpenConnectionsProcessorMock, Mockito.times(1)) + .submitOpenConnectionTaskOutsideLoop(Mockito.any(), serviceEndpointArguments.capture(), openConnectionArguments.capture(), Mockito.anyInt()); + + // Only address4 is in an UnhealthyPending state + assertThat(openConnectionArguments.getAllValues()).containsExactlyElementsOf( + Arrays.asList(address4) + .stream() + .map(addressInformation -> addressInformation.getPhysicalUri()) + .collect(Collectors.toList())); + } } @SuppressWarnings("rawtypes") diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java index 2fb497bde5e6e..762fe81910fff 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java @@ -246,7 +246,7 @@ public void transitTimeoutOnWriteTests(boolean withFailureReason) throws Interru Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); - assertThat(healthyResult.getNow().contains("health check failed due to transit timeout on write threshold hit")); + assertThat(healthyResult.getNow()).contains("health check failed due to transit timeout on write threshold hit"); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -307,4 +307,109 @@ public void transitTimeoutOnWrite_HighCPULoadTests(boolean withFailureReason) th } } } + + @Test(groups = { "unit" }, dataProvider = "isHealthyWithReasonArgs") + public void cancellationPronenessOfChannel_Test(boolean withFailureReason) throws InterruptedException { + SslContext sslContextMock = Mockito.mock(SslContext.class); + + RntbdEndpoint.Config config = new RntbdEndpoint.Config( + new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(), + sslContextMock, + LogLevel.INFO); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + Channel channelMock = Mockito.mock(Channel.class); + ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); + RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); + SingleThreadEventLoop eventLoopMock = new DefaultEventLoop(); + RntbdClientChannelHealthChecker.Timestamps timestampsMock = Mockito.mock(RntbdClientChannelHealthChecker.Timestamps.class); + + Mockito.when(channelMock.pipeline()).thenReturn(channelPipelineMock); + Mockito.when(channelPipelineMock.get(RntbdRequestManager.class)).thenReturn(rntbdRequestManagerMock); + Mockito.when(channelMock.eventLoop()).thenReturn(eventLoopMock); + Mockito.when(rntbdRequestManagerMock.snapshotTimestamps()).thenReturn(timestampsMock); + ChannelPromise defaultChannelPromise = new DefaultChannelPromise(channelMock); + defaultChannelPromise.setSuccess(); + Mockito.when(channelMock.writeAndFlush(RntbdHealthCheckRequest.MESSAGE)).thenReturn(defaultChannelPromise); + + Instant current = Instant.now(); + Instant lastChannelReadTime = current.minusNanos(5 * config.nonRespondingChannelReadDelayTimeLimitInNanos()); + Instant lastChannelWriteTime = current.plusSeconds(1); + Instant lastChannelWriteAttemptTime = lastChannelWriteTime; + + Mockito.when(timestampsMock.lastChannelReadTime()).thenReturn(lastChannelReadTime); + Mockito.when(timestampsMock.lastChannelWriteTime()).thenReturn(lastChannelWriteTime); + Mockito.when(timestampsMock.lastChannelWriteAttemptTime()).thenReturn(lastChannelWriteAttemptTime); + Mockito.when(timestampsMock.transitTimeoutCount()).thenReturn(0); + Mockito.when(timestampsMock.cancellationCount()).thenReturn(config.cancellationCountSinceLastReadThreshold()); + + try(MockedStatic cpuMemoryMonitorMock = Mockito.mockStatic(CpuMemoryMonitor.class)) { + CpuLoadHistory cpuLoadHistoryMock = Mockito.mock(CpuLoadHistory.class); + cpuMemoryMonitorMock.when(CpuMemoryMonitor::getCpuLoad).thenReturn(cpuLoadHistoryMock); + Mockito.when(cpuLoadHistoryMock.isCpuOverThreshold(config.timeoutDetectionDisableCPUThreshold())).thenReturn(false); + + if (withFailureReason) { + Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); + assertThat(healthyResult.isSuccess()).isTrue(); + assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); + assertThat(healthyResult.getNow()).contains("health check failed due to channel being cancellation prone"); + } else { + Future healthyResult = healthChecker.isHealthy(channelMock).sync(); + assertThat(healthyResult.isSuccess()).isTrue(); + assertThat(healthyResult.getNow()).isFalse(); + } + } + } + + @Test(groups = { "unit" }, dataProvider = "isHealthyWithReasonArgs") + public void cancellationPronenessOfChannelWithHighCpuLoad_Test(boolean withFailureReason) throws InterruptedException { + SslContext sslContextMock = Mockito.mock(SslContext.class); + + RntbdEndpoint.Config config = new RntbdEndpoint.Config( + new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(), + sslContextMock, + LogLevel.INFO); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + Channel channelMock = Mockito.mock(Channel.class); + ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); + RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); + SingleThreadEventLoop eventLoopMock = new DefaultEventLoop(); + RntbdClientChannelHealthChecker.Timestamps timestampsMock = Mockito.mock(RntbdClientChannelHealthChecker.Timestamps.class); + + Mockito.when(channelMock.pipeline()).thenReturn(channelPipelineMock); + Mockito.when(channelPipelineMock.get(RntbdRequestManager.class)).thenReturn(rntbdRequestManagerMock); + Mockito.when(channelMock.eventLoop()).thenReturn(eventLoopMock); + Mockito.when(rntbdRequestManagerMock.snapshotTimestamps()).thenReturn(timestampsMock); + ChannelPromise defaultChannelPromise = new DefaultChannelPromise(channelMock); + defaultChannelPromise.setSuccess(); + Mockito.when(channelMock.writeAndFlush(RntbdHealthCheckRequest.MESSAGE)).thenReturn(defaultChannelPromise); + + Instant current = Instant.now(); + Instant lastChannelReadTime = current.minusNanos(config.nonRespondingChannelReadDelayTimeLimitInNanos()); + Instant lastChannelWriteTime = current.plusSeconds(1); + + Mockito.when(timestampsMock.lastChannelReadTime()).thenReturn(lastChannelReadTime); + Mockito.when(timestampsMock.lastChannelWriteTime()).thenReturn(lastChannelWriteTime); + Mockito.when(timestampsMock.lastChannelWriteAttemptTime()).thenReturn(lastChannelWriteTime); + Mockito.when(timestampsMock.transitTimeoutCount()).thenReturn(0); + Mockito.when(timestampsMock.cancellationCount()).thenReturn(config.cancellationCountSinceLastReadThreshold()); + + try(MockedStatic cpuMemoryMonitorMock = Mockito.mockStatic(CpuMemoryMonitor.class)) { + CpuLoadHistory cpuLoadHistoryMock = Mockito.mock(CpuLoadHistory.class); + cpuMemoryMonitorMock.when(CpuMemoryMonitor::getCpuLoad).thenReturn(cpuLoadHistoryMock); + Mockito.when(cpuLoadHistoryMock.isCpuOverThreshold(config.timeoutDetectionDisableCPUThreshold())).thenReturn(true); + + if (withFailureReason) { + Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); + assertThat(healthyResult.isSuccess()).isTrue(); + assertThat(healthyResult.getNow()).isEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); + } else { + Future healthyResult = healthChecker.isHealthy(channelMock).sync(); + assertThat(healthyResult.isSuccess()).isTrue(); + assertThat(healthyResult.getNow()).isTrue(); + } + } + } + } diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index cc3609d1df133..84baffb34341b 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -5,14 +5,17 @@ #### Features Added #### Breaking Changes - * Gone exceptions that are not idempotent should not be retried because it is not known if they succeeded for sure. The handling of the exception in this case is left to the user. Fixed retrying write operations when a gone exception occurs in bulk mode. - See [PR 35838](https://github.com/Azure/azure-sdk-for-java/pull/35838) + #### Bugs Fixed * Fixed retrying write operations when a gone exception occurs in bulk mode. - See [PR 35838](https://github.com/Azure/azure-sdk-for-java/pull/35838) * Fixed request start time in the `CosmosDiagnostics` for individual request responses - See [PR 35705](https://github.com/Azure/azure-sdk-for-java/pull/35705) * Fixed an issue where `ConnectionStateListener` tracked staled `Uris` which fails to mark the current `Uris` unhealthy properly - See [PR 36067](https://github.com/Azure/azure-sdk-for-java/pull/36067) * Gone exceptions that are not idempotent should not be retried because it is not known if they succeeded for sure. The handling of the exception in this case is left to the user. Fixed retrying write operations when a gone exception occurs in bulk mode. - See [PR 35838](https://github.com/Azure/azure-sdk-for-java/pull/35838) * Fixed an issue to update the last unhealthy timestamp for an `Uri` instance only when transitioning to `Unhealthy` from a different health status - See [36083](https://github.com/Azure/azure-sdk-for-java/pull/36083) +* Improved the channel health check flow to deem a channel unhealthy when it sees consecutive cancellations. - See [PR 36225](https://github.com/Azure/azure-sdk-for-java/pull/36225) +* Optimized the replica validation flow to validate replica health with `Unknown` health status only when the replica is +used by a container which is also part of the connection warm-up flow. - See [PR 36225](https://github.com/Azure/azure-sdk-for-java/pull/36225) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java index 9881337ec3e3a..b8d20168a8108 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java @@ -979,16 +979,25 @@ private void validateReplicaAddresses(String collectionRid, AddressInformation[] .stream() .map(Uri::getURIAsString) .collect(Collectors.toList())); + + minConnectionsRequiredForEndpoint = this.connectionPolicy.getMinConnectionPoolSizePerEndpoint(); } for (Uri addressToBeValidated : addressesNeedToValidation) { + // replica validation should be triggered for an address with Unknown health status only when + // the address is used by a container / collection which was part of the warm up flow + if (addressToBeValidated.getHealthStatus() == Uri.HealthStatus.Unknown + && !this.proactiveOpenConnectionsProcessor.isCollectionRidUnderOpenConnectionsFlow(collectionRid)) { + continue; + } + Mono.fromFuture(this.proactiveOpenConnectionsProcessor .submitOpenConnectionTaskOutsideLoop( collectionRid, this.serviceEndpoint, addressToBeValidated, - this.connectionPolicy.getMinConnectionPoolSizePerEndpoint())) + minConnectionsRequiredForEndpoint)) .subscribeOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC) .subscribe(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java index 6d98ccd7f77ad..1cca9d5557e30 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java @@ -617,6 +617,12 @@ public static final class Options { @JsonProperty() private final Duration timeoutDetectionOnWriteTimeLimit; + @JsonProperty() + private final Duration nonRespondingChannelReadDelayTimeLimit; + + @JsonProperty() + private final int cancellationCountSinceLastReadThreshold; + /** * Used during the open connections flow to determine the * minimum no. of connections to keep open to a particular @@ -667,6 +673,8 @@ private Options(final Builder builder) { this.timeoutDetectionOnWriteThreshold = builder.timeoutDetectionOnWriteThreshold; this.timeoutDetectionOnWriteTimeLimit = builder.timeoutDetectionOnWriteTimeLimit; this.minConnectionPoolSizePerEndpoint = builder.minConnectionPoolSizePerEndpoint; + this.nonRespondingChannelReadDelayTimeLimit = builder.nonRespondingChannelReadDelayTimeLimit; + this.cancellationCountSinceLastReadThreshold = builder.cancellationCountSinceLastReadThreshold; this.connectTimeout = builder.connectTimeout == null ? builder.tcpNetworkRequestTimeout @@ -709,6 +717,8 @@ private Options(final ConnectionPolicy connectionPolicy) { this.timeoutDetectionOnWriteTimeLimit = Duration.ofSeconds(6L); this.preferTcpNative = true; this.minConnectionPoolSizePerEndpoint = connectionPolicy.getMinConnectionPoolSizePerEndpoint(); + this.cancellationCountSinceLastReadThreshold = 5; + this.nonRespondingChannelReadDelayTimeLimit = Duration.ofSeconds(60); } // endregion @@ -841,6 +851,14 @@ public Duration timeoutDetectionOnWriteTimeLimit() { return this.timeoutDetectionOnWriteTimeLimit; } + public Duration nonRespondingChannelReadDelayTimeLimit() { + return this.nonRespondingChannelReadDelayTimeLimit; + } + + public int cancellationCountSinceLastReadThreshold() { + return this.cancellationCountSinceLastReadThreshold; + } + public int minConnectionPoolSizePerEndpoint() { return this.minConnectionPoolSizePerEndpoint; } @@ -1018,6 +1036,8 @@ public static class Builder { private int timeoutDetectionOnWriteThreshold; private Duration timeoutDetectionOnWriteTimeLimit; private int minConnectionPoolSizePerEndpoint; + private Duration nonRespondingChannelReadDelayTimeLimit; + private int cancellationCountSinceLastReadThreshold; // endregion @@ -1060,6 +1080,8 @@ public Builder(ConnectionPolicy connectionPolicy) { this.timeoutDetectionOnWriteThreshold = DEFAULT_OPTIONS.timeoutDetectionOnWriteThreshold; this.timeoutDetectionOnWriteTimeLimit = DEFAULT_OPTIONS.timeoutDetectionOnWriteTimeLimit; this.minConnectionPoolSizePerEndpoint = connectionPolicy.getMinConnectionPoolSizePerEndpoint(); + this.nonRespondingChannelReadDelayTimeLimit = DEFAULT_OPTIONS.nonRespondingChannelReadDelayTimeLimit; + this.cancellationCountSinceLastReadThreshold = DEFAULT_OPTIONS.cancellationCountSinceLastReadThreshold; } // endregion diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index 22e5e0694a97a..dd88b00ef4f53 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -73,6 +73,11 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck private final int timeoutOnWriteThreshold; @JsonProperty private final long timeoutOnWriteTimeLimitInNanos; + @JsonProperty + private final long nonRespondingChannelReadDelayTimeLimitInNanos; + @JsonProperty + private final int cancellationCountSinceLastReadThreshold; + // endregion @@ -101,6 +106,8 @@ public RntbdClientChannelHealthChecker(final Config config) { this.timeoutHighFrequencyTimeLimitInNanos = config.timeoutDetectionHighFrequencyTimeLimitInNanos(); this.timeoutOnWriteThreshold = config.timeoutDetectionOnWriteThreshold(); this.timeoutOnWriteTimeLimitInNanos = config.timeoutDetectionOnWriteTimeLimitInNanos(); + this.nonRespondingChannelReadDelayTimeLimitInNanos = config.nonRespondingChannelReadDelayTimeLimitInNanos(); + this.cancellationCountSinceLastReadThreshold = config.cancellationCountSinceLastReadThreshold(); } // endregion @@ -220,6 +227,11 @@ public Future isHealthyWithFailureReason(final Channel channel) { return promise.setSuccess(idleConnectionValidationMessage); } + String isCancellationProneChannelMessage = this.isCancellationProneChannel(timestamps, currentTime, requestManager, channel); + if (StringUtils.isNotEmpty(isCancellationProneChannelMessage)) { + return promise.setSuccess(isCancellationProneChannelMessage); + } + channel.writeAndFlush(RntbdHealthCheckRequest.MESSAGE).addListener(completed -> { if (completed.isSuccess()) { promise.setSuccess(RntbdHealthCheckResults.SuccessValue); @@ -397,6 +409,41 @@ private String idleConnectionValidation(Timestamps timestamps, Instant currentTi return errorMessage; } + private String isCancellationProneChannel(Timestamps timestamps, Instant currentTime, RntbdRequestManager requestManager, Channel channel) { + String errorMessage = StringUtils.EMPTY; + + final Optional rntbdContext = requestManager.rntbdContext(); + + if (timestamps.cancellationCount() >= this.cancellationCountSinceLastReadThreshold) { + + // Request cancellations could be a normal symptom under high CPU load. + // When request cancellations are due to high CPU, + // close the existing the connection and re-establish a new one will not help the issue but rather make it worse, return fast + if (CpuMemoryMonitor.getCpuLoad().isCpuOverThreshold(this.timeoutDetectionDisableCPUThreshold)) { + return errorMessage; + } + + final long readSuccessRecency = Duration.between(timestamps.lastChannelReadTime(), currentTime).toNanos(); + + if (readSuccessRecency >= this.nonRespondingChannelReadDelayTimeLimitInNanos) { + errorMessage = MessageFormat.format( + "{0} health check failed due to channel being cancellation prone: [rntbdContext: {1}, lastChannelWrite: {2}, lastChannelRead: {3}," + + "cancellationCountSinceLastSuccessfulRead: {4}, currentTime: {5}]", + channel, + rntbdContext, + timestamps.lastChannelWriteTime(), + timestamps.lastChannelReadTime(), + timestamps.cancellationCount(), + currentTime); + + logger.warn(errorMessage); + return errorMessage; + } + } + + return errorMessage; + } + @Override public String toString() { return RntbdObjectMapper.toString(this); @@ -429,6 +476,9 @@ public static final class Timestamps { private static final AtomicReferenceFieldUpdater transitTimeoutStartingTimeUpdater = newUpdater(Timestamps.class, Instant.class, "transitTimeoutStartingTime"); + private static final AtomicIntegerFieldUpdater cancellationCountUpdater = + AtomicIntegerFieldUpdater.newUpdater(Timestamps.class, "cancellationCount"); + private volatile Instant lastPingTime; private volatile Instant lastReadTime; private volatile Instant lastWriteTime; @@ -436,7 +486,7 @@ public static final class Timestamps { private volatile int transitTimeoutCount; private volatile int transitTimeoutWriteCount; private volatile Instant transitTimeoutStartingTime; - + private volatile int cancellationCount; public Timestamps() { lastPingUpdater.set(this, Instant.now()); lastReadUpdater.set(this, Instant.now()); @@ -454,6 +504,7 @@ public Timestamps(Timestamps other) { this.transitTimeoutCount = transitTimeoutCountUpdater.get(other); this.transitTimeoutWriteCount = transitTimeoutWriteCountUpdater.get(other); this.transitTimeoutStartingTime = transitTimeoutStartingTimeUpdater.get(other); + this.cancellationCount = cancellationCountUpdater.get(other); } public void channelPingCompleted() { @@ -463,6 +514,7 @@ public void channelPingCompleted() { public void channelReadCompleted() { lastReadUpdater.set(this, Instant.now()); this.resetTransitTimeout(); // we have got a successful read, so reset the transitTimeout count. + this.resetCancellationCount(); } public void channelWriteAttempted() { @@ -488,6 +540,10 @@ public void resetTransitTimeout() { transitTimeoutStartingTimeUpdater.set(this, null); } + public void resetCancellationCount() { + cancellationCountUpdater.set(this, 0); + } + @JsonProperty public Instant lastChannelPingTime() { return lastPingUpdater.get(this); @@ -523,6 +579,16 @@ public Instant transitTimeoutStartingTime() { return transitTimeoutStartingTimeUpdater.get(this); } + @JsonProperty + public int cancellationCount() { + return cancellationCountUpdater.get(this); + } + + @JsonProperty + public void cancellation() { + cancellationCountUpdater.incrementAndGet(this); + } + @Override public String toString() { return RntbdObjectMapper.toString(this); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java index 1e48b6c8ad478..cd53b6c2aa798 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java @@ -3,7 +3,6 @@ package com.azure.cosmos.implementation.directconnectivity.rntbd; -import com.azure.cosmos.implementation.IOpenConnectionsHandler; import com.azure.cosmos.implementation.UserAgentContainer; import com.azure.cosmos.implementation.directconnectivity.AddressSelector; import com.azure.cosmos.implementation.directconnectivity.IAddressResolver; @@ -311,6 +310,16 @@ public long timeoutDetectionOnWriteTimeLimitInNanos() { return this.options.timeoutDetectionOnWriteTimeLimit().toNanos(); } + @JsonProperty + public long nonRespondingChannelReadDelayTimeLimitInNanos() { + return this.options.nonRespondingChannelReadDelayTimeLimit().toNanos(); + } + + @JsonProperty + public int cancellationCountSinceLastReadThreshold() { + return this.options.cancellationCountSinceLastReadThreshold(); + } + @JsonProperty public int minConnectionPoolSizePerEndpoint() { return this.options.minConnectionPoolSizePerEndpoint(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java index 5bf2d15ff5114..91012efc39b71 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java @@ -831,6 +831,10 @@ private RntbdRequestRecord addPendingRequestRecord(final ChannelHandlerContext c if (pendingRequestTimeout.get() != null) { pendingRequestTimeout.get().cancel(); } + + if (record.isCancelled()) { + this.timestamps.cancellation(); + } }); return record;