Skip to content

Commit

Permalink
functional-test: add advance network failure cases
Browse files Browse the repository at this point in the history
add more network failures such as packet corruption, reordering, loss, and network partition.

resolve etcd-io#5614
  • Loading branch information
fanminshi committed Dec 1, 2016
1 parent aea9c66 commit 69b7117
Show file tree
Hide file tree
Showing 9 changed files with 457 additions and 18 deletions.
82 changes: 79 additions & 3 deletions pkg/netutil/isolate_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,82 @@ func RecoverPort(port int) error {
return err
}

// SetPacketCorruption corrupts packets at p%
func SetPacketCorruption(p int) error {
if p < 0 || p > 100 {
return fmt.Errorf("packets corruption percentage must be between 0 and 100 but got %v", p)
}
ifce, err := GetDefaultInterface()
if err != nil {
return err
}
cmdStr1 := fmt.Sprintf("sudo tc qdisc add dev %s root handle 1:1 netem corrupt %d%%", ifce, p)
cmdStr2 := fmt.Sprintf("sudo tc qdisc add dev %s parent 1:1 handle 10:1 netem corrupt %d%%", ifce, p)
cmdStrs := []string{cmdStr1, cmdStr2}

for _, cmdStr := range cmdStrs {
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}
}

return nil

}

// SetPacketReordering reorders packets. rp% of packets (with a correlation of cp%) gets send immediately. The rest will be delayed for ms millisecond
func SetPacketReordering(rp int, cp int, ms int) error {
ifce, err := GetDefaultInterface()
if err != nil {
return err
}
cmdStr1 := fmt.Sprintf("sudo tc qdisc add dev %s root handle 1:1 netem delay %dms reorder %d%% %d%%", ifce, ms, rp, cp)
cmdStr2 := fmt.Sprintf("sudo tc qdisc add dev %s parent 1:1 handle 10:1 delay %dms reorder %d%% %d%%", ifce, ms, rp, cp)
cmdStrs := []string{cmdStr1, cmdStr2}

for _, cmdStr := range cmdStrs {
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}
}

return nil

}

// SetPackLoss randomly drop packet at p% probability
func SetPackLoss(p int) error {
ifce, err := GetDefaultInterface()
if err != nil {
return err
}
cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem loss %d%%", ifce, p)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}

return nil
}

// SetPartitioning sets a very long delay of ms scale with random variations rv to isolate this node
func SetPartitioning(ms int, rv int) error {
ifce, err := GetDefaultInterface()
if err != nil {
return err
}

cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}

return nil
}

