From c075d2011c8b55585404b52c1d358e0cbb39c355 Mon Sep 17 00:00:00 2001 From: apolcyn Date: Fri, 24 Jun 2022 11:34:16 -0700 Subject: [PATCH] interop client: provide new flag, --soak_min_time_ms_between_rpcs (#5421) --- interop/client/client.go | 5 +++-- interop/test_utils.go | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/interop/client/client.go b/interop/client/client.go index 3cfedfcb6542..1e3a46a75745 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -68,6 +68,7 @@ var ( soakMaxFailures = flag.Int("soak_max_failures", 0, "The number of iterations in soak tests that are allowed to fail (either due to non-OK status code or exceeding the per-iteration max acceptable latency).") soakPerIterationMaxAcceptableLatencyMs = flag.Int("soak_per_iteration_max_acceptable_latency_ms", 1000, "The number of milliseconds a single iteration in the two soak tests (rpc_soak and channel_soak) should take.") soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.") + soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS") tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.") testCase = flag.String("test_case", "large_unary", `Configure different test cases. Valid options are: @@ -301,10 +302,10 @@ func main() { interop.DoPickFirstUnary(tc) logger.Infoln("PickFirstUnary done") case "rpc_soak": - interop.DoSoakTest(tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) + interop.DoSoakTest(tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) logger.Infoln("RpcSoak done") case "channel_soak": - interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) + interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) logger.Infoln("ChannelSoak done") default: logger.Fatal("Unsupported test case: ", *testCase) diff --git a/interop/test_utils.go b/interop/test_utils.go index 975a290bc6e6..dc841a110604 100644 --- a/interop/test_utils.go +++ b/interop/test_utils.go @@ -716,7 +716,7 @@ func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, rese // DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds. // If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new // stub that is created with the provided server address and dial options. -func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, perIterationMaxAcceptableLatency time.Duration, overallDeadline time.Time) { +func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration, overallDeadline time.Time) { start := time.Now() ctx, cancel := context.WithDeadline(context.Background(), overallDeadline) defer cancel() @@ -733,6 +733,7 @@ func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.D if time.Now().After(overallDeadline) { break } + earliestNextStart := time.After(minTimeBetweenRPCs) iterationsDone++ var p peer.Peer latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, dopts, []grpc.CallOption{grpc.Peer(&p)}) @@ -749,6 +750,7 @@ func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.D continue } fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s succeeded\n", i, latencyMs, p.Addr.String()) + <-earliestNextStart } var b bytes.Buffer h.Print(&b)