Skip to content

Commit

Permalink
Allow filebeat to only run once (#2456)
Browse files Browse the repository at this point in the history
* Allow filebeat to only run once

When the `-once` flag is used, filebeat starts all configured harvesters and prospectors and runs each prospector until the harvesters are closed. It is recommended to use the flag in combination with `close_eof` so harvester directly close when the end of the file is reached. By default harvesters are closed after `close_inactive`.
  • Loading branch information
ruflin authored and Steffen Siering committed Sep 16, 2016
1 parent d37c77d commit 6b7df0c
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
*Filebeat*
- Introduce close_timeout harvester options {issue}1926[1926]
- Strip BOM from first message in case of BOM files {issue}2351[2351]
- Add command line option -once to run filebeat only once and then close {pull}2456[2456]


- Add harvester_limit option {pull}2417[2417]
Expand Down
90 changes: 55 additions & 35 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package beater

import (
"flag"
"fmt"
"sync"

Expand All @@ -15,11 +16,12 @@ import (
"github.com/elastic/beats/filebeat/spooler"
)

var once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")

// Filebeat is a beater object. Contains all objects needed to run the beat
type Filebeat struct {
config *cfg.Config
sigWait *signalWait
done chan struct{}
config *cfg.Config
done chan struct{}
}

// New creates a new Filebeat pointer instance.
Expand All @@ -33,9 +35,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
}

fb := &Filebeat{
done: make(chan struct{}),
sigWait: newSignalWait(),
config: &config,
done: make(chan struct{}),
config: &config,
}
return fb, nil
}
Expand All @@ -45,13 +46,12 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
config := fb.config

var wgEvents *sync.WaitGroup // count active events for waiting on shutdown
var finishedLogger publisher.SuccessLogger
waitFinished := newSignalWait()
waitEvents := newSignalWait()

if fb.config.ShutdownTimeout > 0 {
wgEvents = &sync.WaitGroup{}
finishedLogger = newFinishedLogger(wgEvents)
}
// count active events for waiting on shutdown
wgEvents := &sync.WaitGroup{}
finishedLogger := newFinishedLogger(wgEvents)

// Setup registrar to persist state
registrar, err := registrar.New(config.RegistryFile, finishedLogger)
Expand All @@ -60,13 +60,14 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

// Channel from harvesters to spooler
successLogger := newRegistrarLogger(registrar)
// Make sure all events that were published in
registrarChannel := newRegistrarLogger(registrar)

// Channel from spooler to harvester
publisherChan := newPublisherChannel()

// Publishes event to output
publisher := publisher.New(config.PublishAsync,
publisherChan.ch, successLogger, b.Publisher)
publisher := publisher.New(config.PublishAsync, publisherChan.ch, registrarChannel, b.Publisher)

// Init and Start spooler: Harvesters dump events into the spooler.
spooler, err := spooler.New(config, publisherChan)
Expand All @@ -75,9 +76,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

crawler, err := crawler.New(
newSpoolerOutlet(fb.done, spooler, wgEvents),
config.Prospectors)
crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand All @@ -98,30 +97,45 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// Start publisher
publisher.Start()
// Stopping publisher (might potentially drop items)
defer publisher.Stop()
defer successLogger.Close()
defer func() {
// Closes first the registrar logger to make sure not more events arrive at the registrar
// registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher
registrarChannel.Close()
publisher.Stop()
}()

// Starting spooler
spooler.Start()

// Stopping spooler will flush items
defer func() {
// With harvesters being stopped, optionally wait for all enqueued events being
// published and written by registrar before continuing shutdown.
fb.sigWait.Wait()
// Wait for all events to be processed or timeout
waitEvents.Wait()

// continue shutdown
// Closes publisher so no further events can be sent
publisherChan.Close()
// Stopping spooler
spooler.Stop()
}()

err = crawler.Start(registrar.GetStates())
err = crawler.Start(registrar.GetStates(), *once)
if err != nil {
return err
}
// Blocks progressing. As soon as channel is closed, all defer statements come into play

<-fb.done
// If run once, add crawler completion check as alternative to done signal
if *once {
runOnce := func() {
logp.Info("Running filebeat once. Waiting for completion ...")
crawler.WaitForCompletion()
logp.Info("All data collection completed. Shutting down.")
}
waitFinished.Add(runOnce)
}

// Add done channel to wait for shutdown signal
waitFinished.AddChan(fb.done)
waitFinished.Wait()

// Stop crawler -> stop prospectors -> stop harvesters
// Note: waiting for crawlers to stop here in order to install wgEvents.Wait
Expand All @@ -130,14 +144,20 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
crawler.Stop()

