Skip to content

Commit

Permalink
tests: Refactor how linearizability test components are run in parallel
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Jan 9, 2023
1 parent b2e1c91 commit 221d2bb
Showing 1 changed file with 25 additions and 23 deletions.
48 changes: 25 additions & 23 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,51 +82,53 @@ func TestLinearizability(t *testing.T) {
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
failpoint := FailpointConfig{
ctx := context.Background()
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&tc.config))
if err != nil {
t.Fatal(err)
}
defer clus.Close()
operations, events := testLinearizability(ctx, t, clus, FailpointConfig{
failpoint: tc.failpoint,
count: 1,
retries: 3,
waitBetweenTriggers: waitBetweenFailpointTriggers,
}
traffic := trafficConfig{
}, trafficConfig{
minimalQPS: minimalQPS,
maximalQPS: maximalQPS,
clientCount: 8,
traffic: DefaultTraffic,
}
testLinearizability(context.Background(), t, tc.config, failpoint, traffic)
})
validateEventsMatch(t, events)
checkOperationsAndPersistResults(t, operations, clus)
})
}
}

func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig) {
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config))
if err != nil {
t.Fatal(err)
}
defer clus.Close()
func testLinearizability(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, failpoint FailpointConfig, traffic trafficConfig) (operations []porcupine.Operation, events [][]watchEvent) {
// Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal.
wg := sync.WaitGroup{}
wg.Add(3)
trafficCtx, trafficCancel := context.WithCancel(ctx)
go func() {
defer trafficCancel()
defer wg.Done()
triggerFailpoints(ctx, t, clus, failpoint)
// Wait second to collect traffic after triggering last failpoint.
time.Sleep(time.Second)
trafficCancel()
}()
watchCtx, watchCancel := context.WithCancel(ctx)
var operations []porcupine.Operation
go func() {
defer watchCancel()
defer wg.Done()
operations = simulateTraffic(trafficCtx, t, clus, traffic)
// Wait second to collect watch events after all traffic was sent.
time.Sleep(time.Second)
watchCancel()
}()
events := watchClusterChanges(watchCtx, t, clus)
err = clus.Stop()
if err != nil {
t.Error(err)
}
validateEventsMatch(t, events)
checkOperationsAndPersistResults(t, operations, clus)
go func() {
defer wg.Done()
events = watchClusterChanges(watchCtx, t, clus)
}()
wg.Wait()
return operations, events
}

func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
Expand Down

0 comments on commit 221d2bb

Please sign in to comment.