From 5a7a290cddf6bf90ac1b7b841f78fbf33ae017ec Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 20 Jun 2023 18:06:27 -0400 Subject: [PATCH 1/3] fix: abstract the resource a batch is using --- gax-java/gax/clirr-ignored-differences.xml | 6 ++ .../api/gax/batching/BatchResource.java | 53 ++++++++++++++ .../google/api/gax/batching/BatcherImpl.java | 64 +++++++++-------- .../api/gax/batching/BatchingDescriptor.java | 20 ++++++ .../gax/batching/DefaultBatchResource.java | 71 +++++++++++++++++++ .../api/gax/batching/BatcherImplTest.java | 8 ++- 6 files changed, 191 insertions(+), 31 deletions(-) create mode 100644 gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java create mode 100644 gax-java/gax/src/main/java/com/google/api/gax/batching/DefaultBatchResource.java diff --git a/gax-java/gax/clirr-ignored-differences.xml b/gax-java/gax/clirr-ignored-differences.xml index a0516a55d4..b4674bf8c0 100644 --- a/gax-java/gax/clirr-ignored-differences.xml +++ b/gax-java/gax/clirr-ignored-differences.xml @@ -13,4 +13,10 @@ *setWaitTimeout* com.google.api.gax.rpc.ServerStreamingCallSettings$Builder + + + 7012 + com/google/api/gax/batching/BatchingDescriptor + * + diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java new file mode 100644 index 0000000000..97919b8131 --- /dev/null +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import com.google.api.core.InternalApi; + +/** + * Represent the resource used by a batch including element and byte. It can also be extended to + * other things to determine if adding a new element needs to be flow controlled or if the current + * batch needs to be flushed. + */ +@InternalApi("For google-cloud-java client use only.") +public interface BatchResource { + + /** Adds the additional resource. */ + BatchResource add(BatchResource resource); + + /** Returns the element count of this resource. */ + long getElementCount(); + + /** Returns the byte count of this resource. */ + long getByteCount(); + + /** Returns true if the resource is empty. */ + boolean isEmpty(); +} diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index e46f61babd..e2d3fd9cad 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -95,6 +95,10 @@ public class BatcherImpl private final FlowController flowController; private final ApiCallContext callContext; + private final long elementThreshold; + + private final long bytesThreshold; + /** * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements * into wrappers request and response @@ -192,7 +196,7 @@ public BatcherImpl( + "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold"); } this.flowController = flowController; - currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats); if (batchingSettings.getDelayThreshold() != null) { long delay = batchingSettings.getDelayThreshold().toMillis(); PushCurrentBatchRunnable runnable = @@ -204,6 +208,11 @@ public BatcherImpl( } currentBatcherReference = new BatcherReference(this); this.callContext = callContext; + + Long elementCountThreshold = batchingSettings.getElementCountThreshold(); + this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold; + Long requestByteThreshold = batchingSettings.getRequestByteThreshold(); + this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold; } /** {@inheritDoc} */ @@ -213,7 +222,7 @@ public ApiFuture add(ElementT element) { // will only be done from a single calling thread. Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher"); - long bytesSize = batchingDescriptor.countBytes(element); + BatchResource newResource = batchingDescriptor.createResource(element); // This is not the optimal way of throttling. It does not send out partial batches, which // means that the Batcher might not use up all the resources allowed by FlowController. @@ -232,7 +241,7 @@ public ApiFuture add(ElementT element) { // defer it till we decide on if refactoring FlowController is necessary. Stopwatch stopwatch = Stopwatch.createStarted(); try { - flowController.reserve(1, bytesSize); + flowController.reserve(newResource.getElementCount(), newResource.getByteCount()); } catch (FlowControlException e) { // This exception will only be thrown if the FlowController is set to ThrowException behavior throw FlowControlRuntimeException.fromFlowControlException(e); @@ -241,12 +250,15 @@ public ApiFuture add(ElementT element) { SettableApiFuture result = SettableApiFuture.create(); synchronized (elementLock) { - currentOpenBatch.add(element, result, throttledTimeMs); - } + if (!currentOpenBatch.isEmpty() + && batchingDescriptor.shouldFlush( + currentOpenBatch.resource.add(newResource), elementThreshold, bytesThreshold)) { + sendOutstanding(); + } - if (currentOpenBatch.hasAnyThresholdReached()) { - sendOutstanding(); + currentOpenBatch.add(element, newResource, result, throttledTimeMs); } + return result; } @@ -267,7 +279,7 @@ public void sendOutstanding() { return; } accumulatedBatch = currentOpenBatch; - currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats); } // This check is for old clients that instantiated the batcher without ApiCallContext @@ -291,7 +303,9 @@ public void sendOutstanding() { @Override public void onSuccess(ResponseT response) { try { - flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter); + flowController.release( + accumulatedBatch.resource.getElementCount(), + accumulatedBatch.resource.getByteCount()); accumulatedBatch.onBatchSuccess(response); } finally { onBatchCompletion(); @@ -301,7 +315,9 @@ public void onSuccess(ResponseT response) { @Override public void onFailure(Throwable throwable) { try { - flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter); + flowController.release( + accumulatedBatch.resource.getElementCount(), + accumulatedBatch.resource.getByteCount()); accumulatedBatch.onBatchFailure(throwable); } finally { onBatchCompletion(); @@ -412,35 +428,31 @@ private static class Batch { private final BatchingRequestBuilder builder; private final List> entries; private final BatchingDescriptor descriptor; - private final BatcherStats batcherStats; - private final long elementThreshold; - private final long bytesThreshold; - private long elementCounter = 0; - private long byteCounter = 0; + private final BatcherStats batcherStats; private long totalThrottledTimeMs = 0; + private BatchResource resource; private Batch( RequestT prototype, BatchingDescriptor descriptor, - BatchingSettings batchingSettings, BatcherStats batcherStats) { this.descriptor = descriptor; this.builder = descriptor.newRequestBuilder(prototype); this.entries = new ArrayList<>(); - Long elementCountThreshold = batchingSettings.getElementCountThreshold(); - this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold; - Long requestByteThreshold = batchingSettings.getRequestByteThreshold(); - this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold; this.batcherStats = batcherStats; + this.resource = descriptor.createEmptyResource(); } - void add(ElementT element, SettableApiFuture result, long throttledTimeMs) { + void add( + ElementT element, + BatchResource newResource, + SettableApiFuture result, + long throttledTimeMs) { builder.add(element); entries.add(BatchEntry.create(element, result)); - elementCounter++; - byteCounter += descriptor.countBytes(element); totalThrottledTimeMs += throttledTimeMs; + resource = resource.add(newResource); } void onBatchSuccess(ResponseT response) { @@ -464,11 +476,7 @@ void onBatchFailure(Throwable throwable) { } boolean isEmpty() { - return elementCounter == 0; - } - - boolean hasAnyThresholdReached() { - return elementCounter >= elementThreshold || byteCounter >= bytesThreshold; + return resource.isEmpty(); } } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java index d81068a98f..de48ce1da6 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java @@ -96,4 +96,24 @@ public interface BatchingDescriptor maxElementThreshold + || resource.getByteCount() > maxBytesThreshold; + } } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/DefaultBatchResource.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/DefaultBatchResource.java new file mode 100644 index 0000000000..75fc6180a2 --- /dev/null +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/DefaultBatchResource.java @@ -0,0 +1,71 @@ +/* + * Copyright 2023 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import com.google.common.base.Preconditions; + +/** + * The default implementation of {@link BatchResource} which tracks the elementCount and byteCount. + */ +final class DefaultBatchResource implements BatchResource { + + private long elementCount; + private long byteCount; + + DefaultBatchResource(long elementCount, long byteCount) { + this.elementCount = elementCount; + this.byteCount = byteCount; + } + + @Override + public BatchResource add(BatchResource resource) { + Preconditions.checkArgument( + resource instanceof DefaultBatchResource, + "BatchResource needs to be an instance of DefaultBatchResource"); + this.elementCount += ((DefaultBatchResource) resource).elementCount; + this.byteCount += ((DefaultBatchResource) resource).byteCount; + return this; + } + + @Override + public long getElementCount() { + return elementCount; + } + + @Override + public long getByteCount() { + return byteCount; + } + + @Override + public boolean isEmpty() { + return elementCount == 0; + } +} diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 1f009c2276..26fba42f4d 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -93,7 +93,7 @@ public class BatcherImplTest { BatchingSettings.newBuilder() .setElementCountThreshold(1000L) .setRequestByteThreshold(1000L) - .setDelayThreshold(Duration.ofSeconds(1)) + .setDelayThreshold(Duration.ofSeconds(1000)) .build(); @After @@ -376,6 +376,7 @@ public void testWhenThresholdIsDisabled() throws Exception { .build(); underTest = createDefaultBatcherImpl(settings, null); Future result = underTest.add(2); + underTest.add(3); assertThat(result.isDone()).isTrue(); assertThat(result.get()).isEqualTo(4); } @@ -895,7 +896,7 @@ public void run() { // Mockito recommends using verify() as the ONLY way to interact with Argument // captors - otherwise it may incur in unexpected behaviour - Mockito.verify(callContext).withOption(key.capture(), value.capture()); + Mockito.verify(callContext, Mockito.timeout(100)).withOption(key.capture(), value.capture()); // Verify that throttled time is recorded in ApiCallContext assertThat(key.getValue()).isSameInstanceAs(Batcher.THROTTLED_TIME_KEY); @@ -1012,8 +1013,9 @@ private void testElementTriggers(BatchingSettings settings) throws Exception { underTest = createDefaultBatcherImpl(settings, null); Future result = underTest.add(4); assertThat(result.isDone()).isFalse(); - // After this element is added, the batch triggers sendOutstanding(). Future anotherResult = underTest.add(5); + // After this element is added, the batch triggers sendOutstanding(). + underTest.add(6); // Both the elements should be resolved now. assertThat(result.isDone()).isTrue(); assertThat(result.get()).isEqualTo(16); From 3b85e00357fa24bc0f8c23103cccafdcf9d71b34 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Thu, 22 Jun 2023 13:37:21 -0400 Subject: [PATCH 2/3] address comments --- .../api/gax/batching/BatchResource.java | 3 -- .../google/api/gax/batching/BatcherImpl.java | 10 +++--- .../api/gax/batching/BatchingDescriptor.java | 4 +-- .../gax/batching/DefaultBatchResource.java | 34 +++++++------------ 4 files changed, 19 insertions(+), 32 deletions(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java index 97919b8131..7c693268a8 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java @@ -47,7 +47,4 @@ public interface BatchResource { /** Returns the byte count of this resource. */ long getByteCount(); - - /** Returns true if the resource is empty. */ - boolean isEmpty(); } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index e2d3fd9cad..3673f690c5 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -95,8 +95,9 @@ public class BatcherImpl private final FlowController flowController; private final ApiCallContext callContext; + // If element threshold or bytes threshold is 0, it means that it'll always flush every element + // without batching private final long elementThreshold; - private final long bytesThreshold; /** @@ -250,9 +251,8 @@ public ApiFuture add(ElementT element) { SettableApiFuture result = SettableApiFuture.create(); synchronized (elementLock) { - if (!currentOpenBatch.isEmpty() - && batchingDescriptor.shouldFlush( - currentOpenBatch.resource.add(newResource), elementThreshold, bytesThreshold)) { + if (batchingDescriptor.shouldFlush( + currentOpenBatch.resource.add(newResource), elementThreshold, bytesThreshold)) { sendOutstanding(); } @@ -476,7 +476,7 @@ void onBatchFailure(Throwable throwable) { } boolean isEmpty() { - return resource.isEmpty(); + return resource.getElementCount() == 0; } } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java index de48ce1da6..3dc8c92bb4 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java @@ -99,12 +99,12 @@ public interface BatchingDescriptor Date: Fri, 23 Jun 2023 10:23:32 -0400 Subject: [PATCH 3/3] address comments --- gax-java/gax/clirr-ignored-differences.xml | 6 ----- gax-java/gax/pom.xml | 1 + .../api/gax/batching/BatchResource.java | 6 +++++ .../google/api/gax/batching/BatcherImpl.java | 8 +++--- .../api/gax/batching/BatchingDescriptor.java | 17 ++++--------- .../gax/batching/DefaultBatchResource.java | 25 +++++++++++++++---- .../api/gax/batching/BatcherImplTest.java | 24 ++++++++++++++++++ 7 files changed, 61 insertions(+), 26 deletions(-) diff --git a/gax-java/gax/clirr-ignored-differences.xml b/gax-java/gax/clirr-ignored-differences.xml index b4674bf8c0..a0516a55d4 100644 --- a/gax-java/gax/clirr-ignored-differences.xml +++ b/gax-java/gax/clirr-ignored-differences.xml @@ -13,10 +13,4 @@ *setWaitTimeout* com.google.api.gax.rpc.ServerStreamingCallSettings$Builder - - - 7012 - com/google/api/gax/batching/BatchingDescriptor - * - diff --git a/gax-java/gax/pom.xml b/gax-java/gax/pom.xml index 72726c9653..7aab5640ba 100644 --- a/gax-java/gax/pom.xml +++ b/gax-java/gax/pom.xml @@ -88,6 +88,7 @@ com/google/api/gax/rpc/RequestUrlParamsEncoder + com/google/api/gax/batching/BatchingDescriptor diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java index 7c693268a8..91cd70c0d4 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchResource.java @@ -47,4 +47,10 @@ public interface BatchResource { /** Returns the byte count of this resource. */ long getByteCount(); + + /** + * Checks if the current {@link BatchResource} should be flushed based on the maxElementThreshold + * and maxBytesThreshold. + */ + boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold); } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 3673f690c5..415e43610a 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -251,8 +251,10 @@ public ApiFuture add(ElementT element) { SettableApiFuture result = SettableApiFuture.create(); synchronized (elementLock) { - if (batchingDescriptor.shouldFlush( - currentOpenBatch.resource.add(newResource), elementThreshold, bytesThreshold)) { + if (currentOpenBatch + .resource + .add(newResource) + .shouldFlush(elementThreshold, bytesThreshold)) { sendOutstanding(); } @@ -451,8 +453,8 @@ void add( long throttledTimeMs) { builder.add(element); entries.add(BatchEntry.create(element, result)); - totalThrottledTimeMs += throttledTimeMs; resource = resource.add(newResource); + totalThrottledTimeMs += throttledTimeMs; } void onBatchSuccess(ResponseT response) { diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java index 3dc8c92bb4..41861c7903 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java @@ -99,21 +99,14 @@ public interface BatchingDescriptor maxElementThreshold - || resource.getByteCount() > maxBytesThreshold; + return DefaultBatchResource.builder().setElementCount(0).setByteCount(0).build(); } } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/DefaultBatchResource.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/DefaultBatchResource.java index 394054d3d1..e5ce99960a 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/DefaultBatchResource.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/DefaultBatchResource.java @@ -38,8 +38,8 @@ @AutoValue abstract class DefaultBatchResource implements BatchResource { - static DefaultBatchResource create(long elementCount, long byteCount) { - return new AutoValue_DefaultBatchResource(elementCount, byteCount); + static DefaultBatchResource.Builder builder() { + return new AutoValue_DefaultBatchResource.Builder(); } @Override @@ -48,9 +48,10 @@ public BatchResource add(BatchResource resource) { resource instanceof DefaultBatchResource, "Expect an instance of DefaultBatchResource, got " + resource.getClass()); DefaultBatchResource defaultResource = (DefaultBatchResource) resource; - return new AutoValue_DefaultBatchResource( - getElementCount() + defaultResource.getElementCount(), - getByteCount() + defaultResource.getByteCount()); + return new AutoValue_DefaultBatchResource.Builder() + .setElementCount(getElementCount() + defaultResource.getElementCount()) + .setByteCount(getByteCount() + defaultResource.getByteCount()) + .build(); } @Override @@ -58,4 +59,18 @@ public BatchResource add(BatchResource resource) { @Override public abstract long getByteCount(); + + @Override + public boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold) { + return getElementCount() > maxElementThreshold || getByteCount() > maxBytesThreshold; + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setElementCount(long elementCount); + + abstract Builder setByteCount(long byteCount); + + abstract DefaultBatchResource build(); + } } diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 26fba42f4d..47454563a1 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -1009,6 +1009,30 @@ public ApiFuture futureCall(Object o, ApiCallContext apiCallContext) { Assert.assertThrows(RuntimeException.class, batcher::close); } + @Test + public void testDefaultShouldFlush() { + BatchResource resource = + DefaultBatchResource.builder().setElementCount(2).setByteCount(2).build(); + + assertThat(resource.shouldFlush(2, 2)).isFalse(); + assertThat(resource.shouldFlush(1, 1)).isTrue(); + } + + @Test + public void testDefaultBatchResourceAdd() { + BatchResource resource = + DefaultBatchResource.builder().setElementCount(1).setByteCount(1).build(); + + BatchResource newResource = + resource.add(DefaultBatchResource.builder().setElementCount(1).setByteCount(1).build()); + + // Make sure add doesn't modify the old object + assertThat(resource.getElementCount()).isEqualTo(1); + assertThat(resource.getByteCount()).isEqualTo(1); + assertThat(newResource.getElementCount()).isEqualTo(2); + assertThat(newResource.getByteCount()).isEqualTo(2); + } + private void testElementTriggers(BatchingSettings settings) throws Exception { underTest = createDefaultBatcherImpl(settings, null); Future result = underTest.add(4);