timeout := fb.config.ShutdownTimeout
if timeout > 0 {
logp.Info("Shutdown output timer started. Waiting for max %v.", timeout)

// Wait for either timeout or all events having been ACKed by outputs.
fb.sigWait.Add(withLog(wgEvents.Wait,
// Checks if on shutdown it should wait for all events to be published
waitPublished := fb.config.ShutdownTimeout > 0 || *once
if waitPublished {
// Wait for registrar to finish writing registry
waitEvents.Add(withLog(wgEvents.Wait,
"Continue shutdown: All enqueued events being published."))
fb.sigWait.Add(withLog(waitDuration(timeout),
"Continue shutdown: Time out waiting for events being published."))
// Wait for either timeout or all events having been ACKed by outputs.
if fb.config.ShutdownTimeout > 0 {
logp.Info("Shutdown output timer started. Waiting for max %v.", timeout)
waitEvents.Add(withLog(waitDuration(timeout),
"Continue shutdown: Time out waiting for events being published."))
} else {
waitEvents.AddChan(fb.done)
}
}

return nil
Expand Down
10 changes: 7 additions & 3 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func New(out prospector.Outlet, prospectorConfigs []*common.Config) (*Crawler, e
}, nil
}

func (c *Crawler) Start(states file.States) error {
func (c *Crawler) Start(states file.States, once bool) error {

logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs))

Expand All @@ -54,7 +54,7 @@ func (c *Crawler) Start(states file.States) error {
logp.Debug("crawler", "Prospector %v stopped", id)
}()
logp.Debug("crawler", "Starting prospector %v", id)
prospector.Run()
prospector.Run(once)
}(i, p)
}

Expand All @@ -76,6 +76,10 @@ func (c *Crawler) Stop() {
c.wg.Add(1)
go stopProspector(p)
}
c.wg.Wait()
c.WaitForCompletion()
logp.Info("Crawler stopped")
}

func (c *Crawler) WaitForCompletion() {
c.wg.Wait()
}
24 changes: 20 additions & 4 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Prospector struct {
done chan struct{}
states *file.States
wg sync.WaitGroup
channelWg sync.WaitGroup // Separate waitgroup for channels as not stopped on completion
harvesterCounter uint64
}

Expand All @@ -50,6 +51,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
done: make(chan struct{}),
states: states.Copy(),
wg: sync.WaitGroup{},
channelWg: sync.WaitGroup{},
}

if err := cfg.Unpack(&prospector.config); err != nil {
Expand Down Expand Up @@ -101,16 +103,22 @@ func (p *Prospector) Init() error {
}

// Starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Prospector) Run() {
func (p *Prospector) Run(once bool) {

logp.Info("Starting prospector of type: %v", p.config.InputType)
p.wg.Add(2)
defer p.wg.Done()

// This waitgroup is not needed if run only once
// Waitgroup has to be added here to prevent panic in case Stop is called immediately afterwards
if !once {
// Add waitgroup to make sure prospectors finished
p.wg.Add(1)
defer p.wg.Done()
}
// Open channel to receive events from harvester and forward them to spooler
// Here potential filtering can happen
p.channelWg.Add(1)
go func() {
defer p.wg.Done()
defer p.channelWg.Done()
for {
select {
case <-p.done:
Expand All @@ -128,6 +136,13 @@ func (p *Prospector) Run() {
// Initial prospector run
p.prospectorer.Run()

// Shuts down after the first complete scan of all prospectors
// As all harvesters are part of the prospector waitgroup, this waits for the closing of all harvesters
if once {
p.wg.Wait()
return
}

for {
select {
case <-p.done:
Expand Down Expand Up @@ -162,6 +177,7 @@ func (p *Prospector) updateState(event *input.Event) error {
func (p *Prospector) Stop() {
logp.Info("Stopping Prospector")
close(p.done)
p.channelWg.Wait()
p.wg.Wait()
}

Expand Down
31 changes: 19 additions & 12 deletions filebeat/spooler/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ const channelSize = 16
type Spooler struct {
Channel chan *input.Event // Channel is the input to the Spooler.
config spoolerConfig
exit chan struct{} // Channel used to signal shutdown.
output Output // batch event output on flush
spool []*input.Event // Events being held by the Spooler.
wg sync.WaitGroup // WaitGroup used to control the shutdown.
}

// Output spooler sends event to through Send method
type Output interface {
Send(events []*input.Event) bool
}
Expand All @@ -45,7 +45,6 @@ func New(
idleTimeout: config.IdleTimeout,
spoolSize: config.SpoolSize,
},
exit: make(chan struct{}),
output: out,
spool: make([]*input.Event, 0, config.SpoolSize),
}, nil
Expand All @@ -63,6 +62,7 @@ func (s *Spooler) run() {
logp.Info("Starting spooler: spool_size: %v; idle_timeout: %s",
s.config.spoolSize, s.config.idleTimeout)

defer s.flush()
defer s.wg.Done()

timer := time.NewTimer(s.config.idleTimeout)
Expand Down Expand Up @@ -97,6 +97,7 @@ func (s *Spooler) run() {
// flushed to the publisher. The method should only be invoked one time after
// Start has been invoked.
func (s *Spooler) Stop() {

logp.Info("Stopping spooler")

// Signal to the run method that it should stop.
Expand All @@ -123,16 +124,22 @@ func (s *Spooler) queue(event *input.Event) bool {
}

// flush flushes all events to the publisher.
func (s *Spooler) flush() {
if len(s.spool) > 0 {
// copy buffer
tmpCopy := make([]*input.Event, len(s.spool))
copy(tmpCopy, s.spool)

// clear buffer
s.spool = s.spool[:0]
func (s *Spooler) flush() int {

// send batched events to output
s.output.Send(tmpCopy)
count := len(s.spool)
if count == 0 {
return 0
}

// copy buffer
tmpCopy := make([]*input.Event, count)
copy(tmpCopy, s.spool)

// clear buffer
s.spool = s.spool[:0]

// send batched events to output
s.output.Send(tmpCopy)

return count
}
Loading

0 comments on commit 6b7df0c

Please sign in to comment.