Skip to content

Commit

Permalink
another version
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jun 21, 2023
1 parent 1f3c451 commit fa2beab
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,23 @@

import com.google.api.core.InternalApi;

/** A class that determines if the current batch should be flushed. */
/**
* Represent the resource of a batch. Needs to have at least elementCount and byteCount. It's used
* to determine if adding a new element needs to be flow controlled, or if the current batch needs
* to be flushed.
*/
@InternalApi("For google-cloud-java client use only.")
public interface BatchFlusher<ElementT> {

/** Returns true if the batch will be full after adding the element. */
boolean willBeFull(ElementT element);
public interface BatchResource {

/** Increment element counter and byte counter. */
void add(ElementT element);
/** Adds the additional resource. */
BatchResource add(BatchResource resource);

/** Returns true if the batch is empty. */
boolean isEmpty();
/** Returns the element count of this resource. */
long getElementCount();

/** Get the element counter of the current batch. */
long getElementCounter();
/** Returns the byte count of this resource. */
long getByteCount();

/** Get the byte counter of the current batch. */
long getByteCounter();
/** Returns true if the resource is empty. */
boolean isEmpty();
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final FlowController flowController;
private final ApiCallContext callContext;

private final long elementThreshold;

private final long bytesThreshold;

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response
Expand Down Expand Up @@ -192,7 +196,7 @@ public BatcherImpl(
+ "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold");
}
this.flowController = flowController;
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats);
if (batchingSettings.getDelayThreshold() != null) {
long delay = batchingSettings.getDelayThreshold().toMillis();
PushCurrentBatchRunnable<ElementT, ElementResultT, RequestT, ResponseT> runnable =
Expand All @@ -204,6 +208,11 @@ public BatcherImpl(
}
currentBatcherReference = new BatcherReference(this);
this.callContext = callContext;

Long elementCountThreshold = batchingSettings.getElementCountThreshold();
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
}

/** {@inheritDoc} */
Expand All @@ -213,7 +222,7 @@ public ApiFuture<ElementResultT> add(ElementT element) {
// will only be done from a single calling thread.
Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher");

long bytesSize = batchingDescriptor.countBytes(element);
BatchResource newResource = batchingDescriptor.createResource(element);

// This is not the optimal way of throttling. It does not send out partial batches, which
// means that the Batcher might not use up all the resources allowed by FlowController.
Expand All @@ -232,20 +241,22 @@ public ApiFuture<ElementResultT> add(ElementT element) {
// defer it till we decide on if refactoring FlowController is necessary.
Stopwatch stopwatch = Stopwatch.createStarted();
try {
flowController.reserve(1, bytesSize);
flowController.reserve(newResource.getElementCount(), newResource.getByteCount());
} catch (FlowControlException e) {
// This exception will only be thrown if the FlowController is set to ThrowException behavior
throw FlowControlRuntimeException.fromFlowControlException(e);
}
long throttledTimeMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);

if (!currentOpenBatch.isEmpty() && currentOpenBatch.batchFlusher.willBeFull(element)) {
if (!currentOpenBatch.isEmpty()
&& batchingDescriptor.shouldFlush(
currentOpenBatch.resource.add(newResource), elementThreshold, bytesThreshold)) {
sendOutstanding();
}

SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
synchronized (elementLock) {
currentOpenBatch.add(element, result, throttledTimeMs);
currentOpenBatch.add(element, newResource, result, throttledTimeMs);
}

return result;
Expand All @@ -268,7 +279,7 @@ public void sendOutstanding() {
return;
}
accumulatedBatch = currentOpenBatch;
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats);
}

