diff --git a/internal/pkg/dl/actions.go b/internal/pkg/dl/actions.go index 15b617241..9a2b34fe7 100644 --- a/internal/pkg/dl/actions.go +++ b/internal/pkg/dl/actions.go @@ -15,6 +15,8 @@ import ( const ( FieldAgents = "agents" FieldExpiration = "expiration" + + maxAgentActionsFetchSize = 100 ) var ( @@ -44,6 +46,8 @@ func prepareFindAgentActions() *dsl.Tmpl { filter.Terms(FieldAgents, tmpl.Bind(FieldAgents), nil) + // Select more actions per agent since the agents array is not loaded + root.Size(maxAgentActionsFetchSize) root.Source().Excludes(FieldAgents) tmpl.MustResolve(root) diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index 1d9c7e101..b4016fd60 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -51,7 +51,7 @@ type Action struct { // Date/time the action was created Timestamp string `json:"@timestamp,omitempty"` - // The action type. APP_ACTION is the value for the actions that suppose to be routed to the endpoints/beats. + // The action type. INPUT_ACTION is the value for the actions that suppose to be routed to the endpoints/beats. Type string `json:"type,omitempty"` } diff --git a/internal/pkg/monitor/monitor.go b/internal/pkg/monitor/monitor.go index bdea51fce..a9c726899 100644 --- a/internal/pkg/monitor/monitor.go +++ b/internal/pkg/monitor/monitor.go @@ -22,9 +22,12 @@ import ( ) const ( - defaultCheckInterval = 1 // check every second for the new action - defaultSeqNo = int64(-1) // the _seq_no in elasticsearch start with 0 + defaultCheckInterval = 1 * time.Second // check every second for the new action + defaultSeqNo = int64(-1) // the _seq_no in elasticsearch start with 0 defaultWithExpiration = false + defaultFetchSize = 10 + + tightLoopCheckInterval = 50 * time.Millisecond // when we get a full page (fetchSize) of documents, use this interval to repeatedly poll for more records ) const ( @@ -80,6 +83,7 @@ type simpleMonitorT struct { index string checkInterval time.Duration withExpiration bool + fetchSize int checkpoint int64 // index global checkpoint @@ -98,8 +102,9 @@ func NewSimple(index string, cli *elasticsearch.Client, opts ...Option) (SimpleM m := &simpleMonitorT{ index: index, cli: cli, - checkInterval: defaultCheckInterval * time.Second, + checkInterval: defaultCheckInterval, withExpiration: defaultWithExpiration, + fetchSize: defaultFetchSize, checkpoint: defaultSeqNo, outCh: make(chan []es.HitT, 1), } @@ -199,29 +204,39 @@ func (m *simpleMonitorT) Run(ctx context.Context) (err error) { for { select { case <-t.C: + interval := m.checkInterval + hits, err := m.check(ctx) if err != nil { m.log.Error().Err(err).Msg("failed checking new documents") } else { - m.notify(ctx, hits) + count := m.notify(ctx, hits) + + // Change check interval if fetched the full page (m.fetchSize) of documents + if count == m.fetchSize { + m.log.Debug().Int("count", count).Dur("wait_next_check", interval).Msg("tight loop check") + interval = tightLoopCheckInterval + } } - t.Reset(m.checkInterval) + t.Reset(interval) case <-ctx.Done(): return ctx.Err() } } } -func (m *simpleMonitorT) notify(ctx context.Context, hits []es.HitT) { +func (m *simpleMonitorT) notify(ctx context.Context, hits []es.HitT) int { sz := len(hits) if sz > 0 { select { case m.outCh <- hits: maxVal := hits[sz-1].SeqNo m.storeCheckpoint(maxVal) + return sz case <-ctx.Done(): } } + return 0 } func (m *simpleMonitorT) check(ctx context.Context) ([]es.HitT, error) { @@ -322,6 +337,7 @@ func (m *simpleMonitorT) prepareCheckQuery() (tmpl *dsl.Tmpl, err error) { // Prepares full documents query func (m *simpleMonitorT) prepareQuery() (tmpl *dsl.Tmpl, err error) { tmpl, root := m.prepareCommon(true) + root.Size(uint64(m.fetchSize)) root.Sort().SortOrder(fieldSeqNo, dsl.SortAscend) if err := tmpl.Resolve(root); err != nil { diff --git a/model/schema.json b/model/schema.json index caa3254ca..ea0517e40 100644 --- a/model/schema.json +++ b/model/schema.json @@ -30,7 +30,7 @@ "format": "date-time" }, "type": { - "description": "The action type. APP_ACTION is the value for the actions that suppose to be routed to the endpoints/beats.", + "description": "The action type. INPUT_ACTION is the value for the actions that suppose to be routed to the endpoints/beats.", "type": "string" }, "input_type": {