Skip to content

Commit

Permalink
Kubevirt: Defer eve reboot/shutdown/update until drain completes
Browse files Browse the repository at this point in the history
As a part of kubevirt-eve we have multiple cluster nodes each
hosting app workloads and volume replicas.  This implements defer
for eve mgmt operations which will result in unavailability of storage
replicas.  An example:

1. Node 1 outage and recovers.
2. Before volumes complete rebuilding on node 1: Node 2 outage and recovery.
3. Volumes begin rebuilding replicas on nodes 1 and 2.
4. User initiated request to reboot/shutdown/update eve-os on node3.
5. That config request is set to defer until replicas are rebuilt on the other nodes.

Signed-off-by: Andrew Durbin <[email protected]>
  • Loading branch information
andrewd-zededa committed Dec 23, 2024
1 parent 0769a20 commit 432f137
Show file tree
Hide file tree
Showing 17 changed files with 989 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/CONFIG-PROPERTIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
| goroutine.leak.detection.check.window.minutes | integer (minutes) | 10 | Interval in minutes for which the leak analysis is performed. It should contain at least 10 measurements, so no less than 10 × goroutine.leak.detection.check.interval.minutes. |
| goroutine.leak.detection.keep.stats.hours | integer (hours) | 24 | Amount of hours to keep the stats for leak detection. We keep more stats than the check window to be able to react to settings with a bigger check window via configuration. |
| goroutine.leak.detection.cooldown.minutes | integer (minutes) | 5 | Cooldown period in minutes after the leak detection is triggered. During this period, no stack traces are collected; only warning messages are logged. |
| kubevirt.drain.timeout | integer | 24 | hours to allow kubernetes to drain a node |

## Log levels

Expand Down
8 changes: 8 additions & 0 deletions pkg/pillar/cmd/baseosmgr/baseosmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type baseOsMgrContext struct {
subContentTreeStatus pubsub.Subscription
subNodeAgentStatus pubsub.Subscription
subZedAgentStatus pubsub.Subscription
subNodeDrainStatus pubsub.Subscription
pubNodeDrainRequest pubsub.Publication
deferredBaseOsID string
rebootReason string // From last reboot
rebootTime time.Time // From last reboot
rebootImage string // Image from which the last reboot happened
Expand Down Expand Up @@ -136,6 +139,8 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
}
log.Functionf("user containerd ready")

initNodeDrainPubSub(ps, &ctx)

// start the forever loop for event handling
for {
select {
Expand All @@ -157,6 +162,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case change := <-ctx.subZedAgentStatus.MsgChan():
ctx.subZedAgentStatus.ProcessChange(change)

case change := <-ctx.subNodeDrainStatus.MsgChan():
ctx.subNodeDrainStatus.ProcessChange(change)

case res := <-ctx.worker.MsgChan():
res.Process(&ctx, true)

Expand Down
11 changes: 11 additions & 0 deletions pkg/pillar/cmd/baseosmgr/handlebaseos.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ func baseOsHandleStatusUpdateUUID(ctx *baseOsMgrContext, id string) {
return
}

// We want to wait to drain until we're sure we actually have a usable image local.
// eve baseos image is downloaded locally verified, available, and most importantly has been activated
// before the node downtime/reboot is initiated, see if we need to defer the operation
if ((status.State == types.LOADED) || (status.State == types.INSTALLED)) && config.Activate && !status.Activated {
log.Tracef("baseOsHandleStatusUpdateUUID() image just activated id:%s config:%v status:%v state:%s", id, config, status, status.State)
deferUpdate := shouldDeferForNodeDrain(ctx, id, config, status)
if deferUpdate {
return
}
}

// handle the change event for this base os config
baseOsHandleStatusUpdate(ctx, config, status)
}
Expand Down
128 changes: 128 additions & 0 deletions pkg/pillar/cmd/baseosmgr/handlenodedrain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) 2024 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0

package baseosmgr

import (
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
"github.com/lf-edge/eve/pkg/pillar/pubsub"
"github.com/lf-edge/eve/pkg/pillar/types"
)

func handleNodeDrainStatusCreate(ctxArg interface{}, key string,
configArg interface{}) {
handleNodeDrainStatusImpl(ctxArg, key, configArg, nil)
}

func handleNodeDrainStatusModify(ctxArg interface{}, key string,
configArg interface{}, oldConfigArg interface{}) {
handleNodeDrainStatusImpl(ctxArg, key, configArg, oldConfigArg)
}

