Skip to content

Commit

Permalink
Implement proper stopping for harvesters when not filebeat shutdown
Browse files Browse the repository at this point in the history
There are two options for stopping a harvester or a prospector. Either the harvester and prospector finish sending all events and stop themself or they are killed because the output is blocking.

In case of shutting down filebeat without using `shutdown_timeout` filebeat is expected to shut down as fast as possible. This means channels are directly closed and the events are not passed through to the registry.

In case of dynamic prospector reloading, prospectors and harvesters must be stopped properly as otherwise no new harvester for the same file can be started.

So far the following changes were made:

* Introduce harvester tracking in prospector to better control / manage the harvesters
* Use `beatDone` channel coming from the beat itself for direct shutdown as this channel is closed when the beat is stopped.
* Introduce more done channels in prospector to make shutdown more fine grained
* Add system tests to verify new behaviour

TODO
* Make sure all events are drained from channel to not guarantee that state updates will be persisted.
  • Loading branch information
ruflin committed Feb 8, 2017
1 parent f006365 commit b6af35f
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 43 deletions.
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, *once)
crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, fb.done, *once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand Down
8 changes: 5 additions & 3 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ type Crawler struct {
wg sync.WaitGroup
reloader *cfgfile.Reloader
once bool
beatDone chan struct{}
}

