Skip to content

Commit

Permalink
Channel health check improvement for cancelled requests (#36225)
Browse files Browse the repository at this point in the history
* Attempt to reproduce Walmart issue.

* Added a cancellation gate to RntbdClientChannelHealthChecker.

* Refactorings.

* Refactorings.

* Added tests.

* Refactorings.

* Fixed replica validation to open only 1 connection to replica.

* Attempt at fixing tests.

* Attempt at fixing tests.

* Attempt at fixing tests.

* Updated CHANGELOG.md.

* Refactorings.

* Refactorings.

* Reacting to review comments.

---------

Co-authored-by: Azure SDK Bot <[email protected]>
  • Loading branch information
jeet1995 and azure-sdk authored Aug 4, 2023
1 parent d464be7 commit e618ba1
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 34 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public void transitTimeoutOnWriteTests(boolean withFailureReason) throws Interru
Future<String> healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue);
assertThat(healthyResult.getNow().contains("health check failed due to transit timeout on write threshold hit"));
assertThat(healthyResult.getNow()).contains("health check failed due to transit timeout on write threshold hit");
} else {
Future<Boolean> healthyResult = healthChecker.isHealthy(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
Expand Down Expand Up @@ -307,4 +307,109 @@ public void transitTimeoutOnWrite_HighCPULoadTests(boolean withFailureReason) th
}
}
}

