Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubevirt: Defer eve reboot/shutdown/update until drain completes #4494

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions docs/CONFIG-PROPERTIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
| goroutine.leak.detection.check.window.minutes | integer (minutes) | 10 | Interval in minutes for which the leak analysis is performed. It should contain at least 10 measurements, so no less than 10 × goroutine.leak.detection.check.interval.minutes. |
| goroutine.leak.detection.keep.stats.hours | integer (hours) | 24 | Amount of hours to keep the stats for leak detection. We keep more stats than the check window to be able to react to settings with a bigger check window via configuration. |
| goroutine.leak.detection.cooldown.minutes | integer (minutes) | 5 | Cooldown period in minutes after the leak detection is triggered. During this period, no stack traces are collected; only warning messages are logged. |
| kubevirt.drain.timeout | integer | 24 | hours to allow kubernetes to drain a node |

## Log levels

Expand Down
7 changes: 7 additions & 0 deletions pkg/pillar/cmd/baseosmgr/baseosmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type baseOsMgrContext struct {
subContentTreeStatus pubsub.Subscription
subNodeAgentStatus pubsub.Subscription
subZedAgentStatus pubsub.Subscription
subNodeDrainStatus pubsub.Subscription
pubNodeDrainRequest pubsub.Publication
deferredBaseOsID string
rebootReason string // From last reboot
rebootTime time.Time // From last reboot
rebootImage string // Image from which the last reboot happened
Expand Down Expand Up @@ -103,6 +106,7 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
initializeNodeAgentHandles(ps, &ctx)
initializeZedagentHandles(ps, &ctx)
initializeVolumemgrHandles(ps, &ctx)
initializeNodeDrainHandles(ps, &ctx)

// publish initial zboot partition status
updateAndPublishZbootStatusAll(&ctx)
Expand Down Expand Up @@ -157,6 +161,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case change := <-ctx.subZedAgentStatus.MsgChan():
ctx.subZedAgentStatus.ProcessChange(change)

case change := <-ctx.subNodeDrainStatus.MsgChan():
ctx.subNodeDrainStatus.ProcessChange(change)

case res := <-ctx.worker.MsgChan():
res.Process(&ctx, true)

Expand Down
11 changes: 11 additions & 0 deletions pkg/pillar/cmd/baseosmgr/handlebaseos.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ func baseOsHandleStatusUpdateUUID(ctx *baseOsMgrContext, id string) {
return
}

// We want to wait to drain until we're sure we actually have a usable image locally.
// eve baseos image is downloaded locally, verified, available, and most importantly has been activated
// before the node downtime/reboot is initiated, see if we need to defer the operation
if ((status.State == types.LOADED) || (status.State == types.INSTALLED)) && config.Activate && !status.Activated {
log.Tracef("baseOsHandleStatusUpdateUUID() image just activated id:%s config:%v status:%v state:%s", id, config, status, status.State)
deferUpdate := shouldDeferForNodeDrain(ctx, id, config, status)
if deferUpdate {
return
}
}

// handle the change event for this base os config
baseOsHandleStatusUpdate(ctx, config, status)
}
Expand Down
136 changes: 136 additions & 0 deletions pkg/pillar/cmd/baseosmgr/handlenodedrain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (c) 2025 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0

package baseosmgr

import (
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
"github.com/lf-edge/eve/pkg/pillar/pubsub"
"github.com/lf-edge/eve/pkg/pillar/types"
)

func handleNodeDrainStatusCreate(ctxArg interface{}, key string,
configArg interface{}) {
handleNodeDrainStatusImpl(ctxArg, key, configArg, nil)
}

func handleNodeDrainStatusModify(ctxArg interface{}, key string,
configArg interface{}, oldConfigArg interface{}) {
handleNodeDrainStatusImpl(ctxArg, key, configArg, oldConfigArg)
}

