Skip to content

Commit

Permalink
(Bug) recover when token expires
Browse files Browse the repository at this point in the history
When the sveltos-agent runs in the management cluster, it receives the managed cluster's
kubeconfig from a Secret. These kubeconfigs can expire (e.g., GKE tokens have a maximum
lifespan of 48 hours).

Sveltos includes a mechanism to proactively renew these tokens. The SveltosCluster controller
can be configured to periodically refresh tokens before they expire, preventing disruptions.

However, prior to this change, the sveltos-agent, when deployed in the management cluster,
lacked the ability to retrieve an updated kubeconfig. Consequently, upon kubeconfig expiration,
the agent encountered numerous authorization errors, effectively ceasing operation.

This pull request addresses this issue by implementing a mechanism to detect kubeconfig expiration.
Upon detection, the sveltos-agent retrieves a fresh, valid kubeconfig. This triggers a restart of
the controller-runtime manager (and all associated controllers) as well as the evaluation manager,
ensuring continued operation.
  • Loading branch information
gianlucam76 committed Jan 22, 2025
1 parent 267a044 commit 4537ba6
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 240 deletions.
174 changes: 112 additions & 62 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ import (
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/klog/v2"
"k8s.io/klog/v2/textlogger"
"k8s.io/utils/ptr"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
Expand All @@ -51,6 +53,7 @@ import (

libsveltosv1beta1 "github.com/projectsveltos/libsveltos/api/v1beta1"
"github.com/projectsveltos/libsveltos/lib/clusterproxy"
"github.com/projectsveltos/libsveltos/lib/k8s_utils"
"github.com/projectsveltos/libsveltos/lib/logsettings"
logs "github.com/projectsveltos/libsveltos/lib/logsettings"
libsveltosset "github.com/projectsveltos/libsveltos/lib/set"
Expand Down Expand Up @@ -102,60 +105,80 @@ func main() {
ctrl.SetLogger(klog.Background())

ctx := ctrl.SetupSignalHandler()
registerForLogSettings(ctx, ctrl.Log.WithName("log-setter"))

// If the agent is running in the management cluster, the token to access the managed
// cluster can expire and be renewed. In such a case, it is needed to create a new manager,
// start again all controllers and the evaluation manager.
// A context with a Done channel is created and if we detect the token has expired (by
// getting unathorized errors), the context channel is closed which terminates the manager
// and the evaluation manager.
for {
ctxWithCancel, cancel := context.WithCancel(ctx)
restConfig := ctrl.GetConfigOrDie()
if deployedCluster != managedCluster {
// if sveltos-agent is running in the management cluster, get the kubeconfig
// of the managed cluster
restConfig = getManagedClusterRestConfig(ctxWithCancel, restConfig, ctrl.Log.WithName("get-kubeconfig"))
}
restConfig.QPS = restConfigQPS
restConfig.Burst = restConfigBurst

ctrlOptions := ctrl.Options{
Scheme: scheme,
Metrics: getDiagnosticsOptions(),
HealthProbeBindAddress: healthAddr,
WebhookServer: webhook.NewServer(
webhook.Options{
Port: webhookPort,
}),
Cache: cache.Options{
SyncPeriod: &syncPeriod,
},
Controller: config.Controller{
// This is needed to avoid the controller with name xyz already exists
SkipNameValidation: ptr.To(true),
},
}

restConfig := ctrl.GetConfigOrDie()
if deployedCluster != managedCluster {
// if sveltos-agent is running in the management cluster, get the kubeconfig
// of the managed cluster
restConfig = getManagedClusterRestConfig(ctx, restConfig, ctrl.Log.WithName("get-kubeconfig"))
}
restConfig.QPS = restConfigQPS
restConfig.Burst = restConfigBurst
mgr, err := ctrl.NewManager(restConfig, ctrlOptions)
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

logsettings.RegisterForLogSettings(ctx,
libsveltosv1beta1.ComponentClassifierAgent, ctrl.Log.WithName("log-setter"),
restConfig)
doSendReports := true
sendReports := controllers.SendReports // do not send reports
if runMode == noReports {
sendReports = controllers.DoNotSendReports
doSendReports = false
}

ctrlOptions := ctrl.Options{
Scheme: scheme,
Metrics: getDiagnosticsOptions(),
HealthProbeBindAddress: healthAddr,
WebhookServer: webhook.NewServer(
webhook.Options{
Port: webhookPort,
}),
Cache: cache.Options{
SyncPeriod: &syncPeriod,
},
}
const intervalInSecond = int64(3)
evaluation.InitializeManager(ctxWithCancel, mgr.GetLogger(),
mgr.GetConfig(), mgr.GetClient(), clusterNamespace, clusterName, version,
libsveltosv1beta1.ClusterType(clusterType), intervalInSecond, doSendReports)

mgr, err := ctrl.NewManager(restConfig, ctrlOptions)
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
go startControllers(ctxWithCancel, mgr, sendReports)

doSendReports := true
sendReports := controllers.SendReports // do not send reports
if runMode == noReports {
sendReports = controllers.DoNotSendReports
doSendReports = false
}
//+kubebuilder:scaffold:builder

const intervalInSecond = int64(3)
evaluation.InitializeManager(ctx, mgr.GetLogger(),
mgr.GetConfig(), mgr.GetClient(), clusterNamespace, clusterName, version,
libsveltosv1beta1.ClusterType(clusterType), intervalInSecond, doSendReports)
if deployedCluster != managedCluster {
// if sveltos-agent is running in the management cluster, get the kubeconfig
// of the managed cluster
go restartIfNeeded(ctxWithCancel, cancel, restConfig, ctrl.Log.WithName("restarter"))
}

go startControllers(ctx, mgr, sendReports)
//+kubebuilder:scaffold:builder
setupChecks(mgr)

setupChecks(mgr)
setupLog.Info("starting manager")
if err := mgr.Start(ctxWithCancel); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}

setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
evaluationMgr := evaluation.GetManager()
evaluationMgr.Reset()
}
}

Expand All @@ -179,7 +202,7 @@ func initFlags(fs *pflag.FlagSet) {
&deployedCluster,
"current-cluster",
managedCluster,
"Indicate whether drift-detection-manager was deployed in the managed or the management cluster. "+
"Indicate whether sveltos-agent was deployed in the managed or the management cluster. "+
"Possible options are managed-cluster or management-cluster.",
)

Expand Down Expand Up @@ -255,18 +278,19 @@ func startClassifierReconciler(ctx context.Context, mgr manager.Manager, sendRep

if isPresent {
setupLog.V(logs.LogInfo).Info("start classifier/node controller")
if err = (&controllers.NodeReconciler{
nodeReconciler := &controllers.NodeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Config: mgr.GetConfig(),
}).SetupWithManager(mgr); err != nil {
}
if err := nodeReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Node")
os.Exit(1)
}

// Do not change order. ClassifierReconciler initializes classification manager.
// NodeReconciler uses classification manager.
if err = (&controllers.ClassifierReconciler{
classifierReconciler := &controllers.ClassifierReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
RunMode: sendReports,
Expand All @@ -276,7 +300,8 @@ func startClassifierReconciler(ctx context.Context, mgr manager.Manager, sendRep
ClusterNamespace: clusterNamespace,
ClusterName: clusterName,
ClusterType: libsveltosv1beta1.ClusterType(clusterType),
}).SetupWithManager(mgr); err != nil {
}
if err := classifierReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Classifier")
os.Exit(1)
}
Expand All @@ -295,7 +320,7 @@ func startHealthCheckReconciler(ctx context.Context, mgr manager.Manager, sendRe

if isPresent {
setupLog.V(logs.LogInfo).Info("start healthCheck/healthCheckReport controllers")
if err = (&controllers.HealthCheckReconciler{
healthCheckReconciler := &controllers.HealthCheckReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
RunMode: sendReports,
Expand All @@ -304,15 +329,17 @@ func startHealthCheckReconciler(ctx context.Context, mgr manager.Manager, sendRe
ClusterNamespace: clusterNamespace,
ClusterName: clusterName,
ClusterType: libsveltosv1beta1.ClusterType(clusterType),
}).SetupWithManager(mgr); err != nil {
}
if err := healthCheckReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "HealthCheck")
os.Exit(1)
}

if err = (&controllers.HealthCheckReportReconciler{
healthCheckReportReconciler := &controllers.HealthCheckReportReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
}
if err := healthCheckReportReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "HealthCheckReport")
os.Exit(1)
}
Expand All @@ -332,7 +359,7 @@ func startEventSourceReconciler(ctx context.Context, mgr manager.Manager, sendRe

if isPresent {
setupLog.V(logs.LogInfo).Info("start eventSource/eventReport controllers")
if err = (&controllers.EventSourceReconciler{
eventSourceReconciler := &controllers.EventSourceReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
RunMode: sendReports,
Expand All @@ -341,15 +368,17 @@ func startEventSourceReconciler(ctx context.Context, mgr manager.Manager, sendRe
ClusterNamespace: clusterNamespace,
ClusterName: clusterName,
ClusterType: libsveltosv1beta1.ClusterType(clusterType),
}).SetupWithManager(mgr); err != nil {
}
if err := eventSourceReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "EventSource")
os.Exit(1)
}

if err = (&controllers.EventReportReconciler{
eventReportReconciler := &controllers.EventReportReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
}
if err := eventReportReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "EventReport")
os.Exit(1)
}
Expand All @@ -369,7 +398,7 @@ func startReloaderReconciler(ctx context.Context, mgr manager.Manager, sendRepor

if isPresent {
setupLog.V(logs.LogInfo).Info("start reloader controllers")
if err = (&controllers.ReloaderReconciler{
reloaderReconciler := &controllers.ReloaderReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
RunMode: sendReports,
Expand All @@ -378,7 +407,8 @@ func startReloaderReconciler(ctx context.Context, mgr manager.Manager, sendRepor
ClusterNamespace: clusterNamespace,
ClusterName: clusterName,
ClusterType: libsveltosv1beta1.ClusterType(clusterType),
}).SetupWithManager(mgr); err != nil {
}
if err := reloaderReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Reloader")
os.Exit(1)
}
Expand Down Expand Up @@ -416,7 +446,7 @@ func getManagedClusterRestConfig(ctx context.Context, cfg *rest.Config, logger l
logger = logger.WithValues("cluster", fmt.Sprintf("%s:%s/%s", clusterType, clusterNamespace, clusterName))
logger.V(logsettings.LogInfo).Info("get secret with kubeconfig")

// When running in the management cluster, drift-detection-manager will need
// When running in the management cluster, sveltos-agent will need
// to access Secret and Cluster/SveltosCluster (to verify existence)
s := runtime.NewScheme()
if err := clientgoscheme.AddToScheme(s); err != nil {
Expand All @@ -435,7 +465,7 @@ func getManagedClusterRestConfig(ctx context.Context, cfg *rest.Config, logger l
panic(1)
}

// In this mode, drift-detection-manager is running in the management cluster.
// In this mode, sveltos-agent is running in the management cluster.
// It access the managed cluster from here.
var currentCfg *rest.Config
currentCfg, err = clusterproxy.GetKubernetesRestConfig(ctx, c, clusterNamespace, clusterName, "", "",
Expand Down Expand Up @@ -483,3 +513,23 @@ func getDiagnosticsOptions() metricsserver.Options {
FilterProvider: filters.WithAuthenticationAndAuthorization,
}
}

func restartIfNeeded(ctx context.Context, cancel context.CancelFunc, restConfig *rest.Config, logger logr.Logger) {
for {
const interval = 10 * time.Second
time.Sleep(interval)
_, err := k8s_utils.GetKubernetesVersion(ctx, restConfig, logger)
if apierrors.IsUnauthorized(err) || apierrors.IsForbidden(err) {
logger.V(logs.LogInfo).Info(fmt.Sprintf("IsUnauthorized/IsForbidden %v. Cancel context", err))
cancel()
break
}
}
}

func registerForLogSettings(ctx context.Context, logger logr.Logger) {
restConfig := ctrl.GetConfigOrDie()
logsettings.RegisterForLogSettings(ctx,
libsveltosv1beta1.ComponentClassifierAgent, logger,
restConfig)
}
76 changes: 41 additions & 35 deletions pkg/evaluation/classifier_evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,44 +57,50 @@ func (m *manager) evaluateClassifiers(ctx context.Context, wg *sync.WaitGroup) {
var once sync.Once

for {
// Sleep before next evaluation
time.Sleep(m.interval)

m.log.V(logs.LogDebug).Info("Evaluating Classifiers")
m.mu.Lock()
// Copy queue content. That is only operation that
// needs to be done in a mutex protect section
jobQueueCopy := make([]string, len(m.classifierJobQueue))
i := 0
for k := range m.classifierJobQueue {
jobQueueCopy[i] = k
i++
}
// Reset current queue
m.classifierJobQueue = make(map[string]bool)
m.mu.Unlock()

failedEvaluations := make([]string, 0)

for i := range jobQueueCopy {
m.log.V(logs.LogDebug).Info(fmt.Sprintf("Evaluating Classifier %s", jobQueueCopy[i]))
err := m.evaluateClassifierInstance(ctx, jobQueueCopy[i])
if err != nil {
m.log.V(logs.LogInfo).Error(err,
fmt.Sprintf("failed to evaluate classifier %s", jobQueueCopy[i]))
failedEvaluations = append(failedEvaluations, jobQueueCopy[i])
select {
case <-ctx.Done():
m.log.V(logs.LogInfo).Info("Context canceled. Exiting evaluation.")
return // Exit the goroutine
default:
// Sleep before next evaluation
time.Sleep(m.interval)

m.log.V(logs.LogDebug).Info("Evaluating Classifiers")
m.mu.Lock()
// Copy queue content. That is only operation that
// needs to be done in a mutex protect section
jobQueueCopy := make([]string, len(m.classifierJobQueue))
i := 0
for k := range m.classifierJobQueue {
jobQueueCopy[i] = k
i++
}
// Reset current queue
m.classifierJobQueue = make(map[string]bool)
m.mu.Unlock()

failedEvaluations := make([]string, 0)

for i := range jobQueueCopy {
m.log.V(logs.LogDebug).Info(fmt.Sprintf("Evaluating Classifier %s", jobQueueCopy[i]))
err := m.evaluateClassifierInstance(ctx, jobQueueCopy[i])
if err != nil {
m.log.V(logs.LogInfo).Error(err,
fmt.Sprintf("failed to evaluate classifier %s", jobQueueCopy[i]))
failedEvaluations = append(failedEvaluations, jobQueueCopy[i])
}
}
}

// Re-queue all Classifiers whose evaluation failed
for i := range failedEvaluations {
m.log.V(logs.LogDebug).Info(fmt.Sprintf("requeuing Classifier %s for evaluation", failedEvaluations[i]))
m.EvaluateClassifier(failedEvaluations[i])
}
// Re-queue all Classifiers whose evaluation failed
for i := range failedEvaluations {
m.log.V(logs.LogDebug).Info(fmt.Sprintf("requeuing Classifier %s for evaluation", failedEvaluations[i]))
m.EvaluateClassifier(failedEvaluations[i])
}

once.Do(func() {
wg.Done()
})
once.Do(func() {
wg.Done()
})
}
}
}

Expand Down
Loading

0 comments on commit 4537ba6

Please sign in to comment.