From b20c96c57246fc2fe41e7d6d6daab8f01c9b48ba Mon Sep 17 00:00:00 2001
From: Michal Pristas <michal.pristas@gmail.com>
Date: Mon, 5 Oct 2020 19:32:12 +0200
Subject: [PATCH] [Ingest Manager] Send updating state (#21461) (#21530)

[Ingest Manager] Send updating state (#21461)
---
 x-pack/elastic-agent/CHANGELOG.next.asciidoc  |  1 +
 .../pkg/agent/application/managed_mode.go     |  4 +-
 .../pkg/agent/application/upgrade/upgrade.go  | 77 +++++++++++++++++--
 x-pack/elastic-agent/pkg/core/state/state.go  |  2 +
 x-pack/elastic-agent/pkg/reporter/reporter.go |  6 ++
 5 files changed, 81 insertions(+), 9 deletions(-)

diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc
index 2ba08864ae85..278a9ea9cf45 100644
--- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc
+++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc
@@ -28,3 +28,4 @@
 - Add support for EQL based condition on inputs {pull}20994[20994]
 - Send `fleet.host.id` to Endpoint Security {pull}21042[21042]
 - Add `install` and `uninstall` subcommands {pull}21206[21206]
+- Send updating state {pull}21461[21461]
diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go
index 34e1242d1e0f..66a252203fc6 100644
--- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go
+++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go
@@ -204,11 +204,13 @@ func newManaged(
 	}
 
 	managedApplication.upgrader = upgrade.NewUpgrader(
+		agentInfo,
 		cfg.Settings.DownloadConfig,
 		log,
 		[]context.CancelFunc{managedApplication.cancelCtxFn},
 		reexec,
-		acker)
+		acker,
+		combinedReporter)
 
 	actionDispatcher.MustRegister(
 		&fleetapi.ActionPolicyChange{},
diff --git a/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go b/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go
index cac36ef7922a..7aacf77ba634 100644
--- a/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go
+++ b/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go
@@ -20,6 +20,7 @@ import (
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/install"
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
+	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
 )
@@ -33,11 +34,13 @@ const (
 
 // Upgrader performs an upgrade
 type Upgrader struct {
+	agentInfo   *info.AgentInfo
 	settings    *artifact.Config
 	log         *logger.Logger
 	closers     []context.CancelFunc
 	reexec      reexecManager
 	acker       acker
+	reporter    stateReporter
 	upgradeable bool
 }
 
@@ -50,14 +53,19 @@ type acker interface {
 	Commit(ctx context.Context) error
 }
 
+type stateReporter interface {
+	OnStateChange(id string, name string, s state.State)
+}
+
 // NewUpgrader creates an upgrader which is capable of performing upgrade operation
-func NewUpgrader(settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker) *Upgrader {
+func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter) *Upgrader {
 	return &Upgrader{
 		settings:    settings,
 		log:         log,
 		closers:     closers,
 		reexec:      reexec,
 		acker:       a,
+		reporter:    r,
 		upgradeable: getUpgradable(),
 	}
 }
@@ -68,13 +76,22 @@ func (u *Upgrader) Upgradeable() bool {
 }
 
 // Upgrade upgrades running agent
-func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) error {
+func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err error) {
+	// report failed
+	defer func() {
+		if err != nil {
+			u.reportFailure(ctx, a, err)
+		}
+	}()
+
 	if !u.upgradeable {
 		return fmt.Errorf(
 			"cannot be upgraded; must be installed with install sub-command and " +
 				"running under control of the systems supervisor")
 	}
 
+	u.reportUpdating(a.Version)
+
 	sourceURI, err := u.sourceURI(a.Version, a.SourceURI)
 	archivePath, err := u.downloadArtifact(ctx, a.Version, sourceURI)
 	if err != nil {
@@ -91,7 +108,10 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) error
 	}
 
 	if strings.HasPrefix(release.Commit(), newHash) {
-		return errors.New("upgrading to same version")
+		// not an error
+		u.ackAction(ctx, a)
+		u.log.Warn("upgrading to same version")
+		return nil
 	}
 
 	if err := copyActionStore(newHash); err != nil {
@@ -132,11 +152,7 @@ func (u *Upgrader) Ack(ctx context.Context) error {
 		return nil
 	}
 
-	if err := u.acker.Ack(ctx, marker.Action); err != nil {
-		return err
-	}
-
-	if err := u.acker.Commit(ctx); err != nil {
+	if err := u.ackAction(ctx, marker.Action); err != nil {
 		return err
 	}
 
@@ -148,6 +164,7 @@ func (u *Upgrader) Ack(ctx context.Context) error {
 
 	return ioutil.WriteFile(markerFile, markerBytes, 0600)
 }
+
 func (u *Upgrader) sourceURI(version, retrievedURI string) (string, error) {
 	if strings.HasSuffix(version, "-SNAPSHOT") && retrievedURI == "" {
 		return "", errors.New("snapshot upgrade requires source uri", errors.TypeConfig)
@@ -159,6 +176,50 @@ func (u *Upgrader) sourceURI(version, retrievedURI string) (string, error) {
 	return u.settings.SourceURI, nil
 }
 
+// ackAction is used for successful updates, it was either updated successfully or to the same version
+// so we need to remove updating state and get prevent from receiving same update action again.
+func (u *Upgrader) ackAction(ctx context.Context, action fleetapi.Action) error {
+	if err := u.acker.Ack(ctx, action); err != nil {
+		return err
+	}
+
+	if err := u.acker.Commit(ctx); err != nil {
+		return err
+	}
+
+	u.reporter.OnStateChange(
+		"",
+		agentName,
+		state.State{Status: state.Running},
+	)
+
+	return nil
+}
+
+// report failure is used when update process fails. action is acked so it won't be received again
+// and state is changed to FAILED
+func (u *Upgrader) reportFailure(ctx context.Context, action fleetapi.Action, err error) {
+	// ack action
+	u.acker.Ack(ctx, action)
+
+	// report failure
+	u.reporter.OnStateChange(
+		"",
+		agentName,
+		state.State{Status: state.Failed, Message: err.Error()},
+	)
+}
+
+// reportUpdating sets state of agent to updating.
+func (u *Upgrader) reportUpdating(version string) {
+	// report failure
+	u.reporter.OnStateChange(
+		"",
+		agentName,
+		state.State{Status: state.Updating, Message: fmt.Sprintf("Update to version '%s' started", version)},
+	)
+}
+
 func rollbackInstall(hash string) {
 	os.RemoveAll(filepath.Join(paths.Data(), fmt.Sprintf("%s-%s", agentName, hash)))
 }
diff --git a/x-pack/elastic-agent/pkg/core/state/state.go b/x-pack/elastic-agent/pkg/core/state/state.go
index 6b7c8bd53dec..670cdc2a2f23 100644
--- a/x-pack/elastic-agent/pkg/core/state/state.go
+++ b/x-pack/elastic-agent/pkg/core/state/state.go
@@ -30,6 +30,8 @@ const (
 	Crashed
 	// Restarting is status describing application is restarting.
 	Restarting
+	// Updating is status describing application is updating.
+	Updating
 )
 
 // State wraps the process state and application status.
diff --git a/x-pack/elastic-agent/pkg/reporter/reporter.go b/x-pack/elastic-agent/pkg/reporter/reporter.go
index c36708a837f7..3b128841b2a1 100644
--- a/x-pack/elastic-agent/pkg/reporter/reporter.go
+++ b/x-pack/elastic-agent/pkg/reporter/reporter.go
@@ -38,6 +38,8 @@ const (
 	EventSubTypeFailed = "FAILED"
 	// EventSubTypeStopping is an event type indicating application is stopping.
 	EventSubTypeStopping = "STOPPING"
+	// EventSubTypeUpdating is an event type indicating update process in progress.
+	EventSubTypeUpdating = "UPDATING"
 )
 
 type agentInfo interface {
@@ -127,6 +129,10 @@ func generateRecord(agentID string, id string, name string, s state.State) event
 	case state.Restarting:
 		subType = EventSubTypeStarting
 		subTypeText = "RESTARTING"
+	case state.Updating:
+		subType = EventSubTypeUpdating
+		subTypeText = EventSubTypeUpdating
+
 	}
 
 	err := errors.New(