Skip to content

Commit

Permalink
Merge pull request #14880 from serathius/linearizability-failed
Browse files Browse the repository at this point in the history
Improve support for failed requests in linearizability tests
  • Loading branch information
serathius authored Dec 6, 2022
2 parents e6ef3c0 + 5ff9202 commit a4c6d1b
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 165 deletions.
54 changes: 9 additions & 45 deletions tests/linearizability/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@ import (
"context"
"time"

"github.com/anishathalye/porcupine"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type recordingClient struct {
client clientv3.Client
id int

operations []porcupine.Operation
client clientv3.Client
history *appendableHistory
}

func NewClient(endpoints []string, id int) (*recordingClient, error) {
func NewClient(endpoints []string, ids idProvider) (*recordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
Expand All @@ -41,9 +38,8 @@ func NewClient(endpoints []string, id int) (*recordingClient, error) {
return nil, err
}
return &recordingClient{
client: *cc,
id: id,
operations: []porcupine.Operation{},
client: *cc,
history: newAppendableHistory(ids),
}, nil
}

Expand All @@ -58,54 +54,22 @@ func (c *recordingClient) Get(ctx context.Context, key string) error {
if err != nil {
return err
}
var readData string
if len(resp.Kvs) == 1 {
readData = string(resp.Kvs[0].Value)
}
c.operations = append(c.operations, porcupine.Operation{
ClientId: c.id,
Input: etcdRequest{op: Get, key: key},
Call: callTime.UnixNano(),
Output: etcdResponse{getData: readData, revision: resp.Header.Revision},
Return: returnTime.UnixNano(),
})
c.history.AppendGet(key, callTime, returnTime, resp)
return nil
}

func (c *recordingClient) Put(ctx context.Context, key, value string) error {
callTime := time.Now()
resp, err := c.client.Put(ctx, key, value)
returnTime := time.Now()
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
c.operations = append(c.operations, porcupine.Operation{
ClientId: c.id,
Input: etcdRequest{op: Put, key: key, putData: value},
Call: callTime.UnixNano(),
Output: etcdResponse{err: err, revision: revision},
Return: returnTime.UnixNano(),
})
return nil
c.history.AppendPut(key, value, callTime, returnTime, resp, err)
return err
}

func (c *recordingClient) Delete(ctx context.Context, key string) error {
callTime := time.Now()
resp, err := c.client.Delete(ctx, key)
returnTime := time.Now()
var revision int64
var deleted int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
deleted = resp.Deleted
}
c.operations = append(c.operations, porcupine.Operation{
ClientId: c.id,
Input: etcdRequest{op: Delete, key: key},
Call: callTime.UnixNano(),
Output: etcdResponse{revision: revision, deleted: deleted, err: err},
Return: returnTime.UnixNano(),
})
c.history.AppendDelete(key, callTime, returnTime, resp, err)
return nil
}
148 changes: 148 additions & 0 deletions tests/linearizability/history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package linearizability

import (
"time"

"github.com/anishathalye/porcupine"
clientv3 "go.etcd.io/etcd/client/v3"
)

type appendableHistory struct {
// id of the next write operation. If needed a new id might be requested from idProvider.
id int
idProvider idProvider

history
}

func newAppendableHistory(ids idProvider) *appendableHistory {
return &appendableHistory{
id: ids.ClientId(),
idProvider: ids,
history: history{
successful: []porcupine.Operation{},
failed: []porcupine.Operation{},
},
}
}

func (h *appendableHistory) AppendGet(key string, start, end time.Time, resp *clientv3.GetResponse) {
var readData string
if len(resp.Kvs) == 1 {
readData = string(resp.Kvs[0].Value)
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: EtcdRequest{Op: Get, Key: key},
Call: start.UnixNano(),
Output: EtcdResponse{GetData: readData, Revision: resp.Header.Revision},
Return: end.UnixNano(),
})
}

