Skip to content

Commit

Permalink
feat(mgr) disable non-leader status
Browse files Browse the repository at this point in the history
Refactor the status utility to provide a LeaderElectionRunnable
implementation.

Run the status utility via mgr.Add() so that it only runs as the leader.

Add a new --election-namespace flag to allow leader election outside a
cluster. Integration tests require this when election is enabled.
  • Loading branch information
Travis Raines committed Dec 14, 2021
1 parent 503a507 commit 60f8a12
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 27 deletions.
73 changes: 55 additions & 18 deletions internal/ctrlutils/ingress-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,48 @@ type statusInfo struct {
hostname string
}

// StatusUpdater is a Kubebuilder controller that updates resource statuses
type StatusUpdater interface {
// NeedLeaderElection indicates if the proxy must only be run when it is controlled by a leader controller
NeedLeaderElection() bool

// Start starts the proxy update loop
Start(ctx context.Context) error

// PullConfigUpdate starts the config update routine
PullConfigUpdate(ctx context.Context)
}

type StandardStatusUpdater struct {
kongConfig sendconfig.Kong
log logr.Logger
kubeConfig *rest.Config
publishService string
publishAddresses []string
}

func (s *StandardStatusUpdater) NeedLeaderElection() bool {
return true
}

func (s *StandardStatusUpdater) Start(ctx context.Context) error {
go s.PullConfigUpdate(ctx)

return nil
}

func NewStandardStatusUpdater(kongConfig sendconfig.Kong, log logr.Logger, kubeConfig *rest.Config,
publishService string, publishAddresses []string) StatusUpdater {
updater := &StandardStatusUpdater{
kongConfig: kongConfig,
log: log,
kubeConfig: kubeConfig,
publishService: publishService,
publishAddresses: publishAddresses,
}
return updater
}