func handleNodeDrainStatusImpl(ctxArg interface{}, _ string,
configArg interface{}, _ interface{}) {
newStatus, ok := configArg.(kubeapi.NodeDrainStatus)
if !ok {
log.Fatalf("handleNodeDrainStatusImpl invalid type in configArg: %v", configArg)
}
ctx, ok := ctxArg.(*baseOsMgrContext)
if !ok {
log.Fatalf("handleNodeDrainStatusImpl invalid type in ctxArg: %v", ctxArg)
}

if newStatus.RequestedBy != kubeapi.UPDATE {
return
}

log.Functionf("handleNodeDrainStatusImpl to:%v", newStatus)
if (newStatus.Status == kubeapi.FAILEDCORDON) ||
(newStatus.Status == kubeapi.FAILEDDRAIN) {
log.Errorf("handleNodeDrainStatusImpl nodedrain-step:drain-failed-handler unpublish NodeDrainRequest due to NodeDrainStatus:%v", newStatus)
if err := ctx.pubNodeDrainRequest.Unpublish("global"); err != nil {
log.Errorf("Unable to remove NodeDrainRequest object:%v", err)
}
}
if newStatus.Status == kubeapi.COMPLETE {
id := ctx.deferredBaseOsID
if id != "" {
log.Noticef("handleNodeDrainStatusImpl nodedrain-step:drain-complete-handler, continuing baseosstatus update id:%s", id)
baseOsHandleStatusUpdateUUID(ctx, id)
}
}
}

func handleNodeDrainStatusDelete(_ interface{}, _ string,
_ interface{}) {
log.Function("handleNodeDrainStatusDelete")
}

func initNodeDrainPubSub(ps *pubsub.PubSub, ctx *baseOsMgrContext) {
subNodeDrainStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{
AgentName: "zedkube",
MyAgentName: agentName,
TopicImpl: kubeapi.NodeDrainStatus{},
Persistent: false,
Activate: false,
Ctx: ctx,
CreateHandler: handleNodeDrainStatusCreate,
ModifyHandler: handleNodeDrainStatusModify,
DeleteHandler: handleNodeDrainStatusDelete,
WarningTime: warningTime,
ErrorTime: errorTime,
})
if err != nil {
log.Fatalf("initNodeDrainPubSub subNodeDrainStatus err:%v", err)
return
}
if err := subNodeDrainStatus.Activate(); err != nil {
log.Fatalf("initNodeDrainPubSub can't activate sub:%v", err)
}

pubNodeDrainRequest, err := ps.NewPublication(
pubsub.PublicationOptions{
AgentName: agentName,
TopicType: kubeapi.NodeDrainRequest{},
})
if err != nil {
log.Fatalf("initNodeDrainPubSub pubNodeDrainRequest err:%v", err)
return
}
ctx.subNodeDrainStatus = subNodeDrainStatus
ctx.pubNodeDrainRequest = pubNodeDrainRequest
}

