diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index e91d48dd73ef..7f96e3b10c2b 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -30,6 +30,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Deprecate test flags, `generate` and `update_expected`, in favor of `data`. {pull}15292[15292] - Python 3 is required now to run python tests and tools. {pull}14798[14798] - The type `memqueue.Broker` is no longer exported; instead of `memqueue.NewBroker`, call `memqueue.NewQueue` (which provides the same public interface). {pull}16667[16667] +- `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691] ==== Bugfixes diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 965c13da6577..c755feec120d 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -171,11 +171,11 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) clients = append(clients, client) } - queueFactory := func(e queue.Eventer) (queue.Queue, error) { + queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) { return memqueue.NewQueue(log, memqueue.Settings{ - Eventer: e, - Events: 20, + ACKListener: ackListener, + Events: 20, }), nil } diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index c7b83835c9cb..3d84ee538816 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -33,7 +33,7 @@ func TestClient(t *testing.T) { makePipeline := func(settings Settings, qu queue.Queue) *Pipeline { p, err := New(beat.Info{}, Monitors{}, - func(_ queue.Eventer) (queue.Queue, error) { + func(_ queue.ACKListener) (queue.Queue, error) { return qu, nil }, outputs.Group{}, diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index 60da37f1d456..570676aeee0e 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -166,7 +166,7 @@ func loadOutput( func createQueueBuilder( config common.ConfigNamespace, monitors Monitors, -) (func(queue.Eventer) (queue.Queue, error), error) { +) (func(queue.ACKListener) (queue.Queue, error), error) { queueType := defaultQueueType if b := config.Name(); b != "" { queueType = b @@ -187,7 +187,7 @@ func createQueueBuilder( monitoring.NewString(queueReg, "name").Set(queueType) } - return func(eventer queue.Eventer) (queue.Queue, error) { - return queueFactory(eventer, monitors.Logger, queueConfig) + return func(ackListener queue.ACKListener) (queue.Queue, error) { + return queueFactory(ackListener, monitors.Logger, queueConfig) }, nil } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 688b7315c748..8c2067733382 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -136,7 +136,7 @@ type waitCloser struct { events sync.WaitGroup } -type queueFactory func(queue.Eventer) (queue.Queue, error) +type queueFactory func(queue.ACKListener) (queue.Queue, error) // New create a new Pipeline instance from a queue instance and a set of outputs. // The new pipeline will take ownership of queue and outputs. On Close, the diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index e8126733de4d..0c804824539a 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -112,8 +112,8 @@ func (l *ackLoop) handleBatchSig() int { } if count > 0 { - if e := l.broker.eventer; e != nil { - e.OnACK(count) + if listener := l.broker.ackListener; listener != nil { + listener.OnACK(count) } // report acks to waiting clients diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 443ff3542f42..1446182b4b64 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -55,7 +55,7 @@ type broker struct { acks chan int scheduledACKs chan chanList - eventer queue.Eventer + ackListener queue.ACKListener // wait group for worker shutdown wg sync.WaitGroup @@ -63,7 +63,7 @@ type broker struct { } type Settings struct { - Eventer queue.Eventer + ACKListener queue.ACKListener Events int FlushMinEvents int FlushTimeout time.Duration @@ -87,7 +87,9 @@ func init() { queue.RegisterType("mem", create) } -func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (queue.Queue, error) { +func create( + ackListener queue.ACKListener, logger *logp.Logger, cfg *common.Config, +) (queue.Queue, error) { config := defaultConfig if err := cfg.Unpack(&config); err != nil { return nil, err @@ -98,7 +100,7 @@ func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (que } return NewQueue(logger, Settings{ - Eventer: eventer, + ACKListener: ackListener, Events: config.Events, FlushMinEvents: config.FlushMinEvents, FlushTimeout: config.FlushTimeout, @@ -152,7 +154,7 @@ func NewQueue( waitOnClose: settings.WaitOnClose, - eventer: settings.Eventer, + ackListener: settings.ACKListener, } var eventLoop interface { diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index eca5c0c499d6..9cda2b25f884 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -27,11 +27,11 @@ import ( ) // Factory for creating a queue used by a pipeline instance. -type Factory func(Eventer, *logp.Logger, *common.Config) (Queue, error) +type Factory func(ACKListener, *logp.Logger, *common.Config) (Queue, error) -// Eventer listens to special events to be send by queue implementations. -type Eventer interface { - OnACK(int) // number of consecutively published messages, acked by producers +// ACKListener listens to special events to be send by queue implementations. +type ACKListener interface { + OnACK(eventCount int) // number of consecutively published events acked by producers } // Queue is responsible for accepting, forwarding and ACKing events. diff --git a/libbeat/publisher/queue/spool/inbroker.go b/libbeat/publisher/queue/spool/inbroker.go index 784880ea847e..54e895bc1fd7 100644 --- a/libbeat/publisher/queue/spool/inbroker.go +++ b/libbeat/publisher/queue/spool/inbroker.go @@ -27,8 +27,8 @@ import ( ) type inBroker struct { - ctx *spoolCtx - eventer queue.Eventer + ctx *spoolCtx + ackListener queue.ACKListener // active state handler state func(*inBroker) bool @@ -73,7 +73,7 @@ const ( func newInBroker( ctx *spoolCtx, - eventer queue.Eventer, + ackListener queue.ACKListener, qu *pq.Queue, codec codecID, flushTimeout time.Duration, @@ -90,9 +90,9 @@ func newInBroker( } b := &inBroker{ - ctx: ctx, - eventer: eventer, - state: (*inBroker).stateEmpty, + ctx: ctx, + ackListener: ackListener, + state: (*inBroker).stateEmpty, // API events: make(chan pushRequest, inEventChannelSize), @@ -134,8 +134,8 @@ func (b *inBroker) onFlush(n uint) { return } - if b.eventer != nil { - b.eventer.OnACK(int(n)) + if b.ackListener != nil { + b.ackListener.OnACK(int(n)) } b.ctx.logger.Debug("inbroker: flushed events:", n) b.bufferedEvents -= n diff --git a/libbeat/publisher/queue/spool/module.go b/libbeat/publisher/queue/spool/module.go index 6e76d8033065..7d778b5057ca 100644 --- a/libbeat/publisher/queue/spool/module.go +++ b/libbeat/publisher/queue/spool/module.go @@ -39,7 +39,9 @@ func init() { queue.RegisterType("spool", create) } -func create(eventer queue.Eventer, logp *logp.Logger, cfg *common.Config) (queue.Queue, error) { +func create( + ackListener queue.ACKListener, logp *logp.Logger, cfg *common.Config, +) (queue.Queue, error) { cfgwarn.Beta("Spooling to disk is beta") config := defaultConfig() @@ -63,7 +65,7 @@ func create(eventer queue.Eventer, logp *logp.Logger, cfg *common.Config) (queue } return NewSpool(log, path, Settings{ - Eventer: eventer, + ACKListener: ackListener, Mode: config.File.Permissions, WriteBuffer: uint(config.Write.BufferSize), WriteFlushTimeout: config.Write.FlushTimeout, diff --git a/libbeat/publisher/queue/spool/spool.go b/libbeat/publisher/queue/spool/spool.go index fe3d260a9b7c..d5ac5f7e3679 100644 --- a/libbeat/publisher/queue/spool/spool.go +++ b/libbeat/publisher/queue/spool/spool.go @@ -64,7 +64,7 @@ type Settings struct { // buffer be flushed and reset to its original size. WriteBuffer uint - Eventer queue.Eventer + ACKListener queue.ACKListener WriteFlushTimeout time.Duration WriteFlushEvents uint @@ -134,7 +134,8 @@ func NewSpool(logger logger, path string, settings Settings) (*Spool, error) { if inFlushTimeout < minInFlushTimeout { inFlushTimeout = minInFlushTimeout } - inBroker, err := newInBroker(inCtx, settings.Eventer, queue, settings.Codec, + inBroker, err := newInBroker( + inCtx, settings.ACKListener, queue, settings.Codec, inFlushTimeout, settings.WriteFlushEvents) if err != nil { return nil, err