@Test(groups = { "unit" }, dataProvider = "isHealthyWithReasonArgs")
public void cancellationPronenessOfChannel_Test(boolean withFailureReason) throws InterruptedException {
SslContext sslContextMock = Mockito.mock(SslContext.class);

RntbdEndpoint.Config config = new RntbdEndpoint.Config(
new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(),
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
SingleThreadEventLoop eventLoopMock = new DefaultEventLoop();
RntbdClientChannelHealthChecker.Timestamps timestampsMock = Mockito.mock(RntbdClientChannelHealthChecker.Timestamps.class);

Mockito.when(channelMock.pipeline()).thenReturn(channelPipelineMock);
Mockito.when(channelPipelineMock.get(RntbdRequestManager.class)).thenReturn(rntbdRequestManagerMock);
Mockito.when(channelMock.eventLoop()).thenReturn(eventLoopMock);
Mockito.when(rntbdRequestManagerMock.snapshotTimestamps()).thenReturn(timestampsMock);
ChannelPromise defaultChannelPromise = new DefaultChannelPromise(channelMock);
defaultChannelPromise.setSuccess();
Mockito.when(channelMock.writeAndFlush(RntbdHealthCheckRequest.MESSAGE)).thenReturn(defaultChannelPromise);

Instant current = Instant.now();
Instant lastChannelReadTime = current.minusNanos(5 * config.nonRespondingChannelReadDelayTimeLimitInNanos());
Instant lastChannelWriteTime = current.plusSeconds(1);
Instant lastChannelWriteAttemptTime = lastChannelWriteTime;

Mockito.when(timestampsMock.lastChannelReadTime()).thenReturn(lastChannelReadTime);
Mockito.when(timestampsMock.lastChannelWriteTime()).thenReturn(lastChannelWriteTime);
Mockito.when(timestampsMock.lastChannelWriteAttemptTime()).thenReturn(lastChannelWriteAttemptTime);
Mockito.when(timestampsMock.transitTimeoutCount()).thenReturn(0);
Mockito.when(timestampsMock.cancellationCount()).thenReturn(config.cancellationCountSinceLastReadThreshold());

try(MockedStatic<CpuMemoryMonitor> cpuMemoryMonitorMock = Mockito.mockStatic(CpuMemoryMonitor.class)) {
CpuLoadHistory cpuLoadHistoryMock = Mockito.mock(CpuLoadHistory.class);
cpuMemoryMonitorMock.when(CpuMemoryMonitor::getCpuLoad).thenReturn(cpuLoadHistoryMock);
Mockito.when(cpuLoadHistoryMock.isCpuOverThreshold(config.timeoutDetectionDisableCPUThreshold())).thenReturn(false);

if (withFailureReason) {
Future<String> healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue);
assertThat(healthyResult.getNow()).contains("health check failed due to channel being cancellation prone");
} else {
Future<Boolean> healthyResult = healthChecker.isHealthy(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
assertThat(healthyResult.getNow()).isFalse();
}
}
}

@Test(groups = { "unit" }, dataProvider = "isHealthyWithReasonArgs")
public void cancellationPronenessOfChannelWithHighCpuLoad_Test(boolean withFailureReason) throws InterruptedException {
SslContext sslContextMock = Mockito.mock(SslContext.class);

RntbdEndpoint.Config config = new RntbdEndpoint.Config(
new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(),
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
SingleThreadEventLoop eventLoopMock = new DefaultEventLoop();
RntbdClientChannelHealthChecker.Timestamps timestampsMock = Mockito.mock(RntbdClientChannelHealthChecker.Timestamps.class);

Mockito.when(channelMock.pipeline()).thenReturn(channelPipelineMock);
Mockito.when(channelPipelineMock.get(RntbdRequestManager.class)).thenReturn(rntbdRequestManagerMock);
Mockito.when(channelMock.eventLoop()).thenReturn(eventLoopMock);
Mockito.when(rntbdRequestManagerMock.snapshotTimestamps()).thenReturn(timestampsMock);
ChannelPromise defaultChannelPromise = new DefaultChannelPromise(channelMock);
defaultChannelPromise.setSuccess();
Mockito.when(channelMock.writeAndFlush(RntbdHealthCheckRequest.MESSAGE)).thenReturn(defaultChannelPromise);

Instant current = Instant.now();
Instant lastChannelReadTime = current.minusNanos(config.nonRespondingChannelReadDelayTimeLimitInNanos());
Instant lastChannelWriteTime = current.plusSeconds(1);

Mockito.when(timestampsMock.lastChannelReadTime()).thenReturn(lastChannelReadTime);
Mockito.when(timestampsMock.lastChannelWriteTime()).thenReturn(lastChannelWriteTime);
Mockito.when(timestampsMock.lastChannelWriteAttemptTime()).thenReturn(lastChannelWriteTime);
Mockito.when(timestampsMock.transitTimeoutCount()).thenReturn(0);
Mockito.when(timestampsMock.cancellationCount()).thenReturn(config.cancellationCountSinceLastReadThreshold());

try(MockedStatic<CpuMemoryMonitor> cpuMemoryMonitorMock = Mockito.mockStatic(CpuMemoryMonitor.class)) {
CpuLoadHistory cpuLoadHistoryMock = Mockito.mock(CpuLoadHistory.class);
cpuMemoryMonitorMock.when(CpuMemoryMonitor::getCpuLoad).thenReturn(cpuLoadHistoryMock);
Mockito.when(cpuLoadHistoryMock.isCpuOverThreshold(config.timeoutDetectionDisableCPUThreshold())).thenReturn(true);

if (withFailureReason) {
Future<String> healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
assertThat(healthyResult.getNow()).isEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue);
} else {
Future<Boolean> healthyResult = healthChecker.isHealthy(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
assertThat(healthyResult.getNow()).isTrue();
}
}
}

}
5 changes: 4 additions & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
#### Features Added

#### Breaking Changes

* Gone exceptions that are not idempotent should not be retried because it is not known if they succeeded for sure. The handling of the exception in this case is left to the user. Fixed retrying write operations when a gone exception occurs in bulk mode. - See [PR 35838](https://github.com/Azure/azure-sdk-for-java/pull/35838)

#### Bugs Fixed
* Fixed retrying write operations when a gone exception occurs in bulk mode. - See [PR 35838](https://github.com/Azure/azure-sdk-for-java/pull/35838)
* Fixed request start time in the `CosmosDiagnostics` for individual request responses - See [PR 35705](https://github.com/Azure/azure-sdk-for-java/pull/35705)
* Fixed an issue where `ConnectionStateListener` tracked staled `Uris` which fails to mark the current `Uris` unhealthy properly - See [PR 36067](https://github.com/Azure/azure-sdk-for-java/pull/36067)
* Gone exceptions that are not idempotent should not be retried because it is not known if they succeeded for sure. The handling of the exception in this case is left to the user. Fixed retrying write operations when a gone exception occurs in bulk mode. - See [PR 35838](https://github.com/Azure/azure-sdk-for-java/pull/35838)
* Fixed an issue to update the last unhealthy timestamp for an `Uri` instance only when transitioning to `Unhealthy` from a different health status - See [36083](https://github.com/Azure/azure-sdk-for-java/pull/36083)
* Improved the channel health check flow to deem a channel unhealthy when it sees consecutive cancellations. - See [PR 36225](https://github.com/Azure/azure-sdk-for-java/pull/36225)
* Optimized the replica validation flow to validate replica health with `Unknown` health status only when the replica is
used by a container which is also part of the connection warm-up flow. - See [PR 36225](https://github.com/Azure/azure-sdk-for-java/pull/36225)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,16 +979,25 @@ private void validateReplicaAddresses(String collectionRid, AddressInformation[]
.stream()
.map(Uri::getURIAsString)
.collect(Collectors.toList()));

minConnectionsRequiredForEndpoint = this.connectionPolicy.getMinConnectionPoolSizePerEndpoint();
}

for (Uri addressToBeValidated : addressesNeedToValidation) {

// replica validation should be triggered for an address with Unknown health status only when
// the address is used by a container / collection which was part of the warm up flow
if (addressToBeValidated.getHealthStatus() == Uri.HealthStatus.Unknown
&& !this.proactiveOpenConnectionsProcessor.isCollectionRidUnderOpenConnectionsFlow(collectionRid)) {
continue;
}

Mono.fromFuture(this.proactiveOpenConnectionsProcessor
.submitOpenConnectionTaskOutsideLoop(
collectionRid,
this.serviceEndpoint,
addressToBeValidated,
this.connectionPolicy.getMinConnectionPoolSizePerEndpoint()))
minConnectionsRequiredForEndpoint))
.subscribeOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC)
.subscribe();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,12 @@ public static final class Options {
@JsonProperty()
private final Duration timeoutDetectionOnWriteTimeLimit;

@JsonProperty()
private final Duration nonRespondingChannelReadDelayTimeLimit;

@JsonProperty()
private final int cancellationCountSinceLastReadThreshold;

/**
* Used during the open connections flow to determine the
* minimum no. of connections to keep open to a particular
Expand Down Expand Up @@ -667,6 +673,8 @@ private Options(final Builder builder) {
this.timeoutDetectionOnWriteThreshold = builder.timeoutDetectionOnWriteThreshold;
this.timeoutDetectionOnWriteTimeLimit = builder.timeoutDetectionOnWriteTimeLimit;
this.minConnectionPoolSizePerEndpoint = builder.minConnectionPoolSizePerEndpoint;
this.nonRespondingChannelReadDelayTimeLimit = builder.nonRespondingChannelReadDelayTimeLimit;
this.cancellationCountSinceLastReadThreshold = builder.cancellationCountSinceLastReadThreshold;

this.connectTimeout = builder.connectTimeout == null
? builder.tcpNetworkRequestTimeout
Expand Down Expand Up @@ -709,6 +717,8 @@ private Options(final ConnectionPolicy connectionPolicy) {
this.timeoutDetectionOnWriteTimeLimit = Duration.ofSeconds(6L);
this.preferTcpNative = true;
this.minConnectionPoolSizePerEndpoint = connectionPolicy.getMinConnectionPoolSizePerEndpoint();
this.cancellationCountSinceLastReadThreshold = 5;
this.nonRespondingChannelReadDelayTimeLimit = Duration.ofSeconds(60);
}

// endregion
Expand Down Expand Up @@ -841,6 +851,14 @@ public Duration timeoutDetectionOnWriteTimeLimit() {
return this.timeoutDetectionOnWriteTimeLimit;
}

public Duration nonRespondingChannelReadDelayTimeLimit() {
return this.nonRespondingChannelReadDelayTimeLimit;
}

public int cancellationCountSinceLastReadThreshold() {
return this.cancellationCountSinceLastReadThreshold;
}

public int minConnectionPoolSizePerEndpoint() {
return this.minConnectionPoolSizePerEndpoint;
}
Expand Down Expand Up @@ -1018,6 +1036,8 @@ public static class Builder {
private int timeoutDetectionOnWriteThreshold;
private Duration timeoutDetectionOnWriteTimeLimit;
private int minConnectionPoolSizePerEndpoint;
private Duration nonRespondingChannelReadDelayTimeLimit;
private int cancellationCountSinceLastReadThreshold;

// endregion

Expand Down Expand Up @@ -1060,6 +1080,8 @@ public Builder(ConnectionPolicy connectionPolicy) {
this.timeoutDetectionOnWriteThreshold = DEFAULT_OPTIONS.timeoutDetectionOnWriteThreshold;
this.timeoutDetectionOnWriteTimeLimit = DEFAULT_OPTIONS.timeoutDetectionOnWriteTimeLimit;
this.minConnectionPoolSizePerEndpoint = connectionPolicy.getMinConnectionPoolSizePerEndpoint();
this.nonRespondingChannelReadDelayTimeLimit = DEFAULT_OPTIONS.nonRespondingChannelReadDelayTimeLimit;
this.cancellationCountSinceLastReadThreshold = DEFAULT_OPTIONS.cancellationCountSinceLastReadThreshold;
}

// endregion
Expand Down
Loading

0 comments on commit e618ba1

Please sign in to comment.