Skip to content

Commit

Permalink
feat: add Batcher#close(timeout) and Batcher#cancelOutstanding (#3141)
Browse files Browse the repository at this point in the history
There have been reports of batcher.close() hanging every once in awhile.
Currently it is impossible to debug because we dont expose any internal
state to analyze.

This PR adds 2 additional methods that should help in diagnosing issues:
1. close(timeout) will try to close the batcher, but if any of the
underlying batch operations fail, the exception message will contain a
wealth of information describing the underlying state of operations as
provided by #3140
2. cancelOutstanding this allows for remediation for close(timeout)
throwing an exception.

The intended usecase is dataflow connector's FinishBundle:

```java
try {
  batcher.close(Duration.ofMinutes(1));
} catch(TimeoutException e) {
   // log details why the batch failed to close with the help of #3140
   logger.error(e);
   batcher.cancelOutstanding();
  batcher.close(Duration.ofMinutes(1));
}
```

Example exception message:

> Exception in thread "main"
com.google.api.gax.batching.BatchingException: Timed out trying to close
batcher after PT1S. Batch request prototype:
com.google.cloud.bigtable.data.v2.models.BulkMutation@2bac9ba.
Outstanding batches:
Batch{operation=CallbackChainRetryingFuture{super=null,
latestCompletedAttemptResult=ImmediateFailedFuture@6a9d5dff[status=FAILURE,
cause=[com.google.cloud.bigtable.data.v2.models.MutateRowsException:
Some mutations failed to apply]], attemptResult=null,
attemptSettings=TimedAttemptSettings{globalSettings=RetrySettings{totalTimeout=PT10M,
initialRetryDelay=PT0.01S, retryDelayMultiplier=2.0, maxRetryDelay=PT1M,
maxAttempts=0, jittered=true, initialRpcTimeout=PT1M,
rpcTimeoutMultiplier=1.0, maxRpcTimeout=PT1M}, retryDelay=PT1.28S,
rpcTimeout=PT1M, randomizedRetryDelay=PT0.877S, attemptCount=8,
overallAttemptCount=8, firstAttemptStartTimeNanos=646922035424541}},
elements=com.google.cloud.bigtable.data.v2.models.RowMutationEntry@7a344b65}

Co-authored-by: Blake Li <[email protected]>
  • Loading branch information
2 people authored and ldetmer committed Sep 17, 2024
1 parent 9dc2410 commit c865b65
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 13 deletions.
6 changes: 6 additions & 0 deletions gax-java/gax/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,10 @@
<className>com/google/api/gax/tracing/MetricsTracer</className>
<field>*</field>
</difference>
<!-- Ignore method additions to an InternalExtensionOnly interface-->
<difference>
<differenceType>7012</differenceType>
<className>com/google/api/gax/batching/Batcher</className>
<method>*</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.rpc.ApiCallContext;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;

/**
* Represents a batching context where individual elements will be accumulated and flushed in a
Expand Down Expand Up @@ -77,13 +80,25 @@ public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
*/
void sendOutstanding();

/** Cancels all outstanding batch RPCs. */
void cancelOutstanding();

/**
* Closes this Batcher by preventing new elements from being added, and then flushing the existing
* elements.
* Closes this Batcher by preventing new elements from being added, then flushing the existing
* elements and waiting for all the outstanding work to be resolved.
*/
@Override
void close() throws InterruptedException;

/**
* Closes this Batcher by preventing new elements from being added, then flushing the existing
* elements and waiting for all the outstanding work to be resolved. If all of the outstanding
* work has not been resolved, then a {@link BatchingException} will be thrown with details of the
* remaining work. The batcher will remain in a closed state and will not allow additional
* elements to be added.
*/
void close(@Nullable Duration timeout) throws InterruptedException, TimeoutException;

