Skip to content

Commit

Permalink
Merge pull request #82 from aleksmaus/improve/monitor_poll
Browse files Browse the repository at this point in the history
Reduce actions fetching interval if the full page of action documents was fetched
  • Loading branch information
aleksmaus authored Feb 3, 2021
2 parents 19fd842 + c61e2bd commit c50af7d
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 8 deletions.
4 changes: 4 additions & 0 deletions internal/pkg/dl/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
const (
FieldAgents = "agents"
FieldExpiration = "expiration"

maxAgentActionsFetchSize = 100
)

var (
Expand Down Expand Up @@ -44,6 +46,8 @@ func prepareFindAgentActions() *dsl.Tmpl {

filter.Terms(FieldAgents, tmpl.Bind(FieldAgents), nil)

// Select more actions per agent since the agents array is not loaded
root.Size(maxAgentActionsFetchSize)
root.Source().Excludes(FieldAgents)

tmpl.MustResolve(root)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 22 additions & 6 deletions internal/pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
)

const (
defaultCheckInterval = 1 // check every second for the new action
defaultSeqNo = int64(-1) // the _seq_no in elasticsearch start with 0
defaultCheckInterval = 1 * time.Second // check every second for the new action
defaultSeqNo = int64(-1) // the _seq_no in elasticsearch start with 0
defaultWithExpiration = false
defaultFetchSize = 10

tightLoopCheckInterval = 50 * time.Millisecond // when we get a full page (fetchSize) of documents, use this interval to repeatedly poll for more records
)

const (
Expand Down Expand Up @@ -80,6 +83,7 @@ type simpleMonitorT struct {
index string
checkInterval time.Duration
withExpiration bool
fetchSize int

checkpoint int64 // index global checkpoint

Expand All @@ -98,8 +102,9 @@ func NewSimple(index string, cli *elasticsearch.Client, opts ...Option) (SimpleM
m := &simpleMonitorT{
index: index,
cli: cli,
checkInterval: defaultCheckInterval * time.Second,
checkInterval: defaultCheckInterval,
withExpiration: defaultWithExpiration,
fetchSize: defaultFetchSize,
checkpoint: defaultSeqNo,
outCh: make(chan []es.HitT, 1),
}
Expand Down Expand Up @@ -199,29 +204,39 @@ func (m *simpleMonitorT) Run(ctx context.Context) (err error) {
for {
select {
case <-t.C:
interval := m.checkInterval

hits, err := m.check(ctx)
if err != nil {
m.log.Error().Err(err).Msg("failed checking new documents")
} else {
m.notify(ctx, hits)
count := m.notify(ctx, hits)

// Change check interval if fetched the full page (m.fetchSize) of documents
if count == m.fetchSize {
m.log.Debug().Int("count", count).Dur("wait_next_check", interval).Msg("tight loop check")
interval = tightLoopCheckInterval
}
}
t.Reset(m.checkInterval)
t.Reset(interval)
case <-ctx.Done():
return ctx.Err()
}
}
}

func (m *simpleMonitorT) notify(ctx context.Context, hits []es.HitT) {
func (m *simpleMonitorT) notify(ctx context.Context, hits []es.HitT) int {
sz := len(hits)
if sz > 0 {
select {
case m.outCh <- hits:
maxVal := hits[sz-1].SeqNo
m.storeCheckpoint(maxVal)
return sz
case <-ctx.Done():
}
}
return 0
}

func (m *simpleMonitorT) check(ctx context.Context) ([]es.HitT, error) {
Expand Down Expand Up @@ -322,6 +337,7 @@ func (m *simpleMonitorT) prepareCheckQuery() (tmpl *dsl.Tmpl, err error) {
// Prepares full documents query
func (m *simpleMonitorT) prepareQuery() (tmpl *dsl.Tmpl, err error) {
tmpl, root := m.prepareCommon(true)
root.Size(uint64(m.fetchSize))
root.Sort().SortOrder(fieldSeqNo, dsl.SortAscend)

if err := tmpl.Resolve(root); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion model/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"format": "date-time"
},
"type": {
"description": "The action type. APP_ACTION is the value for the actions that suppose to be routed to the endpoints/beats.",
"description": "The action type. INPUT_ACTION is the value for the actions that suppose to be routed to the endpoints/beats.",
"type": "string"
},
"input_type": {
Expand Down

0 comments on commit c50af7d

Please sign in to comment.