Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust MicroBatchSize dynamically based on throttling rate in BulkExecutor #22290

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
af621d7
Temp snapshot
FabianMeiswinkel Jun 12, 2021
cddd9ba
Adjusting MicroBatchSize dynamically in BulkExecutor.java
FabianMeiswinkel Jun 15, 2021
b35008a
Making sure Bulk Request 429 bubble up to the BulkExecutor so they ar…
FabianMeiswinkel Jun 15, 2021
5d084a7
Adjusting targeted bulk throttling retry rate to be a range
FabianMeiswinkel Jun 16, 2021
e586150
Reducing lock contention in PartitionScopeThresholds.java
FabianMeiswinkel Jun 16, 2021
2c80e70
Adding unit test coverage for dynamically changing micro batch size i…
FabianMeiswinkel Jun 16, 2021
81f13c9
Adjusting log level in PartitionScopeThresholds
FabianMeiswinkel Jun 16, 2021
7df608d
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jun 16, 2021
953cb75
Moving new API to V4_17_0 Beta annotation
FabianMeiswinkel Jun 16, 2021
74b679a
Adding missing copyright header
FabianMeiswinkel Jun 16, 2021
d8bc67b
Removing 408 special-casing
FabianMeiswinkel Jun 16, 2021
7fb889c
Reacting to code review feedback
FabianMeiswinkel Jun 16, 2021
c9dd8df
Reacting to code review feedback
FabianMeiswinkel Jun 16, 2021
94afa94
Reenabling Direct tests
FabianMeiswinkel Jun 16, 2021
fd6fb01
Fixing a bug in the new buffering logic causing 400-BadRequest when t…
FabianMeiswinkel Jun 18, 2021
f342a90
Fixing type
FabianMeiswinkel Jun 18, 2021
b93d87a
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Jun 18, 2021
83b6be3
Fixes for merge conflicts
FabianMeiswinkel Jun 18, 2021
ba971bc
Dummy
FabianMeiswinkel Jun 22, 2021
9938ad8
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Jun 22, 2021
f20f814
Update BulkWriter.scala
FabianMeiswinkel Jun 22, 2021
fd5de14
Update BulkProcessingThresholds.java
FabianMeiswinkel Jun 22, 2021
ca6fc50
Reverting BridgeInternal changes
FabianMeiswinkel Jun 22, 2021
986d925
Update BridgeInternal.java
FabianMeiswinkel Jun 22, 2021
b99113b
Update BulkProcessingOptionsTest.java
FabianMeiswinkel Jun 22, 2021
62bfdba
Triggering flush on completion of input flux
FabianMeiswinkel Jun 22, 2021
74a1d2d
Self-code review feedback :-)
FabianMeiswinkel Jun 22, 2021
424d4d2
Update BulkProcessingThresholds.java
FabianMeiswinkel Jun 22, 2021
e169843
Fixing Nullref in BulkWriterTest
FabianMeiswinkel Jun 22, 2021
63b1704
Making FlushBuffersItemOperation a singleton
FabianMeiswinkel Jun 22, 2021
1cfb88e
Fixing build break
FabianMeiswinkel Jun 22, 2021
3eb575f
Fixing test failure
FabianMeiswinkel Jun 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ package com.azure.cosmos.spark
import com.azure.cosmos.implementation.guava25.base.Preconditions
import com.azure.cosmos.models.PartitionKey
import com.azure.cosmos.spark.BulkWriter.DefaultMaxPendingOperationPerCore
import com.azure.cosmos.{BulkItemRequestOptions, BulkOperations, CosmosAsyncContainer, CosmosBulkOperationResponse, CosmosException, CosmosItemOperation}
import com.azure.cosmos.
{
BulkItemRequestOptions,
BulkOperations,
BulkProcessingOptions,
BulkProcessingThresholds,
CosmosAsyncContainer,
CosmosBulkOperationResponse,
CosmosException,
CosmosItemOperation
}
import com.fasterxml.jackson.databind.node.ObjectNode
import reactor.core.Disposable
import reactor.core.publisher.Sinks
Expand Down Expand Up @@ -64,7 +74,11 @@ class BulkWriter(container: CosmosAsyncContainer,

private val subscriptionDisposable: Disposable = {
val bulkOperationResponseFlux: SFlux[CosmosBulkOperationResponse[Object]] =
container.processBulkOperations[Object](bulkInputEmitter.asFlux()).asScala
container
.processBulkOperations[Object](
bulkInputEmitter.asFlux(),
new BulkProcessingOptions[Object](null, BulkWriter.bulkProcessingThresholds))
.asScala

bulkOperationResponseFlux.subscribe(
resp => {
Expand All @@ -77,7 +91,7 @@ class BulkWriter(container: CosmosAsyncContainer,

if (resp.getException != null) {
Option(resp.getException) match {
case Some(cosmosException: CosmosException) => {
case Some(cosmosException: CosmosException) =>
logDebug(s"encountered ${cosmosException.getStatusCode}")
if (shouldIgnore(cosmosException)) {
logDebug(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
Expand Down Expand Up @@ -108,7 +122,6 @@ class BulkWriter(container: CosmosAsyncContainer,
captureIfFirstFailure(cosmosException)
cancelWork()
}
}
case _ =>
logWarning(s"unexpected failure: itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
s"encountered , attemptNumber=${context.attemptNumber}, exceptionMessage=${resp.getException.getMessage}", resp.getException)
Expand Down Expand Up @@ -156,7 +169,7 @@ class BulkWriter(container: CosmosAsyncContainer,

semaphore.acquire()
val cnt = totalScheduledMetrics.getAndIncrement()
logDebug(s"total scheduled ${cnt}")
logDebug(s"total scheduled $cnt")

scheduleWriteInternal(partitionKeyValue, objectNode, OperationContext(getId(objectNode), partitionKeyValue, getETag(objectNode), 1))
}
Expand Down Expand Up @@ -194,7 +207,7 @@ class BulkWriter(container: CosmosAsyncContainer,

// the caller has to ensure that after invoking this method scheduleWrite doesn't get invoked
override def flushAndClose(): Unit = {
this.synchronized{
this.synchronized {
try {
if (closed.get()) {
// scalastyle:off return
Expand All @@ -206,6 +219,11 @@ class BulkWriter(container: CosmosAsyncContainer,

logInfo(s"completed so far ${totalSuccessfulIngestionMetrics.get()}, pending tasks ${activeOperations.size}")

// Closing the input flux will cause the BulkExecutor to flush all buffered
// item operations and start a timer that would flush regularly
logInfo("invoking bulkInputEmitter.onComplete()")
bulkInputEmitter.tryEmitComplete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is a specific reason why this is moved before the lock section?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline, I think we might be having a deadlock due to retires


// error handling, if there is any error and the subscription is cancelled
// the remaining tasks will not be processed hence we never reach activeTasks = 0
// once we do error handling we should think how to cover the scenario.
Expand All @@ -218,9 +236,7 @@ class BulkWriter(container: CosmosAsyncContainer,
lock.unlock()
}

logInfo("invoking bulkInputEmitter.onComplete()")
semaphore.release(activeTasks.get())
bulkInputEmitter.tryEmitComplete()

// which error to report?
if (errorCaptureFirstException.get() != null) {
Expand All @@ -233,7 +249,7 @@ class BulkWriter(container: CosmosAsyncContainer,
assume(semaphore.availablePermits() == maxPendingOperations)

logInfo(s"flushAndClose completed with no error. " +
s"totalSuccessfulIngestionMetrics=${totalSuccessfulIngestionMetrics.get()}, totalScheduled=${totalScheduledMetrics}")
s"totalSuccessfulIngestionMetrics=${totalSuccessfulIngestionMetrics.get()}, totalScheduled=$totalScheduledMetrics")
assume(totalScheduledMetrics.get() == totalSuccessfulIngestionMetrics.get)
} finally {
closed.set(true)
Expand All @@ -252,7 +268,7 @@ class BulkWriter(container: CosmosAsyncContainer,
}
}

private def captureIfFirstFailure(throwable: Throwable) = {
private def captureIfFirstFailure(throwable: Throwable): Unit = {
logError("capture failure", throwable)
lock.lock()
try {
Expand Down Expand Up @@ -309,10 +325,12 @@ private object BulkWriter {
// hence we want 2MB/ 1KB items per partition to be buffered
// 2 * 1024 * 167 items should get buffered on a 16 CPU core VM
// so per CPU core we want (2 * 1024 * 167 / 16) max items to be buffered
val DefaultMaxPendingOperationPerCore = 2 * 1024 * 167 / 16
val DefaultMaxPendingOperationPerCore: Int = 2 * 1024 * 167 / 16

val emitFailureHandler: EmitFailureHandler =
(signalType, emitResult) => if (emitResult.equals(EmitResult.FAIL_NON_SERIALIZED)) true else false
(_, emitResult) => if (emitResult.equals(EmitResult.FAIL_NON_SERIALIZED)) true else false

val bulkProcessingThresholds = new BulkProcessingThresholds[Object]()
}

//scalastyle:on multiple.string.literals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.azure.cosmos.implementation.StoredProcedureResponse;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.Warning;
import com.azure.cosmos.implementation.batch.PartitionScopeThresholds;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
import com.azure.cosmos.implementation.directconnectivity.Uri;
Expand Down Expand Up @@ -817,4 +818,11 @@ public static RetryContext getRetryContext(CosmosDiagnostics cosmosDiagnostics)
return null;
}
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <TContext> ConcurrentMap<String, PartitionScopeThresholds<TContext>> getPartitionScopeThresholds(
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
BulkProcessingThresholds<TContext> thresholds) {

return thresholds.getPartitionScopeThresholds();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,29 @@ public final class BulkProcessingOptions<TContext> {

private int maxMicroBatchSize = BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST;
private int maxMicroBatchConcurrency = BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_CONCURRENCY;
private Duration maxMicroBatchInterval = Duration.ofMillis(BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_INTERVAL_IN_MILLISECONDS);
private double maxMicroBatchRetryRate = BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_RETRY_RATE;
private double minMicroBatchRetryRate = BatchRequestResponseConstants.DEFAULT_MIN_MICRO_BATCH_RETRY_RATE;
private Duration maxMicroBatchInterval = Duration.ofMillis(
BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_INTERVAL_IN_MILLISECONDS);
private final TContext batchContext;
private final BulkProcessingThresholds<TContext> thresholds;

public BulkProcessingOptions(TContext batchContext) {
@Beta(value = Beta.SinceVersion.V4_17_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public BulkProcessingOptions(TContext batchContext, BulkProcessingThresholds<TContext> thresholds) {
this.batchContext = batchContext;
if (thresholds == null) {
this.thresholds = new BulkProcessingThresholds<>();
} else {
this.thresholds = thresholds;
}
}

@Beta(value = Beta.SinceVersion.V4_9_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public BulkProcessingOptions(TContext batchContext) {
this(batchContext, null);
}

@Beta(value = Beta.SinceVersion.V4_9_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public BulkProcessingOptions() {
this(null);
}
Expand All @@ -35,19 +51,17 @@ public int getMaxMicroBatchSize() {
}

/**
* The batching size for bulk operations. This value determines number of operations executed in one request.
* There is an upper limit on both number of operations and sum of size of operations. Any overflow is internally
* retried(without keeping any count).
*
* Always good to select a value such that there is less un-necessary retry as much as possible.
* For eg. If max operation count supported is 100, and user passes the value to be 120, 20 operations will be retried
* all the time(without any execution i.e. in the client logic). So it's better to choose a value of 100 in this case.
* The maximum batching size for bulk operations. This value determines number of operations executed in one
* request. There is an upper limit on both number of operations and sum of size of operations. Any overflow is
* internally retried.
*
* Another instance is: Currently we support a max limit of 200KB, and user select batch size to be 100 and individual
* documents are of size 20KB, approximately 90 operations will always be retried. So it's better to choose a batch
* size of 10 here if user is aware of there workload. If sizes are totally unknown and user cannot put a number on it
* then retries are handled, so no issues as such.
*
* If the retry rate exceeds `getMaxMicroBatchInterval` the micro batch size gets dynamically reduced at runtime
*
* @param maxMicroBatchSize batching size.
*
* @return the bulk processing options.
Expand Down Expand Up @@ -94,8 +108,50 @@ public BulkProcessingOptions<TContext> setMaxMicroBatchInterval(Duration maxMicr
return this;
}

@Beta(value = Beta.SinceVersion.V4_17_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public double getMaxTargetedMicroBatchRetryRate() {
return this.maxMicroBatchRetryRate;
}

/**
* The acceptable retry rate bandwidth. This value determines how aggressively the actual micro batch size
* gets reduced or increased if the number of retries (for example due to 429 - Throttling or because the total
* request size exceeds the payload limit) is higher or lower that the targeted range.
*
* @param minRetryRate minimum targeted retry rate of batch requests. If the retry rate is
* lower than this threshold the micro batch size will be dynamically increased over time
* @param maxRetryRate maximum retry rate of batch requests that is treated as acceptable. If the retry rate is
* higher than this threshold the micro batch size will be dynamically reduced over time
*
* @return the bulk processing options.
*/
@Beta(value = Beta.SinceVersion.V4_17_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public BulkProcessingOptions<TContext> setTargetedMicroBatchRetryRate(double minRetryRate, double maxRetryRate) {
if (minRetryRate < 0) {
throw new IllegalArgumentException("The maxRetryRate must not be a negative value");
}

if (minRetryRate > maxRetryRate) {
throw new IllegalArgumentException("The minRetryRate must not exceed the maxRetryRate");
}

this.maxMicroBatchRetryRate = maxRetryRate;
this.minMicroBatchRetryRate = minRetryRate;
return this;
}

@Beta(value = Beta.SinceVersion.V4_17_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public double getMinTargetedMicroBatchRetryRate() {
return this.minMicroBatchRetryRate;
}

@Beta(value = Beta.SinceVersion.V4_9_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public TContext getBatchContext() {
return batchContext;
}

@Beta(value = Beta.SinceVersion.V4_17_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public BulkProcessingThresholds<TContext> getThresholds() {
return this.thresholds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved

import com.azure.cosmos.implementation.batch.PartitionScopeThresholds;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class BulkProcessingThresholds<TContext> {
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
private final ConcurrentMap<String, PartitionScopeThresholds<TContext>> partitionScopeThresholds;

public BulkProcessingThresholds() {
this.partitionScopeThresholds = new ConcurrentHashMap<>();
}

ConcurrentMap<String, PartitionScopeThresholds<TContext>> getPartitionScopeThresholds() {
return this.partitionScopeThresholds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public ClientRetryPolicy(DiagnosticsClientContext diagnosticsClientContext,
this.throttlingRetry = new ResourceThrottleRetryPolicy(
throttlingRetryOptions.getMaxRetryAttemptsOnThrottledRequests(),
throttlingRetryOptions.getMaxRetryWaitTime(),
BridgeInternal.getRetryContext(this.getCosmosDiagnostics()));
BridgeInternal.getRetryContext(this.getCosmosDiagnostics()),
false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
import static com.azure.cosmos.implementation.guava27.Strings.lenientFormat;

public class CosmosDaemonThreadFactory implements ThreadFactory {
private static final String NAME_TEMPLATE = "cosmos-daemon-%s[%s]";
private final String namePrefix;
private final AtomicInteger threadCount;

public CosmosDaemonThreadFactory(String namePrefix) {
checkNotNull(namePrefix, "Argument namePrefix must not be null.");
this.namePrefix = namePrefix;
this.threadCount = new AtomicInteger(0);
}

@Override
public Thread newThread(Runnable r) {
final String name = lenientFormat(NAME_TEMPLATE, this.namePrefix, this.threadCount.incrementAndGet());
Thread t = new Thread(r, name);
t.setDaemon(true);
return t;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ public static class SubStatusCodes {

// Client generated offer not configured exception
public static final int OFFER_NOT_CONFIGURED = 10004;

// Client generated request rate too large exception
public static final int THROUGHPUT_CONTROL_BULK_REQUEST_RATE_TOO_LARGE = 10005;
}

public static class HeaderValues {
Expand Down
Loading