Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests/robustness: Validate all etcd watches opened to etcd #15893

Merged
merged 1 commit into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions tests/robustness/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# etcd Robustness Testing

Purpose of etcd robustness tests is to validate that etcd upholds
[API guarantees] and [watch guarantees] under any condition or failure.
[KV API guarantees] and [watch API guarantees] under any condition or failure.

Robustness tests achieve that comparing etcd cluster behavior against a simplified model.
Multiple test encompass different etcd cluster setups, client traffic types and failures experienced by cluster.
During a single test we create a cluster and inject failures while sending and recording client traffic.
Correctness is validated by running collected history of client operations against the etcd model and a set of validators.
Upon failure tests generate a report that can be used to attribute whether failure was caused by bug in etcd or test framework.

[API guarantees]: https://etcd.io/docs/latest/learning/api_guarantees/
[watch guarantees]: https://etcd.io/docs/latest/learning/api/#watch-streams
[KV API guarantees]: https://etcd.io/docs/v3.6/learning/api_guarantees/#kv-apis
[watch API guarantees]: https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis

## Running locally

Expand Down Expand Up @@ -41,15 +41,15 @@ is included in test logs. One of log lines should look like:
logger.go:130: 2023-03-18T12:18:03.244+0100 INFO Saving member data dir {"member": "TestRobustnessIssue14370-test-0", "path": "/tmp/TestRobustness_Issue14370/TestRobustnessIssue14370-test-0"}
logger.go:130: 2023-03-18T12:18:03.244+0100 INFO Saving watch responses {"path": "/tmp/TestRobustness_Issue14370/TestRobustnessIssue14370-test-0/responses.json"}
logger.go:130: 2023-03-18T12:18:03.247+0100 INFO Saving watch events {"path": "/tmp/TestRobustness_Issue14370/TestRobustnessIssue14370-test-0/events.json"}
logger.go:130: 2023-03-18T12:18:03.248+0100 INFO Saving operation history {"path": "/tmp/TestRobustness_Issue14370/full-history.json"}
logger.go:130: 2023-03-18T12:18:03.252+0100 INFO Saving operation history {"path": "/tmp/TestRobustness_Issue14370/patched-history.json"}
logger.go:130: 2023-05-15T17:42:37.792+0200 INFO Saving operation history {"path": "/tmp/TestRobustness_ClusterOfSize3_Kubernetes/client-1/operations.json"}
logger.go:130: 2023-05-15T17:42:37.793+0200 INFO Saving watch responses {"path": "/tmp/TestRobustness_ClusterOfSize3_Kubernetes/client-2/watch.json"}
Comment on lines +44 to +45
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the change accidentally updated? :/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's intentional as file paths change.