func New(out prospector.Outlet, prospectorConfigs []*common.Config, once bool) (*Crawler, error) {
func New(out prospector.Outlet, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) {

return &Crawler{
out: out,
prospectors: map[uint64]*prospector.Prospector{},
prospectorConfigs: prospectorConfigs,
once: once,
beatDone: beatDone,
}, nil
}

Expand All @@ -47,7 +49,7 @@ func (c *Crawler) Start(r *registrar.Registrar, reloaderConfig *common.Config) e
logp.Warn("BETA feature dynamic configuration reloading is enabled.")

c.reloader = cfgfile.NewReloader(reloaderConfig)
factory := prospector.NewFactory(c.out, r)
factory := prospector.NewFactory(c.out, r, c.beatDone)
go func() {
c.reloader.Run(factory)
}()
Expand All @@ -62,7 +64,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er
if !config.Enabled() {
return nil
}
p, err := prospector.NewProspector(config, c.out)
p, err := prospector.NewProspector(config, c.out, c.beatDone)
if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
Expand Down
12 changes: 9 additions & 3 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"fmt"
"sync"

"github.com/satori/go.uuid"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/source"
Expand All @@ -40,24 +42,28 @@ type Harvester struct {
fileReader *LogFile
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
prospectorDone chan struct{}
once sync.Once
done chan struct{}
wg sync.WaitGroup
ID uuid.UUID
beatDone chan struct{}
}

func NewHarvester(
cfg *common.Config,
state file.State,
prospectorChan chan *input.Event,
done chan struct{},
beatDone chan struct{},
) (*Harvester, error) {

h := &Harvester{
config: defaultConfig,
state: state,
prospectorChan: prospectorChan,
prospectorDone: done,
done: make(chan struct{}),
ID: uuid.NewV4(),
wg: sync.WaitGroup{},
beatDone: beatDone,
}

if err := cfg.Unpack(&h.config); err != nil {
Expand Down
28 changes: 20 additions & 8 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,20 @@ func (h *Harvester) Harvest(r reader.Reader) {

harvesterStarted.Add(1)
harvesterRunning.Add(1)
defer harvesterRunning.Add(-1)

// Makes sure file is properly closed when the harvester is stopped
defer h.close()
h.wg.Add(1)
defer func() {
defer
// Channel to stop internal harvester routines
h.stop()
// Makes sure file is properly closed when the harvester is stopped
h.close()

harvesterRunning.Add(-1)

// Channel to stop internal harvester routines
defer h.stop()
// Marks harvester stopping completed
h.wg.Done()
}()

// Closes reader after timeout or when done channel is closed
// This routine is also responsible to properly stop the reader
Expand All @@ -74,8 +81,6 @@ func (h *Harvester) Harvest(r reader.Reader) {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached.")
// Required for shutdown when hanging inside reader
case <-h.prospectorDone:
// Required when reader loop returns and reader finished
case <-h.done:
}
Expand Down Expand Up @@ -156,12 +161,19 @@ func (h *Harvester) Harvest(r reader.Reader) {
}
}

// stop is intended for internal use and closed the done channel to stop execution
func (h *Harvester) stop() {
h.once.Do(func() {
close(h.done)
})
}

// Stop stops harvester and waits for completion
func (h *Harvester) Stop() {
h.stop()
h.wg.Wait()
}

// sendEvent sends event to the spooler channel
// Return false if event was not sent
func (h *Harvester) sendEvent(event *input.Event) bool {
Expand All @@ -184,7 +196,7 @@ func (h *Harvester) sendStateUpdate() {
event := input.NewEvent(h.state)

select {
case <-h.prospectorDone:
case <-h.beatDone:
case h.prospectorChan <- event: // ship the new event downstream
}
}
Expand Down
6 changes: 4 additions & 2 deletions filebeat/prospector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ import (
type Factory struct {
outlet Outlet
registrar *registrar.Registrar
beatDone chan struct{}
}

func NewFactory(outlet Outlet, registrar *registrar.Registrar) *Factory {
func NewFactory(outlet Outlet, registrar *registrar.Registrar, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
beatDone: beatDone,
}
}

func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) {

p, err := NewProspector(c, r.outlet)
p, err := NewProspector(c, r.outlet, r.beatDone)
if err != nil {
logp.Err("Error creating prospector: %s", err)
return nil, err
Expand Down
126 changes: 101 additions & 25 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/mitchellh/hashstructure"
"github.com/satori/go.uuid"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
Expand All @@ -30,12 +31,17 @@ type Prospector struct {
prospectorer Prospectorer
outlet Outlet
harvesterChan chan *input.Event
done chan struct{}
channelDone chan struct{}
runDone chan struct{}
runWg sync.WaitGroup
states *file.States
wg sync.WaitGroup
channelWg sync.WaitGroup // Separate waitgroup for channels as not stopped on completion
id uint64
Once bool
harvesters map[uuid.UUID]*harvester.Harvester
harvestersMutex sync.Mutex
beatDone chan struct{}
}

type Prospectorer interface {
Expand All @@ -47,17 +53,21 @@ type Outlet interface {
OnEvent(event *input.Event) bool
}

func NewProspector(cfg *common.Config, outlet Outlet) (*Prospector, error) {
func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (*Prospector, error) {
prospector := &Prospector{
cfg: cfg,
config: defaultConfig,
outlet: outlet,
harvesterChan: make(chan *input.Event),
done: make(chan struct{}),
channelDone: make(chan struct{}),
wg: sync.WaitGroup{},
runDone: make(chan struct{}),
runWg: sync.WaitGroup{},
states: &file.States{},
channelWg: sync.WaitGroup{},
Once: false,
harvesters: map[uuid.UUID]*harvester.Harvester{},
beatDone: beatDone,
}

var err error
Expand Down Expand Up @@ -115,31 +125,28 @@ func (p *Prospector) Start() {
logp.Info("Starting prospector of type: %v; id: %v ", p.config.InputType, p.ID())

if p.Once {
// If only run once, waiting for completion of prospector / harvesters
defer p.wg.Wait()
// If only run once, waiting for completion of scan and prospector stopping
defer func() {
// Wait until run is finished
p.runWg.Wait()
// Wait until all harvesters are stopped
p.wg.Wait()
}()
}

// Add waitgroup to make sure prospectors finished
p.wg.Add(1)

go func() {
defer p.wg.Done()
p.Run()
}()

}

// Starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Prospector) Run() {

// Open channel to receive events from harvester and forward them to spooler
// Here potential filtering can happen
p.channelWg.Add(1)
go func() {
defer p.channelWg.Done()
for {
select {
case <-p.done:

case <-p.channelDone:
// TODO: Introduce same magic here as we have for shutdown_timeout to drain channel
logp.Info("Prospector channel stopped")
return
case <-p.beatDone:
logp.Info("Prospector channel stopped")
return
case event := <-p.harvesterChan:
Expand All @@ -151,6 +158,18 @@ func (p *Prospector) Run() {
}
}()

// Add waitgroup to make sure prospectors finished
p.runWg.Add(1)
go func() {
defer p.runWg.Done()
p.Run()
}()

}

// Starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Prospector) Run() {

// Initial prospector run
p.prospectorer.Run()

Expand All @@ -161,7 +180,7 @@ func (p *Prospector) Run() {

for {
select {
case <-p.done:
case <-p.runDone:
logp.Info("Prospector ticker stopped")
return
case <-time.After(p.config.ScanFrequency):
Expand Down Expand Up @@ -195,10 +214,46 @@ func (p *Prospector) updateState(event *input.Event) error {
return nil
}

// Stop stops the prospector and with it all harvesters
//
// The shutdown order is as follwoing
// - stop run and scanning
// - wait until last scan finishes to make sure no new harvesters are added
// - stop harvesters
// - wait until all harvester finished
// - stop communication channel
// - wait on internal waitgroup to make sure all prospector go routines are stopped
func (p *Prospector) Stop() {
logp.Info("Stopping Prospector: %v", p.ID())
close(p.done)

// Stop scanning and wait for completion
close(p.runDone)
p.runWg.Wait()

wg := sync.WaitGroup{}

// Stop all harvesters
// In case the beatDone channel is closed, this will not wait for completion
// Otherwise Stop will wait until output is complete
p.harvestersMutex.Lock()
for _, hv := range p.harvesters {
wg.Add(1)
go func(h *harvester.Harvester) {
defer wg.Done()
h.Stop()
}(hv)
}
p.harvestersMutex.Unlock()

// Waits on stopping all harvesters to make sure all events made it into the channel
wg.Wait()

// Wait for completion of sending events
// TODO: If not beatDone, it should be waited until channel is drained -> how?
close(p.channelDone)
p.channelWg.Wait()

// Makes sure all prospector go routines are stopped
p.wg.Wait()
}

Expand All @@ -209,7 +264,7 @@ func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, er
p.cfg,
state,
p.harvesterChan,
p.done,
p.beatDone,
)

return h, err
Expand Down Expand Up @@ -246,19 +301,40 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error {
return err
}

p.wg.Add(1)
// startHarvester is not run concurrently, but atomic operations are need for the decrementing of the counter
// inside the following go routine
atomic.AddUint64(&p.harvesterCounter, 1)
p.wg.Add(1)
go func() {

p.addHarvester(h)
defer func() {
atomic.AddUint64(&p.harvesterCounter, ^uint64(0))
p.removeHarvester(h)
p.wg.Done()
}()
atomic.AddUint64(&p.harvesterCounter, ^uint64(0))

}()
// Starts harvester and picks the right type. In case type is not set, set it to defeault (log)
h.Harvest(reader)
}()

return nil
}

func (p *Prospector) addHarvester(h *harvester.Harvester) {
p.harvestersMutex.Lock()
defer p.harvestersMutex.Unlock()
p.harvesters[h.ID] = h
}

func (p *Prospector) removeHarvester(h *harvester.Harvester) {
p.harvestersMutex.Lock()
defer p.harvestersMutex.Unlock()
delete(p.harvesters, h.ID)
}

func (p *Prospector) getHarvesters() map[uuid.UUID]*harvester.Harvester {
p.harvestersMutex.Lock()
defer p.harvestersMutex.Unlock()
return p.harvesters
}
Loading

0 comments on commit b6af35f

Please sign in to comment.