Skip to content

Commit

Permalink
Cherry-pick #19633 to 7.x: Update filebeat registrar to use statestore (
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored Jul 8, 2020
1 parent a36910a commit 399c026
Show file tree
Hide file tree
Showing 16 changed files with 602 additions and 630 deletions.
6 changes: 3 additions & 3 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10825,11 +10825,11 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI

--------------------------------------------------------------------------------
Dependency : github.com/magefile/mage
Version: v1.9.0
Version: v1.10.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/magefile/mage@v1.9.0/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/magefile/mage@v1.10.0/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down Expand Up @@ -11019,7 +11019,7 @@ Contents of probable licence file $GOMODCACHE/github.com/magefile/[email protected]/LI
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright {yyyy} {name of copyright owner}
Copyright 2017 the Mage authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
15 changes: 14 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,21 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
finishedLogger := newFinishedLogger(wgEvents)

registryMigrator := registrar.NewMigrator(config.Registry)
if err := registryMigrator.Run(); err != nil {
logp.Err("Failed to migrate registry file: %+v", err)
return err
}

stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry)
if err != nil {
logp.Err("Failed to open state store: %+v", err)
return err
}
defer stateStore.Close()

// Setup registrar to persist state
registrar, err := registrar.New(config.Registry, finishedLogger)
registrar, err := registrar.New(stateStore, finishedLogger, config.Registry.FlushTimeout)
if err != nil {
logp.Err("Could not init registrar: %v", err)
return err
Expand Down
63 changes: 63 additions & 0 deletions filebeat/beater/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"time"

"github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/paths"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/backend/memlog"
)

type filebeatStore struct {
registry *statestore.Registry
storeName string
cleanInterval time.Duration
}

func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
memlog, err := memlog.New(logger, memlog.Settings{
Root: paths.Resolve(paths.Data, cfg.Path),
FileMode: cfg.Permissions,
})
if err != nil {
return nil, err
}

return &filebeatStore{
registry: statestore.NewRegistry(memlog),
storeName: info.Beat,
cleanInterval: cfg.CleanInterval,
}, nil
}

func (s *filebeatStore) Close() {
s.registry.Close()
}

func (s *filebeatStore) Access() (*statestore.Store, error) {
return s.registry.Get(s.storeName)
}

func (s *filebeatStore) CleanupInterval() time.Duration {
return s.cleanInterval
}
16 changes: 9 additions & 7 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,20 @@ type Config struct {
}

type Registry struct {
Path string `config:"path"`
Permissions os.FileMode `config:"file_permissions"`
FlushTimeout time.Duration `config:"flush"`
MigrateFile string `config:"migrate_file"`
Path string `config:"path"`
Permissions os.FileMode `config:"file_permissions"`
FlushTimeout time.Duration `config:"flush"`
CleanInterval time.Duration `config:"cleanup_interval"`
MigrateFile string `config:"migrate_file"`
}

var (
DefaultConfig = Config{
Registry: Registry{
Path: "registry",
Permissions: 0600,
MigrateFile: "",
Path: "registry",
Permissions: 0600,
MigrateFile: "",
CleanInterval: 5 * time.Minute,
},
ShutdownTimeout: 0,
OverwritePipelines: false,
Expand Down
Loading

0 comments on commit 399c026

Please sign in to comment.