Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase the monitor throughput: increase the default fetch size, make it configurable. #171

Merged
merged 2 commits into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er
g, ctx := errgroup.WithContext(ctx)

// Coordinator policy monitor
pim, err := monitor.New(dl.FleetPolicies, es)
pim, err := monitor.New(dl.FleetPolicies, es, monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize))
if err != nil {
return err
}
Expand All @@ -518,8 +518,7 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er
var ad *action.Dispatcher
var tr *action.TokenResolver

// Behind the feature flag
am, err = monitor.NewSimple(dl.FleetActions, es, monitor.WithExpiration(true))
am, err = monitor.NewSimple(dl.FleetActions, es, monitor.WithExpiration(true), monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize))
if err != nil {
return err
}
Expand Down
12 changes: 12 additions & 0 deletions internal/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func TestConfig(t *testing.T) {
NumCounters: defaultCacheNumCounters,
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
},
},
},
Logging: Logging{
Expand Down Expand Up @@ -124,6 +127,9 @@ func TestConfig(t *testing.T) {
NumCounters: defaultCacheNumCounters,
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
},
},
},
Logging: Logging{
Expand Down Expand Up @@ -181,6 +187,9 @@ func TestConfig(t *testing.T) {
NumCounters: defaultCacheNumCounters,
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
},
},
},
Logging: Logging{
Expand Down Expand Up @@ -238,6 +247,9 @@ func TestConfig(t *testing.T) {
NumCounters: defaultCacheNumCounters,
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
},
},
},
Logging: Logging{
Expand Down
10 changes: 6 additions & 4 deletions internal/pkg/config/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,19 @@ func (c *Server) BindAddress() string {

// Input is the input defined by Agent to run Fleet Server.
type Input struct {
Type string `config:"type"`
Policy Policy `config:"policy"`
Server Server `config:"server"`
Cache Cache `config:"cache"`
Type string `config:"type"`
Policy Policy `config:"policy"`
Server Server `config:"server"`
Cache Cache `config:"cache"`
Monitor Monitor `config:"monitor"`
}

// InitDefaults initializes the defaults for the configuration.
func (c *Input) InitDefaults() {
c.Type = "fleet-server"
c.Server.InitDefaults()
c.Cache.InitDefaults()
c.Monitor.InitDefaults()
}

// Validate ensures that the configuration is valid.
Expand Down
17 changes: 17 additions & 0 deletions internal/pkg/config/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package config

const (
defaultFetchSize = 1000
)

type Monitor struct {
FetchSize int `config:"fetch_size"`
}

func (m *Monitor) InitDefaults() {
m.FetchSize = defaultFetchSize
}
20 changes: 18 additions & 2 deletions internal/pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ const (
defaultCheckInterval = 1 * time.Second // check every second for the new action
defaultSeqNo = int64(-1) // the _seq_no in elasticsearch start with 0
defaultWithExpiration = false
defaultFetchSize = 10

tightLoopCheckInterval = 50 * time.Millisecond // when we get a full page (fetchSize) of documents, use this interval to repeatedly poll for more records
// Making the default fetch size larger, in order to increase the throughput of the monitor.
// This is configurable as well, so can be adjusted based on the memory size of the container if needed.
// Seems like the usage of smaller actions, one or few agents in the action document would be more prevalent in the future.
// For example, as of now the current size of osquery action JSON document for 1000 agents is 40KB.
// Assuiming the worst case scenario of 1000 of document fetched, we are looking at 50MB slice.
// One action can be split up into multiple documents up to the 1000 agents per action if needed.
defaultFetchSize = 1000

tightLoopCheckInterval = 10 * time.Millisecond // when we get a full page (fetchSize) of documents, use this interval to repeatedly poll for more records
)

const (
Expand Down Expand Up @@ -130,6 +137,15 @@ func NewSimple(index string, cli *elasticsearch.Client, opts ...Option) (SimpleM
return m, nil
}

// WithCheckInterval sets a periodic check interval
func WithFetchSize(fetchSize int) Option {
return func(m SimpleMonitor) {
if fetchSize > 0 {
m.(*simpleMonitorT).fetchSize = fetchSize
}
}
}

// WithCheckInterval sets a periodic check interval
func WithCheckInterval(interval time.Duration) Option {
return func(m SimpleMonitor) {
Expand Down