Skip to content

Commit

Permalink
resovled conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
michalpristas committed Nov 23, 2021
2 parents 63888ba + 690e0be commit c1daf50
Show file tree
Hide file tree
Showing 45 changed files with 11,597 additions and 9,847 deletions.
42 changes: 20 additions & 22 deletions .mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pull_request_rules:
conditions:
- merged
- base=master
- label=v8.0.0
- label=backport-v8.0.0
actions:
backport:
assignees:
Expand All @@ -31,7 +31,7 @@ pull_request_rules:
conditions:
- merged
- base=master
- label=v7.16.0
- label=backport-v7.16.0
actions:
backport:
assignees:
Expand All @@ -45,7 +45,7 @@ pull_request_rules:
conditions:
- merged
- base=master
- label=v7.15.0
- label=backport-v7.15.0
actions:
backport:
assignees:
Expand All @@ -55,48 +55,46 @@ pull_request_rules:
labels:
- "backport"
title: "[{{ destination_branch }}](backport #{{ number }}) {{ title }}"
- name: backport patches to 7.14 branch
conditions:
- merged
- base=master
- label=v7.14.0
actions:
backport:
assignees:
- "{{ author }}"
branches:
- "7.14"
labels:
- "backport"
title: "[{{ destination_branch }}](backport #{{ number }}) {{ title }}"
- name: notify the backport policy
conditions:
- -label~=(^v\d|backport-skip)
- -label~=^backport
- base=master
actions:
comment:
message: |
This pull request does not have a backport label. Could you fix it @{{author}}? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:
* `v/d./d./d` is the label to automatically backport to the `7./d` branch. `/d` is the digit
* `backport-v/d./d./d` is the label to automatically backport to the `7./d` branch. `/d` is the digit
**NOTE**: `backport-skip` has been added to this pull request.
label:
add:
- backport-skip
- name: remove backport-skip label
conditions:
- label~=^v\d
- label~=backport-v
actions:
label:
remove:
- backport-skip
- name: automatic merge for 7\. branches when CI passes
- name: notify the backport has not been merged yet
conditions:
- -merged
- -closed
- author=mergify[bot]
- "#check-success>0"
- schedule=Mon-Mon 06:00-10:00[Europe/Paris]
- "#assignee>=1"
actions:
comment:
message: |
This pull request has not been merged yet. Could you please review and merge it @{{ assignee | join(', @') }}? 🙏
- name: automatic merge for 7\. or 8\. branches when CI passes
conditions:
- check-success=fleet-server/pr-merge
- check-success=CLA
- base~=^7\.
- base~=^(7|8)\.
- label=backport
- author=mergify[bot]
actions:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ clean: ## - Clean up build artifacts
.PHONY: generate
generate: ## - Generate schema models
@printf "${CMD_COLOR_ON} Installing module for go generate\n${CMD_COLOR_OFF}"
env GOBIN=${GOBIN} go install github.com/aleksmaus/generate/cmd/schema-generate@latest
env GOBIN=${GOBIN} go install github.com/aleksmaus/generate/cmd/schema-generate@5672148f3c31d78bbd0124583bc20133f2e18f37
@printf "${CMD_COLOR_ON} Running go generate\n${CMD_COLOR_OFF}"
env PATH="${GOBIN}:${PATH}" go generate ./...

Expand Down
20,046 changes: 10,340 additions & 9,706 deletions NOTICE.txt

Large diffs are not rendered by default.

