diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index c64b795b6d5c..41078d6b54b5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -91,6 +91,10 @@ public class Configs { "COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS"; private static final int DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS = 50; + // Whether to process the response on a different thread + private static final String DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE_NAME = "COSMOS.SWITCH_OFF_IO_THREAD_FOR_RESPONSE"; + private static final boolean DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE = false; + public Configs() { this.sslContext = sslContextInit(); } @@ -252,6 +256,12 @@ public static int getSessionTokenMismatchMaximumBackoffTimeInMs() { DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS); } + public static boolean shouldSwitchOffIOThreadForResponse() { + return getJVMConfigAsBoolean( + DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE_NAME, + DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE); + } + private static int getJVMConfigAsInt(String propName, int defaultValue) { String propValue = System.getProperty(propName); return getIntValue(propValue, defaultValue); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java index e01014e88b91..b2f4026df29a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java @@ -8,6 +8,8 @@ public class CosmosSchedulers { private final static String COSMOS_PARALLEL_THREAD_NAME = "cosmos-parallel"; + private final static String TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME = "transport-response-bounded-elastic"; + private final static int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS // Using a custom parallel scheduler to be able to schedule retries etc. // without being vulnerable to scenarios where applications abuse the @@ -16,4 +18,13 @@ public class CosmosSchedulers { COSMOS_PARALLEL_THREAD_NAME, Schedulers.DEFAULT_POOL_SIZE, true); + + // Custom bounded elastic scheduler to switch off IO thread to process response. + public final static Scheduler TRANSPORT_RESPONSE_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( + Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME, + TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, + true + ); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java index 5a5d40998fc8..14dc94e5f7dd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java @@ -3,6 +3,8 @@ package com.azure.cosmos.implementation.directconnectivity; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.CosmosSchedulers; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; @@ -15,6 +17,7 @@ // signature for backwards compatibility purposes. @SuppressWarnings("try") public abstract class TransportClient implements AutoCloseable { + private final boolean switchOffIOThreadForResponse = Configs.shouldSwitchOffIOThreadForResponse(); private ThroughputControlStore throughputControlStore; public void enableThroughputControl(ThroughputControlStore throughputControlStore) { @@ -26,16 +29,29 @@ public Mono invokeResourceOperationAsync(Uri physicalAddress, RxD if (StringUtils.isEmpty(request.requestContext.resourcePhysicalAddress)) { request.requestContext.resourcePhysicalAddress = physicalAddress.toString(); } + if (this.throughputControlStore != null) { - return this.throughputControlStore.processRequest( - request, - Mono.defer(() -> this.invokeStoreAsync(physicalAddress, request))); + return this.invokeStoreWithThroughputControlAsync(physicalAddress, request); } - return this.invokeStoreAsync(physicalAddress, request); + return this.invokeStoreInternalAsync(physicalAddress, request); } protected abstract Mono invokeStoreAsync( Uri physicalAddress, RxDocumentServiceRequest request); + + private Mono invokeStoreWithThroughputControlAsync(Uri physicalAddress, RxDocumentServiceRequest request) { + return this.throughputControlStore.processRequest( + request, + Mono.defer(() -> this.invokeStoreInternalAsync(physicalAddress, request))); + } + + private Mono invokeStoreInternalAsync(Uri physicalAddress, RxDocumentServiceRequest request) { + if (switchOffIOThreadForResponse) { + return this.invokeStoreAsync(physicalAddress, request).publishOn(CosmosSchedulers.TRANSPORT_RESPONSE_BOUNDED_ELASTIC); + } + + return this.invokeStoreAsync(physicalAddress, request); + } }