// SetLatency adds latency in millisecond scale with random variations.
func SetLatency(ms, rv int) error {
ifce, err := GetDefaultInterface()
Expand All @@ -64,12 +140,12 @@ func SetLatency(ms, rv int) error {
return nil
}

// RemoveLatency resets latency configurations.
func RemoveLatency() error {
func ResetDefaultInterface() error {
ifce, err := GetDefaultInterface()
if err != nil {
return err
}
_, err = exec.Command("/bin/sh", "-c", fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)).Output()
cmdStr := fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
return err
}
10 changes: 10 additions & 0 deletions pkg/netutil/isolate_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ func DropPort(port int) error { return nil }

func RecoverPort(port int) error { return nil }

func SetPacketCorruption(cp int) error { return nil }

func SetPacketReordering(ms int, rp int, cp int) error { return nil }

func SetPackLoss(p int) error { return nil }

func SetPartitioning(ms int, rv int) error { return nil }

func SetLatency(ms, rv int) error { return nil }

func RemoveLatency() error { return nil }

func ResetDefaultInterface() error { return nil }
2 changes: 1 addition & 1 deletion tools/functional-tester/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM alpine
RUN apk update
RUN apk add -v iptables sudo
RUN apk --update add iptables bash iproute2
ADD bin/etcd-agent /
ADD bin/etcd /
ADD bin/etcd-tester /
Expand Down
7 changes: 1 addition & 6 deletions tools/functional-tester/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,5 @@ tester:
- /etcd-tester
- -agent-endpoints
- "172.20.0.2:9027,172.20.0.3:9027,172.20.0.4:9027"
- -limit
- "1"
- -stress-key-count
- "1"
- -stress-key-size
- "1"


38 changes: 35 additions & 3 deletions tools/functional-tester/etcd-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,48 @@ func (a *Agent) recoverPort(port int) error {
return netutil.RecoverPort(port)
}

func (a *Agent) setLatency(ms, rv int) error {
func (a *Agent) setPacketCorruption(cp int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetPacketCorruption(cp)
}

func (a *Agent) setPacketReordering(rp, cp, ms int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetPacketReordering(rp, cp, ms)
}

func (a *Agent) setPackLoss(p int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetPackLoss(p)
}

func (a *Agent) setPartitioning(ms, rv int) error {
if !a.cfg.UseRoot {
return nil
}
if ms == 0 {
return netutil.RemoveLatency()
return netutil.SetPartitioning(ms, rv)
}

func (a *Agent) setLatency(ms, rv int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetLatency(ms, rv)
}

func (a *Agent) resetDefaultInterface() error {
if !a.cfg.UseRoot {
return nil
}
return netutil.ResetDefaultInterface()
}

func (a *Agent) status() client.Status {
return client.Status{State: a.state}
}
Expand Down
50 changes: 49 additions & 1 deletion tools/functional-tester/etcd-agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,25 @@ type Agent interface {
DropPort(port int) error
// RecoverPort stops dropping all network packets at the given port.
RecoverPort(port int) error
// SetPacketCorruption corrupts packets at a probability of p%
SetPacketCorruption(p int) error
// RemovePacketCorruption removes packet corruption.
RemovePacketCorruption() error
// SetPacketReordering reorders packets. rp% of packets (with a correlation of cp%) gets send immediately. The rest will be delayed for ms millisecond
SetPacketReordering(rp, cp, ms int) error
// RemovePacketReordering removes reordering of packets
RemovePacketReordering() error
// SetPackLoss randomly drop packet at a probability of p%
SetPacketLoss(p int) error
// RemovePacketLoss removes dropping of packets
RemovePacketLoss() error
// SetPartitioning sets a very long network delay of ms scale with random variations rv to isolate a node
SetPartitioning(ms, rv int) error
// SetPartitioning removes partition of a node
RemovePartitioning() error
// SetLatency slows down network by introducing latency.
SetLatency(ms, rv int) error
// RemoveLatency removes latency introduced by SetLatency.
// RemoveLatency removes latency.
RemoveLatency() error
// Status returns the status of etcd on the agent
Status() (Status, error)
Expand Down Expand Up @@ -99,6 +115,38 @@ func (a *agent) RecoverPort(port int) error {
return a.rpcClient.Call("Agent.RPCRecoverPort", port, nil)
}

func (a *agent) SetPacketCorruption(p int) error {
return a.rpcClient.Call("Agent.RPCSetPacketCorruption", []int{p}, nil)
}

func (a *agent) RemovePacketCorruption() error {
return a.rpcClient.Call("Agent.RPCRemovePacketCorruption", struct{}{}, nil)
}

func (a *agent) SetPacketReordering(rp, cp, ms int) error {
return a.rpcClient.Call("Agent.RPCSetPacketReordering", []int{rp, cp, ms}, nil)
}

func (a *agent) RemovePacketReordering() error {
return a.rpcClient.Call("Agent.RPCRemovePacketCorruption", struct{}{}, nil)
}

func (a *agent) SetPacketLoss(p int) error {
return a.rpcClient.Call("Agent.RPCSetPacketLoss", []int{p}, nil)
}

func (a *agent) RemovePacketLoss() error {
return a.rpcClient.Call("Agent.RPCRemovePacketLoss", struct{}{}, nil)
}

func (a *agent) SetPartitioning(ms, rv int) error {
return a.rpcClient.Call("Agent.RPCSetPartitioning", []int{ms, rv}, nil)
}

func (a *agent) RemovePartitioning() error {
return a.rpcClient.Call("Agent.RPCRemovePartitioning", struct{}{}, nil)
}

func (a *agent) SetLatency(ms, rv int) error {
return a.rpcClient.Call("Agent.RPCSetLatency", []int{ms, rv}, nil)
}
Expand Down
86 changes: 85 additions & 1 deletion tools/functional-tester/etcd-agent/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,90 @@ func (a *Agent) RPCRecoverPort(port int, reply *struct{}) error {
return nil
}

func (a *Agent) RPCSetPacketCorruption(args []int, reply *struct{}) error {
if len(args) != 1 {
return fmt.Errorf("SetPacketCorruption needs 1 arg, got (%v)", args)
}
plog.Printf("set packet corruption at %d%%)", args[0])
err := a.setPacketCorruption(args[0])
if err != nil {
plog.Println("error setting packet corruption", err)
}
return nil
}

func (a *Agent) RPCRemovePacketCorruption(args struct{}, reply *struct{}) error {
plog.Println("removing packet corruption")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet corruption")
}
return nil
}

func (a *Agent) RPCSetPacketReordering(args []int, reply *struct{}) error {
if len(args) != 3 {
return fmt.Errorf("SetPacketReordering needs 3 args, got (%v)", args)
}
plog.Printf("SetPacketReordering reorders packets. %d%% of packets (with a correlation of %d%%) gets send immediately. The rest will be delayed for %d millisecond", args[0], args[1], args[2])
err := a.setPacketCorruption(args[0])
if err != nil {
plog.Println("error setting packet reordering", err)
}
return nil
}

func (a *Agent) RPCRemovePacketReordering(args struct{}, reply *struct{}) error {
plog.Println("removing packet reordering")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet reordering")
}
return nil
}

func (a *Agent) RPCSetPacketLoss(args []int, reply *struct{}) error {
if len(args) != 1 {
return fmt.Errorf("SetPacketLoss needs 1 arg, got (%v)", args)
}
plog.Printf("SetPackLoss randomly drop packet at %d probability", args[0])
err := a.setPackLoss(args[0])
if err != nil {
plog.Println("error setting packet loss", err)
}
return nil
}

func (a *Agent) RPCRemovePacketLoss(args struct{}, reply *struct{}) error {
plog.Println("removing packet loss")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet loss")
}
return nil
}

func (a *Agent) RPCSetPartitioning(args []int, reply *struct{}) error {
if len(args) != 1 {
return fmt.Errorf("SetPartitioning needs 2 args, got (%v)", args)
}
plog.Printf("SetPartitioning sets %dms (+/- %dms)", args[0], args[1])
err := a.setPartitioning(args[0], args[1])
if err != nil {
plog.Println("error setting packet loss", err)
}
return nil
}

func (a *Agent) RPCRemovePartitioning(args struct{}, reply *struct{}) error {
plog.Println("removing packet partitioning")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet partitioning")
}
return nil
}

func (a *Agent) RPCSetLatency(args []int, reply *struct{}) error {
if len(args) != 2 {
return fmt.Errorf("SetLatency needs two args, got (%v)", args)
Expand All @@ -118,7 +202,7 @@ func (a *Agent) RPCSetLatency(args []int, reply *struct{}) error {

func (a *Agent) RPCRemoveLatency(args struct{}, reply *struct{}) error {
plog.Println("removing latency")
err := a.setLatency(0, 0)
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing latency")
}
Expand Down
Loading

0 comments on commit 69b7117

Please sign in to comment.