Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce actions fetching interval if the full page of action documents was fetched #82

Merged
merged 2 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/pkg/dl/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
const (
FieldAgents = "agents"
FieldExpiration = "expiration"

maxAgentActionsFetchSize = 100
)

var (
Expand Down Expand Up @@ -44,6 +46,8 @@ func prepareFindAgentActions() *dsl.Tmpl {

filter.Terms(FieldAgents, tmpl.Bind(FieldAgents), nil)

// Select more actions per agent since the agents array is not loaded
root.Size(maxAgentActionsFetchSize)
root.Source().Excludes(FieldAgents)

tmpl.MustResolve(root)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 22 additions & 6 deletions internal/pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
)

const (
defaultCheckInterval = 1 // check every second for the new action
defaultSeqNo = int64(-1) // the _seq_no in elasticsearch start with 0
defaultCheckInterval = 1 * time.Second // check every second for the new action
defaultSeqNo = int64(-1) // the _seq_no in elasticsearch start with 0
defaultWithExpiration = false
defaultFetchSize = 10

tightLoopCheckInterval = 50 * time.Millisecond // when we get a full page (fetchSize) of documents, use this interval to repeatedly poll for more records
)

const (
Expand Down Expand Up @@ -80,6 +83,7 @@ type simpleMonitorT struct {
index string
checkInterval time.Duration
withExpiration bool
fetchSize int

checkpoint int64 // index global checkpoint

Expand All @@ -98,8 +102,9 @@ func NewSimple(index string, cli *elasticsearch.Client, opts ...Option) (SimpleM
m := &simpleMonitorT{
index: index,
cli: cli,
checkInterval: defaultCheckInterval * time.Second,
checkInterval: defaultCheckInterval,
withExpiration: defaultWithExpiration,
fetchSize: defaultFetchSize,
checkpoint: defaultSeqNo,
outCh: make(chan []es.HitT, 1),
}
Expand Down Expand Up @@ -199,29 +204,39 @@ func (m *simpleMonitorT) Run(ctx context.Context) (err error) {
for {
select {
case <-t.C:
interval := m.checkInterval

hits, err := m.check(ctx)
if err != nil {
m.log.Error().Err(err).Msg("failed checking new documents")
} else {
m.notify(ctx, hits)
count := m.notify(ctx, hits)

// Change check interval if fetched the full page (m.fetchSize) of documents
if count == m.fetchSize {
m.log.Debug().Int("count", count).Dur("wait_next_check", interval).Msg("tight loop check")
interval = tightLoopCheckInterval
}
}
t.Reset(m.checkInterval)
t.Reset(interval)
case <-ctx.Done():
return ctx.Err()
}
}
}

func (m *simpleMonitorT) notify(ctx context.Context, hits []es.HitT) {
func (m *simpleMonitorT) notify(ctx context.Context, hits []es.HitT) int {
sz := len(hits)
if sz > 0 {
select {
case m.outCh <- hits:
maxVal := hits[sz-1].SeqNo
m.storeCheckpoint(maxVal)
return sz
case <-ctx.Done():
}
}
return 0
}

func (m *simpleMonitorT) check(ctx context.Context) ([]es.HitT, error) {
Expand Down Expand Up @@ -322,6 +337,7 @@ func (m *simpleMonitorT) prepareCheckQuery() (tmpl *dsl.Tmpl, err error) {
// Prepares full documents query
func (m *simpleMonitorT) prepareQuery() (tmpl *dsl.Tmpl, err error) {
tmpl, root := m.prepareCommon(true)
root.Size(uint64(m.fetchSize))
root.Sort().SortOrder(fieldSeqNo, dsl.SortAscend)

if err := tmpl.Resolve(root); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion model/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"format": "date-time"
},
"type": {
"description": "The action type. APP_ACTION is the value for the actions that suppose to be routed to the endpoints/beats.",
"description": "The action type. INPUT_ACTION is the value for the actions that suppose to be routed to the endpoints/beats.",
"type": "string"
},
"input_type": {
Expand Down