Skip to content

Commit

Permalink
Harvester Cleanup
Browse files Browse the repository at this point in the history
* Rename FileEvent to Event
* Remove double information between FileEvent and State (Source, Fileinfo)
* Remove readLine function as not needed anymore
  • Loading branch information
ruflin committed Jul 25, 2016
1 parent b3ec2d9 commit ac4a155
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 95 deletions.
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}

// Channel from harvesters to spooler
publisherChan := make(chan []*input.FileEvent, 1)
publisherChan := make(chan []*input.Event, 1)

// Publishes event to output
publisher := publish.New(config.PublishAsync,
Expand Down
4 changes: 2 additions & 2 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
type Harvester struct {
config harvesterConfig
state file.State
prospectorChan chan *input.FileEvent
prospectorChan chan *input.Event
file source.FileSource /* the file being watched */
done chan struct{}
encodingFactory encoding.EncodingFactory
Expand All @@ -37,7 +37,7 @@ type Harvester struct {
func NewHarvester(
cfg *common.Config,
state file.State,
prospectorChan chan *input.FileEvent,
prospectorChan chan *input.Event,
done chan struct{},
) (*Harvester, error) {

Expand Down
50 changes: 19 additions & 31 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func (h *Harvester) Harvest() {

logp.Info("Harvester started for file: %s", h.state.Source)

r, err := h.newLineReader()
r, err := h.newLogFileReader()

if err != nil {
logp.Err("Stop Harvesting. Unexpected encoding line reader error: %s", err)
return
Expand All @@ -63,8 +64,7 @@ func (h *Harvester) Harvest() {
default:
}

// Partial lines return error and are only read on completion
ts, text, bytesRead, jsonFields, err := readLine(r)
message, err := r.Next()
if err != nil {
switch err {
case reader.ErrFileTruncate:
Expand All @@ -83,16 +83,24 @@ func (h *Harvester) Harvest() {
return
}

// Update offset if complete line has been processed
h.state.Offset += int64(bytesRead)
// Update offset
h.state.Offset += int64(message.Bytes)

// Create state event
event := input.NewEvent(h.getState())

event := h.createEvent()
text := string(message.Content)

// Check if data should be added to event
if h.shouldExportLine(text) {
event.ReadTime = ts
event.Bytes = bytesRead
event.ReadTime = message.Ts
event.Bytes = message.Bytes
event.Text = &text
event.JSONFields = jsonFields
event.JSONFields = message.Fields
event.EventMetadata = h.config.EventMetadata
event.InputType = h.config.InputType
event.DocumentType = h.config.DocumentType
event.JSONConfig = h.config.JSON
}

// Always send event to update state, also if lines was skipped
Expand All @@ -103,29 +111,9 @@ func (h *Harvester) Harvest() {
}
}

// createEvent creates and empty event.
// By default the offset is set to 0, means no bytes read. This can be used to report the status
// of a harvester
func (h *Harvester) createEvent() *input.FileEvent {

event := &input.FileEvent{
EventMetadata: h.config.EventMetadata,
Source: h.state.Source,
InputType: h.config.InputType,
DocumentType: h.config.DocumentType,
Offset: h.state.Offset,
Bytes: 0,
Fileinfo: h.state.Fileinfo,
JSONConfig: h.config.JSON,
State: h.getState(),
}

return event
}

// sendEvent sends event to the spooler channel
// Return false if event was not sent
func (h *Harvester) sendEvent(event *input.FileEvent) bool {
func (h *Harvester) sendEvent(event *input.Event) bool {
select {
case <-h.done:
return false
Expand Down Expand Up @@ -297,7 +285,7 @@ func (h *Harvester) newLogFileReaderConfig() reader.LogFileConfig {
}
}

func (h *Harvester) newLineReader() (reader.Reader, error) {
func (h *Harvester) newLogFileReader() (reader.Reader, error) {

readerConfig := h.newLogFileReaderConfig()

Expand Down
2 changes: 1 addition & 1 deletion filebeat/harvester/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestReadLine(t *testing.T) {
h.encoding, err = h.encodingFactory(readFile)
assert.NoError(t, err)

r, err := h.newLineReader()
r, err := h.newLogFileReader()
assert.NoError(t, err)

// Read third line
Expand Down
20 changes: 8 additions & 12 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package input

import (
"fmt"
"os"
"time"

"github.com/elastic/beats/filebeat/harvester/reader"
Expand All @@ -11,34 +10,31 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

// FileEvent is sent to the output and must contain all relevant information
type FileEvent struct {
// Event is sent to the output and must contain all relevant information
type Event struct {
common.EventMetadata
ReadTime time.Time
Source string
InputType string
DocumentType string
Offset int64
Bytes int
Text *string
Fileinfo os.FileInfo
JSONFields common.MapStr
JSONConfig *reader.JSONConfig
State file.State
}

func NewEvent(state file.State) *FileEvent {
return &FileEvent{
func NewEvent(state file.State) *Event {
return &Event{
State: state,
}
}

func (f *FileEvent) ToMapStr() common.MapStr {
func (f *Event) ToMapStr() common.MapStr {
event := common.MapStr{
common.EventMetadataKey: f.EventMetadata,
"@timestamp": common.Time(f.ReadTime),
"source": f.Source,
"offset": f.Offset, // Offset here is the offset before the starting char.
"source": f.State.Source,
"offset": f.State.Offset, // Offset here is the offset before the starting char.
"type": f.DocumentType,
"input_type": f.InputType,
}
Expand All @@ -56,7 +52,7 @@ func (f *FileEvent) ToMapStr() common.MapStr {
// respecting the KeysUnderRoot and OverwriteKeys configuration options.
// If MessageKey is defined, the Text value from the event always
// takes precedence.
func mergeJSONFields(f *FileEvent, event common.MapStr) {
func mergeJSONFields(f *Event, event common.MapStr) {

// The message key might have been modified by multiline
if len(f.JSONConfig.MessageKey) > 0 && f.Text != nil {
Expand Down
28 changes: 14 additions & 14 deletions filebeat/input/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ import (
"github.com/stretchr/testify/assert"
)

func TestFileEventToMapStr(t *testing.T) {
func TestEventToMapStr(t *testing.T) {
// Test 'fields' is not present when it is nil.
event := FileEvent{}
event := Event{}
mapStr := event.ToMapStr()
_, found := mapStr["fields"]
assert.False(t, found)
}

func TestFileEventToMapStrJSON(t *testing.T) {
func TestEventToMapStrJSON(t *testing.T) {
type io struct {
Event FileEvent
Event Event
ExpectedItems common.MapStr
}

Expand All @@ -30,7 +30,7 @@ func TestFileEventToMapStrJSON(t *testing.T) {
tests := []io{
{
// by default, don't overwrite keys
Event: FileEvent{
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "text": "hello"},
Expand All @@ -43,7 +43,7 @@ func TestFileEventToMapStrJSON(t *testing.T) {
},
{
// overwrite keys if asked
Event: FileEvent{
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "text": "hello"},
Expand All @@ -56,7 +56,7 @@ func TestFileEventToMapStrJSON(t *testing.T) {
},
{
// without keys_under_root, put everything in a json key
Event: FileEvent{
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "text": "hello"},
Expand All @@ -69,7 +69,7 @@ func TestFileEventToMapStrJSON(t *testing.T) {
},
{
// when MessageKey is defined, the Text overwrites the value of that key
Event: FileEvent{
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "text": "hi"},
Expand All @@ -83,7 +83,7 @@ func TestFileEventToMapStrJSON(t *testing.T) {
{
// when @timestamp is in JSON and overwrite_keys is true, parse it
// in a common.Time
Event: FileEvent{
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Expand All @@ -98,7 +98,7 @@ func TestFileEventToMapStrJSON(t *testing.T) {
{
// when the parsing on @timestamp fails, leave the existing value and add an error key
// in a common.Time
Event: FileEvent{
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Expand All @@ -114,7 +114,7 @@ func TestFileEventToMapStrJSON(t *testing.T) {
{
// when the @timestamp has the wrong type, leave the existing value and add an error key
// in a common.Time
Event: FileEvent{
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Expand All @@ -129,7 +129,7 @@ func TestFileEventToMapStrJSON(t *testing.T) {
},
{
// if overwrite_keys is true, but the `type` key in json is not a string, ignore it
Event: FileEvent{
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": 42},
Expand All @@ -142,7 +142,7 @@ func TestFileEventToMapStrJSON(t *testing.T) {
},
{
// if overwrite_keys is true, but the `type` key in json is empty, ignore it
Event: FileEvent{
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": ""},
Expand All @@ -155,7 +155,7 @@ func TestFileEventToMapStrJSON(t *testing.T) {
},
{
// if overwrite_keys is true, but the `type` key in json starts with _, ignore it
Event: FileEvent{
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "_type"},
Expand Down
8 changes: 4 additions & 4 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type Prospector struct {
cfg *common.Config // Raw config
config prospectorConfig
prospectorer Prospectorer
spoolerChan chan *input.FileEvent
harvesterChan chan *input.FileEvent
spoolerChan chan *input.Event
harvesterChan chan *input.Event
done chan struct{}
states *file.States
wg sync.WaitGroup
Expand All @@ -29,12 +29,12 @@ type Prospectorer interface {
Run()
}

func NewProspector(cfg *common.Config, states file.States, spoolerChan chan *input.FileEvent) (*Prospector, error) {
func NewProspector(cfg *common.Config, states file.States, spoolerChan chan *input.Event) (*Prospector, error) {
prospector := &Prospector{
cfg: cfg,
config: defaultConfig,
spoolerChan: spoolerChan,
harvesterChan: make(chan *input.FileEvent),
harvesterChan: make(chan *input.Event),
done: make(chan struct{}),
states: states.Copy(),
wg: sync.WaitGroup{},
Expand Down
14 changes: 7 additions & 7 deletions filebeat/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type LogPublisher interface {
type syncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in, out chan []*input.FileEvent
in, out chan []*input.Event

done chan struct{}
wg sync.WaitGroup
Expand All @@ -29,7 +29,7 @@ type syncLogPublisher struct {
type asyncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in, out chan []*input.FileEvent
in, out chan []*input.Event

// list of in-flight batches
active batchList
Expand All @@ -44,7 +44,7 @@ type asyncLogPublisher struct {
type eventsBatch struct {
next *eventsBatch
flag int32
events []*input.FileEvent
events []*input.Event
}

type batchList struct {
Expand All @@ -70,7 +70,7 @@ var (

func New(
async bool,
in, out chan []*input.FileEvent,
in, out chan []*input.Event,
pub publisher.Publisher,
) LogPublisher {
if async {
Expand All @@ -80,7 +80,7 @@ func New(
}

func newSyncLogPublisher(
in, out chan []*input.FileEvent,
in, out chan []*input.Event,
pub publisher.Publisher,
) *syncLogPublisher {
return &syncLogPublisher{
Expand All @@ -101,7 +101,7 @@ func (p *syncLogPublisher) Start() {
logp.Info("Start sending events to output")

for {
var events []*input.FileEvent
var events []*input.Event
select {
case <-p.done:
return
Expand Down Expand Up @@ -143,7 +143,7 @@ func (p *syncLogPublisher) Stop() {
}

func newAsyncLogPublisher(
in, out chan []*input.FileEvent,
in, out chan []*input.Event,
pub publisher.Publisher,
) *asyncLogPublisher {
return &asyncLogPublisher{
Expand Down
Loading

0 comments on commit ac4a155

Please sign in to comment.