Skip to content

Commit

Permalink
[Ingest Manager] Unroll on unauthorised (elastic#19722) (elastic#19888)
Browse files Browse the repository at this point in the history
[Ingest Manager] Unroll on unauthorised (elastic#19722)
  • Loading branch information
michalpristas authored Jul 14, 2020
1 parent d26d033 commit c6d44a1
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 56 deletions.
46 changes: 23 additions & 23 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16506,6 +16506,29 @@ Contents of probable licence file $GOMODCACHE/gopkg.in/[email protected]/LICENSE:
limitations under the License.


--------------------------------------------------------------------------------
Dependency : gotest.tools
Version: v2.2.0+incompatible
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/[email protected]+incompatible/LICENSE:

Copyright 2018 gotest.tools authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


--------------------------------------------------------------------------------
Dependency : howett.net/plist
Version: v0.0.0-20181124034731-591f970eefbb
Expand Down Expand Up @@ -39672,29 +39695,6 @@ See the License for the specific language governing permissions and
limitations under the License.


--------------------------------------------------------------------------------
Dependency : gotest.tools
Version: v2.2.0+incompatible
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/[email protected]+incompatible/LICENSE:

Copyright 2018 gotest.tools authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


--------------------------------------------------------------------------------
Dependency : honnef.co/go/tools
Version: v0.0.1-2019.2.3
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ require (
gopkg.in/jcmturner/gokrb5.v7 v7.5.0
gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528
gopkg.in/yaml.v2 v2.3.0
gotest.tools v2.2.0+incompatible
howett.net/plist v0.0.0-20181124034731-591f970eefbb
k8s.io/api v0.18.3
k8s.io/apimachinery v0.18.3
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/application/action_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ var _ actionConfigChangeSerializer = actionConfigChangeSerializer(fleetapi.Actio
type actionUnenrollSerializer struct {
ActionID string `yaml:"action_id"`
ActionType string `yaml:"action_type"`
IsDetected bool `yaml:"is_detected"`
}

// Add a guards between the serializer structs and the original struct.
Expand Down
50 changes: 38 additions & 12 deletions x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/scheduler"
)

const maxUnauthCounter int = 6

// Default Configuration for the Fleet Gateway.
var defaultGatewaySettings = &fleetGatewaySettings{
Duration: 1 * time.Second, // time between successful calls
Expand Down Expand Up @@ -59,18 +61,19 @@ type fleetAcker interface {
// call the API to send the events and will receive actions to be executed locally.
// The only supported action for now is a "ActionPolicyChange".
type fleetGateway struct {
bgContext context.Context
log *logger.Logger
dispatcher dispatcher
client clienter
scheduler scheduler.Scheduler
backoff backoff.Backoff
settings *fleetGatewaySettings
agentInfo agentInfo
reporter fleetReporter
done chan struct{}
wg sync.WaitGroup
acker fleetAcker
bgContext context.Context
log *logger.Logger
dispatcher dispatcher
client clienter
scheduler scheduler.Scheduler
backoff backoff.Backoff
settings *fleetGatewaySettings
agentInfo agentInfo
reporter fleetReporter
done chan struct{}
wg sync.WaitGroup
acker fleetAcker
unauthCounter int
}

func newFleetGateway(
Expand Down Expand Up @@ -203,6 +206,21 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
}

resp, err := cmd.Execute(ctx, req)
if isUnauth(err) {
f.unauthCounter++

if f.shouldUnroll() {
f.log.Warnf("retrieved unauthorized for '%d' times. Unrolling.", f.unauthCounter)
return &fleetapi.CheckinResponse{
Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}},
Success: true,
}, nil
}

return nil, err
}

f.unauthCounter = 0
if err != nil {
return nil, err
}
Expand All @@ -212,6 +230,14 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
return resp, nil
}

func (f *fleetGateway) shouldUnroll() bool {
return f.unauthCounter >= maxUnauthCounter
}

func isUnauth(err error) bool {
return errors.Is(err, fleetapi.ErrInvalidAPIKey)
}

func (f *fleetGateway) Start() {
f.wg.Add(1)
go func(wg *sync.WaitGroup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
// After running Unenroll agent is in idle state, non managed non standalone.
// For it to be operational again it needs to be either enrolled or reconfigured.
type handlerUnenroll struct {
log *logger.Logger
emitter emitterFunc
dispatcher programsDispatcher
closers []context.CancelFunc
log *logger.Logger
emitter emitterFunc
dispatcher programsDispatcher
closers []context.CancelFunc
actionStore *actionStore
}

func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker) error {
Expand All @@ -33,24 +34,26 @@ func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker
noPrograms := make(map[routingKey][]program.Program)
h.dispatcher.Dispatch(a.ID(), noPrograms)

if err := acker.Ack(ctx, action); err != nil {
return err
}

// commit all acks before quitting.
if err := acker.Commit(ctx); err != nil {
return err
if !action.IsDetected {
// ACK only events comming from fleet
if err := acker.Ack(ctx, action); err != nil {
return err
}

// commit all acks before quitting.
if err := acker.Commit(ctx); err != nil {
return err
}
} else if h.actionStore != nil {
// backup action for future start to avoid starting fleet gateway loop
h.actionStore.Add(a)
h.actionStore.Save()
}

// close fleet gateway loop
for _, c := range h.closers {
c()
}

// clean action store
// if err := os.Remove(info.AgentActionStoreFile()); err != nil && !os.IsNotExist(err) {
// return errors.New(err, "failed to clear action store")
// }

return nil
}
9 changes: 5 additions & 4 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,11 @@ func newManaged(
actionDispatcher.MustRegister(
&fleetapi.ActionUnenroll{},
&handlerUnenroll{
log: log,
emitter: emit,
dispatcher: router,
closers: []context.CancelFunc{managedApplication.cancelCtxFn},
log: log,
emitter: emit,
dispatcher: router,
closers: []context.CancelFunc{managedApplication.cancelCtxFn},
actionStore: actionStore,
},
)

Expand Down
47 changes: 46 additions & 1 deletion x-pack/elastic-agent/pkg/agent/errors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,27 @@

package errors

import "github.com/pkg/errors"
import (
goerrors "errors"
"reflect"

"github.com/pkg/errors"
)

// As is just a helper so user dont have to use multiple imports for errors.
func As(err error, target interface{}) bool {
return goerrors.As(err, target)
}

// Is is just a helper so user dont have to use multiple imports for errors.
func Is(err, target error) bool {
return goerrors.Is(err, target)
}

// Unwrap is just a helper so user dont have to use multiple imports for errors.
func Unwrap(err error) error {
return goerrors.Unwrap(err)
}

// MetaRecord is a entry of metadata enhancing an error.
type MetaRecord struct {
Expand Down Expand Up @@ -101,6 +121,31 @@ func (e agentError) Meta() map[string]interface{} {
return resultingMeta
}

// Equal compares errors and evaluates if they are the same or not.
// Agent error is not comparable due to included map so we need to
// do the heavy lifting ourselves.
func (e agentError) Equal(target error) bool {
targetErr, ok := target.(agentError)
if !ok {
return false
}

return errors.Is(e.err, targetErr.err) &&
e.errType == targetErr.errType &&
e.msg == targetErr.msg &&
reflect.DeepEqual(e.meta, targetErr.meta)

}

// Is checks whether agent err is an err.
func (e agentError) Is(target error) bool {
if agentErr, ok := target.(agentError); ok {
return e.Equal(agentErr)
}

return goerrors.Is(e.err, target)
}

// Check it implements Error
var _ Error = agentError{}

Expand Down
48 changes: 48 additions & 0 deletions x-pack/elastic-agent/pkg/agent/errors/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,60 @@ package errors

import (
"fmt"
"io"
"strings"
"testing"

"github.com/pkg/errors"
"gotest.tools/assert"
)

func TestErrorsIs(t *testing.T) {
type testCase struct {
id string
actualErr error
expectedErr error
expectedMatch bool
}

simpleErr := io.ErrNoProgress
simpleWrap := errors.Wrap(simpleErr, "wrapping %w")
agentErr := New()
nestedSimple := New(simpleErr)
nestedWrap := New(simpleWrap)
agentInErr := errors.Wrap(nestedWrap, "wrapping %w")

tt := []testCase{
{"simple wrap", simpleWrap, simpleErr, true},
{"simple mismatch", simpleWrap, errors.New("sample"), false},

{"direct nested - root check", nestedSimple, simpleErr, true},
{"direct nested - mismatch", nestedSimple, errors.New("sample"), false},
{"direct nested - comparing agent errors", nestedSimple, agentErr, false},

{"deep nested - root check", New(nestedSimple), simpleErr, true},
{"deep nested - mismatch", New(nestedSimple), errors.New("sample"), false},
{"deep nested - comparing agent errors", New(nestedSimple), agentErr, false},

{"nested wrap - wrap check", New(nestedWrap), simpleWrap, true},
{"nested wrap - root", New(nestedWrap), simpleErr, true},

{"comparing agent errors", New(agentErr), agentErr, true},

{"agent in error", agentInErr, nestedWrap, true},
{"agent in error wrap", agentInErr, simpleWrap, true},
{"agent in error root", agentInErr, simpleErr, true},
{"agent in error nil check", agentInErr, nil, false},
}

for _, tc := range tt {
t.Run(tc.id, func(t *testing.T) {
match := Is(tc.actualErr, tc.expectedErr)
assert.Equal(t, tc.expectedMatch, match)
})
}
}

func TestErrorsWrap(t *testing.T) {
ce := New("custom error", TypePath, M("k", "v"))
ew := errors.Wrap(ce, "wrapper")
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/fleetapi/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (a *ActionConfigChange) ID() string {
type ActionUnenroll struct {
ActionID string
ActionType string
IsDetected bool
}

func (a *ActionUnenroll) String() string {
Expand Down

0 comments on commit c6d44a1

Please sign in to comment.