logger.go:130: 2023-03-18T12:18:03.256+0100 INFO Saving visualization {"path": "/tmp/TestRobustness_Issue14370/history.html"}
```

Report includes multiple types of files:
* Member db files, can be used to verify disk/memory corruption.
* Watch responses saved as json, can be used to validate [watch guarantees].
* Operation history saved as both html visualization and a json, can be used to validate [API guarantees].
* Watch responses saved as json, can be used to validate [watch API guarantees].
* Operation history saved as both html visualization and a json, can be used to validate [KV API guarantees].

### Example analysis of linearization issue

Expand Down Expand Up @@ -78,22 +78,22 @@ To reproduce the issue by yourself run `make test-robustness-issue15271`.
After a couple of tries robustness tests should fail with a logs `Broke watch guarantee` and save report locally.

Watch issues are easiest to analyse by reading the recorded watch history.
Watch history is recorded for each member separated in different subdirectory under `/tmp/TestRobustness_Issue15271/`
Open `responses.json` for member mentioned in log `Broke watch guarantee`.
For example for member `TestRobustnessIssue15271-test-1` open `/tmp/TestRobustness_Issue15271/TestRobustnessIssue15271-test-1/responses.json`.
Watch history is recorded for each client separated in different subdirectory under `/tmp/TestRobustness_Issue15271/`
Open `watch.json` for client mentioned in log `Broke watch guarantee`.
For example for client `14` open `/tmp/TestRobustness_Issue15271/client-14/watch.json`.

Each line consists of json blob corresponding to single watch response observed by client.
Look for lines with `mod_revision` equal to revision mentioned in the first log with `Broke watch guarantee`
You should see two lines where the `mod_revision` decreases like ones below:
```
{"Header":{"cluster_id":12951239930360520062,"member_id":16914881897345358027,"revision":2574,"raft_term":2},"Events":[{"kv":{"key":"Ng==","create_revision":2303,"mod_revision":2574,"version":46,"value":"Mjg5OA=="}}],"CompactRevision":0,"Canceled":false,"Created":false}
{"Header":{"cluster_id":12951239930360520062,"member_id":16914881897345358027,"revision":7708,"raft_term":2},"Events":[{"kv":{"key":"NQ==","create_revision":5,"mod_revision":91,"version":10,"value":"MTAy"}}, ... }
{"Events":[{"Op":{"Type":"put","Key":"5","WithPrefix":false,"Limit":0,"Value":{"Value":"357","Hash":0},"LeaseID":0},"Revision":335}],"IsProgressNotify":false,"Revision":335,"Time":1050415777}
{"Events":[{"Op":{"Type":"put","Key":"1","WithPrefix":false,"Limit":0,"Value":{"Value":"24","Hash":0},"LeaseID":0},"Revision":24}, ...
```

Up to the first line the `mod_revision` of events within responses only increased up to a value of `2574`.
However, the following line includes an event with `mod_revision` equal `91`.
If you follow the `mod_revision` throughout the file you should notice that watch replayed revisions second time.
This is incorrect and breaks `Ordered` and `Unique` [watch guarantees].
Up to the first line the `revision` of events within responses only increased up to a value of `335`.
However, the following line includes an event with `revision` equal `24`.
If you follow the `revision` throughout the file you should notice that watch replayed revisions second time.
This is incorrect and breaks `Ordered` and `Unique` [watch API guarantees].
This is consistent with the root cause of [#14370] where member reconnecting to cluster will incorrectly resend revisions.

[#15271]: https://github.com/etcd-io/etcd/issues/15271
7 changes: 7 additions & 0 deletions tests/robustness/identity/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Provider interface {
NewStreamId() int
// NewRequestId returns unique identification used to make write requests unique.
NewRequestId() int
// NewClientId returns unique identification for client and their reports.
NewClientId() int
}

func NewIdProvider() Provider {
Expand All @@ -30,6 +32,7 @@ func NewIdProvider() Provider {
type atomicProvider struct {
streamId atomic.Int64
requestId atomic.Int64
clientId atomic.Int64
}

func (id *atomicProvider) NewStreamId() int {
Expand All @@ -39,3 +42,7 @@ func (id *atomicProvider) NewStreamId() int {
func (id *atomicProvider) NewRequestId() int {
return int(id.requestId.Add(1))
}

func (id *atomicProvider) NewClientId() int {
return int(id.clientId.Add(1))
}
40 changes: 22 additions & 18 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"testing"
"time"

"github.com/anishathalye/porcupine"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)
Expand Down Expand Up @@ -152,27 +152,25 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
defer func() {
r.Report(t, panicked)
}()
r.operations, r.responses = s.run(ctx, t, lg, r.clus)
r.clientReports = s.run(ctx, t, lg, r.clus)
forcestopCluster(r.clus)

watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0
validateWatchResponses(t, r.clus, r.responses, s.watch.requestProgress || watchProgressNotifyEnabled)

r.events = watchEvents(r.responses)
validateEventsMatch(t, r.events)

r.patchedOperations = patchOperationBasedOnWatchEvents(r.operations, longestHistory(r.events))
r.visualizeHistory = model.ValidateOperationHistoryAndReturnVisualize(t, lg, r.patchedOperations)
validateGotAtLeastOneProgressNotify(t, r.clientReports, s.watch.requestProgress || watchProgressNotifyEnabled)
r.visualizeHistory = validateCorrectness(t, lg, r.clientReports)

panicked = false
}

func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (operations []porcupine.Operation, responses [][]traffic.WatchResponse) {
func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (reports []traffic.ClientReport) {
g := errgroup.Group{}
var operationReport, watchReport []traffic.ClientReport
finishTraffic := make(chan struct{})

// baseTime is used to get monotonic clock reading when recording operations/watch events
// using baseTime time-measuring operation to get monotonic clock reading
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
baseTime := time.Now()
ids := identity.NewIdProvider()
g.Go(func() error {
defer close(finishTraffic)
injectFailpoints(ctx, t, lg, clus, s.failpoint)
Expand All @@ -182,22 +180,22 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
maxRevisionChan := make(chan int64, 1)
g.Go(func() error {
defer close(maxRevisionChan)
operations = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic, baseTime)
maxRevisionChan <- operationsMaxRevision(operations)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic, baseTime, ids)
maxRevisionChan <- operationsMaxRevision(operationReport)
return nil
})
g.Go(func() error {
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime)
watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
return nil
})
g.Wait()
return operations, responses
return append(operationReport, watchReport...)
}

func operationsMaxRevision(operations []porcupine.Operation) int64 {
func operationsMaxRevision(reports []traffic.ClientReport) int64 {
var maxRevision int64
for _, op := range operations {
revision := op.Output.(model.EtcdNonDeterministicResponse).Revision
for _, r := range reports {
revision := r.OperationHistory.MaxRevision()
if revision > maxRevision {
maxRevision = revision
}
Expand All @@ -212,3 +210,9 @@ func forcestopCluster(clus *e2e.EtcdProcessCluster) error {
}
return clus.ConcurrentStop()
}

func validateCorrectness(t *testing.T, lg *zap.Logger, reports []traffic.ClientReport) (visualize func(basepath string)) {
validateWatchCorrectness(t, reports)
operations := operationsFromClientReports(reports)
return model.ValidateOperationHistoryAndReturnVisualize(t, lg, operations)
}
15 changes: 15 additions & 0 deletions tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ func (h History) Merge(h2 History) History {
return result
}

func (h History) Len() int {
return len(h.successful) + len(h.failed)
}

func (h History) Operations() []porcupine.Operation {
operations := make([]porcupine.Operation, 0, len(h.successful)+len(h.failed))
var maxTime int64
Expand All @@ -530,3 +534,14 @@ func (h History) Operations() []porcupine.Operation {
}
return operations
}

func (h History) MaxRevision() int64 {
var maxRevision int64
for _, op := range h.successful {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle a situation where len(h.successful) == 0?

Copy link
Member Author

@serathius serathius May 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can maintain special meaning of revision=0 as it's in etcd. Having 0 is still correct as not all clients make KV operations.

revision := op.Output.(EtcdNonDeterministicResponse).Revision
if revision > maxRevision {
maxRevision = revision
}
}
return maxRevision
}
48 changes: 27 additions & 21 deletions tests/robustness/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package robustness

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"testing"

Expand All @@ -29,13 +31,10 @@ import (
)

type report struct {
lg *zap.Logger
clus *e2e.EtcdProcessCluster
responses [][]traffic.WatchResponse
events [][]watchEvent
operations []porcupine.Operation
patchedOperations []porcupine.Operation
visualizeHistory func(path string)
lg *zap.Logger
clus *e2e.EtcdProcessCluster
clientReports []traffic.ClientReport
visualizeHistory func(path string)
}

func testResultsDirectory(t *testing.T) string {
Expand Down Expand Up @@ -65,21 +64,28 @@ func testResultsDirectory(t *testing.T) string {
func (r *report) Report(t *testing.T, force bool) {
path := testResultsDirectory(t)
if t.Failed() || force {
for i, member := range r.clus.Procs {
memberDataDir := filepath.Join(path, member.Config().Name)
for _, member := range r.clus.Procs {
memberDataDir := filepath.Join(path, fmt.Sprintf("server-%s", member.Config().Name))
persistMemberDataDir(t, r.lg, member, memberDataDir)
if r.responses != nil {
persistWatchResponses(t, r.lg, filepath.Join(memberDataDir, "responses.json"), r.responses[i])
}
if r.events != nil {
persistWatchEvents(t, r.lg, filepath.Join(memberDataDir, "events.json"), r.events[i])
}
}
if r.operations != nil {
persistOperationHistory(t, r.lg, filepath.Join(path, "full-history.json"), r.operations)
}
if r.patchedOperations != nil {
persistOperationHistory(t, r.lg, filepath.Join(path, "patched-history.json"), r.patchedOperations)
if r.clientReports != nil {
sort.Slice(r.clientReports, func(i, j int) bool {
serathius marked this conversation as resolved.
Show resolved Hide resolved
return r.clientReports[i].ClientId < r.clientReports[j].ClientId
})
for _, report := range r.clientReports {
clientDir := filepath.Join(path, fmt.Sprintf("client-%d", report.ClientId))
err := os.MkdirAll(clientDir, 0700)
if err != nil {
t.Fatal(err)
}
if len(report.Watch) != 0 {
persistWatchResponses(t, r.lg, filepath.Join(clientDir, "watch.json"), report.Watch)
}
operations := report.OperationHistory.Operations()
if len(operations) != 0 {
persistOperationHistory(t, r.lg, filepath.Join(clientDir, "operations.json"), operations)
}
}
}
}
if r.visualizeHistory != nil {
Expand Down Expand Up @@ -112,7 +118,7 @@ func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses
}
}

func persistWatchEvents(t *testing.T, lg *zap.Logger, path string, events []watchEvent) {
func persistWatchEvents(t *testing.T, lg *zap.Logger, path string, events []traffic.TimedWatchEvent) {
lg.Info("Saving watch events", zap.String("path", path))
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
Expand Down
Loading