diff --git a/cmd/fleet/handleCheckin.go b/cmd/fleet/handleCheckin.go index 10c7a74a5..f074eedac 100644 --- a/cmd/fleet/handleCheckin.go +++ b/cmd/fleet/handleCheckin.go @@ -246,7 +246,7 @@ func convertActions(agentId string, actions []model.Action) ([]ActionResp, strin Data: []byte(action.Data), Id: action.ActionId, Type: action.Type, - InputId: action.InputId, + InputType: action.InputType, }) } diff --git a/cmd/fleet/main.go b/cmd/fleet/main.go index a197ec178..730ab9f9e 100644 --- a/cmd/fleet/main.go +++ b/cmd/fleet/main.go @@ -474,13 +474,20 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er // Initial indices bootstrapping, needed for agents actions development // TODO: remove this after the indices bootstrapping logic implemented in ES plugin - err = esboot.EnsureESIndices(ctx, es) - if err != nil { - return err - } - err = migrate.Migrate(ctx, sv, bulker) - if err != nil { - return err + bootFlag := env.GetStr( + "FLEET_ES_BOOT", + "", + ) + if bootFlag == "1" { + log.Debug().Msg("FLEET_ES_BOOT is set to true, perform bootstrap") + err = esboot.EnsureESIndices(ctx, es) + if err != nil { + return err + } + err = migrate.Migrate(ctx, log.Logger, sv, bulker) + if err != nil { + return err + } } // Replacing to errgroup context diff --git a/cmd/fleet/schema.go b/cmd/fleet/schema.go index 77371b6c4..dc2de3b5c 100644 --- a/cmd/fleet/schema.go +++ b/cmd/fleet/schema.go @@ -123,7 +123,7 @@ type ActionResp struct { Data json.RawMessage `json:"data"` Id string `json:"id"` Type string `json:"type"` - InputId string `json:"input_id"` + InputType string `json:"input_type"` } type Event struct { diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 239e6ef8d..272d76f2d 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -6,6 +6,7 @@ package coordinator import ( "context" + "errors" "net" "os" "runtime" @@ -188,6 +189,10 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { leaders := map[string]model.PolicyLeader{} policies, err := dl.QueryLatestPolicies(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) if err != nil { + if errors.Is(err, es.ErrIndexNotFound) { + m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) + return nil + } return err } if len(policies) > 0 { @@ -197,6 +202,10 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { } leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex)) if err != nil { + if errors.Is(err, es.ErrIndexNotFound) { + m.log.Debug().Str("index", m.leadersIndex).Msg(es.ErrIndexNotFound.Error()) + return nil + } return err } } diff --git a/internal/pkg/es/mapping.go b/internal/pkg/es/mapping.go index dadae8870..91dd14e9a 100644 --- a/internal/pkg/es/mapping.go +++ b/internal/pkg/es/mapping.go @@ -24,7 +24,7 @@ const ( "expiration": { "type": "date" }, - "input_id": { + "input_type": { "type": "keyword" }, "@timestamp": { diff --git a/internal/pkg/esboot/bootstrap.go b/internal/pkg/esboot/bootstrap.go index 614d500d6..9906a8f52 100644 --- a/internal/pkg/esboot/bootstrap.go +++ b/internal/pkg/esboot/bootstrap.go @@ -6,8 +6,8 @@ package esboot import ( "context" - "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/go-elasticsearch/v8" ) @@ -20,6 +20,8 @@ type indexConfig struct { } var indexConfigs = map[string]indexConfig{ + // Commenting out the boostrapping for now here, just in case if it needs to be "enabled" again. + // Will remove all the boostrapping code completely later once all is fully integrated ".fleet-actions": {mapping: es.MappingAction}, ".fleet-actions-results": {mapping: es.MappingActionResult, datastream: true}, ".fleet-agents": {mapping: es.MappingAgent}, diff --git a/internal/pkg/migrate/migrate.go b/internal/pkg/migrate/migrate.go index dbcc8cb83..f18a4bde0 100644 --- a/internal/pkg/migrate/migrate.go +++ b/internal/pkg/migrate/migrate.go @@ -7,10 +7,14 @@ package migrate import ( "context" "encoding/json" + "errors" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/saved" + "github.com/rs/zerolog" ) type enrollmentApiKey struct { @@ -29,11 +33,11 @@ type enrollmentApiKey struct { // This is for development only (1 instance of fleet) // Not safe for multiple instances of fleet // Initially needed to migrate the enrollment-api-keys that kibana creates -func Migrate(ctx context.Context, sv saved.CRUD, bulker bulk.Bulk) error { - return MigrateEnrollmentAPIKeys(ctx, sv, bulker) +func Migrate(ctx context.Context, log zerolog.Logger, sv saved.CRUD, bulker bulk.Bulk) error { + return MigrateEnrollmentAPIKeys(ctx, log, sv, bulker) } -func MigrateEnrollmentAPIKeys(ctx context.Context, sv saved.CRUD, bulker bulk.Bulk) error { +func MigrateEnrollmentAPIKeys(ctx context.Context, log zerolog.Logger, sv saved.CRUD, bulker bulk.Bulk) error { // Query all enrollment keys from the new schema raw, err := dl.RenderAllEnrollmentAPIKeysQuery(1000) @@ -42,12 +46,21 @@ func MigrateEnrollmentAPIKeys(ctx context.Context, sv saved.CRUD, bulker bulk.Bu } var recs []model.EnrollmentApiKey + var resHits []es.HitT res, err := bulker.Search(ctx, []string{dl.FleetEnrollmentAPIKeys}, raw, bulk.WithRefresh()) if err != nil { - return err + if errors.Is(err, es.ErrIndexNotFound) { + log.Debug().Str("index", dl.FleetEnrollmentAPIKeys).Msg(es.ErrIndexNotFound.Error()) + // Continue with migration if the .fleet-enrollment-api-keys index is not found + err = nil + } else { + return err + } + } else { + resHits = res.Hits } - for _, hit := range res.Hits { + for _, hit := range resHits { var rec model.EnrollmentApiKey err := json.Unmarshal(hit.Source, &rec) if err != nil { diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index 8f66bee5e..1d9c7e101 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -45,8 +45,8 @@ type Action struct { // The action expiration date/time Expiration string `json:"expiration,omitempty"` - // The input identifier the actions should be routed to. - InputId string `json:"input_id,omitempty"` + // The input type the actions should be routed to. + InputType string `json:"input_type,omitempty"` // Date/time the action was created Timestamp string `json:"@timestamp,omitempty"` diff --git a/internal/pkg/monitor/monitor.go b/internal/pkg/monitor/monitor.go index 78df63db9..bdea51fce 100644 --- a/internal/pkg/monitor/monitor.go +++ b/internal/pkg/monitor/monitor.go @@ -8,6 +8,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "sync/atomic" "time" @@ -291,7 +292,15 @@ func (m *simpleMonitorT) search(ctx context.Context, tmpl *dsl.Tmpl, params map[ } if res.IsError() { - return nil, es.TranslateError(res.StatusCode, esres.Error) + err = es.TranslateError(res.StatusCode, esres.Error) + } + + if err != nil { + if errors.Is(err, es.ErrIndexNotFound) { + m.log.Debug().Str("index", m.index).Msg(es.ErrIndexNotFound.Error()) + return nil, nil + } + return nil, err } return esres.Hits.Hits, nil diff --git a/internal/pkg/policy/monitor.go b/internal/pkg/policy/monitor.go index 52724a231..e647bb612 100644 --- a/internal/pkg/policy/monitor.go +++ b/internal/pkg/policy/monitor.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" ) @@ -125,6 +126,10 @@ LOOP: func (m *monitorT) process(ctx context.Context) error { policies, err := m.policyF(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) if err != nil { + if errors.Is(err, es.ErrIndexNotFound) { + m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) + return nil + } return err } if len(policies) == 0 { diff --git a/internal/pkg/testing/actions.go b/internal/pkg/testing/actions.go index 2bea9a4dc..63e66dd87 100644 --- a/internal/pkg/testing/actions.go +++ b/internal/pkg/testing/actions.go @@ -9,12 +9,13 @@ package testing import ( "context" "encoding/json" + "testing" + "time" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/rnd" - "testing" - "time" "github.com/gofrs/uuid" "github.com/rs/xid" @@ -60,7 +61,7 @@ func CreateRandomActions(min, max int) ([]model.Action, error) { Timestamp: r.Time(now, 2, 5, time.Second, rnd.TimeBefore).Format(time.RFC3339), Expiration: r.Time(now, 12, 25, time.Minute, rnd.TimeAfter).Format(time.RFC3339), Type: "APP_ACTION", - InputId: "osquery", + InputType: "osquery", Agents: aid, Data: data, } diff --git a/model/schema.json b/model/schema.json index 686c23124..caa3254ca 100644 --- a/model/schema.json +++ b/model/schema.json @@ -33,8 +33,8 @@ "description": "The action type. APP_ACTION is the value for the actions that suppose to be routed to the endpoints/beats.", "type": "string" }, - "input_id": { - "description": "The input identifier the actions should be routed to.", + "input_type": { + "description": "The input type the actions should be routed to.", "type": "string" }, "agents": {