Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust MicroBatchSize dynamically based on throttling rate in BulkExecutor #22290

Merged

Conversation

FabianMeiswinkel
Copy link
Member

@FabianMeiswinkel FabianMeiswinkel commented Jun 15, 2021

This PR changes the way how we determine the micro-batch size when using bulk execution (CosmosAsyncContainer.processBulkOperations). Instead of relying on a user provided static micro batch size the size gets dynamically adjusted based on the percentage of throttled operations (either because the entire batch request is throttled or when the batch request is partially successful with some operations being throttled).
To be able to do this in the BulkExecutor I had to change the behavior in the ClientRetryPolicy's ResourceThrottlingRetryPolicy to allow 429s to bubble up to the BulkExecutor - so that the BulkExecutor's ResourceThrottlingRetryPolicy could trigger the retry and account for the throttled operations that way.
The riskiest change was that reactor's bufferTimeout operator doesn't allow specifying a dynamic maxBufferSize - so I had to switch to another operator (bufferUntil) and implement a custom timer triggered mechanism to flush the buffers to drain the remaining operations form the buffers. This timer based mechanism would only be triggered after the input Flux (the user provided Flux of Operations to be executed) has been closed.

From my initial tests with the Spark end-to-end samples this approach works very well. It reduces the percentage of throttled requests significantly when no client-throughput control is enabled and with client throughput control it helps reducing the micro batch size so that the achievable throughput is as expected while also allowing the throughput to be limited reasonably well.

@FabianMeiswinkel FabianMeiswinkel changed the title [DRAFT - do not review yet]Adjust MicroBatchSize dynamically based on throttling rate in BulkExecutor Adjust MicroBatchSize dynamically based on throttling rate in BulkExecutor Jun 16, 2021
@@ -14,7 +14,9 @@
public static final int MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST = 100;

public static final int DEFAULT_MAX_MICRO_BATCH_INTERVAL_IN_MILLISECONDS = 100;
public static final int DEFAULT_MAX_MICRO_BATCH_CONCURRENCY = 2;
public static final int DEFAULT_MAX_MICRO_BATCH_CONCURRENCY = 1;
Copy link
Member

@xinlian12 xinlian12 Jun 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the concurrency here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because concurrency of 2 is pretty much always wrong. Even a single client with 4 cores can easily saturate the 10,000 RU of a physical partition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this change was intentional - but leaving the comment open for others to chime in as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the concurrency should always be 1, then should we remove the setMaxMicroBatchConcurrency public api from bulkOption?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only scenario where I think a higher maxConcurrency makes sense is if you build a webservice accepting requests containing info that results in multiple documents you want to ingest - so where your call to processBulkOperations would only contain let's say a couple dozen/hundred documents. I can imagine that latency might be better with higher concurrency - but this is an edge case - the concurrency can still be modified by customers but 1 as default seems to be meeting most scenarios better

@FabianMeiswinkel FabianMeiswinkel merged commit ddafb5f into Azure:main Jun 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants