Skip to content

Commit

Permalink
test: add a test for fix #1477 and few other tests to increase covera…
Browse files Browse the repository at this point in the history
…ge (#1499)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
mutianf authored Nov 7, 2022
1 parent 1f8cfd7 commit 6d446fc
Showing 1 changed file with 106 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@

import com.google.api.client.util.Lists;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.tracing.SpanName;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.ResponseParams;
Expand All @@ -40,6 +45,7 @@
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
Expand Down Expand Up @@ -106,6 +112,8 @@ public class BuiltinMetricsTracerTest {
@Captor private ArgumentCaptor<String> zone;
@Captor private ArgumentCaptor<String> cluster;

private int batchElementCount = 2;

@Before
public void setUp() throws Exception {
// Add an interceptor to add server-timing in headers
Expand Down Expand Up @@ -150,6 +158,22 @@ public void sendHeaders(Metadata headers) {
.mutateRowSettings()
.retrySettings()
.setInitialRetryDelay(Duration.ofMillis(200));

stubSettingsBuilder
.bulkMutateRowsSettings()
.setBatchingSettings(
// Each batch has 2 mutations, batch has 1 in-flight request, disable auto flush by
// setting the delay to 1 hour.
BatchingSettings.newBuilder()
.setElementCountThreshold((long) batchElementCount)
.setRequestByteThreshold(1000L)
.setDelayThreshold(Duration.ofHours(1))
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount((long) batchElementCount)
.setMaxOutstandingRequestBytes(1000L)
.build())
.build());
stubSettingsBuilder.setTracerFactory(mockFactory);

EnhancedBigtableStubSettings stubSettings = stubSettingsBuilder.build();
Expand All @@ -163,7 +187,7 @@ public void tearDown() {
}

@Test
public void testOperationLatencies() {
public void testReadRowsOperationLatencies() {
when(mockFactory.newTracer(any(), any(), any()))
.thenAnswer(
(Answer<BuiltinMetricsTracer>)
Expand All @@ -179,8 +203,15 @@ public void testOperationLatencies() {
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);

verify(statsRecorderWrapper).putOperationLatencies(operationLatency.capture());
// verify record operation is only called once
verify(statsRecorderWrapper)
.recordOperation(status.capture(), tableId.capture(), zone.capture(), cluster.capture());

assertThat(operationLatency.getValue()).isIn(Range.closed(SERVER_LATENCY, elapsed));
assertThat(status.getAllValues()).containsExactly("OK");
assertThat(tableId.getAllValues()).containsExactly(TABLE_ID);
assertThat(zone.getAllValues()).containsExactly(ZONE);
assertThat(cluster.getAllValues()).containsExactly(CLUSTER);
}

@Test
Expand All @@ -198,6 +229,10 @@ public void testGfeMetrics() {

Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));

// Verify record attempt are called multiple times
verify(statsRecorderWrapper, times(fakeService.getAttemptCounter().get()))
.recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture());

// The request was retried and gfe latency is only recorded in the retry attempt
verify(statsRecorderWrapper).putGfeLatencies(gfeLatency.capture());
assertThat(gfeLatency.getValue()).isEqualTo(FAKE_SERVER_TIMING);
Expand All @@ -206,6 +241,11 @@ public void testGfeMetrics() {
verify(statsRecorderWrapper, times(fakeService.getAttemptCounter().get()))
.putGfeMissingHeaders(gfeMissingHeaders.capture());
assertThat(gfeMissingHeaders.getValue()).isEqualTo(1);

assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "OK");
assertThat(tableId.getAllValues()).containsExactly(TABLE_ID, TABLE_ID);
assertThat(zone.getAllValues()).containsExactly("global", ZONE);
assertThat(cluster.getAllValues()).containsExactly("unspecified", CLUSTER);
}

@Test
Expand Down Expand Up @@ -255,6 +295,8 @@ public void onComplete() {

verify(statsRecorderWrapper).putApplicationLatencies(applicationLatency.capture());
verify(statsRecorderWrapper).putOperationLatencies(operationLatency.capture());
verify(statsRecorderWrapper)
.recordOperation(status.capture(), tableId.capture(), zone.capture(), cluster.capture());

assertThat(counter.get()).isEqualTo(fakeService.getResponseCounter().get());
assertThat(applicationLatency.getValue()).isAtLeast(APPLICATION_LATENCY * counter.get());
Expand Down Expand Up @@ -287,6 +329,8 @@ public void testReadRowsApplicationLatencyWithManualFlowControl() throws Excepti

verify(statsRecorderWrapper).putApplicationLatencies(applicationLatency.capture());
verify(statsRecorderWrapper).putOperationLatencies(operationLatency.capture());
verify(statsRecorderWrapper)
.recordOperation(status.capture(), tableId.capture(), zone.capture(), cluster.capture());

// For manual flow control, the last application latency shouldn't count, because at that point
// the server already sent back all the responses.
Expand Down Expand Up @@ -324,7 +368,7 @@ public void testRetryCount() {
}

@Test
public void testMutateRowAttempts() {
public void testMutateRowAttemptsTagValues() {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
Expand All @@ -343,6 +387,55 @@ public void testMutateRowAttempts() {
assertThat(zone.getAllValues()).containsExactly("global", "global", ZONE);
assertThat(cluster.getAllValues()).containsExactly("unspecified", "unspecified", CLUSTER);
assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "UNAVAILABLE", "OK");
assertThat(tableId.getAllValues()).containsExactly(TABLE_ID, TABLE_ID, TABLE_ID);
}

@Test
public void testReadRowsAttemptsTagValues() {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
OperationType.ServerStreaming,
SpanName.of("Bigtable", "ReadRows"),
statsRecorderWrapper));

Lists.newArrayList(stub.readRowsCallable().call(Query.create("fake-table")).iterator());

// Set a timeout to reduce flakiness of this test. BasicRetryingFuture will set
// attempt succeeded and set the response which will call complete() in AbstractFuture which
// calls releaseWaiters(). onOperationComplete() is called in TracerFinisher which will be
// called after the mutateRow call is returned. So there's a race between when the call returns
// and when the record() is called in onOperationCompletion().
verify(statsRecorderWrapper, timeout(50).times(fakeService.getAttemptCounter().get()))
.recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture());
assertThat(zone.getAllValues()).containsExactly("global", ZONE);
assertThat(cluster.getAllValues()).containsExactly("unspecified", CLUSTER);
assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "OK");
}

