From 5775b0535b004b1d7526eaf962aede6fd1d314b3 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Wed, 20 Jan 2021 10:42:14 -0500 Subject: [PATCH 1/4] Make fleet server components resilent if the indices do not exist --- internal/pkg/coordinator/monitor.go | 13 +++++++++++-- internal/pkg/esboot/bootstrap.go | 17 +++++++++-------- internal/pkg/migrate/migrate.go | 14 ++++++++++++-- internal/pkg/monitor/monitor.go | 10 +++++++++- internal/pkg/policy/monitor.go | 6 ++++++ 5 files changed, 47 insertions(+), 13 deletions(-) diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 239e6ef8d..e88c82c07 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -6,6 +6,7 @@ package coordinator import ( "context" + "errors" "net" "os" "runtime" @@ -188,7 +189,11 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { leaders := map[string]model.PolicyLeader{} policies, err := dl.QueryLatestPolicies(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) if err != nil { - return err + if errors.Is(err, es.ErrIndexNotFound) { + err = nil + } else { + return err + } } if len(policies) > 0 { ids := make([]string, len(policies)) @@ -197,7 +202,11 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { } leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex)) if err != nil { - return err + if errors.Is(err, es.ErrIndexNotFound) { + err = nil + } else { + return err + } } } diff --git a/internal/pkg/esboot/bootstrap.go b/internal/pkg/esboot/bootstrap.go index 614d500d6..c000f965d 100644 --- a/internal/pkg/esboot/bootstrap.go +++ b/internal/pkg/esboot/bootstrap.go @@ -6,7 +6,6 @@ package esboot import ( "context" - "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/go-elasticsearch/v8" ) @@ -20,13 +19,15 @@ type indexConfig struct { } var indexConfigs = map[string]indexConfig{ - ".fleet-actions": {mapping: es.MappingAction}, - ".fleet-actions-results": {mapping: es.MappingActionResult, datastream: true}, - ".fleet-agents": {mapping: es.MappingAgent}, - ".fleet-enrollment-api-keys": {mapping: es.MappingEnrollmentApiKey}, - ".fleet-policies": {mapping: es.MappingPolicy}, - ".fleet-policies-leader": {mapping: es.MappingPolicyLeader}, - ".fleet-servers": {mapping: es.MappingServer}, + // Commenting out the boostrapping for now here, just in case if it needs to be "enabled" again. + // Will remove all the boostrapping code completely later once all is fully integrated + // ".fleet-actions": {mapping: es.MappingAction}, + // ".fleet-actions-results": {mapping: es.MappingActionResult, datastream: true}, + // ".fleet-agents": {mapping: es.MappingAgent}, + // ".fleet-enrollment-api-keys": {mapping: es.MappingEnrollmentApiKey}, + // ".fleet-policies": {mapping: es.MappingPolicy}, + // ".fleet-policies-leader": {mapping: es.MappingPolicyLeader}, + // ".fleet-servers": {mapping: es.MappingServer}, } // Bootstrap creates .fleet-actions data stream diff --git a/internal/pkg/migrate/migrate.go b/internal/pkg/migrate/migrate.go index dbcc8cb83..4eed7d7ed 100644 --- a/internal/pkg/migrate/migrate.go +++ b/internal/pkg/migrate/migrate.go @@ -7,8 +7,11 @@ package migrate import ( "context" "encoding/json" + "errors" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/saved" ) @@ -42,12 +45,19 @@ func MigrateEnrollmentAPIKeys(ctx context.Context, sv saved.CRUD, bulker bulk.Bu } var recs []model.EnrollmentApiKey + var resHits []es.HitT res, err := bulker.Search(ctx, []string{dl.FleetEnrollmentAPIKeys}, raw, bulk.WithRefresh()) if err != nil { - return err + if errors.Is(err, es.ErrIndexNotFound) { + err = nil + } else { + return err + } + } else { + resHits = res.Hits } - for _, hit := range res.Hits { + for _, hit := range resHits { var rec model.EnrollmentApiKey err := json.Unmarshal(hit.Source, &rec) if err != nil { diff --git a/internal/pkg/monitor/monitor.go b/internal/pkg/monitor/monitor.go index 78df63db9..a6f28936b 100644 --- a/internal/pkg/monitor/monitor.go +++ b/internal/pkg/monitor/monitor.go @@ -8,6 +8,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "sync/atomic" "time" @@ -291,7 +292,14 @@ func (m *simpleMonitorT) search(ctx context.Context, tmpl *dsl.Tmpl, params map[ } if res.IsError() { - return nil, es.TranslateError(res.StatusCode, esres.Error) + err = es.TranslateError(res.StatusCode, esres.Error) + } + + if err != nil { + if errors.Is(err, es.ErrIndexNotFound) { + return nil, nil + } + return nil, err } return esres.Hits.Hits, nil diff --git a/internal/pkg/policy/monitor.go b/internal/pkg/policy/monitor.go index 52724a231..9e2b10635 100644 --- a/internal/pkg/policy/monitor.go +++ b/internal/pkg/policy/monitor.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" ) @@ -125,6 +126,11 @@ LOOP: func (m *monitorT) process(ctx context.Context) error { policies, err := m.policyF(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) if err != nil { + if errors.Is(err, es.ErrIndexNotFound) { + err = nil + } else { + err = nil + } return err } if len(policies) == 0 { From 06a7d379439cc3f62ff8cedbca86d2ded4b2756b Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Thu, 21 Jan 2021 10:14:12 -0500 Subject: [PATCH 2/4] Address code review comments --- cmd/fleet/main.go | 2 +- internal/pkg/coordinator/monitor.go | 12 ++++++------ internal/pkg/migrate/migrate.go | 9 ++++++--- internal/pkg/monitor/monitor.go | 1 + internal/pkg/policy/monitor.go | 5 ++--- 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/cmd/fleet/main.go b/cmd/fleet/main.go index a197ec178..be73843f0 100644 --- a/cmd/fleet/main.go +++ b/cmd/fleet/main.go @@ -478,7 +478,7 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er if err != nil { return err } - err = migrate.Migrate(ctx, sv, bulker) + err = migrate.Migrate(ctx, log.Logger, sv, bulker) if err != nil { return err } diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index e88c82c07..272d76f2d 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -190,10 +190,10 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { policies, err := dl.QueryLatestPolicies(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { - err = nil - } else { - return err + m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) + return nil } + return err } if len(policies) > 0 { ids := make([]string, len(policies)) @@ -203,10 +203,10 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex)) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { - err = nil - } else { - return err + m.log.Debug().Str("index", m.leadersIndex).Msg(es.ErrIndexNotFound.Error()) + return nil } + return err } } diff --git a/internal/pkg/migrate/migrate.go b/internal/pkg/migrate/migrate.go index 4eed7d7ed..f18a4bde0 100644 --- a/internal/pkg/migrate/migrate.go +++ b/internal/pkg/migrate/migrate.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/saved" + "github.com/rs/zerolog" ) type enrollmentApiKey struct { @@ -32,11 +33,11 @@ type enrollmentApiKey struct { // This is for development only (1 instance of fleet) // Not safe for multiple instances of fleet // Initially needed to migrate the enrollment-api-keys that kibana creates -func Migrate(ctx context.Context, sv saved.CRUD, bulker bulk.Bulk) error { - return MigrateEnrollmentAPIKeys(ctx, sv, bulker) +func Migrate(ctx context.Context, log zerolog.Logger, sv saved.CRUD, bulker bulk.Bulk) error { + return MigrateEnrollmentAPIKeys(ctx, log, sv, bulker) } -func MigrateEnrollmentAPIKeys(ctx context.Context, sv saved.CRUD, bulker bulk.Bulk) error { +func MigrateEnrollmentAPIKeys(ctx context.Context, log zerolog.Logger, sv saved.CRUD, bulker bulk.Bulk) error { // Query all enrollment keys from the new schema raw, err := dl.RenderAllEnrollmentAPIKeysQuery(1000) @@ -49,6 +50,8 @@ func MigrateEnrollmentAPIKeys(ctx context.Context, sv saved.CRUD, bulker bulk.Bu res, err := bulker.Search(ctx, []string{dl.FleetEnrollmentAPIKeys}, raw, bulk.WithRefresh()) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { + log.Debug().Str("index", dl.FleetEnrollmentAPIKeys).Msg(es.ErrIndexNotFound.Error()) + // Continue with migration if the .fleet-enrollment-api-keys index is not found err = nil } else { return err diff --git a/internal/pkg/monitor/monitor.go b/internal/pkg/monitor/monitor.go index a6f28936b..bdea51fce 100644 --- a/internal/pkg/monitor/monitor.go +++ b/internal/pkg/monitor/monitor.go @@ -297,6 +297,7 @@ func (m *simpleMonitorT) search(ctx context.Context, tmpl *dsl.Tmpl, params map[ if err != nil { if errors.Is(err, es.ErrIndexNotFound) { + m.log.Debug().Str("index", m.index).Msg(es.ErrIndexNotFound.Error()) return nil, nil } return nil, err diff --git a/internal/pkg/policy/monitor.go b/internal/pkg/policy/monitor.go index 9e2b10635..e647bb612 100644 --- a/internal/pkg/policy/monitor.go +++ b/internal/pkg/policy/monitor.go @@ -127,9 +127,8 @@ func (m *monitorT) process(ctx context.Context) error { policies, err := m.policyF(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { - err = nil - } else { - err = nil + m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) + return nil } return err } From fe4fdc5a96a3b82bb1879a9a29a986591c5ddb0d Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Mon, 25 Jan 2021 13:45:33 -0500 Subject: [PATCH 3/4] Hide old bootstrap logic behind FLEET_ES_BOOT environment variable --- cmd/fleet/main.go | 21 ++++++++++++++------- internal/pkg/esboot/bootstrap.go | 15 ++++++++------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/cmd/fleet/main.go b/cmd/fleet/main.go index be73843f0..730ab9f9e 100644 --- a/cmd/fleet/main.go +++ b/cmd/fleet/main.go @@ -474,13 +474,20 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er // Initial indices bootstrapping, needed for agents actions development // TODO: remove this after the indices bootstrapping logic implemented in ES plugin - err = esboot.EnsureESIndices(ctx, es) - if err != nil { - return err - } - err = migrate.Migrate(ctx, log.Logger, sv, bulker) - if err != nil { - return err + bootFlag := env.GetStr( + "FLEET_ES_BOOT", + "", + ) + if bootFlag == "1" { + log.Debug().Msg("FLEET_ES_BOOT is set to true, perform bootstrap") + err = esboot.EnsureESIndices(ctx, es) + if err != nil { + return err + } + err = migrate.Migrate(ctx, log.Logger, sv, bulker) + if err != nil { + return err + } } // Replacing to errgroup context diff --git a/internal/pkg/esboot/bootstrap.go b/internal/pkg/esboot/bootstrap.go index c000f965d..9906a8f52 100644 --- a/internal/pkg/esboot/bootstrap.go +++ b/internal/pkg/esboot/bootstrap.go @@ -7,6 +7,7 @@ package esboot import ( "context" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/go-elasticsearch/v8" ) @@ -21,13 +22,13 @@ type indexConfig struct { var indexConfigs = map[string]indexConfig{ // Commenting out the boostrapping for now here, just in case if it needs to be "enabled" again. // Will remove all the boostrapping code completely later once all is fully integrated - // ".fleet-actions": {mapping: es.MappingAction}, - // ".fleet-actions-results": {mapping: es.MappingActionResult, datastream: true}, - // ".fleet-agents": {mapping: es.MappingAgent}, - // ".fleet-enrollment-api-keys": {mapping: es.MappingEnrollmentApiKey}, - // ".fleet-policies": {mapping: es.MappingPolicy}, - // ".fleet-policies-leader": {mapping: es.MappingPolicyLeader}, - // ".fleet-servers": {mapping: es.MappingServer}, + ".fleet-actions": {mapping: es.MappingAction}, + ".fleet-actions-results": {mapping: es.MappingActionResult, datastream: true}, + ".fleet-agents": {mapping: es.MappingAgent}, + ".fleet-enrollment-api-keys": {mapping: es.MappingEnrollmentApiKey}, + ".fleet-policies": {mapping: es.MappingPolicy}, + ".fleet-policies-leader": {mapping: es.MappingPolicyLeader}, + ".fleet-servers": {mapping: es.MappingServer}, } // Bootstrap creates .fleet-actions data stream From 3951f20a9fcecef8170f392bdb35dc6dc88a522e Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Wed, 27 Jan 2021 16:22:19 -0500 Subject: [PATCH 4/4] Rename action input_id to input_type --- cmd/fleet/handleCheckin.go | 2 +- cmd/fleet/schema.go | 2 +- internal/pkg/es/mapping.go | 2 +- internal/pkg/model/schema.go | 4 ++-- internal/pkg/testing/actions.go | 7 ++++--- model/schema.json | 4 ++-- 6 files changed, 11 insertions(+), 10 deletions(-) diff --git a/cmd/fleet/handleCheckin.go b/cmd/fleet/handleCheckin.go index 10c7a74a5..f074eedac 100644 --- a/cmd/fleet/handleCheckin.go +++ b/cmd/fleet/handleCheckin.go @@ -246,7 +246,7 @@ func convertActions(agentId string, actions []model.Action) ([]ActionResp, strin Data: []byte(action.Data), Id: action.ActionId, Type: action.Type, - InputId: action.InputId, + InputType: action.InputType, }) } diff --git a/cmd/fleet/schema.go b/cmd/fleet/schema.go index 77371b6c4..dc2de3b5c 100644 --- a/cmd/fleet/schema.go +++ b/cmd/fleet/schema.go @@ -123,7 +123,7 @@ type ActionResp struct { Data json.RawMessage `json:"data"` Id string `json:"id"` Type string `json:"type"` - InputId string `json:"input_id"` + InputType string `json:"input_type"` } type Event struct { diff --git a/internal/pkg/es/mapping.go b/internal/pkg/es/mapping.go index dadae8870..91dd14e9a 100644 --- a/internal/pkg/es/mapping.go +++ b/internal/pkg/es/mapping.go @@ -24,7 +24,7 @@ const ( "expiration": { "type": "date" }, - "input_id": { + "input_type": { "type": "keyword" }, "@timestamp": { diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index 8f66bee5e..1d9c7e101 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -45,8 +45,8 @@ type Action struct { // The action expiration date/time Expiration string `json:"expiration,omitempty"` - // The input identifier the actions should be routed to. - InputId string `json:"input_id,omitempty"` + // The input type the actions should be routed to. + InputType string `json:"input_type,omitempty"` // Date/time the action was created Timestamp string `json:"@timestamp,omitempty"` diff --git a/internal/pkg/testing/actions.go b/internal/pkg/testing/actions.go index 2bea9a4dc..63e66dd87 100644 --- a/internal/pkg/testing/actions.go +++ b/internal/pkg/testing/actions.go @@ -9,12 +9,13 @@ package testing import ( "context" "encoding/json" + "testing" + "time" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/rnd" - "testing" - "time" "github.com/gofrs/uuid" "github.com/rs/xid" @@ -60,7 +61,7 @@ func CreateRandomActions(min, max int) ([]model.Action, error) { Timestamp: r.Time(now, 2, 5, time.Second, rnd.TimeBefore).Format(time.RFC3339), Expiration: r.Time(now, 12, 25, time.Minute, rnd.TimeAfter).Format(time.RFC3339), Type: "APP_ACTION", - InputId: "osquery", + InputType: "osquery", Agents: aid, Data: data, } diff --git a/model/schema.json b/model/schema.json index 686c23124..caa3254ca 100644 --- a/model/schema.json +++ b/model/schema.json @@ -33,8 +33,8 @@ "description": "The action type. APP_ACTION is the value for the actions that suppose to be routed to the endpoints/beats.", "type": "string" }, - "input_id": { - "description": "The input identifier the actions should be routed to.", + "input_type": { + "description": "The input type the actions should be routed to.", "type": "string" }, "agents": {