-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix filebeat registry meta being nil vs empty #7632
Changes from all commits
9b63165
beace65
6ab9e60
78b26df
62007fd
f74762d
56bbdf1
453e982
43bd3ad
12778ab
064fdfa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ package registrar | |
import ( | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"os" | ||
"path/filepath" | ||
"sync" | ||
|
@@ -132,20 +133,111 @@ func (r *Registrar) loadStates() error { | |
|
||
logp.Info("Loading registrar data from %s", r.registryFile) | ||
|
||
decoder := json.NewDecoder(f) | ||
states := []file.State{} | ||
err = decoder.Decode(&states) | ||
states, err := readStatesFrom(f) | ||
if err != nil { | ||
return fmt.Errorf("Error decoding states: %s", err) | ||
return err | ||
} | ||
|
||
states = resetStates(states) | ||
r.states.SetStates(states) | ||
logp.Info("States Loaded from registrar: %+v", len(states)) | ||
|
||
return nil | ||
} | ||
|
||
func readStatesFrom(in io.Reader) ([]file.State, error) { | ||
states := []file.State{} | ||
decoder := json.NewDecoder(in) | ||
if err := decoder.Decode(&states); err != nil { | ||
return nil, fmt.Errorf("Error decoding states: %s", err) | ||
} | ||
|
||
states = fixStates(states) | ||
states = resetStates(states) | ||
return states, nil | ||
} | ||
|
||
// fixStates cleans up the regsitry states when updating from an older version | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we be more specific in the comment here to mention between which versions this happened? We might not remember in a few months. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As changes happen in master it's normally difficult to keep track of beats versions :) This kind of information should not be burried somewhere in the code, but we should have a separate changelog with schema changes in registrar directory. Or just put it at the top of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree it's tricky to do this in master but in this case it was fix for a very specific version. The reason I would like to have it here is in case we modify it in the future we also know which versions to test with. |
||
// of filebeat potentially writing invalid entries. | ||
func fixStates(states []file.State) []file.State { | ||
if len(states) == 0 { | ||
return states | ||
} | ||
|
||
// we use a map of states here, so to identify and merge duplicate entries. | ||
idx := map[string]*file.State{} | ||
for i := range states { | ||
state := &states[i] | ||
fixState(state) | ||
|
||
id := state.ID() | ||
old, exists := idx[id] | ||
if !exists { | ||
idx[id] = state | ||
} else { | ||
mergeStates(old, state) // overwrite the entry in 'old' | ||
} | ||
} | ||
|
||
if len(idx) == len(states) { | ||
return states | ||
} | ||
|
||
i := 0 | ||
newStates := make([]file.State, len(idx)) | ||
for _, state := range idx { | ||
newStates[i] = *state | ||
i++ | ||
} | ||
return newStates | ||
} | ||
|
||
// fixState updates a read state to fullfil required invariantes: | ||
// - "Meta" must be nil if len(Meta) == 0 | ||
func fixState(st *file.State) { | ||
if len(st.Meta) == 0 { | ||
st.Meta = nil | ||
} | ||
} | ||
|
||
// mergeStates merges 2 states by trying to determine the 'newer' state. | ||
// The st state is overwritten with the updated fields. | ||
func mergeStates(st, other *file.State) { | ||
st.Finished = st.Finished || other.Finished | ||
if st.Offset < other.Offset { // always select the higher offset | ||
st.Offset = other.Offset | ||
} | ||
|
||
// update file meta-data. As these are updated concurrently by the | ||
// prospectors, select the newer state based on the update timestamp. | ||
var meta, metaOld, metaNew map[string]string | ||
if st.Timestamp.Before(other.Timestamp) { | ||
st.Source = other.Source | ||
st.Timestamp = other.Timestamp | ||
st.TTL = other.TTL | ||
st.FileStateOS = other.FileStateOS | ||
|
||
metaOld, metaNew = st.Meta, other.Meta | ||
} else { | ||
metaOld, metaNew = other.Meta, st.Meta | ||
} | ||
|
||
if len(metaOld) == 0 || len(metaNew) == 0 { | ||
meta = metaNew | ||
} else { | ||
meta = map[string]string{} | ||
for k, v := range metaOld { | ||
meta[k] = v | ||
} | ||
for k, v := range metaNew { | ||
meta[k] = v | ||
} | ||
} | ||
|
||
if len(meta) == 0 { | ||
meta = nil | ||
} | ||
st.Meta = meta | ||
} | ||
|
||
// resetStates sets all states to finished and disable TTL on restart | ||
// For all states covered by an input, TTL will be overwritten with the input value | ||
func resetStates(states []file.State) []file.State { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happened only when going from 6.3.0 to 6.3.1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, meta was introduce in 6.3.1.