Skip to content

Commit

Permalink
functional-tester/tester: add randomize delay
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Apr 5, 2018
1 parent 3a93f1c commit ea4effc
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 28 deletions.
43 changes: 35 additions & 8 deletions tools/functional-tester/tester/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,11 @@ func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
}
}

if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv {
return nil, fmt.Errorf("delay latency %d ms must be greater than delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv)
if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 {
return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv)
}
if clus.Tester.UpdatedDelayLatencyMs == 0 {
clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
}

for _, v := range clus.Tester.FailureCases {
Expand Down Expand Up @@ -303,17 +306,29 @@ func (clus *Cluster) updateFailures() {
clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll(clus))

case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus))
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, true))
case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot())
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, true))
case "DELAY_PEER_PORT_TX_RX_LEADER":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus))
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, true))
case "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot())
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, true))
case "DELAY_PEER_PORT_TX_RX_QUORUM":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus))
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, true))
case "DELAY_PEER_PORT_TX_RX_ALL":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus))
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, false))
case "RANDOM_DELAY_PEER_PORT_TX_RX_ALL":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, true))

case "NO_FAIL_WITH_STRESS":
clus.failures = append(clus.failures, newFailureNoFailWithStress(clus))
Expand All @@ -340,6 +355,18 @@ func (clus *Cluster) failureStrings() (fs []string) {
return fs
}

// UpdateDelayLatencyMs updates delay latency with random value
// within election timeout.
func (clus *Cluster) UpdateDelayLatencyMs() {
rand.Seed(time.Now().UnixNano())
clus.Tester.UpdatedDelayLatencyMs = uint32(rand.Int63n(clus.Members[0].Etcd.ElectionTimeoutMs))

minLatRv := clus.Tester.DelayLatencyMsRv + clus.Tester.DelayLatencyMsRv/5
if clus.Tester.UpdatedDelayLatencyMs <= minLatRv {
clus.Tester.UpdatedDelayLatencyMs += minLatRv
}
}

func (clus *Cluster) shuffleFailures() {
rand.Seed(time.Now().UnixNano())
offset := rand.Intn(1000)
Expand Down
17 changes: 9 additions & 8 deletions tools/functional-tester/tester/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,15 @@ func Test_newCluster(t *testing.T) {
},
},
Tester: &rpcpb.Tester{
TesterNetwork: "tcp",
TesterAddr: "127.0.0.1:9028",
DelayLatencyMs: 5000,
DelayLatencyMsRv: 150,
RoundLimit: 1,
ExitOnFailure: true,
ConsistencyCheck: true,
EnablePprof: true,
TesterNetwork: "tcp",
TesterAddr: "127.0.0.1:9028",
DelayLatencyMs: 5000,
DelayLatencyMsRv: 500,
UpdatedDelayLatencyMs: 5000,
RoundLimit: 1,
ExitOnFailure: true,
ConsistencyCheck: true,
EnablePprof: true,
FailureCases: []string{
"KILL_ONE_FOLLOWER",
"KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
Expand Down
1 change: 1 addition & 0 deletions tools/functional-tester/tester/cluster_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (clus *Cluster) doRound() error {
"round START",
zap.Int("round", clus.rd),
zap.Strings("failures", clus.failureStrings()),
zap.Int("total-failures", len(clus.failures)),
)
for i, fa := range clus.failures {
clus.cs = i
Expand Down
15 changes: 12 additions & 3 deletions tools/functional-tester/tester/failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,18 @@ type failureUntilSnapshot struct {
Failure
}

// all delay failure cases except the ones failing with latency
// greater than election timeout (trigger leader election and
// cluster keeps operating anyways)
var slowCases = map[rpcpb.FailureCase]bool{
rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER: true,
rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER: true,
rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM: true,
rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_ALL: true,
}

func (f *failureUntilSnapshot) Inject(clus *Cluster) error {
Expand Down Expand Up @@ -263,7 +272,7 @@ func (f *failureUntilSnapshot) Inject(clus *Cluster) error {
retries := int(snapshotCount) / 1000 * 3
if v, ok := slowCases[f.FailureCase()]; v && ok {
// slow network takes more retries
retries *= 2
retries *= 5
}

for i := 0; i < retries; i++ {
Expand Down
66 changes: 58 additions & 8 deletions tools/functional-tester/tester/failure_case_network_delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"time"

"github.com/coreos/etcd/tools/functional-tester/rpcpb"

"go.uber.org/zap"
)

const (
Expand All @@ -29,6 +31,12 @@ const (
)

func injectDelayPeerPortTxRx(clus *Cluster, idx int) error {
clus.lg.Info(
"injecting delay latency",
zap.Duration("latency", time.Duration(clus.Tester.UpdatedDelayLatencyMs)*time.Millisecond),
zap.Duration("latency-rv", time.Duration(clus.Tester.DelayLatencyMsRv)*time.Millisecond),
zap.String("endpoint", clus.Members[idx].EtcdClientEndpoint),
)
return clus.sendOperation(idx, rpcpb.Operation_DelayPeerPortTxRx)
}

Expand All @@ -38,76 +46,118 @@ func recoverDelayPeerPortTxRx(clus *Cluster, idx int) error {
return err
}

func newFailureDelayPeerPortTxRxOneFollower(clus *Cluster) Failure {
func newFailureDelayPeerPortTxRxOneFollower(clus *Cluster, random bool) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
}

clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {
clus.UpdateDelayLatencyMs()
ff.failureCase = rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER
}

f := &failureFollower{ff, -1, -1}
return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
}
}

func newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot() Failure {
func newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus *Cluster, random bool) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
}

clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {
clus.UpdateDelayLatencyMs()
ff.failureCase = rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
}

f := &failureFollower{ff, -1, -1}
return &failureUntilSnapshot{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
failureCase: ff.failureCase,
Failure: f,
}
}

func newFailureDelayPeerPortTxRxLeader(clus *Cluster) Failure {
func newFailureDelayPeerPortTxRxLeader(clus *Cluster, random bool) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
}

clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {
clus.UpdateDelayLatencyMs()
ff.failureCase = rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER
}

f := &failureLeader{ff, -1, -1}
return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
}
}

func newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot() Failure {
func newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus *Cluster, random bool) Failure {
ff := failureByFunc{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
}

clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {
clus.UpdateDelayLatencyMs()
ff.failureCase = rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
}

f := &failureLeader{ff, -1, -1}
return &failureUntilSnapshot{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT,
failureCase: ff.failureCase,
Failure: f,
}
}

func newFailureDelayPeerPortTxRxQuorum(clus *Cluster) Failure {
func newFailureDelayPeerPortTxRxQuorum(clus *Cluster, random bool) Failure {
f := &failureQuorum{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_QUORUM,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
}

clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {
clus.UpdateDelayLatencyMs()
f.failureCase = rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM
}

return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
}
}

func newFailureDelayPeerPortTxRxAll(clus *Cluster) Failure {
func newFailureDelayPeerPortTxRxAll(clus *Cluster, random bool) Failure {
f := &failureAll{
failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ALL,
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
}

clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
if random {
clus.UpdateDelayLatencyMs()
f.failureCase = rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_ALL
}

return &failureDelay{
Failure: f,
delayDuration: clus.GetFailureDelayDuration(),
Expand Down
2 changes: 1 addition & 1 deletion tools/functional-tester/tester/local-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ tester-config:

# slow enough to trigger election
delay-latency-ms: 5000
delay-latency-ms-rv: 150
delay-latency-ms-rv: 500

round-limit: 1
exit-on-failure: true
Expand Down

0 comments on commit ea4effc

Please sign in to comment.