Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup Filebeat publishers #2463

Merged
merged 1 commit into from
Sep 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/crawler"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/publish"
"github.com/elastic/beats/filebeat/publisher"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"
)
Expand Down Expand Up @@ -54,7 +54,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
publisherChan := make(chan []*input.Event, 1)

// Publishes event to output
publisher := publish.New(config.PublishAsync,
publisher := publisher.New(config.PublishAsync,
publisherChan, registrar.Channel, b.Publisher)

// Init and Start spooler: Harvesters dump events into the spooler.
Expand Down
142 changes: 27 additions & 115 deletions filebeat/publish/publish.go → filebeat/publisher/async.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,16 @@
package publish
package publisher

import (
"expvar"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
)

type LogPublisher interface {
Start()
Stop()
}

type syncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in, out chan []*input.Event

done chan struct{}
wg sync.WaitGroup
}

type asyncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
Expand Down Expand Up @@ -64,84 +49,6 @@ const (
batchCanceled
)

var (
eventsSent = expvar.NewInt("publish.events")
)

func New(
async bool,
in, out chan []*input.Event,
pub publisher.Publisher,
) LogPublisher {
if async {
return newAsyncLogPublisher(in, out, pub)
}
return newSyncLogPublisher(in, out, pub)
}

func newSyncLogPublisher(
in, out chan []*input.Event,
pub publisher.Publisher,
) *syncLogPublisher {
return &syncLogPublisher{
in: in,
out: out,
pub: pub,
done: make(chan struct{}),
}
}

func (p *syncLogPublisher) Start() {
p.client = p.pub.Connect()

p.wg.Add(1)
go func() {
defer p.wg.Done()

logp.Info("Start sending events to output")

for {
var events []*input.Event
select {
case <-p.done:
return
case events = <-p.in:
}

pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
// Only send event with bytes read. 0 Bytes means state update only
if event.HasData() {
pubEvents = append(pubEvents, event.ToMapStr())
}
}

ok := p.client.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed)
if !ok {
// PublishEvents will only returns false, if p.client has been closed.
logp.Debug("publish", "Shutting down publisher")
return
}

logp.Debug("publish", "Events sent: %d", len(events))
eventsSent.Add(int64(len(events)))

// Tell the registrar that we've successfully sent these events
select {
case <-p.done:
return
case p.out <- events:
}
}
}()
}

func (p *syncLogPublisher) Stop() {
p.client.Close()
close(p.done)
p.wg.Wait()
}

func newAsyncLogPublisher(
in, out chan []*input.Event,
pub publisher.Publisher,
Expand All @@ -168,35 +75,40 @@ func (p *asyncLogPublisher) Start() {
ticker := time.NewTicker(defaultGCTimeout)

for {
err := p.Publish()
if err != nil {
return
}

select {
case <-p.done:
return
case events := <-p.in:

pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
if event.HasData() {
pubEvents = append(pubEvents, event.ToMapStr())
}
}

batch := &eventsBatch{
flag: 0,
events: events,
}
p.client.PublishEvents(pubEvents,
publisher.Signal(batch), publisher.Guaranteed)

p.active.append(batch)

case <-ticker.C:
}
p.collect()

p.collect()
}
}
}()
}

func (p *asyncLogPublisher) Publish() error {
select {
case <-p.done:
return errors.New("async publisher stopped")
case events := <-p.in:

batch := &eventsBatch{
flag: 0,
events: events,
}
p.client.PublishEvents(getDataEvents(events), publisher.Signal(batch), publisher.Guaranteed)

p.active.append(batch)
p.collect()
}
return nil
}

func (p *asyncLogPublisher) Stop() {
p.client.Close()
close(p.done)
Expand Down
41 changes: 41 additions & 0 deletions filebeat/publisher/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package publisher

import (
"expvar"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
)

var (
eventsSent = expvar.NewInt("publish.events")
)

type LogPublisher interface {
Start()
Stop()
Publish() error
}

func New(
async bool,
in, out chan []*input.Event,
pub publisher.Publisher,
) LogPublisher {
if async {
return newAsyncLogPublisher(in, out, pub)
}
return newSyncLogPublisher(in, out, pub)
}

// getDataEvents returns all events which contain data (not only state updates)
func getDataEvents(events []*input.Event) []common.MapStr {
dataEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
if event.HasData() {
dataEvents = append(dataEvents, event.ToMapStr())
}
}
return dataEvents
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !integration

package publish
package publisher

import (
"fmt"
Expand Down
83 changes: 83 additions & 0 deletions filebeat/publisher/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package publisher

import (
"errors"
"sync"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
)

type syncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in, out chan []*input.Event

done chan struct{}
wg sync.WaitGroup
}

func newSyncLogPublisher(
in, out chan []*input.Event,
pub publisher.Publisher,
) *syncLogPublisher {
return &syncLogPublisher{
in: in,
out: out,
pub: pub,
done: make(chan struct{}),
}
}

func (p *syncLogPublisher) Start() {
p.client = p.pub.Connect()

p.wg.Add(1)
go func() {
defer p.wg.Done()

logp.Info("Start sending events to output")

for {
err := p.Publish()
if err != nil {
logp.Debug("publisher", "Shutting down sync publisher")
return
}
}
}()
}

func (p *syncLogPublisher) Publish() error {
var events []*input.Event
select {
case <-p.done:
return errors.New("publishing was stopped")
case events = <-p.in:
}

ok := p.client.PublishEvents(getDataEvents(events), publisher.Sync, publisher.Guaranteed)
if !ok {
// PublishEvents will only returns false, if p.client has been closed.
return errors.New("publisher didn't published events")
}

logp.Debug("publish", "Events sent: %d", len(events))
eventsSent.Add(int64(len(events)))

// Tell the registrar that we've successfully sent these events
select {
case <-p.done:
return errors.New("publishing was stopped")
case p.out <- events:
}

return nil
}

func (p *syncLogPublisher) Stop() {
p.client.Close()
close(p.done)
p.wg.Wait()
}