From 7dac29e5e51362e45c695060ab95bf3b13721be9 Mon Sep 17 00:00:00 2001 From: ruflin Date: Mon, 5 Sep 2016 15:33:07 +0200 Subject: [PATCH] Cleanup Filebeat publishers * Separate async and sync publisher into own files * Add Publish interface method which can be used to manually trigger publish (see https://github.com/elastic/beats/pull/2456) * Add getDataEvents function * Rename publish to publisher package --- filebeat/beater/filebeat.go | 4 +- .../publish.go => publisher/async.go} | 142 ++++-------------- filebeat/publisher/publisher.go | 41 +++++ .../publisher_test.go} | 2 +- filebeat/publisher/sync.go | 83 ++++++++++ 5 files changed, 154 insertions(+), 118 deletions(-) rename filebeat/{publish/publish.go => publisher/async.go} (63%) create mode 100644 filebeat/publisher/publisher.go rename filebeat/{publish/publish_test.go => publisher/publisher_test.go} (98%) create mode 100644 filebeat/publisher/sync.go diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 5b0374b482f..eba0506caf8 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -10,7 +10,7 @@ import ( cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/crawler" "github.com/elastic/beats/filebeat/input" - "github.com/elastic/beats/filebeat/publish" + "github.com/elastic/beats/filebeat/publisher" "github.com/elastic/beats/filebeat/registrar" "github.com/elastic/beats/filebeat/spooler" ) @@ -54,7 +54,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { publisherChan := make(chan []*input.Event, 1) // Publishes event to output - publisher := publish.New(config.PublishAsync, + publisher := publisher.New(config.PublishAsync, publisherChan, registrar.Channel, b.Publisher) // Init and Start spooler: Harvesters dump events into the spooler. diff --git a/filebeat/publish/publish.go b/filebeat/publisher/async.go similarity index 63% rename from filebeat/publish/publish.go rename to filebeat/publisher/async.go index bf811505a82..e372b6e0800 100644 --- a/filebeat/publish/publish.go +++ b/filebeat/publisher/async.go @@ -1,31 +1,16 @@ -package publish +package publisher import ( - "expvar" + "errors" "sync" "sync/atomic" "time" "github.com/elastic/beats/filebeat/input" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" ) -type LogPublisher interface { - Start() - Stop() -} - -type syncLogPublisher struct { - pub publisher.Publisher - client publisher.Client - in, out chan []*input.Event - - done chan struct{} - wg sync.WaitGroup -} - type asyncLogPublisher struct { pub publisher.Publisher client publisher.Client @@ -64,84 +49,6 @@ const ( batchCanceled ) -var ( - eventsSent = expvar.NewInt("publish.events") -) - -func New( - async bool, - in, out chan []*input.Event, - pub publisher.Publisher, -) LogPublisher { - if async { - return newAsyncLogPublisher(in, out, pub) - } - return newSyncLogPublisher(in, out, pub) -} - -func newSyncLogPublisher( - in, out chan []*input.Event, - pub publisher.Publisher, -) *syncLogPublisher { - return &syncLogPublisher{ - in: in, - out: out, - pub: pub, - done: make(chan struct{}), - } -} - -func (p *syncLogPublisher) Start() { - p.client = p.pub.Connect() - - p.wg.Add(1) - go func() { - defer p.wg.Done() - - logp.Info("Start sending events to output") - - for { - var events []*input.Event - select { - case <-p.done: - return - case events = <-p.in: - } - - pubEvents := make([]common.MapStr, 0, len(events)) - for _, event := range events { - // Only send event with bytes read. 0 Bytes means state update only - if event.HasData() { - pubEvents = append(pubEvents, event.ToMapStr()) - } - } - - ok := p.client.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed) - if !ok { - // PublishEvents will only returns false, if p.client has been closed. - logp.Debug("publish", "Shutting down publisher") - return - } - - logp.Debug("publish", "Events sent: %d", len(events)) - eventsSent.Add(int64(len(events))) - - // Tell the registrar that we've successfully sent these events - select { - case <-p.done: - return - case p.out <- events: - } - } - }() -} - -func (p *syncLogPublisher) Stop() { - p.client.Close() - close(p.done) - p.wg.Wait() -} - func newAsyncLogPublisher( in, out chan []*input.Event, pub publisher.Publisher, @@ -168,35 +75,40 @@ func (p *asyncLogPublisher) Start() { ticker := time.NewTicker(defaultGCTimeout) for { + err := p.Publish() + if err != nil { + return + } + select { case <-p.done: return - case events := <-p.in: - - pubEvents := make([]common.MapStr, 0, len(events)) - for _, event := range events { - if event.HasData() { - pubEvents = append(pubEvents, event.ToMapStr()) - } - } - - batch := &eventsBatch{ - flag: 0, - events: events, - } - p.client.PublishEvents(pubEvents, - publisher.Signal(batch), publisher.Guaranteed) - - p.active.append(batch) - case <-ticker.C: - } + p.collect() - p.collect() + } } }() } +func (p *asyncLogPublisher) Publish() error { + select { + case <-p.done: + return errors.New("async publisher stopped") + case events := <-p.in: + + batch := &eventsBatch{ + flag: 0, + events: events, + } + p.client.PublishEvents(getDataEvents(events), publisher.Signal(batch), publisher.Guaranteed) + + p.active.append(batch) + p.collect() + } + return nil +} + func (p *asyncLogPublisher) Stop() { p.client.Close() close(p.done) diff --git a/filebeat/publisher/publisher.go b/filebeat/publisher/publisher.go new file mode 100644 index 00000000000..be6c03b3102 --- /dev/null +++ b/filebeat/publisher/publisher.go @@ -0,0 +1,41 @@ +package publisher + +import ( + "expvar" + + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/publisher" +) + +var ( + eventsSent = expvar.NewInt("publish.events") +) + +type LogPublisher interface { + Start() + Stop() + Publish() error +} + +func New( + async bool, + in, out chan []*input.Event, + pub publisher.Publisher, +) LogPublisher { + if async { + return newAsyncLogPublisher(in, out, pub) + } + return newSyncLogPublisher(in, out, pub) +} + +// getDataEvents returns all events which contain data (not only state updates) +func getDataEvents(events []*input.Event) []common.MapStr { + dataEvents := make([]common.MapStr, 0, len(events)) + for _, event := range events { + if event.HasData() { + dataEvents = append(dataEvents, event.ToMapStr()) + } + } + return dataEvents +} diff --git a/filebeat/publish/publish_test.go b/filebeat/publisher/publisher_test.go similarity index 98% rename from filebeat/publish/publish_test.go rename to filebeat/publisher/publisher_test.go index 3b131ab9241..e739154a0bd 100644 --- a/filebeat/publish/publish_test.go +++ b/filebeat/publisher/publisher_test.go @@ -1,6 +1,6 @@ // +build !integration -package publish +package publisher import ( "fmt" diff --git a/filebeat/publisher/sync.go b/filebeat/publisher/sync.go new file mode 100644 index 00000000000..e2d385e1e57 --- /dev/null +++ b/filebeat/publisher/sync.go @@ -0,0 +1,83 @@ +package publisher + +import ( + "errors" + "sync" + + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/publisher" +) + +type syncLogPublisher struct { + pub publisher.Publisher + client publisher.Client + in, out chan []*input.Event + + done chan struct{} + wg sync.WaitGroup +} + +func newSyncLogPublisher( + in, out chan []*input.Event, + pub publisher.Publisher, +) *syncLogPublisher { + return &syncLogPublisher{ + in: in, + out: out, + pub: pub, + done: make(chan struct{}), + } +} + +func (p *syncLogPublisher) Start() { + p.client = p.pub.Connect() + + p.wg.Add(1) + go func() { + defer p.wg.Done() + + logp.Info("Start sending events to output") + + for { + err := p.Publish() + if err != nil { + logp.Debug("publisher", "Shutting down sync publisher") + return + } + } + }() +} + +func (p *syncLogPublisher) Publish() error { + var events []*input.Event + select { + case <-p.done: + return errors.New("publishing was stopped") + case events = <-p.in: + } + + ok := p.client.PublishEvents(getDataEvents(events), publisher.Sync, publisher.Guaranteed) + if !ok { + // PublishEvents will only returns false, if p.client has been closed. + return errors.New("publisher didn't published events") + } + + logp.Debug("publish", "Events sent: %d", len(events)) + eventsSent.Add(int64(len(events))) + + // Tell the registrar that we've successfully sent these events + select { + case <-p.done: + return errors.New("publishing was stopped") + case p.out <- events: + } + + return nil +} + +func (p *syncLogPublisher) Stop() { + p.client.Close() + close(p.done) + p.wg.Wait() +}