From 4c741077d614093d08665e9ddd83fb0e332b7881 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 23 Jun 2023 11:59:55 -0400 Subject: [PATCH] fix: abstract batch resource and add method to determine if batch should be flushed (#1790) --- gax-java/gax/pom.xml | 1 + .../api/gax/batching/BatchResource.java | 56 ++++++++++++++ .../google/api/gax/batching/BatcherImpl.java | 66 +++++++++------- .../api/gax/batching/BatchingDescriptor.java | 13 ++++ .../gax/batching/DefaultBatchResource.java | 76 +++++++++++++++++++ .../api/gax/batching/BatcherImplTest.java | 32 +++++++- 6 files changed, 213 insertions(+), 31 deletions(-) create mode 100644 gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java create mode 100644 gax-java/gax/src/main/java/com/google/api/gax/batching/DefaultBatchResource.java diff --git a/gax-java/gax/pom.xml b/gax-java/gax/pom.xml index 8fb0bf47e7..df1ea44bb8 100644 --- a/gax-java/gax/pom.xml +++ b/gax-java/gax/pom.xml @@ -88,6 +88,7 @@ com/google/api/gax/rpc/RequestUrlParamsEncoder + com/google/api/gax/batching/BatchingDescriptor diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java new file mode 100644 index 0000000000..91cd70c0d4 --- /dev/null +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import com.google.api.core.InternalApi; + +/** + * Represent the resource used by a batch including element and byte. It can also be extended to + * other things to determine if adding a new element needs to be flow controlled or if the current + * batch needs to be flushed. + */ +@InternalApi("For google-cloud-java client use only.") +public interface BatchResource { + + /** Adds the additional resource. */ + BatchResource add(BatchResource resource); + + /** Returns the element count of this resource. */ + long getElementCount(); + + /** Returns the byte count of this resource. */ + long getByteCount(); + + /** + * Checks if the current {@link BatchResource} should be flushed based on the maxElementThreshold + * and maxBytesThreshold. + */ + boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold); +} diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index e46f61babd..415e43610a 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -95,6 +95,11 @@ public class BatcherImpl private final FlowController flowController; private final ApiCallContext callContext; + // If element threshold or bytes threshold is 0, it means that it'll always flush every element + // without batching + private final long elementThreshold; + private final long bytesThreshold; + /** * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements * into wrappers request and response @@ -192,7 +197,7 @@ public BatcherImpl( + "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold"); } this.flowController = flowController; - currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats); if (batchingSettings.getDelayThreshold() != null) { long delay = batchingSettings.getDelayThreshold().toMillis(); PushCurrentBatchRunnable runnable = @@ -204,6 +209,11 @@ public BatcherImpl( } currentBatcherReference = new BatcherReference(this); this.callContext = callContext; + + Long elementCountThreshold = batchingSettings.getElementCountThreshold(); + this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold; + Long requestByteThreshold = batchingSettings.getRequestByteThreshold(); + this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold; } /** {@inheritDoc} */ @@ -213,7 +223,7 @@ public ApiFuture add(ElementT element) { // will only be done from a single calling thread. Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher"); - long bytesSize = batchingDescriptor.countBytes(element); + BatchResource newResource = batchingDescriptor.createResource(element); // This is not the optimal way of throttling. It does not send out partial batches, which // means that the Batcher might not use up all the resources allowed by FlowController. @@ -232,7 +242,7 @@ public ApiFuture add(ElementT element) { // defer it till we decide on if refactoring FlowController is necessary. Stopwatch stopwatch = Stopwatch.createStarted(); try { - flowController.reserve(1, bytesSize); + flowController.reserve(newResource.getElementCount(), newResource.getByteCount()); } catch (FlowControlException e) { // This exception will only be thrown if the FlowController is set to ThrowException behavior throw FlowControlRuntimeException.fromFlowControlException(e); @@ -241,12 +251,16 @@ public ApiFuture add(ElementT element) { SettableApiFuture result = SettableApiFuture.create(); synchronized (elementLock) { - currentOpenBatch.add(element, result, throttledTimeMs); - } + if (currentOpenBatch + .resource + .add(newResource) + .shouldFlush(elementThreshold, bytesThreshold)) { + sendOutstanding(); + } - if (currentOpenBatch.hasAnyThresholdReached()) { - sendOutstanding(); + currentOpenBatch.add(element, newResource, result, throttledTimeMs); } + return result; } @@ -267,7 +281,7 @@ public void sendOutstanding() { return; } accumulatedBatch = currentOpenBatch; - currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats); } // This check is for old clients that instantiated the batcher without ApiCallContext @@ -291,7 +305,9 @@ public void sendOutstanding() { @Override public void onSuccess(ResponseT response) { try { - flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter); + flowController.release( + accumulatedBatch.resource.getElementCount(), + accumulatedBatch.resource.getByteCount()); accumulatedBatch.onBatchSuccess(response); } finally { onBatchCompletion(); @@ -301,7 +317,9 @@ public void onSuccess(ResponseT response) { @Override public void onFailure(Throwable throwable) { try { - flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter); + flowController.release( + accumulatedBatch.resource.getElementCount(), + accumulatedBatch.resource.getByteCount()); accumulatedBatch.onBatchFailure(throwable); } finally { onBatchCompletion(); @@ -412,34 +430,30 @@ private static class Batch { private final BatchingRequestBuilder builder; private final List> entries; private final BatchingDescriptor descriptor; - private final BatcherStats batcherStats; - private final long elementThreshold; - private final long bytesThreshold; - private long elementCounter = 0; - private long byteCounter = 0; + private final BatcherStats batcherStats; private long totalThrottledTimeMs = 0; + private BatchResource resource; private Batch( RequestT prototype, BatchingDescriptor descriptor, - BatchingSettings batchingSettings, BatcherStats batcherStats) { this.descriptor = descriptor; this.builder = descriptor.newRequestBuilder(prototype); this.entries = new ArrayList<>(); - Long elementCountThreshold = batchingSettings.getElementCountThreshold(); - this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold; - Long requestByteThreshold = batchingSettings.getRequestByteThreshold(); - this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold; this.batcherStats = batcherStats; + this.resource = descriptor.createEmptyResource(); } - void add(ElementT element, SettableApiFuture result, long throttledTimeMs) { + void add( + ElementT element, + BatchResource newResource, + SettableApiFuture result, + long throttledTimeMs) { builder.add(element); entries.add(BatchEntry.create(element, result)); - elementCounter++; - byteCounter += descriptor.countBytes(element); + resource = resource.add(newResource); totalThrottledTimeMs += throttledTimeMs; } @@ -464,11 +478,7 @@ void onBatchFailure(Throwable throwable) { } boolean isEmpty() { - return elementCounter == 0; - } - - boolean hasAnyThresholdReached() { - return elementCounter >= elementThreshold || byteCounter >= bytesThreshold; + return resource.getElementCount() == 0; } } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java index d81068a98f..41861c7903 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java @@ -96,4 +96,17 @@ public interface BatchingDescriptor maxElementThreshold || getByteCount() > maxBytesThreshold; + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setElementCount(long elementCount); + + abstract Builder setByteCount(long byteCount); + + abstract DefaultBatchResource build(); + } +} diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 1f009c2276..47454563a1 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -93,7 +93,7 @@ public class BatcherImplTest { BatchingSettings.newBuilder() .setElementCountThreshold(1000L) .setRequestByteThreshold(1000L) - .setDelayThreshold(Duration.ofSeconds(1)) + .setDelayThreshold(Duration.ofSeconds(1000)) .build(); @After @@ -376,6 +376,7 @@ public void testWhenThresholdIsDisabled() throws Exception { .build(); underTest = createDefaultBatcherImpl(settings, null); Future result = underTest.add(2); + underTest.add(3); assertThat(result.isDone()).isTrue(); assertThat(result.get()).isEqualTo(4); } @@ -895,7 +896,7 @@ public void run() { // Mockito recommends using verify() as the ONLY way to interact with Argument // captors - otherwise it may incur in unexpected behaviour - Mockito.verify(callContext).withOption(key.capture(), value.capture()); + Mockito.verify(callContext, Mockito.timeout(100)).withOption(key.capture(), value.capture()); // Verify that throttled time is recorded in ApiCallContext assertThat(key.getValue()).isSameInstanceAs(Batcher.THROTTLED_TIME_KEY); @@ -1008,12 +1009,37 @@ public ApiFuture futureCall(Object o, ApiCallContext apiCallContext) { Assert.assertThrows(RuntimeException.class, batcher::close); } + @Test + public void testDefaultShouldFlush() { + BatchResource resource = + DefaultBatchResource.builder().setElementCount(2).setByteCount(2).build(); + + assertThat(resource.shouldFlush(2, 2)).isFalse(); + assertThat(resource.shouldFlush(1, 1)).isTrue(); + } + + @Test + public void testDefaultBatchResourceAdd() { + BatchResource resource = + DefaultBatchResource.builder().setElementCount(1).setByteCount(1).build(); + + BatchResource newResource = + resource.add(DefaultBatchResource.builder().setElementCount(1).setByteCount(1).build()); + + // Make sure add doesn't modify the old object + assertThat(resource.getElementCount()).isEqualTo(1); + assertThat(resource.getByteCount()).isEqualTo(1); + assertThat(newResource.getElementCount()).isEqualTo(2); + assertThat(newResource.getByteCount()).isEqualTo(2); + } + private void testElementTriggers(BatchingSettings settings) throws Exception { underTest = createDefaultBatcherImpl(settings, null); Future result = underTest.add(4); assertThat(result.isDone()).isFalse(); - // After this element is added, the batch triggers sendOutstanding(). Future anotherResult = underTest.add(5); + // After this element is added, the batch triggers sendOutstanding(). + underTest.add(6); // Both the elements should be resolved now. assertThat(result.isDone()).isTrue(); assertThat(result.get()).isEqualTo(16);