Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
libbeat/publisher/pipeline: fix Client.Close
Browse files Browse the repository at this point in the history
Set the "closing" variable to true, not false,
upon signalling the waiter to close.
axw committed Jul 14, 2020
1 parent ac89845 commit e7b42d8
Showing 2 changed files with 93 additions and 1 deletion.
2 changes: 1 addition & 1 deletion libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
@@ -271,7 +271,7 @@ func (w *clientCloseWaiter) signalClose() {
return
}

w.closing.Store(false)
w.closing.Store(true)
if w.events.Load() == 0 {
w.finishClose()
return
92 changes: 92 additions & 0 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
@@ -21,11 +21,14 @@ import (
"context"
"sync"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/beats/v7/libbeat/tests/resources"
)

@@ -113,3 +116,92 @@ func TestClient(t *testing.T) {
}
})
}

func TestClientWaitClose(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

makePipeline := func(settings Settings, qu queue.Queue) *Pipeline {
p, err := New(beat.Info{},
Monitors{},
func(queue.ACKListener) (queue.Queue, error) { return qu, nil },
outputs.Group{},
settings,
)
if err != nil {
panic(err)
}

return p
}
if testing.Verbose() {
logp.TestingSetup()
}

q := memqueue.NewQueue(logp.L(), memqueue.Settings{Events: 1})
pipeline := makePipeline(Settings{}, q)
defer pipeline.Close()

t.Run("WaitClose blocks", func(t *testing.T) {
client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitClose: 500 * time.Millisecond,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()

// Send an event which never gets acknowledged.
client.Publish(beat.Event{})

closed := make(chan struct{})
go func() {
defer close(closed)
client.Close()
}()

select {
case <-closed:
t.Fatal("expected Close to wait for event acknowledgement")
case <-time.After(100 * time.Millisecond):
}

select {
case <-closed:
case <-time.After(10 * time.Second):
t.Fatal("expected Close to stop waiting after WaitClose elapses")
}
})

t.Run("ACKing events unblocks WaitClose", func(t *testing.T) {
client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitClose: time.Minute,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()

// Send an event which gets acknowledged immediately.
client.Publish(beat.Event{})
output := newMockClient(func(batch publisher.Batch) error {
batch.ACK()
return nil
})
defer output.Close()
pipeline.output.Set(outputs.Group{Clients: []outputs.Client{output}})
defer pipeline.output.Set(outputs.Group{})

closed := make(chan struct{})
go func() {
defer close(closed)
client.Close()
}()

select {
case <-closed:
case <-time.After(10 * time.Second):
t.Fatal("expected Close to stop waiting after event acknowledgement")
}
})
}

0 comments on commit e7b42d8

Please sign in to comment.