func (h *appendableHistory) AppendPut(key, value string, start, end time.Time, resp *clientv3.PutResponse, err error) {
request := EtcdRequest{Op: Put, Key: key, PutData: value}
if err != nil {
h.appendFailed(request, start, err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: EtcdRequest{Op: Put, Key: key, PutData: value},
Call: start.UnixNano(),
Output: EtcdResponse{Err: err, Revision: revision},
Return: end.UnixNano(),
})
}

func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp *clientv3.DeleteResponse, err error) {
request := EtcdRequest{Op: Delete, Key: key}
if err != nil {
h.appendFailed(request, start, err)
return
}
var revision int64
var deleted int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
deleted = resp.Deleted
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: EtcdResponse{Revision: revision, Deleted: deleted, Err: err},
Return: end.UnixNano(),
})
}

func (h *appendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) {
h.failed = append(h.failed, porcupine.Operation{
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: EtcdResponse{Err: err},
Return: 0, // For failed writes we don't know when request has really finished.
})
// Operations of single client needs to be sequential.
// As we don't know return time of failed operations, all new writes need to be done with new client id.
h.id = h.idProvider.ClientId()
}

type history struct {
successful []porcupine.Operation
// failed requests are kept separate as we don't know return time of failed operations.
// Based on https://github.com/anishathalye/porcupine/issues/10
failed []porcupine.Operation
}

func (h history) Merge(h2 history) history {
result := history{
successful: make([]porcupine.Operation, 0, len(h.successful)+len(h2.successful)),
failed: make([]porcupine.Operation, 0, len(h.failed)+len(h2.failed)),
}
result.successful = append(result.successful, h.successful...)
result.successful = append(result.successful, h2.successful...)
result.failed = append(result.failed, h.failed...)
result.failed = append(result.failed, h2.failed...)
return result
}

func (h history) Operations() []porcupine.Operation {
operations := make([]porcupine.Operation, 0, len(h.successful)+len(h.failed))
var maxTime int64
for _, op := range h.successful {
operations = append(operations, op)
if op.Return > maxTime {
maxTime = op.Return
}
}
// Failed requests don't have a known return time.
// We simulate Infinity by using return time of latest successfully request.
for _, op := range h.failed {
if op.Call > maxTime {
continue
}
op.Return = maxTime + 1
operations = append(operations, op)
}
return operations
}
40 changes: 40 additions & 0 deletions tests/linearizability/id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package linearizability

import "sync/atomic"

type idProvider interface {
ClientId() int
RequestId() int
}

func newIdProvider() idProvider {
return &atomicProvider{}
}

type atomicProvider struct {
clientId atomic.Int64
requestId atomic.Int64
}

func (id *atomicProvider) ClientId() int {
// Substract one as ClientId should start from zero.
return int(id.clientId.Add(1) - 1)
}

func (id *atomicProvider) RequestId() int {
return int(id.requestId.Add(1))
}
11 changes: 7 additions & 4 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,33 +140,36 @@ type FailpointConfig struct {
waitBetweenTriggers time.Duration
}

func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config trafficConfig) (operations []porcupine.Operation) {
func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config trafficConfig) []porcupine.Operation {
mux := sync.Mutex{}
endpoints := clus.EndpointsV3()

ids := newIdProvider()
h := history{}
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)

startTime := time.Now()
wg := sync.WaitGroup{}
for i := 0; i < config.clientCount; i++ {
wg.Add(1)
endpoints := []string{endpoints[i%len(endpoints)]}
c, err := NewClient(endpoints, i)
c, err := NewClient(endpoints, ids)
if err != nil {
t.Fatal(err)
}
go func(c *recordingClient) {
defer wg.Done()
defer c.Close()

config.traffic.Run(ctx, c, limiter)
config.traffic.Run(ctx, c, limiter, ids)
mux.Lock()
operations = append(operations, c.operations...)
h = h.Merge(c.history.history)
mux.Unlock()
}(c)
}
wg.Wait()
endTime := time.Now()
operations := h.Operations()
t.Logf("Recorded %d operations", len(operations))

qps := float64(len(operations)) / float64(endTime.Sub(startTime)) * float64(time.Second)
Expand Down
Loading

0 comments on commit a4c6d1b

Please sign in to comment.