// This check is for old clients that instantiated the batcher without ApiCallContext
Expand All @@ -293,8 +304,8 @@ public void sendOutstanding() {
public void onSuccess(ResponseT response) {
try {
flowController.release(
accumulatedBatch.batchFlusher.getElementCounter(),
accumulatedBatch.batchFlusher.getByteCounter());
accumulatedBatch.resource.getElementCount(),
accumulatedBatch.resource.getElementCount());
accumulatedBatch.onBatchSuccess(response);
} finally {
onBatchCompletion();
Expand All @@ -305,8 +316,8 @@ public void onSuccess(ResponseT response) {
public void onFailure(Throwable throwable) {
try {
flowController.release(
accumulatedBatch.batchFlusher.getElementCounter(),
accumulatedBatch.batchFlusher.getByteCounter());
accumulatedBatch.resource.getElementCount(),
accumulatedBatch.resource.getByteCount());
accumulatedBatch.onBatchFailure(throwable);
} finally {
onBatchCompletion();
Expand Down Expand Up @@ -418,31 +429,30 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private final List<BatchEntry<ElementT, ElementResultT>> entries;
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;

private final BatchFlusher<ElementT> batchFlusher;
private final BatcherStats batcherStats;
private long totalThrottledTimeMs = 0;
private BatchResource resource;

private Batch(
RequestT prototype,
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
BatchingSettings batchingSettings,
BatcherStats batcherStats) {
this.descriptor = descriptor;
this.builder = descriptor.newRequestBuilder(prototype);
this.entries = new ArrayList<>();
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
long elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
long bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
this.batchFlusher = descriptor.newBatchFlusher(elementThreshold, bytesThreshold);
this.batcherStats = batcherStats;
this.resource = descriptor.createEmptyResource();
}

void add(ElementT element, SettableApiFuture<ElementResultT> result, long throttledTimeMs) {
void add(
ElementT element,
BatchResource newResource,
SettableApiFuture<ElementResultT> result,
long throttledTimeMs) {
builder.add(element);
entries.add(BatchEntry.create(element, result));
batchFlusher.add(element);
totalThrottledTimeMs += throttledTimeMs;
resource = resource.add(newResource);
}

void onBatchSuccess(ResponseT response) {
Expand All @@ -466,7 +476,7 @@ void onBatchFailure(Throwable throwable) {
}

boolean isEmpty() {
return batchFlusher.isEmpty();
return resource.isEmpty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,23 @@ public interface BatchingDescriptor<ElementT, ElementResultT, RequestT, Response
/** Returns the size of the passed element object in bytes. */
long countBytes(ElementT element);

/** Creates a new {@link BatchFlusher} to check if the current batch should be flushed. */
default BatchFlusher<ElementT> newBatchFlusher(long elementThreshold, long bytesThreshold) {
return new BatchFlusherImpl<>(this, elementThreshold, bytesThreshold);
/** Creates a new {@link BatchResource} with ElementT. */
default BatchResource createResource(ElementT element) {
return new DefaultBatchResource(1, countBytes(element));
}

/** Create an empty {@link BatchResource}. */
default BatchResource createEmptyResource() {
return new DefaultBatchResource(0, 0);
}

/**
* Checks if the current {@link BatchResource} should be flushed based on the maxElementThreshold
* and maxBytesThreshold.
*/
default boolean shouldFlush(
BatchResource resource, long maxElementThreshold, long maxBytesThreshold) {
return resource.getElementCount() > maxElementThreshold
|| resource.getByteCount() > maxBytesThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,53 +29,41 @@
*/
package com.google.api.gax.batching;

import com.google.api.core.InternalApi;
import com.google.common.base.Preconditions;

/** Default implementation of {@link BatchFlusher}. */
@InternalApi("For google-cloud-java client use only.")
public class BatchFlusherImpl<ElementT> implements BatchFlusher<ElementT> {

private final long elementThreshold;
private final long bytesThreshold;

private final BatchingDescriptor<ElementT, ?, ?, ?> batchingDescriptor;

private long elementCounter = 0;
private long byteCounter = 0;
/**
* The default implementation of {@link BatchResource} which tracks the elementCount and byteCount.
*/
final class DefaultBatchResource implements BatchResource {

BatchFlusherImpl(
BatchingDescriptor<ElementT, ?, ?, ?> batchingDescriptor,
long elementThreshold,
long bytesThreshold) {
this.batchingDescriptor = batchingDescriptor;
this.elementThreshold = elementThreshold;
this.bytesThreshold = bytesThreshold;
}
private long elementCount = 0;
private long byteCount = 0;

@Override
public boolean willBeFull(ElementT element) {
return (elementCounter + 1 > elementThreshold)
|| (byteCounter + batchingDescriptor.countBytes(element) > bytesThreshold);
DefaultBatchResource(long elementCount, long byteCount) {
this.elementCount = elementCount;
this.byteCount = byteCount;
}

@Override
public void add(ElementT element) {
elementCounter++;
byteCounter += batchingDescriptor.countBytes(element);
public BatchResource add(BatchResource resource) {
Preconditions.checkArgument(resource instanceof DefaultBatchResource);
this.elementCount += ((DefaultBatchResource) resource).elementCount;
this.byteCount += ((DefaultBatchResource) resource).byteCount;
return this;
}

@Override
public boolean isEmpty() {
return elementCounter == 0;
public long getElementCount() {
return elementCount;
}

@Override
public long getElementCounter() {
return elementCounter;
public long getByteCount() {
return byteCount;
}

@Override
public long getByteCounter() {
return byteCounter;
public boolean isEmpty() {
return elementCount == 0;
}
}

0 comments on commit fa2beab

Please sign in to comment.