/**
* Closes this Batcher by preventing new elements from being added, and then sending outstanding
* elements. The returned future will be resolved when the last element completes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,29 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
Expand All @@ -86,7 +91,8 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final BatcherReference currentBatcherReference;

private Batch<ElementT, ElementResultT, RequestT, ResponseT> currentOpenBatch;
private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0);
private final ConcurrentMap<Batch<ElementT, ElementResultT, RequestT, ResponseT>, Boolean>
outstandingBatches = new ConcurrentHashMap<>();
private final Object flushLock = new Object();
private final Object elementLock = new Object();
private final Future<?> scheduledFuture;
Expand Down Expand Up @@ -297,8 +303,10 @@ public void sendOutstanding() {
} catch (Exception ex) {
batchResponse = ApiFutures.immediateFailedFuture(ex);
}
accumulatedBatch.setResponseFuture(batchResponse);

outstandingBatches.put(accumulatedBatch, Boolean.TRUE);

numOfOutstandingBatches.incrementAndGet();
ApiFutures.addCallback(
batchResponse,
new ApiFutureCallback<ResponseT>() {
Expand All @@ -310,7 +318,7 @@ public void onSuccess(ResponseT response) {
accumulatedBatch.resource.getByteCount());
accumulatedBatch.onBatchSuccess(response);
} finally {
onBatchCompletion();
onBatchCompletion(accumulatedBatch);
}
}

Expand All @@ -322,18 +330,19 @@ public void onFailure(Throwable throwable) {
accumulatedBatch.resource.getByteCount());
accumulatedBatch.onBatchFailure(throwable);
} finally {
onBatchCompletion();
onBatchCompletion(accumulatedBatch);
}
}
},
directExecutor());
}

private void onBatchCompletion() {
private void onBatchCompletion(Batch<ElementT, ElementResultT, RequestT, ResponseT> batch) {
boolean shouldClose = false;

synchronized (flushLock) {
if (numOfOutstandingBatches.decrementAndGet() == 0) {
outstandingBatches.remove(batch);
if (outstandingBatches.isEmpty()) {
flushLock.notifyAll();
shouldClose = closeFuture != null;
}
Expand All @@ -349,22 +358,43 @@ private void onBatchCompletion() {
}

private void awaitAllOutstandingBatches() throws InterruptedException {
while (numOfOutstandingBatches.get() > 0) {
while (!outstandingBatches.isEmpty()) {
synchronized (flushLock) {
// Check again under lock to avoid racing with onBatchCompletion
if (numOfOutstandingBatches.get() == 0) {
if (outstandingBatches.isEmpty()) {
break;
}
flushLock.wait();
}
}
}

@Override
public void cancelOutstanding() {
for (Batch<?, ?, ?, ?> batch : outstandingBatches.keySet()) {
batch.cancel();
}
}
/** {@inheritDoc} */
@Override
public void close() throws InterruptedException {
try {
closeAsync().get();
close(null);
} catch (TimeoutException e) {
// should never happen with a null timeout
throw new IllegalStateException(
"Unexpected timeout exception when trying to close the batcher without a timeout", e);
}
}

@Override
public void close(@Nullable Duration timeout) throws InterruptedException, TimeoutException {
try {
if (timeout != null) {
closeAsync().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} else {
closeAsync().get();
}
} catch (ExecutionException e) {
// Original stacktrace of a batching exception is not useful, so rethrow the error with
// the caller stacktrace
Expand All @@ -374,6 +404,17 @@ public void close() throws InterruptedException {
} else {
throw new IllegalStateException("unexpected error closing the batcher", e.getCause());
}
} catch (TimeoutException e) {
StringJoiner batchesStr = new StringJoiner(",");
for (Batch<ElementT, ElementResultT, RequestT, ResponseT> batch :
outstandingBatches.keySet()) {
batchesStr.add(batch.toString());
}
String msg = "Timed out trying to close batcher after " + timeout + ".";
msg += " Batch request prototype: " + prototype + ".";
msg += " Outstanding batches: " + batchesStr;

throw new TimeoutException(msg);
}
}

Expand All @@ -393,7 +434,7 @@ public ApiFuture<Void> closeAsync() {
// prevent admission of new elements
closeFuture = SettableApiFuture.create();
// check if we can close immediately
closeImmediately = numOfOutstandingBatches.get() == 0;
closeImmediately = outstandingBatches.isEmpty();
}

// Clean up accounting
Expand Down Expand Up @@ -435,6 +476,8 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private long totalThrottledTimeMs = 0;
private BatchResource resource;

private volatile ApiFuture<ResponseT> responseFuture;

private Batch(
RequestT prototype,
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
Expand All @@ -457,6 +500,17 @@ void add(
totalThrottledTimeMs += throttledTimeMs;
}

void setResponseFuture(@Nonnull ApiFuture<ResponseT> responseFuture) {
Preconditions.checkNotNull(responseFuture);
this.responseFuture = responseFuture;
}

void cancel() {
if (this.responseFuture != null) {
this.responseFuture.cancel(true);
}
}

void onBatchSuccess(ResponseT response) {
try {
descriptor.splitResponse(response, entries);
Expand All @@ -480,6 +534,19 @@ void onBatchFailure(Throwable throwable) {
boolean isEmpty() {
return resource.getElementCount() == 0;
}

@Override
public String toString() {
StringJoiner elementsStr = new StringJoiner(",");
for (BatchEntry<ElementT, ElementResultT> entry : entries) {
elementsStr.add(
Optional.ofNullable(entry.getElement()).map(Object::toString).orElse("null"));
}
return MoreObjects.toStringHelper(this)
.add("responseFuture", responseFuture)
.add("elements", elementsStr)
.toString();
}
}

/**
Expand Down
Loading

0 comments on commit c865b65

Please sign in to comment.