Skip to content

Commit

Permalink
Update filebeat registrar to use statestore (#19633)
Browse files Browse the repository at this point in the history
Update filebeat registrar, initialization and testing to use the
statestore as new persistent storage backend between restarts.

This PR swaps out the storage backend, adds support for migrating old registry files to the statestore, and updates existing tests in filebeat. The migration support extends the already existing migration support, supporting old registry files from the 6.x release as well (system tests start at 6.3).
  • Loading branch information
Steffen Siering authored Jul 6, 2020
1 parent a57e390 commit 5c719cf
Show file tree
Hide file tree
Showing 13 changed files with 596 additions and 626 deletions.
15 changes: 14 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,21 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
finishedLogger := newFinishedLogger(wgEvents)

registryMigrator := registrar.NewMigrator(config.Registry)
if err := registryMigrator.Run(); err != nil {
logp.Err("Failed to migrate registry file: %+v", err)
return err
}

stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry)
if err != nil {
logp.Err("Failed to open state store: %+v", err)
return err
}
defer stateStore.Close()

// Setup registrar to persist state
registrar, err := registrar.New(config.Registry, finishedLogger)
registrar, err := registrar.New(stateStore, finishedLogger, config.Registry.FlushTimeout)
if err != nil {
logp.Err("Could not init registrar: %v", err)
return err
Expand Down
63 changes: 63 additions & 0 deletions filebeat/beater/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"time"

"github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/paths"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/backend/memlog"
)

type filebeatStore struct {
registry *statestore.Registry
storeName string
cleanInterval time.Duration
}

func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
memlog, err := memlog.New(logger, memlog.Settings{
Root: paths.Resolve(paths.Data, cfg.Path),
FileMode: cfg.Permissions,
})
if err != nil {
return nil, err
}

return &filebeatStore{
registry: statestore.NewRegistry(memlog),
storeName: info.Beat,
cleanInterval: cfg.CleanInterval,
}, nil
}

func (s *filebeatStore) Close() {
s.registry.Close()
}

func (s *filebeatStore) Access() (*statestore.Store, error) {
return s.registry.Get(s.storeName)
}

func (s *filebeatStore) CleanupInterval() time.Duration {
return s.cleanInterval
}
16 changes: 9 additions & 7 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,20 @@ type Config struct {
}

type Registry struct {
Path string `config:"path"`
Permissions os.FileMode `config:"file_permissions"`
FlushTimeout time.Duration `config:"flush"`
MigrateFile string `config:"migrate_file"`
Path string `config:"path"`
Permissions os.FileMode `config:"file_permissions"`
FlushTimeout time.Duration `config:"flush"`
CleanInterval time.Duration `config:"cleanup_interval"`
MigrateFile string `config:"migrate_file"`
}

var (
DefaultConfig = Config{
Registry: Registry{
Path: "registry",
Permissions: 0600,
MigrateFile: "",
Path: "registry",
Permissions: 0600,
MigrateFile: "",
CleanInterval: 5 * time.Minute,
},
ShutdownTimeout: 0,
OverwritePipelines: false,
Expand Down
Loading

0 comments on commit 5c719cf

Please sign in to comment.