Skip to content

Commit

Permalink
Merge pull request #1894 from ruflin/improve-namespacing
Browse files Browse the repository at this point in the history
Improve Filebeat organisiation and Cleanup
  • Loading branch information
Steffen Siering authored Jun 22, 2016
2 parents 8a35fdb + 6079993 commit a15d0ef
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 121 deletions.
90 changes: 44 additions & 46 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,15 @@ import (
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/crawler"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/publish"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"
)

// Filebeat is a beater object. Contains all objects needed to run the beat
type Filebeat struct {
FbConfig *cfg.Config
// Channel from harvesters to spooler
publisherChan chan []*input.FileEvent
spooler *Spooler
registrar *crawler.Registrar
crawler *crawler.Crawler
pub logPublisher
done chan struct{}
config *cfg.Config
done chan struct{}
}

// New creates a new Filebeat pointer instance.
Expand All @@ -32,73 +29,86 @@ func New() *Filebeat {
func (fb *Filebeat) Config(b *beat.Beat) error {

// Load Base config
err := b.RawConfig.Unpack(&fb.FbConfig)

err := b.RawConfig.Unpack(&fb.config)
if err != nil {
return fmt.Errorf("Error reading config file: %v", err)
}

// Check if optional config_dir is set to fetch additional prospector config files
fb.FbConfig.FetchConfigs()
fb.config.FetchConfigs()

return nil
}

// Setup applies the minimum required setup to a new Filebeat instance for use.
func (fb *Filebeat) Setup(b *beat.Beat) error {
fb.done = make(chan struct{})

return nil
}

// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {

var err error

// Init channels
fb.publisherChan = make(chan []*input.FileEvent, 1)
config := fb.config.Filebeat

// Setup registrar to persist state
fb.registrar, err = crawler.NewRegistrar(fb.FbConfig.Filebeat.RegistryFile)
registrar, err := registrar.New(config.RegistryFile)
if err != nil {
logp.Err("Could not init registrar: %v", err)
return err
}

fb.crawler = &crawler.Crawler{
Registrar: fb.registrar,
// Channel from harvesters to spooler
publisherChan := make(chan []*input.FileEvent, 1)

// Publishes event to output
publisher := publish.New(config.PublishAsync,
publisherChan, registrar.Channel, b.Publisher.Connect())

// Init and Start spooler: Harvesters dump events into the spooler.
spooler, err := spooler.New(config, publisherChan)
if err != nil {
logp.Err("Could not init spooler: %v", err)
return err
}

// Load the previous log file locations now, for use in prospector
err = fb.registrar.LoadState()
crawler, err := crawler.New(spooler, config.Prospectors)
if err != nil {
logp.Err("Error loading state: %v", err)
logp.Err("Could not init crawler: %v", err)
return err
}

// Init and Start spooler: Harvesters dump events into the spooler.
fb.spooler = NewSpooler(fb.FbConfig.Filebeat, fb.publisherChan)
// The order of starting and stopping is important. Stopping is inverted to the starting order.
// The current order is: registrar, publisher, spooler, crawler
// That means, crawler is stopped first.

// Start the registrar
err = registrar.Start()
if err != nil {
logp.Err("Could not init spooler: %v", err)
return err
logp.Err("Could not start registrar: %v", err)
}
// Stopping registrar will write last state
defer registrar.Stop()

fb.registrar.Start()
fb.spooler.Start()
// Start publisher
publisher.Start()
// Stopping publisher (might potentially drop items)
defer publisher.Stop()

// Starting spooler
spooler.Start()
// Stopping spooler will flush items
defer spooler.Stop()

err = fb.crawler.Start(fb.FbConfig.Filebeat.Prospectors, fb.spooler.Channel)
err = crawler.Start(registrar.GetStates())
if err != nil {
return err
}
// Stop crawler -> stop prospectors -> stop harvesters
defer crawler.Stop()

// Publishes event to output
fb.pub = newPublisher(fb.FbConfig.Filebeat.PublishAsync,
fb.publisherChan, fb.registrar.Channel, b.Publisher.Connect())
fb.pub.Start()

// Blocks progressing
// Blocks progressing. As soon as channel is closed, all defer statements come into play
<-fb.done

return nil
Expand All @@ -114,18 +124,6 @@ func (fb *Filebeat) Stop() {

logp.Info("Stopping filebeat")

// Stop crawler -> stop prospectors -> stop harvesters
fb.crawler.Stop()

// Stopping spooler will flush items
fb.spooler.Stop()

// stopping publisher (might potentially drop items)
fb.pub.Stop()

// Stopping registrar will write last state
fb.registrar.Stop()

// Stop Filebeat
close(fb.done)
}
3 changes: 0 additions & 3 deletions filebeat/beater/filebeat_test.go

This file was deleted.

38 changes: 24 additions & 14 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/spooler"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
Expand All @@ -23,27 +24,32 @@ import (
*/

type Crawler struct {
// Registrar object to persist the state
Registrar *Registrar
prospectors []*prospector.Prospector
wg sync.WaitGroup
prospectors []*prospector.Prospector
wg sync.WaitGroup
spooler *spooler.Spooler
prospectorConfigs []*common.Config
}

func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *input.FileEvent) error {
func New(spooler *spooler.Spooler, prospectorConfigs []*common.Config) (*Crawler, error) {

if len(prospectorConfigs) == 0 {
return fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.")
return nil, fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.")
}

logp.Info("Loading Prospectors: %v", len(prospectorConfigs))
return &Crawler{
spooler: spooler,
prospectorConfigs: prospectorConfigs,
}, nil
}

func (c *Crawler) Start(states input.States) error {

// Get existing states
states := *c.Registrar.state
logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs))

// Prospect the globs/paths given on the command line and launch harvesters
for _, prospectorConfig := range prospectorConfigs {
for _, prospectorConfig := range c.prospectorConfigs {

prospector, err := prospector.NewProspector(prospectorConfig, states, eventChan)
prospector, err := prospector.NewProspector(prospectorConfig, states, c.spooler.Channel)
if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
Expand All @@ -66,19 +72,23 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu
}(i, p)
}

logp.Info("All prospectors are initialised and running with %d states to persist", c.Registrar.state.Count())
logp.Info("All prospectors are initialised and running with %d states to persist", states.Count())

return nil
}

func (c *Crawler) Stop() {
logp.Info("Stopping Crawler")
stopProspector := func(p *prospector.Prospector) {
defer c.wg.Done()
p.Stop()
}

logp.Info("Stopping %v prospectors", len(c.prospectors))
for _, prospector := range c.prospectors {
for _, p := range c.prospectors {
// Stop prospectors in parallel
c.wg.Add(1)
go prospector.Stop(&c.wg)
go stopProspector(p)
}
c.wg.Wait()
logp.Info("Crawler stopped")
Expand Down
7 changes: 2 additions & 5 deletions filebeat/crawler/crawler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ package crawler
import (
"testing"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"
)

func TestCrawlerStartError(t *testing.T) {
crawler := Crawler{}
channel := make(chan *input.FileEvent, 1)
func TestNewCrawlerNoProspectorsError(t *testing.T) {
prospectorConfigs := []*common.Config{}

error := crawler.Start(prospectorConfigs, channel)
_, error := New(nil, prospectorConfigs)

assert.Error(t, error)
}
2 changes: 1 addition & 1 deletion filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (h *Harvester) Harvest() {
ts, text, bytesRead, jsonFields, err := readLine(reader)
if err != nil {
if err == errFileTruncate {
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)
logp.Warn("File was truncated. Begin reading file from offset 0: %s", h.Path)
h.SetOffset(0)
return
}
Expand Down
20 changes: 1 addition & 19 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,10 @@ func (p *Prospector) Run() {
}
}

func (p *Prospector) Stop(wg *sync.WaitGroup) {
func (p *Prospector) Stop() {
logp.Info("Stopping Prospector")
close(p.done)
p.wg.Wait()
wg.Done()
}

// createHarvester creates a new harvester instance from the given state
Expand Down Expand Up @@ -169,20 +168,3 @@ func (p *Prospector) startHarvester(state input.FileState, offset int64) (*harve

return h, nil
}

// isIgnoreOlder checks if the given state reached ignore_older
func (p *Prospector) isIgnoreOlder(state input.FileState) bool {

// ignore_older is disable
if p.config.IgnoreOlder == 0 {
return false
}

modTime := state.Fileinfo.ModTime()

if time.Since(modTime) > p.config.IgnoreOlder {
return true
}

return false
}
19 changes: 18 additions & 1 deletion filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (p *ProspectorLog) scan() {
// harvestNewFile harvest a new file
func (p *ProspectorLog) harvestNewFile(state input.FileState) {

if !p.Prospector.isIgnoreOlder(state) {
if !p.isIgnoreOlder(state) {
logp.Debug("prospector", "Start harvester for new file: %s", state.Source)
p.Prospector.startHarvester(state, 0)
} else {
Expand Down Expand Up @@ -176,3 +176,20 @@ func (p *ProspectorLog) isFileExcluded(file string) bool {
patterns := p.config.ExcludeFiles
return len(patterns) > 0 && harvester.MatchAnyRegexps(patterns, file)
}

// isIgnoreOlder checks if the given state reached ignore_older
func (p *ProspectorLog) isIgnoreOlder(state input.FileState) bool {

// ignore_older is disable
if p.config.IgnoreOlder == 0 {
return false
}

modTime := state.Fileinfo.ModTime()

if time.Since(modTime) > p.config.IgnoreOlder {
return true
}

return false
}
11 changes: 5 additions & 6 deletions filebeat/prospector/prospector_stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ import (
)

type ProspectorStdin struct {
Prospector *Prospector
harvester *harvester.Harvester
started bool
harvester *harvester.Harvester
started bool
}

// NewProspectorStdin creates a new stdin prospector
// This prospector contains one harvester which is reading from stdin
func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) {

prospectorer := &ProspectorStdin{
Prospector: p,
}
prospectorer := &ProspectorStdin{}

var err error

Expand Down
8 changes: 4 additions & 4 deletions filebeat/beater/publish.go → filebeat/publish/publish.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package beater
package publish

import (
"sync"
Expand All @@ -11,7 +11,7 @@ import (
"github.com/elastic/beats/libbeat/publisher"
)

type logPublisher interface {
type LogPublisher interface {
Start()
Stop()
}
Expand Down Expand Up @@ -61,11 +61,11 @@ const (
batchCanceled
)

func newPublisher(
func New(
async bool,
in, out chan []*input.FileEvent,
client publisher.Client,
) logPublisher {
) LogPublisher {
if async {
return newAsyncLogPublisher(in, out, client)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !integration

package beater
package publish

import (
"fmt"
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestPublisherModes(t *testing.T) {
regChan := make(chan []*input.FileEvent, len(test.order)+1)
client := pubtest.NewChanClient(0)

pub := newPublisher(test.async, pubChan, regChan, client)
pub := New(test.async, pubChan, regChan, client)
pub.Start()

var events [][]*input.FileEvent
Expand Down
Loading

0 comments on commit a15d0ef

Please sign in to comment.