Skip to content

Commit

Permalink
Policy changes can be dropped during policy rollouts to large number …
Browse files Browse the repository at this point in the history
…of agents. Aggregates changes to avoid missing updates.

(cherry picked from commit 1fc4b1f)
  • Loading branch information
Sean Cunningham authored and mergify-bot committed Jul 7, 2021
1 parent c4020a4 commit 9b12677
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
3 changes: 2 additions & 1 deletion cmd/fleet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"

"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"

"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/julienschmidt/httprouter"
Expand Down Expand Up @@ -142,7 +143,7 @@ type stubLogger struct {
}

func (s *stubLogger) Write(p []byte) (n int, err error) {
log.Error().Bytes("msg", p).Send()
log.Error().Bytes(logger.EcsMessage, p).Send()
return len(p), nil
}

Expand Down
9 changes: 5 additions & 4 deletions internal/pkg/monitor/subscription_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ func (m *monitorT) notify(ctx context.Context, hits []es.HitT) {
select {
case s.c <- hits:
case <-lc.Done():
err := ctx.Err()
if err == context.DeadlineExceeded {
log.Err(err).Str("ctx", "subscription monitor").Dur("timeout", m.subTimeout).Msg("dropped notification")
}
log.Error().
Err(lc.Err()).
Str("ctx", "subscription monitor").
Dur("timeout", m.subTimeout).
Msg("dropped notification")
}
}(s)
}
Expand Down
44 changes: 42 additions & 2 deletions internal/pkg/policy/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ func (m *monitorT) Run(ctx context.Context) error {
s := m.monitor.Subscribe()
defer m.monitor.Unsubscribe(s)

// The output from m.monitor times out if we don't pull data off quickly enough.
// Rollout can take a while; append upates here until we can attend to it.
// TODO: This is a workaround for 7.14. Rollout strategy will be reconsidered for 7.15.
subCtx, cfunc := context.WithCancel(ctx)
defer cfunc()
outputCh := m.monitorOutputChannel(subCtx, s.Output())

close(m.startCh)
LOOP:
for {
Expand All @@ -111,7 +118,7 @@ LOOP:
if err := m.process(ctx); err != nil {
return err
}
case hits := <-s.Output():
case hits := <-outputCh:
policies := make([]model.Policy, len(hits))
for i, hit := range hits {
err := hit.Unmarshal(&policies[i])
Expand All @@ -128,6 +135,35 @@ LOOP:
return nil
}

// Aggegates changes from the output channel until the main loop can process.
func (m *monitorT) monitorOutputChannel(ctx context.Context, outputCh <-chan []es.HitT) chan []es.HitT {
localOutputCh := make(chan []es.HitT)

go func() {

var hits []es.HitT
var outCh chan []es.HitT

for {
select {
case <-ctx.Done():
m.log.Info().Msg("Exit policy monitor local")
return
case nHits := <-outputCh:
m.log.Info().Int("nHits", len(nHits)).Msg("Received hits on local monitor")
hits = append(hits, nHits...)
outCh = localOutputCh
case outCh <- hits:
m.log.Info().Int("nHits", len(hits)).Msg("Hits dispatched to main loop")
outCh = nil
hits = nil
}
}
}()

return localOutputCh
}

func (m *monitorT) waitStart(ctx context.Context) (err error) {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -189,7 +225,11 @@ func (m *monitorT) groupByLatest(policies []model.Policy) map[string]model.Polic
}

func (m *monitorT) rollout(ctx context.Context, policy model.Policy) error {
zlog := m.log.With().Str("policyId", policy.PolicyId).Logger()
zlog := m.log.With().
Str("policyId", policy.PolicyId).
Int64("revisionIdx", policy.RevisionIdx).
Int64("coordinatorIdx", policy.CoordinatorIdx).
Logger()

pp, err := NewParsedPolicy(policy)
if err != nil {
Expand Down

0 comments on commit 9b12677

Please sign in to comment.