func handleNodeDrainStatusImpl(ctxArg interface{}, _ string,
configArg interface{}, _ interface{}) {
newStatus, ok := configArg.(kubeapi.NodeDrainStatus)
if !ok {
log.Errorf("handleNodeDrainStatusImpl invalid type in configArg: %v", configArg)
andrewd-zededa marked this conversation as resolved.
Show resolved Hide resolved
return
}
ctx, ok := ctxArg.(*baseOsMgrContext)
if !ok {
log.Errorf("handleNodeDrainStatusImpl invalid type in ctxArg: %v", ctxArg)
return
}

if newStatus.RequestedBy != kubeapi.UPDATE {
return
}

log.Functionf("handleNodeDrainStatusImpl to:%v", newStatus)
if (newStatus.Status == kubeapi.FAILEDCORDON) ||
(newStatus.Status == kubeapi.FAILEDDRAIN) {
log.Errorf("handleNodeDrainStatusImpl nodedrain-step:drain-failed-handler unpublish NodeDrainRequest due to NodeDrainStatus:%v", newStatus)
if err := ctx.pubNodeDrainRequest.Unpublish("global"); err != nil {
log.Errorf("Unable to remove NodeDrainRequest object:%v", err)
}
}
if newStatus.Status == kubeapi.COMPLETE {
id := ctx.deferredBaseOsID
if id != "" {
log.Noticef("handleNodeDrainStatusImpl nodedrain-step:drain-complete-handler, continuing baseosstatus update id:%s", id)
baseOsHandleStatusUpdateUUID(ctx, id)
}
}
}

func handleNodeDrainStatusDelete(_ interface{}, _ string,
_ interface{}) {
log.Function("handleNodeDrainStatusDelete")
}

func initializeNodeDrainHandles(ps *pubsub.PubSub, ctx *baseOsMgrContext) {
subNodeDrainStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{
AgentName: "zedkube",
MyAgentName: agentName,
TopicImpl: kubeapi.NodeDrainStatus{},
Persistent: false,
Activate: false,
Ctx: ctx,
CreateHandler: handleNodeDrainStatusCreate,
ModifyHandler: handleNodeDrainStatusModify,
DeleteHandler: handleNodeDrainStatusDelete,
WarningTime: warningTime,
ErrorTime: errorTime,
})
if err != nil {
log.Fatalf("initNodeDrainPubSub subNodeDrainStatus err:%v", err)
return
}
if err := subNodeDrainStatus.Activate(); err != nil {
log.Fatalf("initNodeDrainPubSub can't activate sub:%v", err)
}

pubNodeDrainRequest, err := ps.NewPublication(
pubsub.PublicationOptions{
AgentName: agentName,
TopicType: kubeapi.NodeDrainRequest{},
})
if err != nil {
log.Fatalf("initNodeDrainPubSub pubNodeDrainRequest err:%v", err)
}
ctx.subNodeDrainStatus = subNodeDrainStatus
ctx.pubNodeDrainRequest = pubNodeDrainRequest
}

// shouldDeferForNodeDrain will return true if this BaseOsStatus update will be handled later
func shouldDeferForNodeDrain(ctx *baseOsMgrContext, id string, config *types.BaseOsConfig, status *types.BaseOsStatus) bool {
drainStatus := kubeapi.GetNodeDrainStatus(ctx.subNodeDrainStatus, log)
if drainStatus.Status == kubeapi.NOTSUPPORTED {
return false
}
if drainStatus.Status == kubeapi.UNKNOWN {
log.Error("shouldDeferForNodeDrain EARLY boot request, zedkube not up yet")
return false
}

log.Noticef("shouldDeferForNodeDrain drainCheck id:%s state:%d baseOsConfig:%v baseOsStatus:%v drainStatus:%d",
id, status.State, config, status, drainStatus.Status)
// To allow switching baseos version mid-drain, keep this general to all
// cases of: restarting-failed-drain, starting-fresh-drain
ctx.deferredBaseOsID = id

if drainStatus.Status == kubeapi.NOTREQUESTED ||
drainStatus.Status == kubeapi.FAILEDCORDON ||
drainStatus.Status == kubeapi.FAILEDDRAIN {
log.Noticef("shouldDeferForNodeDrain nodedrain-step:request requester:eve-os-update ctx:%s", id)
err := kubeapi.RequestNodeDrain(ctx.pubNodeDrainRequest, kubeapi.UPDATE, id)
if err != nil {
log.Errorf("shouldDeferForNodeDrain: can't request node drain: %v", err)
}
return true
}
if drainStatus.Status == kubeapi.REQUESTED ||
drainStatus.Status == kubeapi.STARTING ||
drainStatus.Status == kubeapi.CORDONED ||
drainStatus.Status == kubeapi.DRAINRETRYING {
log.Functionf("shouldDeferForNodeDrain drain in-progress or in error, still defer")
return true
}

if drainStatus.Status != kubeapi.COMPLETE {
log.Errorf("shouldDeferForNodeDrain unhanded NodeDrainStatus:%v", drainStatus)
}

log.Noticef("shouldDeferForNodeDrain nodedrain-step:handle-complete requester:eve-os-update ctx:%s", id)
return false
}
8 changes: 8 additions & 0 deletions pkg/pillar/cmd/diag/diag.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type diagContext struct {
appInstanceSummary types.AppInstanceSummary
subAppInstanceStatus pubsub.Subscription
subDownloaderStatus pubsub.Subscription
subNodeDrainStatus pubsub.Subscription
zedcloudMetrics *zedcloud.AgentMetrics
gotBC bool
gotDNS bool
Expand Down Expand Up @@ -381,6 +382,8 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
ctx.subDownloaderStatus = subDownloaderStatus
subDownloaderStatus.Activate()

initDrainSub(ps, &ctx)

cloudPingMetricPub, err := ps.NewPublication(
pubsub.PublicationOptions{
AgentName: agentName,
Expand Down Expand Up @@ -429,6 +432,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar

case change := <-subDownloaderStatus.MsgChan():
subDownloaderStatus.ProcessChange(change)

case change := <-ctx.subNodeDrainStatus.MsgChan():
ctx.subNodeDrainStatus.ProcessChange(change)
}
// Is this the first time we have all the info to print?
if !gotAll && ctx.gotBC && ctx.gotDNS && ctx.gotDPCList {
Expand Down Expand Up @@ -1063,6 +1069,8 @@ func printOutput(ctx *diagContext, caller string) {
ds.Name, ds.ImageSha256, ds.Progress, ds.TotalSize)
}
}

printNodeDrainStatus(ctx)
ctx.ph.Flush()
}