// shouldDeferForNodeDrain will return true if this BaseOsStatus update will be handled later
func shouldDeferForNodeDrain(ctx *baseOsMgrContext, id string, config *types.BaseOsConfig, status *types.BaseOsStatus) bool {
drainStatus := kubeapi.GetNodeDrainStatus(ctx.subNodeDrainStatus, log)
if drainStatus.Status == kubeapi.UNKNOWN {
log.Error("shouldDeferForNodeDrain EARLY boot request, zedkube not up yet")
return false
}

log.Noticef("shouldDeferForNodeDrain drainCheck id:%s state:%d baseOsConfig:%v baseOsStatus:%v drainStatus:%d",
id, status.State, config, status, drainStatus.Status)
if drainStatus.Status == kubeapi.NOTREQUESTED {
ctx.deferredBaseOsID = id
log.Noticef("shouldDeferForNodeDrain nodedrain-step:request requester:eve-os-update ctx:%s", id)
err := kubeapi.RequestNodeDrain(ctx.pubNodeDrainRequest, kubeapi.UPDATE, id)
if err != nil {
log.Errorf("shouldDeferForNodeDrain: can't request node drain: %v", err)
}
return true
}
if drainStatus.Status == kubeapi.REQUESTED ||
drainStatus.Status == kubeapi.STARTING ||
drainStatus.Status == kubeapi.CORDONED ||
drainStatus.Status == kubeapi.FAILEDCORDON ||
drainStatus.Status == kubeapi.DRAINRETRYING ||
drainStatus.Status == kubeapi.FAILEDDRAIN {
log.Functionf("shouldDeferForNodeDrain drain in-progress or in error, still defer")
return true
}

if drainStatus.Status != kubeapi.COMPLETE {
log.Errorf("shouldDeferForNodeDrain unhanded NodeDrainStatus:%v", drainStatus)
}
log.Noticef("shouldDeferForNodeDrain nodedrain-step:handle-complete requester:eve-os-update ctx:%s", id)
return false
}
6 changes: 6 additions & 0 deletions pkg/pillar/cmd/diag/diag.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type diagContext struct {
appInstanceSummary types.AppInstanceSummary
subAppInstanceStatus pubsub.Subscription
subDownloaderStatus pubsub.Subscription
subNodeDrainStatus pubsub.Subscription
zedcloudMetrics *zedcloud.AgentMetrics
gotBC bool
gotDNS bool
Expand Down Expand Up @@ -381,6 +382,8 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
ctx.subDownloaderStatus = subDownloaderStatus
subDownloaderStatus.Activate()

initDrainSub(ps, &ctx)

cloudPingMetricPub, err := ps.NewPublication(
pubsub.PublicationOptions{
AgentName: agentName,
Expand Down Expand Up @@ -429,6 +432,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar

case change := <-subDownloaderStatus.MsgChan():
subDownloaderStatus.ProcessChange(change)

case change := <-ctx.subNodeDrainStatus.MsgChan():
ctx.subNodeDrainStatus.ProcessChange(change)
}
// Is this the first time we have all the info to print?
if !gotAll && ctx.gotBC && ctx.gotDNS && ctx.gotDPCList {
Expand Down
60 changes: 60 additions & 0 deletions pkg/pillar/cmd/diag/handlenodedrain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package diag

import (
"time"

"github.com/lf-edge/eve/pkg/pillar/kubeapi"
"github.com/lf-edge/eve/pkg/pillar/pubsub"
)

func initDrainSub(ps *pubsub.PubSub, ctx *diagContext) {
subNodeDrainStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{
AgentName: "zedkube",
MyAgentName: agentName,
TopicImpl: kubeapi.NodeDrainStatus{},
Persistent: false,
Activate: true,
Ctx: ctx,
CreateHandler: handleNodeDrainStatusCreate,
ModifyHandler: handleNodeDrainStatusModify,
DeleteHandler: handleNodeDrainStatusDelete,
WarningTime: warningTime,
ErrorTime: errorTime,
})
if err != nil {
log.Fatal(err)
}
ctx.subNodeDrainStatus = subNodeDrainStatus
ctx.subNodeDrainStatus.Activate()
}

func handleNodeDrainStatusCreate(ctxArg interface{}, key string,
configArg interface{}) {
handleNodeDrainStatusImpl(ctxArg, key, configArg, nil)
}

func handleNodeDrainStatusModify(ctxArg interface{}, key string,
configArg interface{}, oldConfigArg interface{}) {
handleNodeDrainStatusImpl(ctxArg, key, configArg, oldConfigArg)
}

func handleNodeDrainStatusImpl(ctxArg interface{}, key string,
configArg interface{}, oldConfigArg interface{}) {
ctx := ctxArg.(*diagContext)
newStatus := configArg.(kubeapi.NodeDrainStatus)
printNodeDrainStatus(ctx, newStatus)
}

func printNodeDrainStatus(ctx *diagContext, newStatus kubeapi.NodeDrainStatus) {
ts := time.Now().Format(time.RFC3339Nano)
if newStatus.Status < kubeapi.REQUESTED {
// Just print the transitions which are linked to lengthy operations or errors
return
}
ctx.ph.Print("INFO: Node Drain -> %s at %v\n", newStatus.Status.String(), ts)
ctx.ph.Flush()
}

func handleNodeDrainStatusDelete(ctxArg interface{}, key string,
statusArg interface{}) {
}
76 changes: 76 additions & 0 deletions pkg/pillar/cmd/nodeagent/handlenodedrain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package nodeagent

import (
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
"github.com/lf-edge/eve/pkg/pillar/pubsub"
)

func handleNodeDrainStatusCreateNA(ctxArg interface{}, key string,
configArg interface{}) {
handleNodeDrainStatusImplNA(ctxArg, key, configArg, nil)
}

func handleNodeDrainStatusModifyNA(ctxArg interface{}, key string,
configArg interface{}, oldConfigArg interface{}) {
handleNodeDrainStatusImplNA(ctxArg, key, configArg, oldConfigArg)
}

func handleNodeDrainStatusImplNA(ctxArg interface{}, _ string,
configArg interface{}, _ interface{}) {
ctx, ok := ctxArg.(*nodeagentContext)
if !ok {
log.Fatalf("handleNodeDrainStatusImplNA invalid type in ctxArg:%v", ctxArg)
}
newStatus, ok := configArg.(kubeapi.NodeDrainStatus)
if !ok {
log.Fatalf("handleNodeDrainStatusImplNA invalid type in configArg:%v", configArg)
}

if newStatus.RequestedBy != kubeapi.DEVICEOP {
return
}

log.Noticef("handleNodeDrainStatusImplNA to:%v", newStatus)
// NodeDrainStatus Failures here should keep drainInProgress set.
// As this will set DrainInProgress on NodeAgentStatus and keep zedagent from allowing
// the deferred operation to continue.
if (newStatus.Status >= kubeapi.REQUESTED) && (newStatus.Status < kubeapi.COMPLETE) {
log.Noticef("handleNodeDrainStatusImplNA nodedrain-step:drain-inprogress-handler NodeDrainStatus:%v", newStatus)
ctx.drainInProgress = true
publishNodeAgentStatus(ctx)
}
if newStatus.Status == kubeapi.COMPLETE {
log.Notice("handleNodeDrainStatusImplNA nodedrain-step:drain-complete-handler notify zedagent")
ctx.drainInProgress = false
publishNodeAgentStatus(ctx)
}
}

func handleNodeDrainStatusDeleteNA(_ interface{}, _ string,
_ interface{}) {
log.Functionf("handleNodeDrainStatusDeleteNA")
}

func initNodeDrainPubSub(ps *pubsub.PubSub, ctx *nodeagentContext) {
subNodeDrainStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{
AgentName: "zedkube",
MyAgentName: agentName,
TopicImpl: kubeapi.NodeDrainStatus{},
Persistent: false,
Activate: false,
Ctx: ctx,
CreateHandler: handleNodeDrainStatusCreateNA,
ModifyHandler: handleNodeDrainStatusModifyNA,
DeleteHandler: handleNodeDrainStatusDeleteNA,
WarningTime: warningTime,
ErrorTime: errorTime,
})
if err != nil {
log.Fatalf("initNodeDrainPubSub subNodeDrainStatus err:%v", err)
return
}
if err := subNodeDrainStatus.Activate(); err != nil {
log.Fatalf("initNodeDrainPubSub activate err:%v", err)
}
ctx.subNodeDrainStatus = subNodeDrainStatus
}
9 changes: 9 additions & 0 deletions pkg/pillar/cmd/nodeagent/nodeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type nodeagentContext struct {
subDomainStatus pubsub.Subscription
subVaultStatus pubsub.Subscription
subVolumeMgrStatus pubsub.Subscription
subNodeDrainStatus pubsub.Subscription
pubZbootConfig pubsub.Publication
pubNodeAgentStatus pubsub.Publication
curPart string
Expand Down Expand Up @@ -115,6 +116,8 @@ type nodeagentContext struct {
configGetSuccess bool // got config from controller success
vaultmgrReported bool // got reports from vaultmgr
hvTypeKube bool // image is kubernetes cluster type
drainInProgress bool
drainComplete bool

// Some constants.. Declared here as variables to enable unit tests
minRebootDelay uint32
Expand Down Expand Up @@ -364,6 +367,8 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
ctxPtr.subZedAgentStatus = subZedAgentStatus
subZedAgentStatus.Activate()

initNodeDrainPubSub(ps, ctxPtr)

log.Functionf("zedbox event loop")
for {
select {
Expand All @@ -388,6 +393,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case change := <-subVolumeMgrStatus.MsgChan():
subVolumeMgrStatus.ProcessChange(change)

case change := <-ctxPtr.subNodeDrainStatus.MsgChan():
ctxPtr.subNodeDrainStatus.ProcessChange(change)

case <-ctxPtr.stillRunning.C:
}
ps.StillRunning(agentName, warningTime, errorTime)
Expand Down Expand Up @@ -722,6 +730,7 @@ func publishNodeAgentStatus(ctxPtr *nodeagentContext) {
LocalMaintenanceMode: ctxPtr.maintMode,
LocalMaintenanceModeReason: ctxPtr.maintModeReason,
HVTypeKube: ctxPtr.hvTypeKube,
DrainInProgress: ctxPtr.drainInProgress,
}
ctxPtr.lastLock.Unlock()
pub.Publish(agentName, status)
Expand Down
Loading

0 comments on commit 432f137

Please sign in to comment.