From 1cb3251b500ed14ac3504fdd72578bfc9a4b1348 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 7 Jul 2021 16:48:49 +0000 Subject: [PATCH] Policy changes can be dropped during policy rollouts to large number of agents. Aggregates changes to avoid missing updates. (#525) (cherry picked from commit 1fc4b1f758c68858071568267f567c952ffe189e) Co-authored-by: Sean Cunningham --- cmd/fleet/server.go | 3 +- internal/pkg/monitor/subscription_monitor.go | 9 ++-- internal/pkg/policy/monitor.go | 44 +++++++++++++++++++- 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/cmd/fleet/server.go b/cmd/fleet/server.go index 22a596f42..bb9960889 100644 --- a/cmd/fleet/server.go +++ b/cmd/fleet/server.go @@ -12,6 +12,7 @@ import ( "net/http" "github.com/elastic/fleet-server/v7/internal/pkg/config" + "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/julienschmidt/httprouter" @@ -142,7 +143,7 @@ type stubLogger struct { } func (s *stubLogger) Write(p []byte) (n int, err error) { - log.Error().Bytes("msg", p).Send() + log.Error().Bytes(logger.EcsMessage, p).Send() return len(p), nil } diff --git a/internal/pkg/monitor/subscription_monitor.go b/internal/pkg/monitor/subscription_monitor.go index ec5c71821..02e43c4d8 100644 --- a/internal/pkg/monitor/subscription_monitor.go +++ b/internal/pkg/monitor/subscription_monitor.go @@ -145,10 +145,11 @@ func (m *monitorT) notify(ctx context.Context, hits []es.HitT) { select { case s.c <- hits: case <-lc.Done(): - err := ctx.Err() - if err == context.DeadlineExceeded { - log.Err(err).Str("ctx", "subscription monitor").Dur("timeout", m.subTimeout).Msg("dropped notification") - } + log.Error(). + Err(lc.Err()). + Str("ctx", "subscription monitor"). + Dur("timeout", m.subTimeout). + Msg("dropped notification") } }(s) } diff --git a/internal/pkg/policy/monitor.go b/internal/pkg/policy/monitor.go index 49befe528..598acc849 100644 --- a/internal/pkg/policy/monitor.go +++ b/internal/pkg/policy/monitor.go @@ -101,6 +101,13 @@ func (m *monitorT) Run(ctx context.Context) error { s := m.monitor.Subscribe() defer m.monitor.Unsubscribe(s) + // The output from m.monitor times out if we don't pull data off quickly enough. + // Rollout can take a while; append upates here until we can attend to it. + // TODO: This is a workaround for 7.14. Rollout strategy will be reconsidered for 7.15. + subCtx, cfunc := context.WithCancel(ctx) + defer cfunc() + outputCh := m.monitorOutputChannel(subCtx, s.Output()) + close(m.startCh) LOOP: for { @@ -111,7 +118,7 @@ LOOP: if err := m.process(ctx); err != nil { return err } - case hits := <-s.Output(): + case hits := <-outputCh: policies := make([]model.Policy, len(hits)) for i, hit := range hits { err := hit.Unmarshal(&policies[i]) @@ -128,6 +135,35 @@ LOOP: return nil } +// Aggegates changes from the output channel until the main loop can process. +func (m *monitorT) monitorOutputChannel(ctx context.Context, outputCh <-chan []es.HitT) chan []es.HitT { + localOutputCh := make(chan []es.HitT) + + go func() { + + var hits []es.HitT + var outCh chan []es.HitT + + for { + select { + case <-ctx.Done(): + m.log.Info().Msg("Exit policy monitor local") + return + case nHits := <-outputCh: + m.log.Info().Int("nHits", len(nHits)).Msg("Received hits on local monitor") + hits = append(hits, nHits...) + outCh = localOutputCh + case outCh <- hits: + m.log.Info().Int("nHits", len(hits)).Msg("Hits dispatched to main loop") + outCh = nil + hits = nil + } + } + }() + + return localOutputCh +} + func (m *monitorT) waitStart(ctx context.Context) (err error) { select { case <-ctx.Done(): @@ -189,7 +225,11 @@ func (m *monitorT) groupByLatest(policies []model.Policy) map[string]model.Polic } func (m *monitorT) rollout(ctx context.Context, policy model.Policy) error { - zlog := m.log.With().Str("policyId", policy.PolicyId).Logger() + zlog := m.log.With(). + Str("policyId", policy.PolicyId). + Int64("revisionIdx", policy.RevisionIdx). + Int64("coordinatorIdx", policy.CoordinatorIdx). + Logger() pp, err := NewParsedPolicy(policy) if err != nil {