Expand Down
82 changes: 82 additions & 0 deletions pkg/pillar/cmd/diag/handlenodedrain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2025 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0

package diag

import (
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
"github.com/lf-edge/eve/pkg/pillar/pubsub"
)

func initDrainSub(ps *pubsub.PubSub, ctx *diagContext) {
subNodeDrainStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{
AgentName: "zedkube",
MyAgentName: agentName,
TopicImpl: kubeapi.NodeDrainStatus{},
Persistent: false,
Activate: true,
Ctx: ctx,
CreateHandler: handleNodeDrainStatusCreate,
ModifyHandler: handleNodeDrainStatusModify,
DeleteHandler: handleNodeDrainStatusDelete,
WarningTime: warningTime,
ErrorTime: errorTime,
})
if err != nil {
log.Fatal(err)
}
ctx.subNodeDrainStatus = subNodeDrainStatus
ctx.subNodeDrainStatus.Activate()
}

func handleNodeDrainStatusCreate(ctxArg interface{}, key string,
configArg interface{}) {
handleNodeDrainStatusImpl(ctxArg, key, configArg, nil)
}

func handleNodeDrainStatusModify(ctxArg interface{}, key string,
configArg interface{}, oldConfigArg interface{}) {
handleNodeDrainStatusImpl(ctxArg, key, configArg, oldConfigArg)
}

func handleNodeDrainStatusImpl(ctxArg interface{}, _ string,
_ interface{}, _ interface{}) {
ctx := ctxArg.(*diagContext)
triggerPrintOutput(ctx, "NodeDrain")
}

func printNodeDrainStatus(ctx *diagContext) {
items := ctx.subNodeDrainStatus.GetAll()
for _, item := range items {
nds := item.(kubeapi.NodeDrainStatus)

sev := ""
switch nds.Status {
case kubeapi.UNKNOWN:
case kubeapi.NOTSUPPORTED:
// not kubevirt-EVE or not clustered, skipping unnecessary logging
case kubeapi.NOTREQUESTED:
fallthrough
case kubeapi.REQUESTED:
fallthrough
case kubeapi.STARTING:
fallthrough
case kubeapi.CORDONED:
sev = "INFO"
break
case kubeapi.FAILEDCORDON:
sev = "ERROR"
case kubeapi.DRAINRETRYING:
sev = "WARNING"
case kubeapi.FAILEDDRAIN:
sev = "ERROR"
case kubeapi.COMPLETE:
sev = "INFO"
}
ctx.ph.Print("%s: Node Drain -> %s\n", sev, nds.Status.String())
}
}

func handleNodeDrainStatusDelete(ctxArg interface{}, key string,
statusArg interface{}) {
}
79 changes: 79 additions & 0 deletions pkg/pillar/cmd/nodeagent/handlenodedrain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2025 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0

package nodeagent

