From 5e7292181e321a8464979a56545c546f62c6da21 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Fri, 27 Aug 2021 14:54:09 -0700 Subject: [PATCH 1/7] fix --- .../azure/cosmos/implementation/CosmosSchedulers.java | 7 +++++++ .../directconnectivity/TransportClient.java | 9 +++++++-- .../directconnectivity/rntbd/RntbdServiceEndpoint.java | 3 ++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java index e01014e88b91..c2820b0ddd16 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java @@ -8,6 +8,7 @@ public class CosmosSchedulers { private final static String COSMOS_PARALLEL_THREAD_NAME = "cosmos-parallel"; + private final static String TRANSPORT_CLIENT_BOUNDED_ELASTIC_THREAD_NAME = "transport-client-bounded-elastic"; // Using a custom parallel scheduler to be able to schedule retries etc. // without being vulnerable to scenarios where applications abuse the @@ -16,4 +17,10 @@ public class CosmosSchedulers { COSMOS_PARALLEL_THREAD_NAME, Schedulers.DEFAULT_POOL_SIZE, true); + + public final static Scheduler TRANSPORT_CLIENT_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( + Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + TRANSPORT_CLIENT_BOUNDED_ELASTIC_THREAD_NAME + ); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java index 5a5d40998fc8..d6bb43ebdc5c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java @@ -3,10 +3,13 @@ package com.azure.cosmos.implementation.directconnectivity; +import com.azure.cosmos.implementation.CosmosSchedulers; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; // We suppress the "try" warning here because the close() method's signature // allows it to throw InterruptedException which is strongly advised against @@ -29,10 +32,12 @@ public Mono invokeResourceOperationAsync(Uri physicalAddress, RxD if (this.throughputControlStore != null) { return this.throughputControlStore.processRequest( request, - Mono.defer(() -> this.invokeStoreAsync(physicalAddress, request))); + Mono.defer( + () -> this.invokeStoreAsync(physicalAddress, request).publishOn(CosmosSchedulers.TRANSPORT_CLIENT_BOUNDED_ELASTIC))); } - return this.invokeStoreAsync(physicalAddress, request); + return this.invokeStoreAsync(physicalAddress, request) + .publishOn(CosmosSchedulers.TRANSPORT_CLIENT_BOUNDED_ELASTIC); } protected abstract Mono invokeStoreAsync( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java index 1aad216eae79..8cd3880a1f9f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java @@ -379,7 +379,8 @@ private RntbdRequestRecord writeWhenConnected( assert channel != null : "impossible"; this.releaseToPool(channel); requestRecord.channelTaskQueueLength(RntbdUtils.tryGetExecutorTaskQueueSize(channel.eventLoop())); - channel.write(requestRecord.stage(RntbdRequestRecord.Stage.PIPELINED)); + channel.write(requestRecord.stage(RntbdRequestRecord.Stage.PIPELINED)) + .addListener(ignore -> this.releaseToPool(channel)); return requestRecord; } From 9c8a61513e6ed5b801497d1158b415a2725c8f06 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Mon, 30 Aug 2021 12:22:56 -0700 Subject: [PATCH 2/7] Swith off IO thread for response processing --- .../azure/cosmos/implementation/Configs.java | 10 +++++++ .../implementation/CosmosSchedulers.java | 1 + .../directconnectivity/TransportClient.java | 27 +++++++++++++------ 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index c64b795b6d5c..41078d6b54b5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -91,6 +91,10 @@ public class Configs { "COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS"; private static final int DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS = 50; + // Whether to process the response on a different thread + private static final String DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE_NAME = "COSMOS.SWITCH_OFF_IO_THREAD_FOR_RESPONSE"; + private static final boolean DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE = false; + public Configs() { this.sslContext = sslContextInit(); } @@ -252,6 +256,12 @@ public static int getSessionTokenMismatchMaximumBackoffTimeInMs() { DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS); } + public static boolean shouldSwitchOffIOThreadForResponse() { + return getJVMConfigAsBoolean( + DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE_NAME, + DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE); + } + private static int getJVMConfigAsInt(String propName, int defaultValue) { String propValue = System.getProperty(propName); return getIntValue(propValue, defaultValue); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java index c2820b0ddd16..9861f82ba66a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java @@ -18,6 +18,7 @@ public class CosmosSchedulers { Schedulers.DEFAULT_POOL_SIZE, true); + // Custom bounded elastic scheduler to switch off IO thread as soon as possible. public final static Scheduler TRANSPORT_CLIENT_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java index d6bb43ebdc5c..78c1bc329f70 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java @@ -3,13 +3,12 @@ package com.azure.cosmos.implementation.directconnectivity; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.CosmosSchedulers; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; // We suppress the "try" warning here because the close() method's signature // allows it to throw InterruptedException which is strongly advised against @@ -18,6 +17,7 @@ // signature for backwards compatibility purposes. @SuppressWarnings("try") public abstract class TransportClient implements AutoCloseable { + private static final boolean switchOffIOThreadForResponse = Configs.shouldSwitchOffIOThreadForResponse(); private ThroughputControlStore throughputControlStore; public void enableThroughputControl(ThroughputControlStore throughputControlStore) { @@ -29,18 +29,29 @@ public Mono invokeResourceOperationAsync(Uri physicalAddress, RxD if (StringUtils.isEmpty(request.requestContext.resourcePhysicalAddress)) { request.requestContext.resourcePhysicalAddress = physicalAddress.toString(); } + if (this.throughputControlStore != null) { - return this.throughputControlStore.processRequest( - request, - Mono.defer( - () -> this.invokeStoreAsync(physicalAddress, request).publishOn(CosmosSchedulers.TRANSPORT_CLIENT_BOUNDED_ELASTIC))); + return this.invokeStoreWithThroughputControlAsync(physicalAddress, request); } - return this.invokeStoreAsync(physicalAddress, request) - .publishOn(CosmosSchedulers.TRANSPORT_CLIENT_BOUNDED_ELASTIC); + return this.invokeStoreInternalAsync(physicalAddress, request); } protected abstract Mono invokeStoreAsync( Uri physicalAddress, RxDocumentServiceRequest request); + + private Mono invokeStoreWithThroughputControlAsync(Uri physicalAddress, RxDocumentServiceRequest request) { + return this.throughputControlStore.processRequest( + request, + Mono.defer(() -> this.invokeStoreInternalAsync(physicalAddress, request))); + } + + private Mono invokeStoreInternalAsync(Uri physicalAddress, RxDocumentServiceRequest request) { + if (switchOffIOThreadForResponse) { + return this.invokeStoreAsync(physicalAddress, request).publishOn(CosmosSchedulers.TRANSPORT_CLIENT_BOUNDED_ELASTIC); + } + + return this.invokeStoreAsync(physicalAddress, request); + } } From b2e83e586ee5d73192d2929af69e80812bca64fa Mon Sep 17 00:00:00 2001 From: annie-mac Date: Mon, 30 Aug 2021 12:28:34 -0700 Subject: [PATCH 3/7] rename --- .../com/azure/cosmos/implementation/CosmosSchedulers.java | 8 ++++---- .../directconnectivity/TransportClient.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java index 9861f82ba66a..9845e7c3bb72 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java @@ -8,7 +8,7 @@ public class CosmosSchedulers { private final static String COSMOS_PARALLEL_THREAD_NAME = "cosmos-parallel"; - private final static String TRANSPORT_CLIENT_BOUNDED_ELASTIC_THREAD_NAME = "transport-client-bounded-elastic"; + private final static String TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME = "transport-response-bounded-elastic"; // Using a custom parallel scheduler to be able to schedule retries etc. // without being vulnerable to scenarios where applications abuse the @@ -18,10 +18,10 @@ public class CosmosSchedulers { Schedulers.DEFAULT_POOL_SIZE, true); - // Custom bounded elastic scheduler to switch off IO thread as soon as possible. - public final static Scheduler TRANSPORT_CLIENT_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( + // Custom bounded elastic scheduler to switch off IO thread to process response. + public final static Scheduler TRANSPORT_RESPONSE_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - TRANSPORT_CLIENT_BOUNDED_ELASTIC_THREAD_NAME + TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME ); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java index 78c1bc329f70..0a24e93ef0b2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java @@ -49,7 +49,7 @@ private Mono invokeStoreWithThroughputControlAsync(Uri physicalAd private Mono invokeStoreInternalAsync(Uri physicalAddress, RxDocumentServiceRequest request) { if (switchOffIOThreadForResponse) { - return this.invokeStoreAsync(physicalAddress, request).publishOn(CosmosSchedulers.TRANSPORT_CLIENT_BOUNDED_ELASTIC); + return this.invokeStoreAsync(physicalAddress, request).publishOn(CosmosSchedulers.TRANSPORT_RESPONSE_BOUNDED_ELASTIC); } return this.invokeStoreAsync(physicalAddress, request); From 498e5ee2976e36e712b580a93275d9566d6d73f8 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Mon, 30 Aug 2021 12:33:31 -0700 Subject: [PATCH 4/7] refactor --- .../implementation/directconnectivity/TransportClient.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java index 0a24e93ef0b2..2cbefe0f94fc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java @@ -17,9 +17,13 @@ // signature for backwards compatibility purposes. @SuppressWarnings("try") public abstract class TransportClient implements AutoCloseable { - private static final boolean switchOffIOThreadForResponse = Configs.shouldSwitchOffIOThreadForResponse(); + private final boolean switchOffIOThreadForResponse; private ThroughputControlStore throughputControlStore; + public TransportClient() { + this.switchOffIOThreadForResponse = Configs.shouldSwitchOffIOThreadForResponse(); + } + public void enableThroughputControl(ThroughputControlStore throughputControlStore) { this.throughputControlStore = throughputControlStore; } From adaf9049a8138c5df9e179f383ea0d4137ca277e Mon Sep 17 00:00:00 2001 From: annie-mac Date: Mon, 30 Aug 2021 12:45:32 -0700 Subject: [PATCH 5/7] resolve comments' --- .../directconnectivity/rntbd/RntbdServiceEndpoint.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java index 8cd3880a1f9f..604f2ac4b6fd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java @@ -377,7 +377,6 @@ private RntbdRequestRecord writeWhenConnected( if (connected.isSuccess()) { final Channel channel = (Channel) connected.getNow(); assert channel != null : "impossible"; - this.releaseToPool(channel); requestRecord.channelTaskQueueLength(RntbdUtils.tryGetExecutorTaskQueueSize(channel.eventLoop())); channel.write(requestRecord.stage(RntbdRequestRecord.Stage.PIPELINED)) .addListener(ignore -> this.releaseToPool(channel)); From be752f985adb83eec3f7e5d825d987d10f09bf02 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Thu, 2 Sep 2021 08:19:15 -0700 Subject: [PATCH 6/7] revert releaseToPool change --- .../com/azure/cosmos/implementation/CosmosSchedulers.java | 5 ++++- .../implementation/directconnectivity/TransportClient.java | 6 +----- .../directconnectivity/rntbd/RntbdServiceEndpoint.java | 4 ++-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java index 9845e7c3bb72..fbcbd8836699 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java @@ -9,6 +9,7 @@ public class CosmosSchedulers { private final static String COSMOS_PARALLEL_THREAD_NAME = "cosmos-parallel"; private final static String TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME = "transport-response-bounded-elastic"; + private final int TTL_FOR_SCHEDULER_WORKER = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS // Using a custom parallel scheduler to be able to schedule retries etc. // without being vulnerable to scenarios where applications abuse the @@ -22,6 +23,8 @@ public class CosmosSchedulers { public final static Scheduler TRANSPORT_RESPONSE_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME + TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME, + 60, + true ); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java index 2cbefe0f94fc..14dc94e5f7dd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java @@ -17,13 +17,9 @@ // signature for backwards compatibility purposes. @SuppressWarnings("try") public abstract class TransportClient implements AutoCloseable { - private final boolean switchOffIOThreadForResponse; + private final boolean switchOffIOThreadForResponse = Configs.shouldSwitchOffIOThreadForResponse(); private ThroughputControlStore throughputControlStore; - public TransportClient() { - this.switchOffIOThreadForResponse = Configs.shouldSwitchOffIOThreadForResponse(); - } - public void enableThroughputControl(ThroughputControlStore throughputControlStore) { this.throughputControlStore = throughputControlStore; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java index 604f2ac4b6fd..1aad216eae79 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java @@ -377,9 +377,9 @@ private RntbdRequestRecord writeWhenConnected( if (connected.isSuccess()) { final Channel channel = (Channel) connected.getNow(); assert channel != null : "impossible"; + this.releaseToPool(channel); requestRecord.channelTaskQueueLength(RntbdUtils.tryGetExecutorTaskQueueSize(channel.eventLoop())); - channel.write(requestRecord.stage(RntbdRequestRecord.Stage.PIPELINED)) - .addListener(ignore -> this.releaseToPool(channel)); + channel.write(requestRecord.stage(RntbdRequestRecord.Stage.PIPELINED)); return requestRecord; } From bd2d17876a472378ee1989244fa64c031e27befb Mon Sep 17 00:00:00 2001 From: annie-mac Date: Thu, 2 Sep 2021 08:45:55 -0700 Subject: [PATCH 7/7] fix --- .../com/azure/cosmos/implementation/CosmosSchedulers.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java index fbcbd8836699..b2f4026df29a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java @@ -9,7 +9,7 @@ public class CosmosSchedulers { private final static String COSMOS_PARALLEL_THREAD_NAME = "cosmos-parallel"; private final static String TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME = "transport-response-bounded-elastic"; - private final int TTL_FOR_SCHEDULER_WORKER = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS + private final static int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS // Using a custom parallel scheduler to be able to schedule retries etc. // without being vulnerable to scenarios where applications abuse the @@ -24,7 +24,7 @@ public class CosmosSchedulers { Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME, - 60, + TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true ); }