@Test
public void testClientBlockingLatencies() throws InterruptedException {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
OperationType.Unary, SpanName.of("Bigtable", "MutateRows"), statsRecorderWrapper));
try (Batcher<RowMutationEntry, Void> batcher = stub.newMutateRowsBatcher(TABLE_ID, null)) {
for (int i = 0; i < 6; i++) {
batcher.add(RowMutationEntry.create("key").setCell("f", "q", "v"));
}

int expectedNumRequests = 6 / batchElementCount;
ArgumentCaptor<Long> throttledTime = ArgumentCaptor.forClass(Long.class);
verify(statsRecorderWrapper, times(expectedNumRequests))
.putBatchRequestThrottled(throttledTime.capture());

// Adding the first 2 elements should not get throttled since the batch is empty
assertThat(throttledTime.getAllValues().get(0)).isEqualTo(0);
// After the first request is sent, batcher will block on add because of the server latency.
// Blocking latency should be around server latency.
assertThat(throttledTime.getAllValues().get(1)).isAtLeast(SERVER_LATENCY - 10);
assertThat(throttledTime.getAllValues().get(2)).isAtLeast(SERVER_LATENCY - 10);
}
}

private static class FakeService extends BigtableGrpc.BigtableImplBase {
Expand Down Expand Up @@ -413,6 +506,17 @@ public void mutateRow(
responseObserver.onCompleted();
}

@Override
public void mutateRows(
MutateRowsRequest request, StreamObserver<MutateRowsResponse> responseObserver) {
try {
Thread.sleep(SERVER_LATENCY);
} catch (InterruptedException e) {
}
responseObserver.onNext(MutateRowsResponse.getDefaultInstance());
responseObserver.onCompleted();
}

public AtomicInteger getAttemptCounter() {
return attemptCounter;
}
Expand Down

0 comments on commit 6d446fc

Please sign in to comment.