diff --git a/tests/linearizability/client.go b/tests/linearizability/client.go index ca2da56df8f..4a4e0c675fd 100644 --- a/tests/linearizability/client.go +++ b/tests/linearizability/client.go @@ -18,19 +18,16 @@ import ( "context" "time" - "github.com/anishathalye/porcupine" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) type recordingClient struct { - client clientv3.Client - id int - - operations []porcupine.Operation + client clientv3.Client + history *appendableHistory } -func NewClient(endpoints []string, id int) (*recordingClient, error) { +func NewClient(endpoints []string, ids idProvider) (*recordingClient, error) { cc, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, Logger: zap.NewNop(), @@ -41,9 +38,8 @@ func NewClient(endpoints []string, id int) (*recordingClient, error) { return nil, err } return &recordingClient{ - client: *cc, - id: id, - operations: []porcupine.Operation{}, + client: *cc, + history: newAppendableHistory(ids), }, nil } @@ -58,17 +54,7 @@ func (c *recordingClient) Get(ctx context.Context, key string) error { if err != nil { return err } - var readData string - if len(resp.Kvs) == 1 { - readData = string(resp.Kvs[0].Value) - } - c.operations = append(c.operations, porcupine.Operation{ - ClientId: c.id, - Input: etcdRequest{op: Get, key: key}, - Call: callTime.UnixNano(), - Output: etcdResponse{getData: readData, revision: resp.Header.Revision}, - Return: returnTime.UnixNano(), - }) + c.history.AppendGet(key, callTime, returnTime, resp) return nil } @@ -76,36 +62,14 @@ func (c *recordingClient) Put(ctx context.Context, key, value string) error { callTime := time.Now() resp, err := c.client.Put(ctx, key, value) returnTime := time.Now() - var revision int64 - if resp != nil && resp.Header != nil { - revision = resp.Header.Revision - } - c.operations = append(c.operations, porcupine.Operation{ - ClientId: c.id, - Input: etcdRequest{op: Put, key: key, putData: value}, - Call: callTime.UnixNano(), - Output: etcdResponse{err: err, revision: revision}, - Return: returnTime.UnixNano(), - }) - return nil + c.history.AppendPut(key, value, callTime, returnTime, resp, err) + return err } func (c *recordingClient) Delete(ctx context.Context, key string) error { callTime := time.Now() resp, err := c.client.Delete(ctx, key) returnTime := time.Now() - var revision int64 - var deleted int64 - if resp != nil && resp.Header != nil { - revision = resp.Header.Revision - deleted = resp.Deleted - } - c.operations = append(c.operations, porcupine.Operation{ - ClientId: c.id, - Input: etcdRequest{op: Delete, key: key}, - Call: callTime.UnixNano(), - Output: etcdResponse{revision: revision, deleted: deleted, err: err}, - Return: returnTime.UnixNano(), - }) + c.history.AppendDelete(key, callTime, returnTime, resp, err) return nil } diff --git a/tests/linearizability/history.go b/tests/linearizability/history.go new file mode 100644 index 00000000000..388bccd0066 --- /dev/null +++ b/tests/linearizability/history.go @@ -0,0 +1,148 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "time" + + "github.com/anishathalye/porcupine" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type appendableHistory struct { + // id of the next write operation. If needed a new id might be requested from idProvider. + id int + idProvider idProvider + + history +} + +func newAppendableHistory(ids idProvider) *appendableHistory { + return &appendableHistory{ + id: ids.ClientId(), + idProvider: ids, + history: history{ + successful: []porcupine.Operation{}, + failed: []porcupine.Operation{}, + }, + } +} + +func (h *appendableHistory) AppendGet(key string, start, end time.Time, resp *clientv3.GetResponse) { + var readData string + if len(resp.Kvs) == 1 { + readData = string(resp.Kvs[0].Value) + } + h.successful = append(h.successful, porcupine.Operation{ + ClientId: h.id, + Input: EtcdRequest{Op: Get, Key: key}, + Call: start.UnixNano(), + Output: EtcdResponse{GetData: readData, Revision: resp.Header.Revision}, + Return: end.UnixNano(), + }) +} + +func (h *appendableHistory) AppendPut(key, value string, start, end time.Time, resp *clientv3.PutResponse, err error) { + request := EtcdRequest{Op: Put, Key: key, PutData: value} + if err != nil { + h.appendFailed(request, start, err) + return + } + var revision int64 + if resp != nil && resp.Header != nil { + revision = resp.Header.Revision + } + h.successful = append(h.successful, porcupine.Operation{ + ClientId: h.id, + Input: EtcdRequest{Op: Put, Key: key, PutData: value}, + Call: start.UnixNano(), + Output: EtcdResponse{Err: err, Revision: revision}, + Return: end.UnixNano(), + }) +} + +func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp *clientv3.DeleteResponse, err error) { + request := EtcdRequest{Op: Delete, Key: key} + if err != nil { + h.appendFailed(request, start, err) + return + } + var revision int64 + var deleted int64 + if resp != nil && resp.Header != nil { + revision = resp.Header.Revision + deleted = resp.Deleted + } + h.successful = append(h.successful, porcupine.Operation{ + ClientId: h.id, + Input: request, + Call: start.UnixNano(), + Output: EtcdResponse{Revision: revision, Deleted: deleted, Err: err}, + Return: end.UnixNano(), + }) +} + +func (h *appendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) { + h.failed = append(h.failed, porcupine.Operation{ + ClientId: h.id, + Input: request, + Call: start.UnixNano(), + Output: EtcdResponse{Err: err}, + Return: 0, // For failed writes we don't know when request has really finished. + }) + // Operations of single client needs to be sequential. + // As we don't know return time of failed operations, all new writes need to be done with new client id. + h.id = h.idProvider.ClientId() +} + +type history struct { + successful []porcupine.Operation + // failed requests are kept separate as we don't know return time of failed operations. + // Based on https://github.com/anishathalye/porcupine/issues/10 + failed []porcupine.Operation +} + +func (h history) Merge(h2 history) history { + result := history{ + successful: make([]porcupine.Operation, 0, len(h.successful)+len(h2.successful)), + failed: make([]porcupine.Operation, 0, len(h.failed)+len(h2.failed)), + } + result.successful = append(result.successful, h.successful...) + result.successful = append(result.successful, h2.successful...) + result.failed = append(result.failed, h.failed...) + result.failed = append(result.failed, h2.failed...) + return result +} + +func (h history) Operations() []porcupine.Operation { + operations := make([]porcupine.Operation, 0, len(h.successful)+len(h.failed)) + var maxTime int64 + for _, op := range h.successful { + operations = append(operations, op) + if op.Return > maxTime { + maxTime = op.Return + } + } + // Failed requests don't have a known return time. + // We simulate Infinity by using return time of latest successfully request. + for _, op := range h.failed { + if op.Call > maxTime { + continue + } + op.Return = maxTime + 1 + operations = append(operations, op) + } + return operations +} diff --git a/tests/linearizability/id.go b/tests/linearizability/id.go new file mode 100644 index 00000000000..4e8fa381765 --- /dev/null +++ b/tests/linearizability/id.go @@ -0,0 +1,40 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import "sync/atomic" + +type idProvider interface { + ClientId() int + RequestId() int +} + +func newIdProvider() idProvider { + return &atomicProvider{} +} + +type atomicProvider struct { + clientId atomic.Int64 + requestId atomic.Int64 +} + +func (id *atomicProvider) ClientId() int { + // Substract one as ClientId should start from zero. + return int(id.clientId.Add(1) - 1) +} + +func (id *atomicProvider) RequestId() int { + return int(id.requestId.Add(1)) +} diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 70ac4cb4a7d..78780e700c8 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -140,10 +140,12 @@ type FailpointConfig struct { waitBetweenTriggers time.Duration } -func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config trafficConfig) (operations []porcupine.Operation) { +func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config trafficConfig) []porcupine.Operation { mux := sync.Mutex{} endpoints := clus.EndpointsV3() + ids := newIdProvider() + h := history{} limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) startTime := time.Now() @@ -151,7 +153,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu for i := 0; i < config.clientCount; i++ { wg.Add(1) endpoints := []string{endpoints[i%len(endpoints)]} - c, err := NewClient(endpoints, i) + c, err := NewClient(endpoints, ids) if err != nil { t.Fatal(err) } @@ -159,14 +161,15 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu defer wg.Done() defer c.Close() - config.traffic.Run(ctx, c, limiter) + config.traffic.Run(ctx, c, limiter, ids) mux.Lock() - operations = append(operations, c.operations...) + h = h.Merge(c.history.history) mux.Unlock() }(c) } wg.Wait() endTime := time.Now() + operations := h.Operations() t.Logf("Recorded %d operations", len(operations)) qps := float64(len(operations)) / float64(endTime.Sub(startTime)) * float64(time.Second) diff --git a/tests/linearizability/model.go b/tests/linearizability/model.go index 6c0617222d8..ec83d59f1e1 100644 --- a/tests/linearizability/model.go +++ b/tests/linearizability/model.go @@ -29,24 +29,24 @@ const ( Delete Operation = "delete" ) -type etcdRequest struct { - op Operation - key string - putData string +type EtcdRequest struct { + Op Operation + Key string + PutData string } -type etcdResponse struct { - getData string - revision int64 - deleted int64 - err error +type EtcdResponse struct { + GetData string + Revision int64 + Deleted int64 + Err error } type EtcdState struct { Key string Value string LastRevision int64 - FailedWrites map[string]struct{} + FailedWrite *EtcdRequest } var etcdModel = porcupine.Model{ @@ -57,7 +57,7 @@ var etcdModel = porcupine.Model{ if err != nil { panic(err) } - ok, state := step(state, in.(etcdRequest), out.(etcdResponse)) + ok, state := step(state, in.(EtcdRequest), out.(EtcdResponse)) data, err := json.Marshal(state) if err != nil { panic(err) @@ -65,26 +65,26 @@ var etcdModel = porcupine.Model{ return ok, string(data) }, DescribeOperation: func(in, out interface{}) string { - request := in.(etcdRequest) - response := out.(etcdResponse) - switch request.op { + request := in.(EtcdRequest) + response := out.(EtcdResponse) + switch request.Op { case Get: - if response.err != nil { - return fmt.Sprintf("get(%q) -> %q", request.key, response.err) + if response.Err != nil { + return fmt.Sprintf("get(%q) -> %q", request.Key, response.Err) } else { - return fmt.Sprintf("get(%q) -> %q, rev: %d", request.key, response.getData, response.revision) + return fmt.Sprintf("get(%q) -> %q, rev: %d", request.Key, response.GetData, response.Revision) } case Put: - if response.err != nil { - return fmt.Sprintf("put(%q, %q) -> %s", request.key, request.putData, response.err) + if response.Err != nil { + return fmt.Sprintf("put(%q, %q) -> %s", request.Key, request.PutData, response.Err) } else { - return fmt.Sprintf("put(%q, %q) -> ok, rev: %d", request.key, request.putData, response.revision) + return fmt.Sprintf("put(%q, %q) -> ok, rev: %d", request.Key, request.PutData, response.Revision) } case Delete: - if response.err != nil { - return fmt.Sprintf("delete(%q) -> %s", request.key, response.err) + if response.Err != nil { + return fmt.Sprintf("delete(%q) -> %s", request.Key, response.Err) } else { - return fmt.Sprintf("delete(%q) -> ok, rev: %d deleted:%d", request.key, response.revision, response.deleted) + return fmt.Sprintf("delete(%q) -> ok, rev: %d deleted:%d", request.Key, response.Revision, response.Deleted) } default: return "" @@ -92,17 +92,17 @@ var etcdModel = porcupine.Model{ }, } -func step(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { - if request.key == "" { +func step(state EtcdState, request EtcdRequest, response EtcdResponse) (bool, EtcdState) { + if request.Key == "" { panic("invalid request") } if state.Key == "" { return true, initState(request, response) } - if state.Key != request.key { + if state.Key != request.Key { panic("Multiple keys not supported") } - switch request.op { + switch request.Op { case Get: return stepGet(state, request, response) case Put: @@ -114,24 +114,23 @@ func step(state EtcdState, request etcdRequest, response etcdResponse) (bool, Et } } -func initState(request etcdRequest, response etcdResponse) EtcdState { +func initState(request EtcdRequest, response EtcdResponse) EtcdState { state := EtcdState{ - Key: request.key, - LastRevision: response.revision, - FailedWrites: map[string]struct{}{}, + Key: request.Key, + LastRevision: response.Revision, } - switch request.op { + switch request.Op { case Get: - state.Value = response.getData + state.Value = response.GetData case Put: - if response.err == nil { - state.Value = request.putData + if response.Err == nil { + state.Value = request.PutData } else { - state.FailedWrites[request.putData] = struct{}{} + state.FailedWrite = &request } case Delete: - if response.err != nil { - state.FailedWrites[""] = struct{}{} + if response.Err != nil { + state.FailedWrite = &request } default: panic("Unknown operation") @@ -139,55 +138,76 @@ func initState(request etcdRequest, response etcdResponse) EtcdState { return state } -func stepGet(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { - if state.Value == response.getData && state.LastRevision <= response.revision { +func stepGet(state EtcdState, request EtcdRequest, response EtcdResponse) (bool, EtcdState) { + if state.Value == response.GetData && state.LastRevision == response.Revision { + state.FailedWrite = nil return true, state } - _, ok := state.FailedWrites[response.getData] - if ok && state.LastRevision < response.revision { - state.Value = response.getData - state.LastRevision = response.revision - delete(state.FailedWrites, response.getData) - return true, state + if state.FailedWrite != nil && state.LastRevision < response.Revision { + var ok bool + switch state.FailedWrite.Op { + case Get: + panic("Expected write") + case Put: + ok = response.GetData == state.FailedWrite.PutData + case Delete: + ok = response.GetData == "" + default: + panic("Unknown operation") + } + if ok { + state.Value = response.GetData + state.LastRevision = response.Revision + state.FailedWrite = nil + return true, state + } } return false, state } -func stepPut(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { - if response.err != nil { - state.FailedWrites[request.putData] = struct{}{} +func stepPut(state EtcdState, request EtcdRequest, response EtcdResponse) (bool, EtcdState) { + if response.Err != nil { + state.FailedWrite = &request return true, state } - if state.LastRevision >= response.revision { + if response.Revision <= state.LastRevision { + return false, state + } + if response.Revision != state.LastRevision+1 && state.FailedWrite == nil { return false, state } - state.Value = request.putData - state.LastRevision = response.revision + state.Value = request.PutData + state.LastRevision = response.Revision + state.FailedWrite = nil return true, state } -func stepDelete(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { - if response.err != nil { - state.FailedWrites[""] = struct{}{} +func stepDelete(state EtcdState, request EtcdRequest, response EtcdResponse) (bool, EtcdState) { + if response.Err != nil { + state.FailedWrite = &request return true, state } - deleteSucceeded := response.deleted != 0 + // revision should never decrease + if response.Revision < state.LastRevision { + return false, state + } + deleteSucceeded := response.Deleted != 0 keySet := state.Value != "" - //non-existent key cannot be deleted. - if deleteSucceeded != keySet { + // non-existent key cannot be deleted. + if deleteSucceeded != keySet && state.FailedWrite == nil { return false, state } - //if key was deleted, response revision should go up - if deleteSucceeded && state.LastRevision >= response.revision { + //if key was deleted, response revision should increase + if deleteSucceeded && (response.Revision != state.LastRevision+1 || !keySet) && (state.FailedWrite == nil || response.Revision < state.LastRevision+2) { return false, state } //if key was not deleted, response revision should not change - if !deleteSucceeded && state.LastRevision != response.revision { + if !deleteSucceeded && state.LastRevision != response.Revision && state.FailedWrite == nil { return false, state } state.Value = "" - state.LastRevision = response.revision + state.LastRevision = response.Revision return true, state } diff --git a/tests/linearizability/model_test.go b/tests/linearizability/model_test.go index ca65a146b46..65aca563050 100644 --- a/tests/linearizability/model_test.go +++ b/tests/linearizability/model_test.go @@ -27,91 +27,195 @@ func TestModel(t *testing.T) { { name: "First Get can start from non-empty value and non-zero revision", operations: []testOperation{ - {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 42}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 42}}, }, }, { name: "First Put can start from non-zero revision", operations: []testOperation{ - {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 42}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Revision: 42}}, }, }, { - name: "Get response data should match PUT", + name: "First delete can start from non-zero revision", operations: []testOperation{ - {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}}, - {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 1}, failure: true}, - {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "1", revision: 1}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 42}}, }, }, { - name: "Get response revision should be equal or greater then put", + name: "Get response data should match put", operations: []testOperation{ - {req: etcdRequest{op: Put, key: "key"}, resp: etcdResponse{revision: 2}}, - {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{revision: 1}, failure: true}, - {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{revision: 2}}, - {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{revision: 4}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 2}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, }, }, { - name: "Put bumps revision", + name: "Get revision should be equal to put", operations: []testOperation{ - {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}}, - {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 1}, failure: true}, - {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 2}}, + {req: EtcdRequest{Op: Put, Key: "key"}, resp: EtcdResponse{Revision: 2}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 3}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 2}}, }, }, { - name: "Put can fail and be lost", + name: "Put must increase revision by 1", operations: []testOperation{ - {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}}, - {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{err: errors.New("failed")}}, - {req: etcdRequest{op: Put, key: "key", putData: "3"}, resp: etcdResponse{revision: 2}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 3}, failure: true}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Revision: 2}}, }, }, { - name: "Put can fail but bump revision", + name: "Put can fail and be lost", operations: []testOperation{ - {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}}, - {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{err: errors.New("failed")}}, - {req: etcdRequest{op: Put, key: "key", putData: "3"}, resp: etcdResponse{revision: 3}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 2}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 2}, failure: true}, }, }, { - name: "Put can fail but be persisted and bump revision", + name: "Put can fail but be persisted and increase revision before put", operations: []testOperation{ - {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}}, - {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{err: errors.New("failed")}}, - {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 1}, failure: true}, - {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 2}}, + // One failed request, one persisted. + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}}, + // Two failed request, two persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "6"}, resp: EtcdResponse{Revision: 6}}, }, }, { - name: "Put can fail but be persisted later", + name: "Put can fail but be persisted and increase revision before get", operations: []testOperation{ - {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{err: errors.New("failed")}}, - {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 2}}, - {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 2}}, - {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "1", revision: 3}}, + // One failed request, one persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "3", Revision: 2}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 2}}, + // Two failed request, two persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "3", Revision: 3}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "3", Revision: 4}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "4", Revision: 4}}, }, }, { - name: "Put can fail but bump revision later", + name: "Put can fail but be persisted and increase revision before delete", operations: []testOperation{ - {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{err: errors.New("failed")}}, - {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 2}}, - {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 2}}, - {req: etcdRequest{op: Put, key: "key", putData: "3"}, resp: etcdResponse{revision: 4}}, + // One failed request, one persisted. + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 2}, failure: true}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 3}}, + // Two failed request, two persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "6"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 7}}, + // Two failed request, one persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "8"}, resp: EtcdResponse{Revision: 8}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "9"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "10"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 10}}, }, }, { name: "Delete only increases revision on success", operations: []testOperation{ - {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}}, - {req: etcdRequest{op: Delete, key: "key"}, resp: etcdResponse{deleted: 1, revision: 1}, failure: true}, - {req: etcdRequest{op: Delete, key: "key"}, resp: etcdResponse{deleted: 1, revision: 2}}, - {req: etcdRequest{op: Delete, key: "key"}, resp: etcdResponse{deleted: 0, revision: 3}, failure: true}, - {req: etcdRequest{op: Delete, key: "key"}, resp: etcdResponse{deleted: 0, revision: 2}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 2}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 0, Revision: 3}, failure: true}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 0, Revision: 2}}, + }, + }, + { + name: "Delete clears value", + operations: []testOperation{ + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 2}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 2}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 2}}, + }, + }, + { + name: "Delete can fail and be lost before get", + operations: []testOperation{ + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 2}, failure: true}, + }, + }, + { + name: "Delete can fail and be lost before delete", + operations: []testOperation{ + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 2}}, + }, + }, + { + name: "Delete can fail and be lost before put", + operations: []testOperation{ + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Revision: 2}}, + }, + }, + { + name: "Delete can fail but be persisted before get", + operations: []testOperation{ + // One failed request, one persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 2}}, + // Two failed request, one persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 4}}, + }, + }, + { + name: "Delete can fail but be persisted before put", + operations: []testOperation{ + // One failed request, one persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}}, + // Two failed request, one persisted. + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "5"}, resp: EtcdResponse{Revision: 5}}, + }, + }, + { + name: "Delete can fail but be persisted before delete", + operations: []testOperation{ + // One failed request, one persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 2}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}}, + // Two failed request, one persisted. + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 4}}, }, }, } @@ -131,7 +235,7 @@ func TestModel(t *testing.T) { } type testOperation struct { - req etcdRequest - resp etcdResponse + req EtcdRequest + resp EtcdResponse failure bool } diff --git a/tests/linearizability/traffic.go b/tests/linearizability/traffic.go index f1507466e52..413f3b8e2e5 100644 --- a/tests/linearizability/traffic.go +++ b/tests/linearizability/traffic.go @@ -28,7 +28,7 @@ var ( ) type Traffic interface { - Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) + Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider) } type readWriteSingleKey struct { @@ -41,12 +41,9 @@ type opChance struct { chance int } -func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) { - maxOperationsPerClient := 1000000 - minId := maxOperationsPerClient * c.id - maxId := maxOperationsPerClient * (c.id + 1) +func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider) { - for writeId := minId; writeId < maxId; { + for { select { case <-ctx.Done(): return @@ -58,10 +55,8 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter continue } // Provide each write with unique id to make it easier to validate operation history. - t.Write(ctx, c, limiter, writeId) - writeId++ + t.Write(ctx, c, limiter, ids.RequestId()) } - return } func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) error {