From c820272933c887276003d52eeae8952690503be7 Mon Sep 17 00:00:00 2001 From: Dan Carley Date: Thu, 12 Jan 2023 10:37:26 +0000 Subject: [PATCH] Wait for results before exiting from signal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the attack command would not wait for in-flight requests to finish before exiting from an interrupt signal. In the case where all requests take longer than the attack duration then the output file will be empty and reporting on it will produce an obscure error: % echo "GET http://172.18.0.254/will/timeout" | time vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out & sleep 1 && pkill -2 vegeta && fg && vegeta report vegeta.out [1] 12347 12348 [1] + 12347 done echo "GET http://172.18.0.254/will/timeout" | 12348 running time vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out 0.00s user 0.01s system 0% cpu 1.075 total 2023/01/11 21:35:50 encode: can't detect encoding of "vegeta.out" By omitting the return on the first call to `Stop()` we can use the results channel to block the exit until the attack has finished: % echo "GET http://172.18.0.254/will/timeout" | time ./vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out & sleep 1 && pkill -2 vegeta && fg && ./vegeta report vegeta.out [1] 12433 12434 [1] + 12433 done echo "GET http://172.18.0.254/will/timeout" | 12434 running time ./vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out ./vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out 0.00s user 0.01s system 0% cpu 11.012 total Requests [total, rate, throughput] 1, 1.00, 0.00 Duration [total, attack, wait] 10.001s, 0s, 10.001s Latencies [min, mean, 50, 90, 95, 99, max] 10.001s, 10.001s, 10.001s, 10.001s, 10.001s, 10.001s, 10.001s Bytes In [total, mean] 0, 0.00 Bytes Out [total, mean] 0, 0.00 Success [ratio] 0.00% Status Codes [code:count] 0:1 Error Set: Get "http://172.18.0.254/will/timeout": context deadline exceeded (Client.Timeout exceeded while awaiting headers) A subsequent interrupt signal (ie. `^C^C`) is honoured if you want to force an immediate exit: % echo "GET http://172.18.0.254/will/timeout" | time vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out & sleep 1 && pkill -2 vegeta && pkill -2 vegeta && fg [1] 12073 12074 vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out 0.00s user 0.01s system 1% cpu 1.057 total [1] + 12073 done echo "GET http://172.18.0.254/will/timeout" | 12074 done time vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out Testing this required a refactor of `attack()` in order to pass our own signal channel in. The diff is fortunately pretty simple though. Like most simple changes and async code, the majority of the changeset is testing it. Closes #611 Signed-off-by: Tomás Senart --- attack.go | 17 +++++-- attack_test.go | 130 +++++++++++++++++++++++++++++++++++++++++++++++++ lib/attack.go | 14 ++++-- 3 files changed, 154 insertions(+), 7 deletions(-) diff --git a/attack.go b/attack.go index 58ea54f8..63c8d55a 100644 --- a/attack.go +++ b/attack.go @@ -195,16 +195,27 @@ func attack(opts *attackOpts) (err error) { sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt) + return processAttack(atk, res, enc, sig) +} + +func processAttack( + atk *vegeta.Attacker, + res <-chan *vegeta.Result, + enc vegeta.Encoder, + sig <-chan os.Signal, +) error { for { select { case <-sig: - atk.Stop() - return nil + if stopSent := atk.Stop(); !stopSent { + // Exit immediately on second signal. + return nil + } case r, ok := <-res: if !ok { return nil } - if err = enc.Encode(r); err != nil { + if err := enc.Encode(r); err != nil { return err } } diff --git a/attack_test.go b/attack_test.go index 6745e3ab..967044c2 100644 --- a/attack_test.go +++ b/attack_test.go @@ -1,9 +1,18 @@ package main import ( + "bufio" + "bytes" + "io" "net/http" + "net/http/httptest" + "os" "reflect" + "sync" "testing" + "time" + + vegeta "github.com/tsenart/vegeta/v12/lib" ) func TestHeadersSet(t *testing.T) { @@ -26,3 +35,124 @@ func TestHeadersSet(t *testing.T) { } } } + +func decodeMetrics(buf bytes.Buffer) (vegeta.Metrics, error) { + var metrics vegeta.Metrics + dec := vegeta.NewDecoder(bufio.NewReader(&buf)) + + for { + var r vegeta.Result + if err := dec.Decode(&r); err != nil { + if err == io.EOF { + break + } + return metrics, err + } + metrics.Add(&r) + } + metrics.Close() + + return metrics, nil +} + +func TestAttackSignalOnce(t *testing.T) { + t.Parallel() + + const ( + signalDelay = 300 * time.Millisecond // Delay before stopping. + clientTimeout = 1 * time.Second // This, plus delay, is the max time for the attack. + serverTimeout = 2 * time.Second // Must be more than clientTimeout. + attackDuration = 10 * time.Second // The attack should never take this long. + ) + + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(serverTimeout) // Server.Close() will block for this long on shutdown. + }), + ) + defer server.Close() + + tr := vegeta.NewStaticTargeter(vegeta.Target{Method: "GET", URL: server.URL}) + atk := vegeta.NewAttacker(vegeta.Timeout(clientTimeout)) + rate := vegeta.Rate{Freq: 10, Per: time.Second} // Every 100ms. + + var buf bytes.Buffer + writer := bufio.NewWriter(&buf) + enc := vegeta.NewEncoder(writer) + sig := make(chan os.Signal, 1) + res := atk.Attack(tr, rate, attackDuration, "") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + processAttack(atk, res, enc, sig) + }() + + // Allow more than one request to have started before stopping. + time.Sleep(signalDelay) + sig <- os.Interrupt + wg.Wait() + writer.Flush() + + metrics, err := decodeMetrics(buf) + if err != nil { + t.Error(err) + } + if got, min := metrics.Requests, uint64(2); got < min { + t.Errorf("not enough requests recorded. got %+v, min: %+v", got, min) + } + if got, want := metrics.Success, 0.0; got != want { + t.Errorf("all requests should fail. got %+v, want: %+v", got, want) + } + if got, max := metrics.Duration, clientTimeout; got > max { + t.Errorf("attack duration too long. got %+v, max: %+v", got, max) + } + if got, want := metrics.Wait.Round(time.Second), clientTimeout; got != want { + t.Errorf("attack wait doesn't match timeout. got %+v, want: %+v", got, want) + } +} + +func TestAttackSignalTwice(t *testing.T) { + t.Parallel() + + const ( + attackDuration = 10 * time.Second // The attack should never take this long. + ) + + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}), + ) + defer server.Close() + + tr := vegeta.NewStaticTargeter(vegeta.Target{Method: "GET", URL: server.URL}) + atk := vegeta.NewAttacker() + rate := vegeta.Rate{Freq: 1, Per: time.Second} + + var buf bytes.Buffer + writer := bufio.NewWriter(&buf) + enc := vegeta.NewEncoder(writer) + sig := make(chan os.Signal, 1) + res := atk.Attack(tr, rate, attackDuration, "") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + processAttack(atk, res, enc, sig) + }() + + // Exit as soon as possible. + sig <- os.Interrupt + sig <- os.Interrupt + wg.Wait() + writer.Flush() + + metrics, err := decodeMetrics(buf) + if err != nil { + t.Error(err) + } + if got, max := metrics.Duration, time.Second; got > max { + t.Errorf("attack duration too long. got %+v, max: %+v", got, max) + } +} diff --git a/lib/attack.go b/lib/attack.go index 0294c628..9a565c55 100644 --- a/lib/attack.go +++ b/lib/attack.go @@ -22,6 +22,7 @@ type Attacker struct { dialer *net.Dialer client http.Client stopch chan struct{} + stopOnce sync.Once workers uint64 maxWorkers uint64 maxBody int64 @@ -68,6 +69,7 @@ var ( func NewAttacker(opts ...func(*Attacker)) *Attacker { a := &Attacker{ stopch: make(chan struct{}), + stopOnce: sync.Once{}, workers: DefaultWorkers, maxWorkers: DefaultMaxWorkers, maxBody: DefaultMaxBody, @@ -325,13 +327,17 @@ func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) < return results } -// Stop stops the current attack. -func (a *Attacker) Stop() { +// Stop stops the current attack. The return value indicates whether this call +// has signalled the attack to stop (`true` for the first call) or whether it +// was a noop because it has been previously signalled to stop (`false` for any +// subsequent calls). +func (a *Attacker) Stop() bool { select { case <-a.stopch: - return + return false default: - close(a.stopch) + a.stopOnce.Do(func() { close(a.stopch) }) + return true } }