Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix filebeat registry meta being nil vs empty #7632

Merged
merged 11 commits into from
Jul 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Fix offset field pointing at end of a line. {issue}6514[6514]
- Fix an issue when parsing ISO8601 dates with timezone definition {issue}7367[7367]
- Fix Grok pattern of MongoDB module. {pull}7568[7568]
- Fix registry duplicates and log resending on upgrade. {issue}7634[7634]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happened only when going from 6.3.0 to 6.3.1?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, meta was introduce in 6.3.1.


*Heartbeat*
- Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616]
Expand Down
3 changes: 3 additions & 0 deletions filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func NewInput(

// Add stream to meta to ensure different state per stream
if config.Containers.Stream != "all" {
if context.Meta == nil {
context.Meta = map[string]string{}
}
context.Meta["stream"] = config.Containers.Stream
}

Expand Down
7 changes: 5 additions & 2 deletions filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type State struct {

// NewState creates a new file state
func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State {
if len(meta) == 0 {
meta = nil
}
return State{
Fileinfo: fileInfo,
Source: path,
Expand All @@ -60,7 +63,7 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin
func (s *State) ID() string {
// Generate id on first request. This is needed as id is not set when converting back from json
if s.Id == "" {
if s.Meta == nil {
if len(s.Meta) == 0 {
s.Id = s.FileStateOS.String()
} else {
hashValue, _ := hashstructure.Hash(s.Meta, nil)
Expand Down Expand Up @@ -91,6 +94,6 @@ func (s *State) IsEqual(c *State) bool {
func (s *State) IsEmpty() bool {
return s.FileStateOS == file.StateOS{} &&
s.Source == "" &&
s.Meta == nil &&
len(s.Meta) == 0 &&
s.Timestamp.IsZero()
}
2 changes: 1 addition & 1 deletion filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func New(
Done: input.done,
BeatDone: input.beatDone,
DynamicFields: dynFields,
Meta: map[string]string{},
Meta: nil,
}
var ipt Input
ipt, err = f(conf, outlet, context)
Expand Down
11 changes: 10 additions & 1 deletion filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func NewInput(
// can be forwarded correctly to the registrar.
stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone)

meta := context.Meta
if len(meta) == 0 {
meta = nil
}

p := &Input{
config: defaultConfig,
cfg: cfg,
Expand All @@ -101,7 +106,7 @@ func NewInput(
stateOutlet: stateOut,
states: file.NewStates(),
done: context.Done,
meta: context.Meta,
meta: meta,
}

if err := cfg.Unpack(&p.config); err != nil {
Expand Down Expand Up @@ -687,6 +692,10 @@ func (p *Input) updateState(state file.State) error {
state.TTL = p.config.CleanInactive
}

if len(state.Meta) == 0 {
state.Meta = nil
}

// Update first internal state
p.states.Update(state)

Expand Down
104 changes: 98 additions & 6 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package registrar
import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -132,20 +133,111 @@ func (r *Registrar) loadStates() error {

logp.Info("Loading registrar data from %s", r.registryFile)

decoder := json.NewDecoder(f)
states := []file.State{}
err = decoder.Decode(&states)
states, err := readStatesFrom(f)
if err != nil {
return fmt.Errorf("Error decoding states: %s", err)
return err
}

states = resetStates(states)
r.states.SetStates(states)
logp.Info("States Loaded from registrar: %+v", len(states))

return nil
}

func readStatesFrom(in io.Reader) ([]file.State, error) {
states := []file.State{}
decoder := json.NewDecoder(in)
if err := decoder.Decode(&states); err != nil {
return nil, fmt.Errorf("Error decoding states: %s", err)
}

states = fixStates(states)
states = resetStates(states)
return states, nil
}

// fixStates cleans up the regsitry states when updating from an older version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be more specific in the comment here to mention between which versions this happened? We might not remember in a few months.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As changes happen in master it's normally difficult to keep track of beats versions :)

This kind of information should not be burried somewhere in the code, but we should have a separate changelog with schema changes in registrar directory. Or just put it at the top of registrar.go. One big oversight is the missing versioning of the registry file schema itself. Knowing the version of the file + having a changelog would be helpful to (btw. changelog should also contain a date).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree it's tricky to do this in master but in this case it was fix for a very specific version. The reason I would like to have it here is in case we modify it in the future we also know which versions to test with.

// of filebeat potentially writing invalid entries.
func fixStates(states []file.State) []file.State {
if len(states) == 0 {
return states
}

// we use a map of states here, so to identify and merge duplicate entries.
idx := map[string]*file.State{}
for i := range states {
state := &states[i]
fixState(state)

id := state.ID()
old, exists := idx[id]
if !exists {
idx[id] = state
} else {
mergeStates(old, state) // overwrite the entry in 'old'
}
}

if len(idx) == len(states) {
return states
}

i := 0
newStates := make([]file.State, len(idx))
for _, state := range idx {
newStates[i] = *state
i++
}
return newStates
}

// fixState updates a read state to fullfil required invariantes:
// - "Meta" must be nil if len(Meta) == 0
func fixState(st *file.State) {
if len(st.Meta) == 0 {
st.Meta = nil
}
}

// mergeStates merges 2 states by trying to determine the 'newer' state.
// The st state is overwritten with the updated fields.
func mergeStates(st, other *file.State) {
st.Finished = st.Finished || other.Finished
if st.Offset < other.Offset { // always select the higher offset
st.Offset = other.Offset
}

// update file meta-data. As these are updated concurrently by the
// prospectors, select the newer state based on the update timestamp.
var meta, metaOld, metaNew map[string]string
if st.Timestamp.Before(other.Timestamp) {
st.Source = other.Source
st.Timestamp = other.Timestamp
st.TTL = other.TTL
st.FileStateOS = other.FileStateOS

metaOld, metaNew = st.Meta, other.Meta
} else {
metaOld, metaNew = other.Meta, st.Meta
}

if len(metaOld) == 0 || len(metaNew) == 0 {
meta = metaNew
} else {
meta = map[string]string{}
for k, v := range metaOld {
meta[k] = v
}
for k, v := range metaNew {
meta[k] = v
}
}

if len(meta) == 0 {
meta = nil
}
st.Meta = meta
}

// resetStates sets all states to finished and disable TTL on restart
// For all states covered by an input, TTL will be overwritten with the input value
func resetStates(states []file.State) []file.State {
Expand Down
Loading