-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adjust MicroBatchSize dynamically based on throttling rate in BulkExecutor #22290
Adjust MicroBatchSize dynamically based on throttling rate in BulkExecutor #22290
Conversation
…e accounted for in dynamic MicroBatchSize adjustment
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BulkProcessingOptions.java
Outdated
Show resolved
Hide resolved
…into users/fabianm/dynamicMicroBatchSize
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BulkProcessingThresholds.java
Show resolved
Hide resolved
@@ -14,7 +14,9 @@ | |||
public static final int MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST = 100; | |||
|
|||
public static final int DEFAULT_MAX_MICRO_BATCH_INTERVAL_IN_MILLISECONDS = 100; | |||
public static final int DEFAULT_MAX_MICRO_BATCH_CONCURRENCY = 2; | |||
public static final int DEFAULT_MAX_MICRO_BATCH_CONCURRENCY = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change the concurrency here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because concurrency of 2 is pretty much always wrong. Even a single client with 4 cores can easily saturate the 10,000 RU of a physical partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this change was intentional - but leaving the comment open for others to chime in as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the concurrency should always be 1, then should we remove the setMaxMicroBatchConcurrency public api from bulkOption?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only scenario where I think a higher maxConcurrency makes sense is if you build a webservice accepting requests containing info that results in multiple documents you want to ingest - so where your call to processBulkOperations would only contain let's say a couple dozen/hundred documents. I can imagine that latency might be better with higher concurrency - but this is an edge case - the concurrency can still be modified by customers but 1 as default seems to be meeting most scenarios better
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java
Outdated
Show resolved
Hide resolved
...re-cosmos/src/main/java/com/azure/cosmos/implementation/batch/FlushBuffersItemOperation.java
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java
Show resolved
Hide resolved
...ure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java
Outdated
Show resolved
Hide resolved
...ure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java
Show resolved
Hide resolved
…to users/fabianm/dynamicMicroBatchSize
This PR changes the way how we determine the micro-batch size when using bulk execution (
CosmosAsyncContainer.processBulkOperations
). Instead of relying on a user provided static micro batch size the size gets dynamically adjusted based on the percentage of throttled operations (either because the entire batch request is throttled or when the batch request is partially successful with some operations being throttled).To be able to do this in the BulkExecutor I had to change the behavior in the ClientRetryPolicy's ResourceThrottlingRetryPolicy to allow 429s to bubble up to the BulkExecutor - so that the BulkExecutor's ResourceThrottlingRetryPolicy could trigger the retry and account for the throttled operations that way.
The riskiest change was that reactor's bufferTimeout operator doesn't allow specifying a dynamic maxBufferSize - so I had to switch to another operator (bufferUntil) and implement a custom timer triggered mechanism to flush the buffers to drain the remaining operations form the buffers. This timer based mechanism would only be triggered after the input Flux (the user provided Flux of Operations to be executed) has been closed.
From my initial tests with the Spark end-to-end samples this approach works very well. It reduces the percentage of throttled requests significantly when no client-throughput control is enabled and with client throughput control it helps reducing the micro batch size so that the achievable throughput is as expected while also allowing the throughput to be limited reasonably well.