func newStatusConfig(kubeConfig *rest.Config) (statusConfig, error) {
client, err := clientset.NewForConfig(kubeConfig)
if err != nil {
Expand Down Expand Up @@ -82,54 +124,49 @@ func newStatusConfig(kubeConfig *rest.Config) (statusConfig, error) {
}

// PullConfigUpdate is a dedicated function that process ingress/customer resource status update after configuration is updated within kong.
func PullConfigUpdate(
func (s *StandardStatusUpdater) PullConfigUpdate(
ctx context.Context,
kongConfig sendconfig.Kong,
log logr.Logger,
kubeConfig *rest.Config,
publishService string,
publishAddresses []string,
) {
cfg := statusConfig{ready: false}
status := statusInfo{ready: false}
var wg sync.WaitGroup
var err error
for {
select {
case updateDone := <-kongConfig.ConfigDone:
case updateDone := <-s.kongConfig.ConfigDone:
if !cfg.ready {
cfg, err = newStatusConfig(kubeConfig)
cfg, err = newStatusConfig(s.kubeConfig)
if err != nil {
log.Error(err, "failed to initialize status updater")
s.log.Error(err, "failed to initialize status updater")
}
}
if !status.ready {
status, err = runningAddresses(ctx, kubeConfig, publishService, publishAddresses)
status, err = runningAddresses(ctx, s.kubeConfig, s.publishService, s.publishAddresses)
if err != nil {
if errors.Is(err, errLBNotReady) {
// Separate this into a debug log since it's expected in environments that cannot provision
// LoadBalancers (which we request by default), and will spam logs otherwise.
log.V(util.DebugLevel).Info("LoadBalancer type Service for the Kong proxy has no IPs", "service", publishService)
s.log.V(util.DebugLevel).Info("LoadBalancer type Service for the Kong proxy has no IPs", "service", s.publishService)
} else {
log.Error(err, "failed to look up status info for Kong proxy Service", "service", publishService)
s.log.Error(err, "failed to look up status info for Kong proxy Service", "service", s.publishService)
}
}
}

if cfg.ready && status.ready {
log.V(util.DebugLevel).Info("data-plane updates completed, updating resource statuses")
s.log.V(util.DebugLevel).Info("data-plane updates completed, updating resource statuses")
wg.Add(1)
go func() {
if err := UpdateStatuses(ctx, &updateDone, log, cfg.client, cfg.kicClient, &wg, status.ips,
status.hostname, kubeConfig, cfg.kubernetesVersion); err != nil {
log.Error(err, "failed to update resource statuses")
if err := UpdateStatuses(ctx, &updateDone, s.log, cfg.client, cfg.kicClient, &wg, status.ips,
status.hostname, s.kubeConfig, cfg.kubernetesVersion); err != nil {
s.log.Error(err, "failed to update resource statuses")
}
}()
} else {
log.V(util.DebugLevel).Info("config or publish service information unavailable, skipping status update")
s.log.V(util.DebugLevel).Info("config or publish service information unavailable, skipping status update")
}
case <-ctx.Done():
log.Info("stop status update channel.")
s.log.Info("stop status update channel.")
wg.Wait()
return
}
Expand Down
16 changes: 9 additions & 7 deletions internal/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ type Config struct {
KongCustomEntitiesSecret string

// Kubernetes configurations
KubeconfigPath string
IngressClassName string
EnableLeaderElection bool
LeaderElectionID string
Concurrency int
FilterTags []string
WatchNamespaces []string
KubeconfigPath string
IngressClassName string
EnableLeaderElection bool
LeaderElectionNamespace string
LeaderElectionID string
Concurrency int
FilterTags []string
WatchNamespaces []string

// Ingress status
PublishService string
Expand Down Expand Up @@ -132,6 +133,7 @@ func (c *Config) FlagSet() *pflag.FlagSet {
flagSet.StringVar(&c.IngressClassName, "ingress-class", annotations.DefaultIngressClass, `Name of the ingress class to route through this controller.`)
flagSet.BoolVar(&c.EnableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
flagSet.StringVar(&c.LeaderElectionID, "election-id", "5b374a9e.konghq.com", `Election id to use for status update.`)
flagSet.StringVar(&c.LeaderElectionNamespace, "election-namespace", "", `Leader election namespace to use when running outside a cluster`)
flagSet.StringSliceVar(&c.FilterTags, "kong-admin-filter-tag", []string{"managed-by-ingress-controller"}, "The tag used to manage and filter entities in Kong. This flag can be specified multiple times to specify multiple tags. This setting will be silently ignored if the Kong instance has no tags support.")
flagSet.IntVar(&c.Concurrency, "kong-admin-concurrency", 10, "Max number of concurrent requests sent to Kong's Admin API.")
flagSet.StringSliceVar(&c.WatchNamespaces, "watch-namespace", nil,
Expand Down
3 changes: 1 addition & 2 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/healthz"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"

"github.com/kong/kubernetes-ingress-controller/v2/internal/ctrlutils"
"github.com/kong/kubernetes-ingress-controller/v2/internal/metadata"
"github.com/kong/kubernetes-ingress-controller/v2/internal/mgrutils"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
Expand Down Expand Up @@ -136,7 +135,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic) e

if c.UpdateStatus {
setupLog.Info("Starting resource status updater")
go ctrlutils.PullConfigUpdate(ctx, kongConfig, logger, kubeconfig, c.PublishService, c.PublishStatusAddress)
setupStatusUpdater(mgr, kongConfig, logger, kubeconfig, c.PublishService, c.PublishStatusAddress)
} else {
setupLog.Info("WARNING: status updates were disabled, resources like Ingress objects will not receive updates to their statuses.")
}
Expand Down
12 changes: 12 additions & 0 deletions internal/manager/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/kong/kubernetes-ingress-controller/v2/internal/admission"
"github.com/kong/kubernetes-ingress-controller/v2/internal/ctrlutils"
"github.com/kong/kubernetes-ingress-controller/v2/internal/proxy"
"github.com/kong/kubernetes-ingress-controller/v2/internal/sendconfig"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
Expand Down Expand Up @@ -98,6 +100,10 @@ func setupControllerOptions(logger logr.Logger, c *Config, scheme *runtime.Schem
controllerOpts.NewCache = cache.MultiNamespacedCacheBuilder(append(c.WatchNamespaces, requiredCacheNamespaces...))
}

if len(c.LeaderElectionNamespace) > 0 {
controllerOpts.LeaderElectionNamespace = c.LeaderElectionNamespace
}

return controllerOpts, nil
}

Expand Down Expand Up @@ -209,3 +215,9 @@ func setupAdmissionServer(ctx context.Context, managerConfig *Config, managerCli
}()
return nil
}

func setupStatusUpdater(mgr manager.Manager, kongConfig sendconfig.Kong, log logr.Logger, kubeConfig *rest.Config,
publishService string, publishAddresses []string) error {
updater := ctrlutils.NewStandardStatusUpdater(kongConfig, log, kubeConfig, publishService, publishAddresses)
return mgr.Add(updater)
}
1 change: 1 addition & 0 deletions test/integration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func TestMain(m *testing.M) {
"--log-level=trace",
"--debug-log-reduce-redundancy",
"--feature-gates=Gateway=true",
fmt.Sprintf("--election-namespace=%s", kongAddon.Namespace()),
}
allControllerArgs := append(standardControllerArgs, extraControllerArgs...)
exitOnErr(testutils.DeployControllerManagerForCluster(ctx, env.Cluster(), allControllerArgs...))
Expand Down

0 comments on commit 60f8a12

Please sign in to comment.