From b9d158a82c694a8a0e0b869dd67c91ddcac506bc Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Wed, 3 Mar 2021 15:50:23 -0500 Subject: [PATCH] Fix global checkpoint based monitoring (#116) The index monitoring got broken dues to replacing the original .fleet-actions and .fleet-policies indices with aliases. This change uses the first index global checkpoint value received from stats and returns the error if there are more than two indices for alias for any reason. --- internal/pkg/monitor/global_checkpoint.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/internal/pkg/monitor/global_checkpoint.go b/internal/pkg/monitor/global_checkpoint.go index 31eb73ea9..89f0e8025 100644 --- a/internal/pkg/monitor/global_checkpoint.go +++ b/internal/pkg/monitor/global_checkpoint.go @@ -7,11 +7,16 @@ package monitor import ( "context" "encoding/json" + "errors" + "fmt" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/go-elasticsearch/v8" ) +var ErrGlobalCheckpoint = errors.New("global checkpoint error") + type shard struct { SeqNo struct { GlobalCheckpoint int64 `json:"global_checkpoint"` @@ -49,7 +54,21 @@ func queryGlobalCheckpoint(ctx context.Context, es *elasticsearch.Client, index return } - if stats, ok := sres.IndexStats[index]; ok { + if len(sres.IndexStats) > 1 { + indices := make([]string, 0, len(sres.IndexStats)) + for k := range sres.IndexStats { + indices = append(indices, k) + } + return seqno, fmt.Errorf("more than one indices found %v, %w", indices, ErrGlobalCheckpoint) + } + + if len(sres.IndexStats) > 0 { + // Grab the first and only index stats + var stats indexStats + for _, stats = range sres.IndexStats { + break + } + if shards, ok := stats.Shards["0"]; ok { if len(shards) > 0 { seqno = shards[0].SeqNo.GlobalCheckpoint