diff --git a/NOTICE.txt b/NOTICE.txt index b016100b8a7e..c33815560d2b 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -10825,11 +10825,11 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI -------------------------------------------------------------------------------- Dependency : github.com/magefile/mage -Version: v1.9.0 +Version: v1.10.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/magefile/mage@v1.9.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/magefile/mage@v1.10.0/LICENSE: Apache License Version 2.0, January 2004 @@ -11019,7 +11019,7 @@ Contents of probable licence file $GOMODCACHE/github.com/magefile/mage@v1.9.0/LI same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright {yyyy} {name of copyright owner} + Copyright 2017 the Mage authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 4f3c5b64647e..324cc5189d83 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -312,8 +312,21 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } finishedLogger := newFinishedLogger(wgEvents) + registryMigrator := registrar.NewMigrator(config.Registry) + if err := registryMigrator.Run(); err != nil { + logp.Err("Failed to migrate registry file: %+v", err) + return err + } + + stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry) + if err != nil { + logp.Err("Failed to open state store: %+v", err) + return err + } + defer stateStore.Close() + // Setup registrar to persist state - registrar, err := registrar.New(config.Registry, finishedLogger) + registrar, err := registrar.New(stateStore, finishedLogger, config.Registry.FlushTimeout) if err != nil { logp.Err("Could not init registrar: %v", err) return err diff --git a/filebeat/beater/store.go b/filebeat/beater/store.go new file mode 100644 index 000000000000..6db8dc37ee6f --- /dev/null +++ b/filebeat/beater/store.go @@ -0,0 +1,63 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "time" + + "github.com/elastic/beats/v7/filebeat/config" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/paths" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/backend/memlog" +) + +type filebeatStore struct { + registry *statestore.Registry + storeName string + cleanInterval time.Duration +} + +func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) { + memlog, err := memlog.New(logger, memlog.Settings{ + Root: paths.Resolve(paths.Data, cfg.Path), + FileMode: cfg.Permissions, + }) + if err != nil { + return nil, err + } + + return &filebeatStore{ + registry: statestore.NewRegistry(memlog), + storeName: info.Beat, + cleanInterval: cfg.CleanInterval, + }, nil +} + +func (s *filebeatStore) Close() { + s.registry.Close() +} + +func (s *filebeatStore) Access() (*statestore.Store, error) { + return s.registry.Get(s.storeName) +} + +func (s *filebeatStore) CleanupInterval() time.Duration { + return s.cleanInterval +} diff --git a/filebeat/config/config.go b/filebeat/config/config.go index db10fa20ba6e..75289d2f0c94 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -51,18 +51,20 @@ type Config struct { } type Registry struct { - Path string `config:"path"` - Permissions os.FileMode `config:"file_permissions"` - FlushTimeout time.Duration `config:"flush"` - MigrateFile string `config:"migrate_file"` + Path string `config:"path"` + Permissions os.FileMode `config:"file_permissions"` + FlushTimeout time.Duration `config:"flush"` + CleanInterval time.Duration `config:"cleanup_interval"` + MigrateFile string `config:"migrate_file"` } var ( DefaultConfig = Config{ Registry: Registry{ - Path: "registry", - Permissions: 0600, - MigrateFile: "", + Path: "registry", + Permissions: 0600, + MigrateFile: "", + CleanInterval: 5 * time.Minute, }, ShutdownTimeout: 0, OverwritePipelines: false, diff --git a/filebeat/registrar/migrate.go b/filebeat/registrar/migrate.go index 64d2b4579a8c..4a76771878af 100644 --- a/filebeat/registrar/migrate.go +++ b/filebeat/registrar/migrate.go @@ -26,23 +26,57 @@ import ( "github.com/pkg/errors" + "github.com/elastic/beats/v7/filebeat/config" + "github.com/elastic/beats/v7/filebeat/input/file" helper "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/paths" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/backend/memlog" ) +type registryVersion string + const ( - legacyVersion = "" - currentVersion = "0" + noRegistry registryVersion = "" + legacyVersion = "" + version0 = "0" + version1 = "1" ) -func ensureCurrent(home, migrateFile string, perm os.FileMode) error { +const currentVersion = version1 + +type Migrator struct { + dataPath string + migrateFile string + permissions os.FileMode +} + +func NewMigrator(cfg config.Registry) *Migrator { + path := paths.Resolve(paths.Data, cfg.Path) + migrateFile := cfg.MigrateFile + if migrateFile != "" { + migrateFile = paths.Resolve(paths.Data, migrateFile) + } + + return &Migrator{ + dataPath: path, + migrateFile: migrateFile, + permissions: cfg.Permissions, + } +} + +// Run checks the on disk registry version and updates +// old on disk and data layouts to the current supported storage format. +func (m *Migrator) Run() error { + migrateFile := m.migrateFile if migrateFile == "" { - if isFile(home) { - migrateFile = home + if isFile(m.dataPath) { + migrateFile = m.dataPath } } - fbRegHome := filepath.Join(home, "filebeat") + fbRegHome := filepath.Join(m.dataPath, "filebeat") version, err := readVersion(fbRegHome, migrateFile) if err != nil { return err @@ -50,26 +84,52 @@ func ensureCurrent(home, migrateFile string, perm os.FileMode) error { logp.Debug("registrar", "Registry type '%v' found", version) - switch version { - case legacyVersion: - return migrateLegacy(home, fbRegHome, migrateFile, perm) - case currentVersion: - return nil - case "": - backupFile := migrateFile + ".bak" - if isFile(backupFile) { - return migrateLegacy(home, fbRegHome, backupFile, perm) + for { + switch version { + case legacyVersion: + // first migrate to verion0 style registry and go from there + if err := m.updateToVersion0(fbRegHome, migrateFile); err != nil { + return err + } + fallthrough + case version0: + return m.updateToVersion1(fbRegHome) + + case currentVersion: + return nil + + case noRegistry: + // check if we've been in the middle of a migration from the legacy + // format to the current version. If so continue with + // the migration and try again. + backupFile := migrateFile + ".bak" + if isFile(backupFile) { + migrateFile = backupFile + version = legacyVersion + break + } + + // postpone registry creation, until we open and configure it. + return nil + default: + return fmt.Errorf("registry file version %v not supported", version) } - return initRegistry(fbRegHome, perm) - default: - return fmt.Errorf("registry file version %v not supported", version) } } -func migrateLegacy(home, regHome, migrateFile string, perm os.FileMode) error { +// updateToVersion0 converts a single old registry file to a version 0 style registry. +// The version 0 registry is a directory, that contains two files: meta.json, and data.json. +// The meta.json reports the current storage version, and the data.json +// contains a JSON array with all entries known to the registrar. +// +// The data representation has changed multiple times before version 0 was introduced. The update function tries +// to fix the state, allow us to migrarte from older Beats registry files. +// NOTE: The oldest known filebeat registry file format this was tested with is from Filebeat 6.3. +// Support for older Filebeat versions is best effort. +func (m *Migrator) updateToVersion0(regHome, migrateFile string) error { logp.Info("Migrate registry file to registry directory") - if home == migrateFile { + if m.dataPath == migrateFile { backupFile := migrateFile + ".bak" if isFile(migrateFile) { logp.Info("Move registry file to backup file: %v", backupFile) @@ -83,7 +143,7 @@ func migrateLegacy(home, regHome, migrateFile string, perm os.FileMode) error { } } - if err := initRegistry(regHome, perm); err != nil { + if err := initVersion0Registry(regHome, m.permissions); err != nil { return err } @@ -99,7 +159,7 @@ func migrateLegacy(home, regHome, migrateFile string, perm os.FileMode) error { return nil } -func initRegistry(regHome string, perm os.FileMode) error { +func initVersion0Registry(regHome string, perm os.FileMode) error { if !isDir(regHome) { logp.Info("No registry home found. Create: %v", regHome) if err := os.MkdirAll(regHome, 0750); err != nil { @@ -119,31 +179,112 @@ func initRegistry(regHome string, perm os.FileMode) error { return nil } -func readVersion(regHome, migrateFile string) (string, error) { +// updateToVersion1 updates the filebeat registry from version 0 to version 1 +// only. Version 1 is based on the implementation of version 1 in +// libbeat/statestore/backend/memlog. +func (m *Migrator) updateToVersion1(regHome string) error { + logp.Info("Migrate registry version 0 to version 1") + + origDataFile := filepath.Join(regHome, "data.json") + if !isFile(origDataFile) { + return fmt.Errorf("missing original data file at: %v", origDataFile) + } + + // read states from file and ensure file is closed immediately. + states, err := func() ([]file.State, error) { + origIn, err := os.Open(origDataFile) + if err != nil { + return nil, errors.Wrap(err, "failed to open original data file") + } + defer origIn.Close() + + var states []file.State + decoder := json.NewDecoder(origIn) + if err := decoder.Decode(&states); err != nil { + return nil, errors.Wrapf(err, "Error decoding original data file '%v'", origDataFile) + } + return states, nil + }() + if err != nil { + return err + } + + states = resetStates(fixStates(states)) + + registryBackend, err := memlog.New(logp.NewLogger("migration"), memlog.Settings{ + Root: m.dataPath, + FileMode: m.permissions, + Checkpoint: func(_ uint64) bool { return true }, + IgnoreVersionCheck: true, + }) + if err != nil { + return errors.Wrap(err, "failed to create new registry backend") + } + + reg := statestore.NewRegistry(registryBackend) + defer reg.Close() + + store, err := reg.Get("filebeat") + if err != nil { + return errors.Wrap(err, "failed to open filebeat registry store") + } + defer store.Close() + + if err := writeStates(store, states); err != nil { + return errors.Wrap(err, "failed to migrate registry states") + } + + if err := os.Remove(origDataFile); err != nil { + return errors.Wrapf(err, "migration complete but failed to remove original data file: %v", origDataFile) + } + + if err := ioutil.WriteFile(filepath.Join(regHome, "meta.json"), []byte(`{"version": "1"}`), m.permissions); err != nil { + return fmt.Errorf("failed to update the meta.json file: %w", err) + } + + return nil +} + +func writeMeta(path string, version string, perm os.FileMode) error { + logp.Info("Write registry meta file with version: %v", version) + doc := struct{ Version string }{version} + body, err := json.Marshal(doc) + if err != nil { + panic(err) // must not fail + } + + if err = safeWriteFile(path+".tmp", body, perm); err != nil { + return errors.Wrap(err, "failed writing registry meta.json") + } + + return helper.SafeFileRotate(path, path+".tmp") +} + +func readVersion(regHome, migrateFile string) (registryVersion, error) { if isFile(migrateFile) { return legacyVersion, nil } if !isDir(regHome) { - return "", nil + return noRegistry, nil } metaFile := filepath.Join(regHome, "meta.json") if !isFile(metaFile) { - return "", nil + return noRegistry, nil } tmp, err := ioutil.ReadFile(metaFile) if err != nil { - return "", errors.Wrap(err, "failed to open meta file") + return noRegistry, errors.Wrap(err, "failed to open meta file") } meta := struct{ Version string }{} if err := json.Unmarshal(tmp, &meta); err != nil { - return "", errors.Wrap(err, "failed reading meta file") + return noRegistry, errors.Wrap(err, "failed reading meta file") } - return meta.Version, nil + return registryVersion(meta.Version), nil } func isDir(path string) bool { @@ -185,3 +326,98 @@ func safeWriteFile(path string, data []byte, perm os.FileMode) error { } return err } + +// fixStates cleans up the registry states when updating from an older version +// of filebeat potentially writing invalid entries. +func fixStates(states []file.State) []file.State { + if len(states) == 0 { + return states + } + + // we use a map of states here, so to identify and merge duplicate entries. + idx := map[string]*file.State{} + for i := range states { + state := &states[i] + fixState(state) + + id := state.ID() + old, exists := idx[id] + if !exists { + idx[id] = state + } else { + mergeStates(old, state) // overwrite the entry in 'old' + } + } + + if len(idx) == len(states) { + return states + } + + i := 0 + newStates := make([]file.State, len(idx)) + for _, state := range idx { + newStates[i] = *state + i++ + } + return newStates +} + +// fixState updates a read state to fullfil required invariantes: +// - "Meta" must be nil if len(Meta) == 0 +func fixState(st *file.State) { + if len(st.Meta) == 0 { + st.Meta = nil + } +} + +// resetStates sets all states to finished and disable TTL on restart +// For all states covered by an input, TTL will be overwritten with the input value +func resetStates(states []file.State) []file.State { + for key, state := range states { + state.Finished = true + // Set ttl to -2 to easily spot which states are not managed by a input + state.TTL = -2 + states[key] = state + } + return states +} + +// mergeStates merges 2 states by trying to determine the 'newer' state. +// The st state is overwritten with the updated fields. +func mergeStates(st, other *file.State) { + st.Finished = st.Finished || other.Finished + if st.Offset < other.Offset { // always select the higher offset + st.Offset = other.Offset + } + + // update file meta-data. As these are updated concurrently by the + // inputs, select the newer state based on the update timestamp. + var meta, metaOld, metaNew map[string]string + if st.Timestamp.Before(other.Timestamp) { + st.Source = other.Source + st.Timestamp = other.Timestamp + st.TTL = other.TTL + st.FileStateOS = other.FileStateOS + + metaOld, metaNew = st.Meta, other.Meta + } else { + metaOld, metaNew = other.Meta, st.Meta + } + + if len(metaOld) == 0 || len(metaNew) == 0 { + meta = metaNew + } else { + meta = map[string]string{} + for k, v := range metaOld { + meta[k] = v + } + for k, v := range metaNew { + meta[k] = v + } + } + + if len(meta) == 0 { + meta = nil + } + st.Meta = meta +} diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 61b9c2608e1a..118f7c276db0 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -18,41 +18,47 @@ package registrar import ( - "encoding/json" "fmt" - "io" - "os" - "path/filepath" + "strings" "sync" "time" - "github.com/elastic/beats/v7/filebeat/config" + "github.com/pkg/errors" + "github.com/elastic/beats/v7/filebeat/input/file" - helper "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/elastic/beats/v7/libbeat/paths" + "github.com/elastic/beats/v7/libbeat/statestore" ) type Registrar struct { - Channel chan []file.State - out successLogger - done chan struct{} - registryFile string // Path to the Registry File - fileMode os.FileMode // Permissions to apply on the Registry File - wg sync.WaitGroup - - states *file.States // Map with all file paths inside and the corresponding state - gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write - gcEnabled bool // gcEnabled indicates the registry contains some state that can be gc'ed in the future - flushTimeout time.Duration + log *logp.Logger + + // registrar event input and output + Channel chan []file.State + out successLogger bufferedStateUpdates int + + // shutdown handling + done chan struct{} + wg sync.WaitGroup + + // state storage + states *file.States // Map with all file paths inside and the corresponding state + store *statestore.Store // Store keeps states in memory and on disk + flushTimeout time.Duration + + gcEnabled, gcRequired bool } type successLogger interface { Published(n int) bool } +type StateStore interface { + Access() (*statestore.Store, error) +} + var ( statesUpdate = monitoring.NewInt(nil, "registrar.states.update") statesCleanup = monitoring.NewInt(nil, "registrar.states.cleanup") @@ -62,69 +68,27 @@ var ( registrySuccess = monitoring.NewInt(nil, "registrar.writes.success") ) +const fileStatePrefix = "filebeat::logs::" + // New creates a new Registrar instance, updating the registry file on // `file.State` updates. New fails if the file can not be opened or created. -func New(cfg config.Registry, out successLogger) (*Registrar, error) { - home := paths.Resolve(paths.Data, cfg.Path) - migrateFile := cfg.MigrateFile - if migrateFile != "" { - migrateFile = paths.Resolve(paths.Data, migrateFile) - } - - err := ensureCurrent(home, migrateFile, cfg.Permissions) +func New(stateStore StateStore, out successLogger, flushTimeout time.Duration) (*Registrar, error) { + store, err := stateStore.Access() if err != nil { return nil, err } - dataFile := filepath.Join(home, "filebeat", "data.json") r := &Registrar{ - registryFile: dataFile, - fileMode: cfg.Permissions, - done: make(chan struct{}), - states: file.NewStates(), + log: logp.NewLogger("registrar"), Channel: make(chan []file.State, 1), - flushTimeout: cfg.FlushTimeout, out: out, + done: make(chan struct{}), wg: sync.WaitGroup{}, + states: file.NewStates(), + store: store, + flushTimeout: flushTimeout, } - return r, r.Init() -} - -// Init sets up the Registrar and make sure the registry file is setup correctly -func (r *Registrar) Init() error { - // The registry file is opened in the data path - r.registryFile = paths.Resolve(paths.Data, r.registryFile) - - // Create directory if it does not already exist. - registryPath := filepath.Dir(r.registryFile) - err := os.MkdirAll(registryPath, 0750) - if err != nil { - return fmt.Errorf("Failed to created registry file dir %s: %v", registryPath, err) - } - - // Check if files exists - fileInfo, err := os.Lstat(r.registryFile) - if os.IsNotExist(err) { - logp.Info("No registry file found under: %s. Creating a new registry file.", r.registryFile) - // No registry exists yet, write empty state to check if registry can be written - return r.writeRegistry() - } - if err != nil { - return err - } - - // Check if regular file, no dir, no symlink - if !fileInfo.Mode().IsRegular() { - // Special error message for directory - if fileInfo.IsDir() { - return fmt.Errorf("Registry file path must be a file. %s is a directory.", r.registryFile) - } - return fmt.Errorf("Registry file path is not a regular file: %s", r.registryFile) - } - - logp.Debug("registrar", "Registry file set to: %s", r.registryFile) - - return nil + return r, nil } // GetStates return the registrar states @@ -135,132 +99,17 @@ func (r *Registrar) GetStates() []file.State { // loadStates fetches the previous reading state from the configure RegistryFile file // The default file is `registry` in the data path. func (r *Registrar) loadStates() error { - f, err := os.Open(r.registryFile) + states, err := readStatesFrom(r.store) if err != nil { - return err + return errors.Wrap(err, "can not load filebeat registry state") } - defer f.Close() - - logp.Info("Loading registrar data from %s", r.registryFile) - - states, err := readStatesFrom(f) - if err != nil { - return err - } r.states.SetStates(states) - logp.Info("States Loaded from registrar: %+v", len(states)) + r.log.Infof("States Loaded from registrar: %+v", len(states)) return nil } -func readStatesFrom(in io.Reader) ([]file.State, error) { - states := []file.State{} - decoder := json.NewDecoder(in) - if err := decoder.Decode(&states); err != nil { - return nil, fmt.Errorf("Error decoding states: %s", err) - } - - states = fixStates(states) - states = resetStates(states) - return states, nil -} - -// fixStates cleans up the registry states when updating from an older version -// of filebeat potentially writing invalid entries. -func fixStates(states []file.State) []file.State { - if len(states) == 0 { - return states - } - - // we use a map of states here, so to identify and merge duplicate entries. - idx := map[string]*file.State{} - for i := range states { - state := &states[i] - fixState(state) - - id := state.ID() - old, exists := idx[id] - if !exists { - idx[id] = state - } else { - mergeStates(old, state) // overwrite the entry in 'old' - } - } - - if len(idx) == len(states) { - return states - } - - i := 0 - newStates := make([]file.State, len(idx)) - for _, state := range idx { - newStates[i] = *state - i++ - } - return newStates -} - -// fixState updates a read state to fullfil required invariantes: -// - "Meta" must be nil if len(Meta) == 0 -func fixState(st *file.State) { - if len(st.Meta) == 0 { - st.Meta = nil - } -} - -// mergeStates merges 2 states by trying to determine the 'newer' state. -// The st state is overwritten with the updated fields. -func mergeStates(st, other *file.State) { - st.Finished = st.Finished || other.Finished - if st.Offset < other.Offset { // always select the higher offset - st.Offset = other.Offset - } - - // update file meta-data. As these are updated concurrently by the - // inputs, select the newer state based on the update timestamp. - var meta, metaOld, metaNew map[string]string - if st.Timestamp.Before(other.Timestamp) { - st.Source = other.Source - st.Timestamp = other.Timestamp - st.TTL = other.TTL - st.FileStateOS = other.FileStateOS - - metaOld, metaNew = st.Meta, other.Meta - } else { - metaOld, metaNew = other.Meta, st.Meta - } - - if len(metaOld) == 0 || len(metaNew) == 0 { - meta = metaNew - } else { - meta = map[string]string{} - for k, v := range metaOld { - meta[k] = v - } - for k, v := range metaNew { - meta[k] = v - } - } - - if len(meta) == 0 { - meta = nil - } - st.Meta = meta -} - -// resetStates sets all states to finished and disable TTL on restart -// For all states covered by an input, TTL will be overwritten with the input value -func resetStates(states []file.State) []file.State { - for key, state := range states { - state.Finished = true - // Set ttl to -2 to easily spot which states are not managed by a input - state.TTL = -2 - states[key] = state - } - return states -} - func (r *Registrar) Start() error { // Load the previous log file locations now, for use in input err := r.loadStates() @@ -269,45 +118,100 @@ func (r *Registrar) Start() error { } r.wg.Add(1) - go r.Run() + go func() { + defer r.wg.Done() + r.Run() + }() return nil } +// Stop stops the registry. It waits until Run function finished. +func (r *Registrar) Stop() { + r.log.Info("Stopping Registrar") + defer r.log.Info("Registrar stopped") + + close(r.done) + r.wg.Wait() +} + func (r *Registrar) Run() { - logp.Debug("registrar", "Starting Registrar") - // Writes registry on shutdown + r.log.Debug("Starting Registrar") + defer r.log.Debug("Stopping Registrar") + + defer r.store.Close() + defer func() { - r.writeRegistry() - r.wg.Done() + writeStates(r.store, r.states.GetStates()) }() var ( timer *time.Timer flushC <-chan time.Time + + directIn chan []file.State + collectIn chan []file.State ) + if r.flushTimeout <= 0 { + directIn = r.Channel + } else { + collectIn = r.Channel + } + for { select { case <-r.done: - logp.Info("Ending Registrar") + r.log.Info("Ending Registrar") return - case <-flushC: - flushC = nil - timer.Stop() - r.flushRegistry() - case states := <-r.Channel: + + case states := <-directIn: + // no flush timeout configured. Directly update registry r.onEvents(states) - if r.flushTimeout <= 0 { - r.flushRegistry() - } else if flushC == nil { + r.commitStateUpdates() + + case states := <-collectIn: + // flush timeout configured. Only update internal state and track pending + // updates to be written to registry. + r.onEvents(states) + if flushC == nil && len(states) > 0 { timer = time.NewTimer(r.flushTimeout) flushC = timer.C } + + case <-flushC: + timer.Stop() + r.commitStateUpdates() + + flushC = nil + timer = nil } } } +func (r *Registrar) commitStateUpdates() { + // First clean up states + r.gcStates() + states := r.states.GetStates() + statesCurrent.Set(int64(len(states))) + + registryWrites.Inc() + + r.log.Debugf("Registry file updated. %d active states.", len(states)) + registrySuccess.Inc() + + if err := writeStates(r.store, states); err != nil { + r.log.Errorf("Error writing registrar state to statestore: %v", err) + registryFails.Inc() + } + + if r.out != nil { + r.out.Published(r.bufferedStateUpdates) + } + r.bufferedStateUpdates = 0 + +} + // onEvents processes events received from the publisher pipeline func (r *Registrar) onEvents(states []file.State) { r.processEventStates(states) @@ -323,7 +227,7 @@ func (r *Registrar) onEvents(states []file.State) { } } - logp.Debug("registrar", "Registrar state updates processed. Count: %v", len(states)) + r.log.Debugf("Registrar state updates processed. Count: %v", len(states)) // new set of events received -> mark state registry ready for next // cleanup phase in case gc'able events are stored in the registry. @@ -340,10 +244,13 @@ func (r *Registrar) gcStates() { } beforeCount := r.states.Count() - cleanedStates, pendingClean := r.states.Cleanup() + cleanedStates, pendingClean := r.states.CleanupWith(func(id string) { + // TODO: report error + r.store.Remove(fileStatePrefix + id) + }) statesCleanup.Add(int64(cleanedStates)) - logp.Debug("registrar", + r.log.Debugf( "Registrar states cleaned up. Before: %d, After: %d, Pending: %d", beforeCount, beforeCount-cleanedStates, pendingClean) @@ -353,7 +260,7 @@ func (r *Registrar) gcStates() { // processEventStates gets the states from the events and writes them to the registrar state func (r *Registrar) processEventStates(states []file.State) { - logp.Debug("registrar", "Processing %d events", len(states)) + r.log.Debugf("Processing %d events", len(states)) ts := time.Now() for i := range states { @@ -362,75 +269,43 @@ func (r *Registrar) processEventStates(states []file.State) { } } -// Stop stops the registry. It waits until Run function finished. -func (r *Registrar) Stop() { - logp.Info("Stopping Registrar") - close(r.done) - r.wg.Wait() -} - -func (r *Registrar) flushRegistry() { - if err := r.writeRegistry(); err != nil { - logp.Err("Writing of registry returned error: %v. Continuing...", err) - } - - if r.out != nil { - r.out.Published(r.bufferedStateUpdates) - } - r.bufferedStateUpdates = 0 -} +func readStatesFrom(store *statestore.Store) ([]file.State, error) { + var states []file.State -// writeRegistry writes the new json registry file to disk. -func (r *Registrar) writeRegistry() error { - // First clean up states - r.gcStates() - states := r.states.GetStates() - statesCurrent.Set(int64(len(states))) + err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + if !strings.HasPrefix(key, fileStatePrefix) { + return true, nil + } - registryWrites.Inc() + // try to decode. Ingore faulty/incompatible values. + var st file.State + if err := dec.Decode(&st); err != nil { + // XXX: Do we want to log here? In case we start to store other + // state types in the registry, then this operation will likely fail + // quite often, producing some false-positives in the logs... + return true, nil + } - tempfile, err := writeTmpFile(r.registryFile, r.fileMode, states) - if err != nil { - registryFails.Inc() - return err - } + st.Id = key[len(fileStatePrefix):] + states = append(states, st) + return true, nil + }) - err = helper.SafeFileRotate(r.registryFile, tempfile) if err != nil { - registryFails.Inc() - return err + return nil, err } - logp.Debug("registrar", "Registry file updated. %d states written.", len(states)) - registrySuccess.Inc() - - return nil + states = fixStates(states) + states = resetStates(states) + return states, nil } -func writeTmpFile(baseName string, perm os.FileMode, states []file.State) (string, error) { - logp.Debug("registrar", "Write registry file: %s (%v)", baseName, len(states)) - - tempfile := baseName + ".new" - f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm) - if err != nil { - logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, err) - return "", err - } - - defer f.Close() - - encoder := json.NewEncoder(f) - - if err := encoder.Encode(states); err != nil { - logp.Err("Error when encoding the states: %s", err) - return "", err - } - - // Commit the changes to storage to avoid corrupt registry files - if err = f.Sync(); err != nil { - logp.Err("Error when syncing new registry file contents: %s", err) - return "", err +func writeStates(store *statestore.Store, states []file.State) error { + for i := range states { + key := fileStatePrefix + states[i].ID() + if err := store.Set(key, states[i]); err != nil { + return err + } } - - return tempfile, nil + return nil } diff --git a/filebeat/registrar/registrar_test.go b/filebeat/registrar/registrar_test.go deleted file mode 100644 index 58d5d5acff81..000000000000 --- a/filebeat/registrar/registrar_test.go +++ /dev/null @@ -1,189 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package registrar - -import ( - "reflect" - "sort" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/filebeat/input/file" -) - -func TestRegistrarRead(t *testing.T) { - type testCase struct { - input string - expected []file.State - } - - zone := time.FixedZone("+0000", 0) - - cases := map[string]testCase{ - "ok registry with one entry": testCase{ - input: `[ - { - "type": "log", - "source": "test.log", - "offset": 10, - "timestamp": "2018-07-16T10:45:01+00:00", - "ttl": -1, - "meta": null - } - ]`, - expected: []file.State{ - { - Type: "log", - Source: "test.log", - Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), - Offset: 10, - TTL: -2, // loader always resets states - }, - }, - }, - - "load config without meta": testCase{ - input: `[ - { - "type": "log", - "source": "test.log", - "offset": 10, - "timestamp": "2018-07-16T10:45:01+00:00", - "ttl": -1 - } - ]`, - expected: []file.State{ - { - Type: "log", - Source: "test.log", - Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), - Offset: 10, - TTL: -2, // loader always resets states - }, - }, - }, - - "load config with empty meta": testCase{ - input: `[ - { - "type": "log", - "source": "test.log", - "offset": 10, - "timestamp": "2018-07-16T10:45:01+00:00", - "ttl": -1, - "meta": {} - } - ]`, - expected: []file.State{ - { - Type: "log", - Source: "test.log", - Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), - Offset: 10, - TTL: -2, // loader always resets states - }, - }, - }, - - "requires merge without meta-data": testCase{ - input: `[ - { - "type": "log", - "source": "test.log", - "offset": 100, - "timestamp": "2018-07-16T10:45:01+00:00", - "ttl": -1, - "meta": {} - }, - { - "type": "log", - "source": "test.log", - "offset": 10, - "timestamp": "2018-07-16T10:45:10+00:00", - "ttl": -1, - "meta": null - } - ]`, - expected: []file.State{ - { - Type: "log", - Source: "test.log", - Timestamp: time.Date(2018, time.July, 16, 10, 45, 10, 0, zone), - Offset: 100, - TTL: -2, // loader always resets states - Meta: nil, - }, - }, - }, - } - - matchState := func(t *testing.T, i int, expected, actual file.State) { - check := func(name string, a, b interface{}) { - if !reflect.DeepEqual(a, b) { - t.Errorf("State %v: %v mismatch (expected=%v, actual=%v)", i, name, a, b) - } - } - - check("id", expected.ID(), actual.ID()) - check("source", expected.Source, actual.Source) - check("offset", expected.Offset, actual.Offset) - check("ttl", expected.TTL, actual.TTL) - check("meta", expected.Meta, actual.Meta) - check("type", expected.Type, actual.Type) - - if t1, t2 := expected.Timestamp, actual.Timestamp; !t1.Equal(t2) { - t.Errorf("State %v: timestamp mismatch (expected=%v, actual=%v)", i, t1, t2) - } - } - - for name, test := range cases { - test := test - t.Run(name, func(t *testing.T) { - in := strings.NewReader(test.input) - - states, err := readStatesFrom(in) - if !assert.NoError(t, err) { - return - } - - actual := sortedStates(states) - expected := sortedStates(test.expected) - if len(actual) != len(expected) { - t.Errorf("expected %v state, but registrar did load %v states", - len(expected), len(actual)) - return - } - - for i := range expected { - matchState(t, i, expected[i], actual[i]) - } - }) - } -} - -func sortedStates(states []file.State) []file.State { - tmp := make([]file.State, len(states)) - copy(tmp, states) - sort.Slice(tmp, func(i, j int) bool { - return tmp[i].ID() < tmp[j].ID() - }) - return tmp -} diff --git a/filebeat/tests/system/filebeat.py b/filebeat/tests/system/filebeat.py index 39c145184769..ca8a86073600 100644 --- a/filebeat/tests/system/filebeat.py +++ b/filebeat/tests/system/filebeat.py @@ -8,7 +8,7 @@ from beat.beat import TestCase, TimeoutError, REGEXP_TYPE -default_registry_file = 'registry/filebeat/data.json' +default_registry_path = 'registry/filebeat' class BaseTest(TestCase): @@ -105,16 +105,17 @@ class Registry: def __init__(self, home, name=None): if not name: - name = default_registry_file + name = default_registry_path self.path = os.path.join(home, name) + self._meta_path = os.path.join(self.path, "meta.json") + self._log_path = os.path.join(self.path, "log.json") + self._active_path = os.path.join(self.path, "active.dat") def exists(self): - return os.path.isfile(self.path) + return os.path.isfile(self._log_path) def load(self, filter=None): - with open(self.path) as f: - entries = json.load(f) - + entries = self._read_registry() if filter: entries = [x for x in entries if filter(x)] return entries @@ -124,6 +125,32 @@ def count(self, filter=None): return 0 return len(self.load(filter=filter)) + def _read_registry(self): + data = {} + data_file_path = None + if os.path.isfile(self._active_path): + with open(self._active_path) as f: + data_file_path = f.read().strip() + if data_file_path and os.path.isfile(data_file_path): + with open(data_file_path) as f: + data = dict((x['_key'], x) for x in json.load(f)) + + with open(self._log_path) as f: + try: + iter_objs = (json.loads(line) for line in f) + for action, entry in zip(iter_objs, iter_objs): + if action['op'] == 'remove': + if entry['k'] in data: + del data[entry['k']] + elif action['op'] == 'set': + v = entry['v'] + v['_key'] = entry['k'] + data[entry['k']] = v + except ValueError: # log file was incomplete, ignore mid writes + pass + + return list(data.values()) + class LogState: def __init__(self, path): diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index 2c706b2a3c4e..be2e4f42b8ff 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -109,8 +109,7 @@ def test_close_removed(self): # Make sure state is written self.wait_until( - lambda: self.log_contains_count( - "Write registry file") > 1, + lambda: len(self.get_registry()) > 0, max_timeout=10) # Wait until error shows up on windows @@ -595,10 +594,9 @@ def test_symlink_rotated(self): filebeat = self.start_beat() - # Make sure state is written + # Make sure some state is written self.wait_until( - lambda: self.log_contains_count( - "Write registry file") > 1, + lambda: len(self.get_registry()) > 0, max_timeout=10) # Make sure symlink is skipped diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index ed2e04d2c07c..5dda351a5f96 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -40,7 +40,7 @@ def test_registrar_file_content(self): testfile.close() filebeat = self.start_beat() - count = self.log_contains_count("states written") + count = self.log_contains_count("Registry file updated") self.wait_until( lambda: self.output_has(lines=5), @@ -48,7 +48,7 @@ def test_registrar_file_content(self): # Make sure states written appears one more time self.wait_until( - lambda: self.log_contains("states written") > count, + lambda: self.log_contains("Registry file updated") > count, max_timeout=10) # wait until the registry file exist. Needed to avoid a race between @@ -68,7 +68,7 @@ def test_registrar_file_content(self): "offset": iterations * line_len, }, record) self.assertTrue("FileStateOS" in record) - self.assertIsNone(record["meta"]) + self.assertTrue("meta" not in record) file_state_os = record["FileStateOS"] if os.name == "nt": @@ -150,10 +150,10 @@ def test_custom_registry_file_location(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: self.has_registry("a/b/c/registry/filebeat/data.json"), + lambda: self.has_registry("a/b/c/registry/filebeat"), max_timeout=1) filebeat.check_kill_and_wait() - assert self.has_registry("a/b/c/registry/filebeat/data.json") + assert self.has_registry("a/b/c/registry/filebeat") def test_registry_file_default_permissions(self): """ @@ -166,7 +166,7 @@ def test_registry_file_default_permissions(self): raise SkipTest registry_home = "a/b/c/registry" - registry_file = os.path.join(registry_home, "filebeat/data.json") + registry_path = os.path.join(registry_home, "filebeat") self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", @@ -183,11 +183,11 @@ def test_registry_file_default_permissions(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: self.has_registry(registry_file), + lambda: self.has_registry(registry_path), max_timeout=1) filebeat.check_kill_and_wait() - self.assertEqual(self.file_permissions(registry_file), "0o600") + self.assertEqual(self.file_permissions(os.path.join(registry_path, "log.json")), "0o600") def test_registry_file_custom_permissions(self): """ @@ -200,7 +200,7 @@ def test_registry_file_custom_permissions(self): raise SkipTest registry_home = "a/b/c/registry" - registry_file = os.path.join(registry_home, "filebeat/data.json") + registry_path = os.path.join(registry_home, "filebeat") self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", @@ -218,11 +218,11 @@ def test_registry_file_custom_permissions(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: self.has_registry(registry_file), + lambda: self.has_registry(registry_path), max_timeout=1) filebeat.check_kill_and_wait() - self.assertEqual(self.file_permissions(registry_file), "0o640") + self.assertEqual(self.file_permissions(os.path.join(registry_path, "log.json")), "0o640") def test_registry_file_update_permissions(self): """ @@ -235,7 +235,7 @@ def test_registry_file_update_permissions(self): raise SkipTest registry_home = "a/b/c/registry_x" - registry_file = os.path.join(registry_home, "filebeat/data.json") + registry_path = os.path.join(registry_home, "filebeat") self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", @@ -252,11 +252,11 @@ def test_registry_file_update_permissions(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: self.has_registry(registry_file), + lambda: self.has_registry(registry_path), max_timeout=1) filebeat.check_kill_and_wait() - self.assertEqual(self.file_permissions(registry_file), "0o600") + self.assertEqual(self.file_permissions(os.path.join(registry_path, "log.json")), "0o600") self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", @@ -271,7 +271,7 @@ def test_registry_file_update_permissions(self): # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( - lambda: self.has_registry(registry_file), + lambda: self.has_registry(registry_path), max_timeout=1) # Wait a moment to make sure registry is completely written @@ -279,7 +279,7 @@ def test_registry_file_update_permissions(self): filebeat.check_kill_and_wait() - self.assertEqual(self.file_permissions(registry_file), "0o640") + self.assertEqual(self.file_permissions(os.path.join(registry_path, "log.json")), "0o640") def test_rotating_file(self): """ @@ -380,7 +380,7 @@ def test_rotating_file_inode(self): # Wait until rotation is detected self.wait_until( lambda: self.log_contains_count( - "Registry file updated. 1 states written") >= 1, + "Registry file updated. 1 active states") >= 1, max_timeout=10) data = self.get_registry() @@ -476,7 +476,7 @@ def test_restart_continue(self): filebeat.check_kill_and_wait() # Store first registry file - registry_file = "registry/filebeat/data.json" + registry_file = "registry/filebeat/log.json" shutil.copyfile( self.working_dir + "/" + registry_file, self.working_dir + "/registry.first", @@ -577,7 +577,7 @@ def test_rotating_file_with_restart(self): filebeat.check_kill_and_wait() # Store first registry file - registry_file = "registry/filebeat/data.json" + registry_file = "registry/filebeat/log.json" shutil.copyfile( self.working_dir + "/" + registry_file, self.working_dir + "/registry.first", @@ -680,7 +680,7 @@ def test_state_after_rotation(self): self.wait_until( lambda: self.log_contains_count( - "Registry file updated. 2 states written.") >= 1, + "Registry file updated. 2 active states.") >= 1, max_timeout=15) time.sleep(1) @@ -756,7 +756,7 @@ def test_state_after_rotation_ignore_older(self): self.wait_until( lambda: self.log_contains_count( - "Registry file updated. 2 states written.") >= 1, + "Registry file updated. 2 active states.") >= 1, max_timeout=15) # Wait a moment to make sure registry is completely written @@ -858,15 +858,11 @@ def test_clean_removed(self): self.wait_until(lambda: self.output_has(lines=3), max_timeout=10) # Make sure all states are cleaned up - self.wait_until(self.logs.nextCheck(re.compile("Registrar.*After: 1"))) - + self.wait_until(lambda: self.registry.count() == 1) filebeat.check_kill_and_wait() - # Check that the first to files were removed from the registry - data = self.registry.load() - assert len(data) == 1 - # Make sure the last file in the registry is the correct one and has the correct offset + data = self.registry.load() assert data[0]["offset"] == self.input_logs.size(file2) @unittest.skipIf(os.name == 'nt', 'flaky test https://github.com/elastic/beats/issues/10606') @@ -899,12 +895,7 @@ def test_clean_removed_with_clean_inactive(self): # Wait until registry file is created self.wait_until( - self.logs.nextCheck("Registry file updated. 2 states written."), - max_timeout=15) - - count = self.registry.count() - print("registry size: {}".format(count)) - assert count == 2 + lambda: len(self.get_registry()) == 2, max_timeout=10) self.input_logs.remove(file1) @@ -918,10 +909,8 @@ def test_clean_removed_with_clean_inactive(self): # wait until next gc and until registry file has been updated self.wait_until(self.logs.check("Before: 1, After: 1, Pending: 1")) - self.wait_until(self.logs.nextCheck("Registry file updated. 1 states written.")) - count = self.registry.count() - print("registry size after remove: {}".format(count)) - assert count == 1 + self.wait_until( + lambda: len(self.get_registry()) == 1, max_timeout=10) filebeat.check_kill_and_wait() @@ -932,71 +921,6 @@ def test_clean_removed_with_clean_inactive(self): # Make sure the last file in the registry is the correct one and has the correct offset assert data[0]["offset"] == self.input_logs.size(file2) - def test_symlink_failure(self): - """ - Test that filebeat does not start if a symlink is set as registry file - """ - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/*", - ) - os.mkdir(self.working_dir + "/log/") - - testfile_path = self.working_dir + "/log/test.log" - with open(testfile_path, 'w') as testfile: - testfile.write("Hello World\n") - - registry_file = self.working_dir + "/registry/filebeat/data.json" - link_to_file = self.working_dir + "registry.data" - os.makedirs(self.working_dir + "/registry/filebeat") - - with open(link_to_file, 'w') as f: - f.write("[]") - - if os.name == "nt": - import win32file # pylint: disable=import-error - win32file.CreateSymbolicLink(registry_file, link_to_file, 0) - else: - os.symlink(link_to_file, registry_file) - - filebeat = self.start_beat() - - # Make sure states written appears one more time - self.wait_until( - lambda: self.log_contains("Exiting: Registry file path is not a regular file"), - max_timeout=10) - - filebeat.check_kill_and_wait(exit_code=1) - - def test_invalid_state(self): - """ - Test that filebeat fails starting if invalid state in registry - """ - - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/*", - ) - os.mkdir(self.working_dir + "/log/") - registry_file = self.working_dir + "/registry" - - testfile_path = self.working_dir + "/log/test.log" - with open(testfile_path, 'w') as testfile: - testfile.write("Hello World\n") - - registry_file_path = self.working_dir + "/registry" - with open(registry_file_path, 'w') as registry_file: - # Write invalid state - registry_file.write("Hello World") - - filebeat = self.start_beat() - - # Make sure states written appears one more time - self.wait_until( - lambda: self.log_contains( - "Exiting: Could not start registrar: Error loading state"), - max_timeout=10) - - filebeat.check_kill_and_wait(exit_code=1) - def test_restart_state(self): """ Make sure that states are rewritten correctly on restart and cleaned @@ -1122,15 +1046,12 @@ def test_restart_state_reset_ttl(self): lambda: self.output_has(lines=1), max_timeout=30) - self.wait_until( - lambda: self.log_contains("Registry file updated. 1 states written.", - logfile="filebeat.log"), max_timeout=10) + self.wait_until(lambda: self.registry.count() == 1, max_timeout=10) filebeat.check_kill_and_wait() # Check that ttl > 0 was set because of clean_inactive data = self.get_registry() - assert len(data) == 1 assert data[0]["ttl"] == 20 * 1000 * 1000 * 1000 # New config file which does not match the existing clean_inactive @@ -1180,16 +1101,12 @@ def test_restart_state_reset_ttl_with_space(self): self.wait_until( lambda: self.output_has(lines=1), max_timeout=30) - - self.wait_until( - lambda: self.log_contains("Registry file updated. 1 states written.", - logfile="filebeat.log"), max_timeout=10) + self.wait_until(lambda: self.registry.count() == 1) filebeat.check_kill_and_wait() # Check that ttl > 0 was set because of clean_inactive data = self.get_registry() - assert len(data) == 1 assert data[0]["ttl"] == 20 * 1000 * 1000 * 1000 # new config file with other clean_inactive @@ -1297,8 +1214,7 @@ def test_ignore_older_state(self): # Make sure state is written self.wait_until( - lambda: self.log_contains("Registry file updated. 1 states written."), - max_timeout=10) + lambda: len(self.get_registry()) == 1, max_timeout=10) filebeat.check_kill_and_wait() @@ -1347,8 +1263,7 @@ def test_ignore_older_state_clean_inactive(self): # Make sure state is written self.wait_until( - lambda: self.log_contains("Registry file updated. 0 states written."), - max_timeout=10) + lambda: len(self.get_registry()) == 0, max_timeout=10) filebeat.check_kill_and_wait() diff --git a/filebeat/tests/system/test_registrar_upgrade.py b/filebeat/tests/system/test_registrar_upgrade.py index 84d5ac8b4ff7..7a077caea510 100644 --- a/filebeat/tests/system/test_registrar_upgrade.py +++ b/filebeat/tests/system/test_registrar_upgrade.py @@ -94,4 +94,6 @@ def validate_if_registry_is_moved_under_folder(self): migrated_registry_dir = os.path.abspath(self.working_dir + "/registry") assert os.path.isdir(migrated_registry_dir) assert os.path.isdir(migrated_registry_dir + "/filebeat") - assert os.path.isfile(migrated_registry_dir + "/filebeat/data.json") + assert os.path.isfile(migrated_registry_dir + "/filebeat/log.json") + assert os.path.isfile(migrated_registry_dir + "/filebeat/1.json") + assert os.path.isfile(migrated_registry_dir + "/filebeat/active.dat") diff --git a/go.mod b/go.mod index f21c09c48881..60183e550147 100644 --- a/go.mod +++ b/go.mod @@ -107,7 +107,7 @@ require ( github.com/jstemmer/go-junit-report v0.9.1 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/lib/pq v1.1.2-0.20190507191818-2ff3cb3adc01 - github.com/magefile/mage v1.9.0 + github.com/magefile/mage v1.10.0 github.com/mailru/easyjson v0.7.1 // indirect github.com/mattn/go-colorable v0.0.8 github.com/mattn/go-ieproxy v0.0.0-20191113090002-7c0f6868bffe // indirect diff --git a/go.sum b/go.sum index 848a4b465daf..cdce365cbb38 100644 --- a/go.sum +++ b/go.sum @@ -484,6 +484,8 @@ github.com/lib/pq v1.1.2-0.20190507191818-2ff3cb3adc01 h1:EPw7R3OAyxHBCyl0oqh3lU github.com/lib/pq v1.1.2-0.20190507191818-2ff3cb3adc01/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/magefile/mage v1.9.0 h1:t3AU2wNwehMCW97vuqQLtw6puppWXHO+O2MHo5a50XE= github.com/magefile/mage v1.9.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/magefile/mage v1.10.0 h1:3HiXzCUY12kh9bIuyXShaVe529fJfyqoVM42o/uom2g= +github.com/magefile/mage v1.10.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.1 h1:mdxE1MF9o53iCb2Ghj1VfWvh7ZOwHpnVG/xwXrV90U8= github.com/mailru/easyjson v0.7.1/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= diff --git a/libbeat/common/file/file_other.go b/libbeat/common/file/file_other.go index 894183cee320..599108f480b3 100644 --- a/libbeat/common/file/file_other.go +++ b/libbeat/common/file/file_other.go @@ -26,8 +26,8 @@ import ( ) type StateOS struct { - Inode uint64 `json:"inode,"` - Device uint64 `json:"device,"` + Inode uint64 `json:"inode," struct:"inode"` + Device uint64 `json:"device," struct:"device"` } // GetOSState returns the FileStateOS for non windows systems diff --git a/libbeat/common/file/file_windows.go b/libbeat/common/file/file_windows.go index c5a4d6ef3641..1a9ac6e1c769 100644 --- a/libbeat/common/file/file_windows.go +++ b/libbeat/common/file/file_windows.go @@ -29,9 +29,9 @@ import ( ) type StateOS struct { - IdxHi uint64 `json:"idxhi,"` - IdxLo uint64 `json:"idxlo,"` - Vol uint64 `json:"vol,"` + IdxHi uint64 `json:"idxhi," struct:"idxhi"` + IdxLo uint64 `json:"idxlo," struct:"idxlo"` + Vol uint64 `json:"vol," struct:"vol"` } var ( diff --git a/libbeat/statestore/backend/memlog/util_darwin.go b/libbeat/statestore/backend/memlog/util_darwin.go index 1aaf64ab42a8..21861d471afb 100644 --- a/libbeat/statestore/backend/memlog/util_darwin.go +++ b/libbeat/statestore/backend/memlog/util_darwin.go @@ -19,17 +19,20 @@ package memlog import ( "os" + "syscall" "golang.org/x/sys/unix" ) +var errno0 = syscall.Errno(0) + // syncFile implements the fsync operation for darwin. On darwin fsync is not // reliable, instead the fcntl syscall with F_FULLFSYNC must be used. func syncFile(f *os.File) error { for { _, err := unix.FcntlInt(f.Fd(), unix.F_FULLFSYNC, 0) - err = normalizeIOError(err) - if err == nil || isIOError(err) { + rootCause := errorRootCause(err) + if err == nil || isIOError(rootCause) { return err } @@ -44,3 +47,28 @@ func syncFile(f *os.File) error { return err } } + +func isIOError(err error) bool { + return err == unix.EIO || + // space/quota + err == unix.ENOSPC || err == unix.EDQUOT || err == unix.EFBIG || + // network + err == unix.ECONNRESET || err == unix.ENETDOWN || err == unix.ENETUNREACH +} + +// normalizeSysError returns the underlying error or nil, if the underlying +// error indicates it is no error. +func errorRootCause(err error) error { + for err != nil { + u, ok := err.(interface{ Unwrap() error }) + if !ok { + break + } + err = u.Unwrap() + } + + if err == nil || err == errno0 { + return nil + } + return err +}