import (
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
"github.com/lf-edge/eve/pkg/pillar/pubsub"
)

func handleNodeDrainStatusCreate(ctxArg interface{}, key string,
configArg interface{}) {
handleNodeDrainStatusImpl(ctxArg, key, configArg, nil)
}

func handleNodeDrainStatusModify(ctxArg interface{}, key string,
configArg interface{}, oldConfigArg interface{}) {
handleNodeDrainStatusImpl(ctxArg, key, configArg, oldConfigArg)
}

func handleNodeDrainStatusImpl(ctxArg interface{}, _ string,
configArg interface{}, _ interface{}) {
ctx, ok := ctxArg.(*nodeagentContext)
if !ok {
log.Errorf("handleNodeDrainStatusImpl invalid type in ctxArg:%v", ctxArg)
}
newStatus, ok := configArg.(kubeapi.NodeDrainStatus)
if !ok {
log.Errorf("handleNodeDrainStatusImpl invalid type in configArg:%v", configArg)
}

if newStatus.RequestedBy != kubeapi.DEVICEOP {
return
}

log.Noticef("handleNodeDrainStatusImpl to:%v", newStatus)
// NodeDrainStatus Failures here should keep drainInProgress set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is something catastrophic happening to a cluster (e.g., two out of three nodes die due to hardware failures), is there a way to recover from that and make the single remaining node be usable e.g., as a one node cluster or as a standalone device?

It seems like a reboot might not be usable to recover since (based on this comment) you can't reboot until the drain has succeeded, and that is presumably impossible if the two other nodes died while you were trying to drain.
What happens when the drain timer fires?

Is there a recovery case which does not involve a truck roll and a reinstall of EVE in such a case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If only a single cluster node is usable then etcd will be unable to form a quorum and k3s won't be able to meet the ready state.

If the cluster is at the state of the local kubernetes node not running then applications should already be unavailable and the zedkube drain handler should just skip requesting drain. There is already a handler to skip drain for one kubernetes outage type and return complete but I do see how this needs to be expanded to handle more kubernetes outage cases and more clearly show that zedkube is allowing these controller operations to be handled as recovery mechanisms and not just maintenance.

The key here is to make sure to appropriately debounce these intermittent kubernetes api issues we could see due to timeouts. This pr has a constant value of 180 seconds but I'm going to move this to a config-property to allow some modification.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of a different sequence.

  1. Cluster is running with three nodes.
  2. Controller (or LOC) requests a device reboot or EVE update of node X
  3. Drain starts
  4. Node Y fails/disappears from the cluster for any reason (hardware, software crash, network partition)

In such a case should we proceed with the device rebooot/update of node X? Or pause until Y is back in the cluster?

// As this will set DrainInProgress on NodeAgentStatus and keep zedagent from allowing
// the deferred operation to continue.
if (newStatus.Status >= kubeapi.REQUESTED) && (newStatus.Status < kubeapi.COMPLETE) {
log.Noticef("handleNodeDrainStatusImpl nodedrain-step:drain-inprogress-handler NodeDrainStatus:%v", newStatus)
ctx.drainInProgress = true
publishNodeAgentStatus(ctx)
}
if newStatus.Status == kubeapi.COMPLETE {
log.Notice("handleNodeDrainStatusImpl nodedrain-step:drain-complete-handler notify zedagent")
ctx.drainInProgress = false
publishNodeAgentStatus(ctx)
}
}

func handleNodeDrainStatusDelete(_ interface{}, _ string,
_ interface{}) {
log.Functionf("handleNodeDrainStatusDelete")
}

func initNodeDrainPubSub(ps *pubsub.PubSub, ctx *nodeagentContext) {
subNodeDrainStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{
AgentName: "zedkube",
MyAgentName: agentName,
TopicImpl: kubeapi.NodeDrainStatus{},
Persistent: false,
Activate: false,
Ctx: ctx,
CreateHandler: handleNodeDrainStatusCreate,
ModifyHandler: handleNodeDrainStatusModify,
DeleteHandler: handleNodeDrainStatusDelete,
WarningTime: warningTime,
ErrorTime: errorTime,
})
if err != nil {
log.Fatalf("initNodeDrainPubSub subNodeDrainStatus err:%v", err)
return
}
if err := subNodeDrainStatus.Activate(); err != nil {
log.Fatalf("initNodeDrainPubSub activate err:%v", err)
}
ctx.subNodeDrainStatus = subNodeDrainStatus
}
Loading
Loading