18 changes: 15 additions & 3 deletions cmd/fleet/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type AckT struct {
func NewAckT(cfg *config.Server, bulker bulk.Bulk, cache cache.Cache) *AckT {
log.Info().
Interface("limits", cfg.Limits.AckLimit).
Msg("Ack install limits")
Msg("Setting config ack_limits")

return &AckT{
cfg: cfg,
Expand Down Expand Up @@ -259,6 +259,18 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return nil
}

sz := len(agent.DefaultApiKeyHistory)
if sz > 0 {
ids := make([]string, sz)
for i := 0; i < sz; i++ {
ids[i] = agent.DefaultApiKeyHistory[i].Id
}
log.Info().Strs("ids", ids).Msg("Invalidate old API keys")
if err := ack.bulk.ApiKeyInvalidate(ctx, ids...); err != nil {
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}

body := makeUpdatePolicyBody(
agent.PolicyId,
currRev,
Expand Down Expand Up @@ -360,7 +372,7 @@ func _getAPIKeyIDs(agent *model.Agent) []string {
//
// WARNING: This assumes the input data is sanitized.

const kUpdatePolicyPrefix = `{"script":{"lang":"painless","source":"if (ctx._source.policy_id == params.id) {ctx._source.` +
const kUpdatePolicyPrefix = `{"script":{"lang":"painless","source":"if (ctx._source.policy_id == params.id) {ctx._source.remove('default_api_key_history');ctx._source.` +
dl.FieldPolicyRevisionIdx +
` = params.rev;ctx._source.` +
dl.FieldPolicyCoordinatorIdx +
Expand All @@ -371,7 +383,7 @@ const kUpdatePolicyPrefix = `{"script":{"lang":"painless","source":"if (ctx._sou
func makeUpdatePolicyBody(policyId string, newRev, coordIdx int64) []byte {

var buf bytes.Buffer
buf.Grow(384)
buf.Grow(410)

// Not pretty, but fast.
buf.WriteString(kUpdatePolicyPrefix)
Expand Down
35 changes: 32 additions & 3 deletions cmd/fleet/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"reflect"
"strings"
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/action"
Expand Down Expand Up @@ -476,10 +478,16 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
dl.FieldDefaultApiKeyId: defaultOutputApiKey.Id,
dl.FieldPolicyOutputPermissionsHash: pp.Default.Role.Sha2,
}
if agent.DefaultApiKeyId != "" {
fields[dl.FieldDefaultApiKeyHistory] = model.DefaultApiKeyHistoryItems{
Id: agent.DefaultApiKeyId,
RetiredAt: time.Now().UTC().Format(time.RFC3339),
}
}

// Using painless script to append the old keys to the history
body, err := renderUpdatePainlessScript(fields)

body, err := json.Marshal(map[string]interface{}{
"doc": fields,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -509,6 +517,27 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
return &resp, nil
}

func renderUpdatePainlessScript(fields map[string]interface{}) ([]byte, error) {
var source strings.Builder
for field := range fields {
if field == dl.FieldDefaultApiKeyHistory {
source.WriteString(fmt.Sprint("if (ctx._source.", field, "==null) {ctx._source.", field, "=new ArrayList();} ctx._source.", field, ".add(params.", field, ");"))
} else {
source.WriteString(fmt.Sprint("ctx._source.", field, "=", "params.", field, ";"))
}
}

body, err := json.Marshal(map[string]interface{}{
"script": map[string]interface{}{
"lang": "painless",
"source": source.String(),
"params": fields,
},
})

return body, err
}

// Return Serializable policy injecting the apikey into the output field.
// This avoids reallocation of each section of the policy by duping
// the map object and only replacing the targeted section.
Expand Down
3 changes: 2 additions & 1 deletion cmd/fleet/handleChecking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package fleet

import (
"encoding/json"
"testing"

"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/stretchr/testify/assert"
"testing"
)

func TestConvertActionsEmpty(t *testing.T) {
Expand Down
69 changes: 46 additions & 23 deletions cmd/fleet/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/rollback"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -56,7 +57,7 @@ func NewEnrollerT(verCon version.Constraints, cfg *config.Server, bulker bulk.Bu

log.Info().
Interface("limits", cfg.Limits.EnrollLimit).
Msg("Enroller install limits")
Msg("Setting config enroll_limit")

return &EnrollerT{
verCon: verCon,
Expand Down Expand Up @@ -84,7 +85,25 @@ func (rt Router) handleEnroll(w http.ResponseWriter, r *http.Request, ps httprou
Str("mod", kEnrollMod).
Logger()

resp, err := rt.et.handleEnroll(&zlog, w, r)
// Error in the scope for deferred rolback function check
var err error

// Initialize rollback/cleanup for enrollment
// This deletes all the artifacts that were created during enrollment
rb := rollback.New(zlog)
defer func() {
if err != nil {
zlog.Error().Err(err).Msg("perform rollback on enrollment failure")
// Using the router context for the rollback
err = rb.Rollback(rt.ctx)
if err != nil {
zlog.Error().Err(err).Msg("rollback error on enrollment failure")
}
}
}()

var resp *EnrollResponse
resp, err = rt.et.handleEnroll(rb, &zlog, w, r)

if err != nil {
cntEnroll.IncError(err)
Expand All @@ -96,8 +115,8 @@ func (rt Router) handleEnroll(w http.ResponseWriter, r *http.Request, ps httprou
Int64(EcsEventDuration, time.Since(start).Nanoseconds()).
Msg("fail enroll")

if err := resp.Write(w); err != nil {
zlog.Error().Err(err).Msg("fail writing error response")
if rerr := resp.Write(w); rerr != nil {
zlog.Error().Err(rerr).Msg("fail writing error response")
}
return
}
Expand All @@ -108,13 +127,10 @@ func (rt Router) handleEnroll(w http.ResponseWriter, r *http.Request, ps httprou
Err(err).
Int64(EcsEventDuration, time.Since(start).Nanoseconds()).
Msg("fail write response")

// Remove ghost artifacts; agent will never receive the paylod
rt.et.wipeGhosts(r.Context(), zlog, resp)
}
}

func (et *EnrollerT) handleEnroll(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request) (*EnrollResponse, error) {
func (et *EnrollerT) handleEnroll(rb *rollback.Rollback, zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request) (*EnrollResponse, error) {

limitF, err := et.limit.Acquire()
if err != nil {
Expand All @@ -141,10 +157,10 @@ func (et *EnrollerT) handleEnroll(zlog *zerolog.Logger, w http.ResponseWriter, r
dfunc := cntEnroll.IncStart()
defer dfunc()

return et.processRequest(*zlog, w, r, key.Id, ver)
return et.processRequest(rb, *zlog, w, r, key.Id, ver)
}

func (et *EnrollerT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, enrollmentApiKeyId, ver string) (*EnrollResponse, error) {
func (et *EnrollerT) processRequest(rb *rollback.Rollback, zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, enrollmentApiKeyId, ver string) (*EnrollResponse, error) {

// Validate that an enrollment record exists for a key with this id.
erec, err := et.fetchEnrollmentKeyRecord(r.Context(), enrollmentApiKeyId)
Expand All @@ -169,10 +185,10 @@ func (et *EnrollerT) processRequest(zlog zerolog.Logger, w http.ResponseWriter,

cntEnroll.bodyIn.Add(readCounter.Count())

return et._enroll(r.Context(), zlog, req, erec.PolicyId, ver)
return et._enroll(r.Context(), rb, zlog, req, erec.PolicyId, ver)
}

func (et *EnrollerT) _enroll(ctx context.Context, zlog zerolog.Logger, req *EnrollRequest, policyId, ver string) (*EnrollResponse, error) {
func (et *EnrollerT) _enroll(ctx context.Context, rb *rollback.Rollback, zlog zerolog.Logger, req *EnrollRequest, policyId, ver string) (*EnrollResponse, error) {

if req.SharedId != "" {
// TODO: Support pre-existing install
Expand Down Expand Up @@ -201,6 +217,11 @@ func (et *EnrollerT) _enroll(ctx context.Context, zlog zerolog.Logger, req *Enro
return nil, err
}

// Register invalidate API key function for enrollment error rollback
rb.Register("invalidate API key", func(ctx context.Context) error {
return invalidateApiKey(ctx, zlog, et.bulker, accessApiKey.Id)
})

agentData := model.Agent{
Active: true,
PolicyId: policyId,
Expand All @@ -217,10 +238,14 @@ func (et *EnrollerT) _enroll(ctx context.Context, zlog zerolog.Logger, req *Enro

err = createFleetAgent(ctx, et.bulker, agentId, agentData)
if err != nil {
invalidateApiKey(ctx, zlog, et.bulker, accessApiKey.Id)
return nil, err
}

// Register delete fleet agent for enrollment error rollback
rb.Register("delete agent", func(ctx context.Context) error {
return deleteAgent(ctx, zlog, et.bulker, agentId)
})

resp := EnrollResponse{
Action: "created",
Item: EnrollResponseItem{
Expand All @@ -243,17 +268,15 @@ func (et *EnrollerT) _enroll(ctx context.Context, zlog zerolog.Logger, req *Enro
return &resp, nil
}

// Remove the ghost artifacts from Elastic; the agent record and the accessApiKey.
func (et *EnrollerT) wipeGhosts(ctx context.Context, zlog zerolog.Logger, resp *EnrollResponse) {
zlog = zlog.With().Str(LogAgentId, resp.Item.ID).Logger()
func deleteAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agentID string) error {
zlog = zlog.With().Str(LogAgentId, agentID).Logger()

if err := et.bulker.Delete(ctx, dl.FleetAgents, resp.Item.ID); err != nil {
zlog.Error().Err(err).Msg("ghost agent record failed to delete")
} else {
zlog.Info().Msg("ghost agent record deleted")
if err := bulker.Delete(ctx, dl.FleetAgents, agentID); err != nil {
zlog.Error().Err(err).Msg("agent record failed to delete")
return err
}

invalidateApiKey(ctx, zlog, et.bulker, resp.Item.AccessApiKeyId)
zlog.Info().Msg("agent record deleted")
return nil
}

func invalidateApiKey(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, apikeyId string) error {
Expand Down Expand Up @@ -323,7 +346,7 @@ func writeResponse(zlog zerolog.Logger, w http.ResponseWriter, resp *EnrollRespo
Str(LogAccessApiKeyId, resp.Item.AccessApiKeyId).
Int(EcsHttpResponseBodyBytes, numWritten).
Int64(EcsEventDuration, time.Since(start).Nanoseconds()).
Msg("success enroll")
Msg("Elastic Agent successfully enrolled")

return nil
}
Expand Down
Loading

0 comments on commit c1daf50

Please sign in to comment.