diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 50df71725e8f..3b3c247dd3ee 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -82,51 +82,53 @@ func TestLinearizability(t *testing.T) { } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - failpoint := FailpointConfig{ + ctx := context.Background() + clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&tc.config)) + if err != nil { + t.Fatal(err) + } + defer clus.Close() + operations, events := testLinearizability(ctx, t, clus, FailpointConfig{ failpoint: tc.failpoint, count: 1, retries: 3, waitBetweenTriggers: waitBetweenFailpointTriggers, - } - traffic := trafficConfig{ + }, trafficConfig{ minimalQPS: minimalQPS, maximalQPS: maximalQPS, clientCount: 8, traffic: DefaultTraffic, - } - testLinearizability(context.Background(), t, tc.config, failpoint, traffic) + }) + validateEventsMatch(t, events) + checkOperationsAndPersistResults(t, operations, clus) }) } } -func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig) { - clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config)) - if err != nil { - t.Fatal(err) - } - defer clus.Close() +func testLinearizability(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, failpoint FailpointConfig, traffic trafficConfig) (operations []porcupine.Operation, events [][]watchEvent) { + // Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal. + wg := sync.WaitGroup{} + wg.Add(3) trafficCtx, trafficCancel := context.WithCancel(ctx) go func() { - defer trafficCancel() + defer wg.Done() triggerFailpoints(ctx, t, clus, failpoint) - // Wait second to collect traffic after triggering last failpoint. time.Sleep(time.Second) + trafficCancel() }() watchCtx, watchCancel := context.WithCancel(ctx) - var operations []porcupine.Operation go func() { - defer watchCancel() + defer wg.Done() operations = simulateTraffic(trafficCtx, t, clus, traffic) - // Wait second to collect watch events after all traffic was sent. time.Sleep(time.Second) + watchCancel() }() - events := watchClusterChanges(watchCtx, t, clus) - err = clus.Stop() - if err != nil { - t.Error(err) - } - validateEventsMatch(t, events) - checkOperationsAndPersistResults(t, operations, clus) + go func() { + defer wg.Done() + events = watchClusterChanges(watchCtx, t, clus) + }() + wg.Wait() + return operations, events } func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig) {