From 6d446fc002184f197076acab7aaf34e5c519320d Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 7 Nov 2022 13:16:15 -0500 Subject: [PATCH] test: add a test for fix #1477 and few other tests to increase coverage (#1499) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md). --- .../metrics/BuiltinMetricsTracerTest.java | 108 +++++++++++++++++- 1 file changed, 106 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index 8828724f24..8e2bc1564a 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -25,6 +25,9 @@ import com.google.api.client.util.Lists; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.Batcher; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StreamController; @@ -32,6 +35,8 @@ import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.ResponseParams; @@ -40,6 +45,7 @@ import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; import com.google.cloud.bigtable.stats.StatsRecorderWrapper; @@ -106,6 +112,8 @@ public class BuiltinMetricsTracerTest { @Captor private ArgumentCaptor zone; @Captor private ArgumentCaptor cluster; + private int batchElementCount = 2; + @Before public void setUp() throws Exception { // Add an interceptor to add server-timing in headers @@ -150,6 +158,22 @@ public void sendHeaders(Metadata headers) { .mutateRowSettings() .retrySettings() .setInitialRetryDelay(Duration.ofMillis(200)); + + stubSettingsBuilder + .bulkMutateRowsSettings() + .setBatchingSettings( + // Each batch has 2 mutations, batch has 1 in-flight request, disable auto flush by + // setting the delay to 1 hour. + BatchingSettings.newBuilder() + .setElementCountThreshold((long) batchElementCount) + .setRequestByteThreshold(1000L) + .setDelayThreshold(Duration.ofHours(1)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount((long) batchElementCount) + .setMaxOutstandingRequestBytes(1000L) + .build()) + .build()); stubSettingsBuilder.setTracerFactory(mockFactory); EnhancedBigtableStubSettings stubSettings = stubSettingsBuilder.build(); @@ -163,7 +187,7 @@ public void tearDown() { } @Test - public void testOperationLatencies() { + public void testReadRowsOperationLatencies() { when(mockFactory.newTracer(any(), any(), any())) .thenAnswer( (Answer) @@ -179,8 +203,15 @@ public void testOperationLatencies() { long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); verify(statsRecorderWrapper).putOperationLatencies(operationLatency.capture()); + // verify record operation is only called once + verify(statsRecorderWrapper) + .recordOperation(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); assertThat(operationLatency.getValue()).isIn(Range.closed(SERVER_LATENCY, elapsed)); + assertThat(status.getAllValues()).containsExactly("OK"); + assertThat(tableId.getAllValues()).containsExactly(TABLE_ID); + assertThat(zone.getAllValues()).containsExactly(ZONE); + assertThat(cluster.getAllValues()).containsExactly(CLUSTER); } @Test @@ -198,6 +229,10 @@ public void testGfeMetrics() { Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + // Verify record attempt are called multiple times + verify(statsRecorderWrapper, times(fakeService.getAttemptCounter().get())) + .recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); + // The request was retried and gfe latency is only recorded in the retry attempt verify(statsRecorderWrapper).putGfeLatencies(gfeLatency.capture()); assertThat(gfeLatency.getValue()).isEqualTo(FAKE_SERVER_TIMING); @@ -206,6 +241,11 @@ public void testGfeMetrics() { verify(statsRecorderWrapper, times(fakeService.getAttemptCounter().get())) .putGfeMissingHeaders(gfeMissingHeaders.capture()); assertThat(gfeMissingHeaders.getValue()).isEqualTo(1); + + assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "OK"); + assertThat(tableId.getAllValues()).containsExactly(TABLE_ID, TABLE_ID); + assertThat(zone.getAllValues()).containsExactly("global", ZONE); + assertThat(cluster.getAllValues()).containsExactly("unspecified", CLUSTER); } @Test @@ -255,6 +295,8 @@ public void onComplete() { verify(statsRecorderWrapper).putApplicationLatencies(applicationLatency.capture()); verify(statsRecorderWrapper).putOperationLatencies(operationLatency.capture()); + verify(statsRecorderWrapper) + .recordOperation(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); assertThat(counter.get()).isEqualTo(fakeService.getResponseCounter().get()); assertThat(applicationLatency.getValue()).isAtLeast(APPLICATION_LATENCY * counter.get()); @@ -287,6 +329,8 @@ public void testReadRowsApplicationLatencyWithManualFlowControl() throws Excepti verify(statsRecorderWrapper).putApplicationLatencies(applicationLatency.capture()); verify(statsRecorderWrapper).putOperationLatencies(operationLatency.capture()); + verify(statsRecorderWrapper) + .recordOperation(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); // For manual flow control, the last application latency shouldn't count, because at that point // the server already sent back all the responses. @@ -324,7 +368,7 @@ public void testRetryCount() { } @Test - public void testMutateRowAttempts() { + public void testMutateRowAttemptsTagValues() { when(mockFactory.newTracer(any(), any(), any())) .thenReturn( new BuiltinMetricsTracer( @@ -343,6 +387,55 @@ public void testMutateRowAttempts() { assertThat(zone.getAllValues()).containsExactly("global", "global", ZONE); assertThat(cluster.getAllValues()).containsExactly("unspecified", "unspecified", CLUSTER); assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "UNAVAILABLE", "OK"); + assertThat(tableId.getAllValues()).containsExactly(TABLE_ID, TABLE_ID, TABLE_ID); + } + + @Test + public void testReadRowsAttemptsTagValues() { + when(mockFactory.newTracer(any(), any(), any())) + .thenReturn( + new BuiltinMetricsTracer( + OperationType.ServerStreaming, + SpanName.of("Bigtable", "ReadRows"), + statsRecorderWrapper)); + + Lists.newArrayList(stub.readRowsCallable().call(Query.create("fake-table")).iterator()); + + // Set a timeout to reduce flakiness of this test. BasicRetryingFuture will set + // attempt succeeded and set the response which will call complete() in AbstractFuture which + // calls releaseWaiters(). onOperationComplete() is called in TracerFinisher which will be + // called after the mutateRow call is returned. So there's a race between when the call returns + // and when the record() is called in onOperationCompletion(). + verify(statsRecorderWrapper, timeout(50).times(fakeService.getAttemptCounter().get())) + .recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); + assertThat(zone.getAllValues()).containsExactly("global", ZONE); + assertThat(cluster.getAllValues()).containsExactly("unspecified", CLUSTER); + assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "OK"); + } + + @Test + public void testClientBlockingLatencies() throws InterruptedException { + when(mockFactory.newTracer(any(), any(), any())) + .thenReturn( + new BuiltinMetricsTracer( + OperationType.Unary, SpanName.of("Bigtable", "MutateRows"), statsRecorderWrapper)); + try (Batcher batcher = stub.newMutateRowsBatcher(TABLE_ID, null)) { + for (int i = 0; i < 6; i++) { + batcher.add(RowMutationEntry.create("key").setCell("f", "q", "v")); + } + + int expectedNumRequests = 6 / batchElementCount; + ArgumentCaptor throttledTime = ArgumentCaptor.forClass(Long.class); + verify(statsRecorderWrapper, times(expectedNumRequests)) + .putBatchRequestThrottled(throttledTime.capture()); + + // Adding the first 2 elements should not get throttled since the batch is empty + assertThat(throttledTime.getAllValues().get(0)).isEqualTo(0); + // After the first request is sent, batcher will block on add because of the server latency. + // Blocking latency should be around server latency. + assertThat(throttledTime.getAllValues().get(1)).isAtLeast(SERVER_LATENCY - 10); + assertThat(throttledTime.getAllValues().get(2)).isAtLeast(SERVER_LATENCY - 10); + } } private static class FakeService extends BigtableGrpc.BigtableImplBase { @@ -413,6 +506,17 @@ public void mutateRow( responseObserver.onCompleted(); } + @Override + public void mutateRows( + MutateRowsRequest request, StreamObserver responseObserver) { + try { + Thread.sleep(SERVER_LATENCY); + } catch (InterruptedException e) { + } + responseObserver.onNext(MutateRowsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + public AtomicInteger getAttemptCounter() { return attemptCounter; }