From 9cba4cf022781bff574bfbb59a4ebd65e4484331 Mon Sep 17 00:00:00 2001 From: Chris Doherty Date: Sun, 19 Mar 2023 18:09:30 -0500 Subject: [PATCH] Refactor reconcilers and controller manager Signed-off-by: Chris Doherty --- cmd/tink-controller/main.go | 80 ++++--- internal/controller/manager.go | 222 ++---------------- internal/controller/manager_test.go | 216 ----------------- internal/controller/types.go | 25 -- internal/e2e/tink_suite_test.go | 29 ++- internal/server/index.go | 30 +++ internal/server/index_test.go | 88 +++++++ internal/server/kubernetes_api.go | 41 +++- internal/server/kubernetes_api_workflow.go | 3 +- .../workflow/{controller.go => reconciler.go} | 54 ++--- ...{controller_test.go => reconciler_test.go} | 8 +- 11 files changed, 275 insertions(+), 521 deletions(-) delete mode 100644 internal/controller/manager_test.go delete mode 100644 internal/controller/types.go create mode 100644 internal/server/index.go create mode 100644 internal/server/index_test.go rename internal/workflow/{controller.go => reconciler.go} (77%) rename internal/workflow/{controller_test.go => reconciler_test.go} (99%) diff --git a/cmd/tink-controller/main.go b/cmd/tink-controller/main.go index ae7ce9b94..e95eb5556 100644 --- a/cmd/tink-controller/main.go +++ b/cmd/tink-controller/main.go @@ -6,58 +6,78 @@ import ( "os" "strings" + "github.com/go-logr/logr" + "github.com/go-logr/zapr" "github.com/packethost/pkg/log" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/spf13/viper" "github.com/tinkerbell/tink/internal/controller" - "github.com/tinkerbell/tink/internal/workflow" + "go.uber.org/zap" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + ctrl "sigs.k8s.io/controller-runtime" ) // version is set at build time. var version = "devel" -// DaemonConfig represents all the values you can configure as part of the tink-server. -// You can change the configuration via environment variable, or file, or command flags. -type DaemonConfig struct { - K8sAPI string - Kubeconfig string // only applies to out of cluster +type Config struct { + K8sAPI string + Kubeconfig string // only applies to out of cluster + MetricsAddr string + ProbeAddr string + EnableLeaderElection bool } -func (c *DaemonConfig) AddFlags(fs *pflag.FlagSet) { - fs.StringVar(&c.K8sAPI, "kubernetes", "", "The Kubernetes API URL, used for in-cluster client construction.") +func (c *Config) AddFlags(fs *pflag.FlagSet) { + fs.StringVar(&c.K8sAPI, "kubernetes", "", + "The Kubernetes API URL, used for in-cluster client construction.") fs.StringVar(&c.Kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file") + fs.StringVar(&c.MetricsAddr, "metrics-bind-address", ":8080", + "The address the metric endpoint binds to.") + fs.StringVar(&c.ProbeAddr, "health-probe-bind-address", ":8081", + "The address the probe endpoint binds to.") + fs.BoolVar(&c.EnableLeaderElection, "leader-elect", false, + "Enable leader election for controller manager. "+ + "Enabling this will ensure there is only one active controller manager.") } func main() { + // Init the packet logger as its used throughout the codebase. logger, err := log.Init("github.com/tinkerbell/tink") if err != nil { panic(err) } - defer logger.Close() - config := &DaemonConfig{} - - cmd := NewRootCommand(config, logger) + cmd := NewRootCommand() if err := cmd.ExecuteContext(context.Background()); err != nil { - defer os.Exit(1) + logger.Close() + fmt.Fprint(os.Stderr, err.Error()) + os.Exit(1) } + logger.Close() } -func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command { +func NewRootCommand() *cobra.Command { + config := &Config{} + zapLogger, err := zap.NewProduction() + if err != nil { + panic(err) + } + logger := zapr.NewLogger(zapLogger) + cmd := &cobra.Command{ Use: "tink-controller", PreRunE: func(cmd *cobra.Command, args []string) error { viper, err := createViper(logger) if err != nil { - return err + return fmt.Errorf("config init: %w", err) } return applyViper(viper, cmd) }, RunE: func(cmd *cobra.Command, args []string) error { - logger.Info("starting controller version " + version) + logger.Info("Starting controller version " + version) ccfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( &clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig}, @@ -72,24 +92,29 @@ func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command { if err != nil { return err } - options := controller.GetControllerOptions() - options.LeaderElectionNamespace = namespace - manager, err := controller.NewManager(cfg, options) + + options := ctrl.Options{ + Logger: logger, + LeaderElection: config.EnableLeaderElection, + LeaderElectionID: "tink.tinkerbell.org", + LeaderElectionNamespace: namespace, + MetricsBindAddress: config.MetricsAddr, + HealthProbeBindAddress: config.ProbeAddr, + } + + mgr, err := controller.NewManager(cfg, options) if err != nil { - return err + return fmt.Errorf("controller manager: %w", err) } - return manager.RegisterControllers( - cmd.Context(), - workflow.NewController(manager.GetClient()), - ).Start(cmd.Context()) + return mgr.Start(cmd.Context()) }, } config.AddFlags(cmd.Flags()) return cmd } -func createViper(logger log.Logger) (*viper.Viper, error) { +func createViper(logger logr.Logger) (*viper.Viper, error) { v := viper.New() v.AutomaticEnv() v.SetConfigName("tink-controller") @@ -100,12 +125,11 @@ func createViper(logger log.Logger) (*viper.Viper, error) { // If a config file is found, read it in. if err := v.ReadInConfig(); err != nil { if _, ok := err.(viper.ConfigFileNotFoundError); !ok { - logger.With("configFile", v.ConfigFileUsed()).Error(err, "could not load config file") - return nil, err + return nil, fmt.Errorf("loading config file: %w", err) } logger.Info("no config file found") } else { - logger.With("configFile", v.ConfigFileUsed()).Info("loaded config file") + logger.Info("loaded config file", "configFile", v.ConfigFileUsed()) } return v, nil diff --git a/internal/controller/manager.go b/internal/controller/manager.go index d549be6d4..bc0d01157 100644 --- a/internal/controller/manager.go +++ b/internal/controller/manager.go @@ -1,225 +1,53 @@ package controller import ( - "context" "fmt" - "github.com/go-logr/zapr" "github.com/tinkerbell/tink/api/v1alpha1" + "github.com/tinkerbell/tink/internal/workflow" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" - "knative.dev/pkg/logging" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" - "sigs.k8s.io/controller-runtime/pkg/manager" ) -const ( - WorkflowWorkerAddrIndex = ".status.tasks.workerAddr" - WorkflowWorkerNonTerminalStateIndex = ".status.state.nonTerminalWorker" - WorkflowStateIndex = ".status.state" - HardwareMACAddrIndex = ".spec.interfaces.dhcp.mac" - HardwareIPAddrIndex = ".spec.interfaces.dhcp.ip" +var schemeBuilder = runtime.NewSchemeBuilder( + clientgoscheme.AddToScheme, + v1alpha1.AddToScheme, ) -var ( - runtimescheme = runtime.NewScheme() - options = Options{} -) - -func init() { - _ = clientgoscheme.AddToScheme(runtimescheme) - _ = v1alpha1.AddToScheme(runtimescheme) -} - -// Options for running this binary. -type Options struct { - MetricsPort int - HealthProbePort int -} - -// GetControllerOptions returns a set of options used by the Tink controller. -// These options include leader election enabled. -func GetControllerOptions() controllerruntime.Options { - return controllerruntime.Options{ - Logger: zapr.NewLogger(logging.FromContext(context.Background()).Desugar()), - LeaderElection: true, - LeaderElectionID: "tink-leader-election", - Scheme: runtimescheme, - MetricsBindAddress: fmt.Sprintf(":%d", options.MetricsPort), - HealthProbeBindAddress: fmt.Sprintf(":%d", options.HealthProbePort), - } +// DefaultScheme returns a scheme with all the types necessary for the tink controller. +func DefaultScheme() *runtime.Scheme { + s := runtime.NewScheme() + _ = schemeBuilder.AddToScheme(s) + return s } -// GetNamespacedControllerOptions returns a set of options used by the Tink controller. -// These options include leader election enabled. -func GetNamespacedControllerOptions(namespace string) controllerruntime.Options { - opts := GetControllerOptions() - opts.Namespace = namespace - return opts -} - -// GetServerOptions returns a set of options used by the Tink API. -// These options include leader election disabled. -func GetServerOptions() controllerruntime.Options { - return controllerruntime.Options{ - Logger: zapr.NewLogger(logging.FromContext(context.Background()).Desugar()), - LeaderElection: false, - Scheme: runtimescheme, - MetricsBindAddress: fmt.Sprintf(":%d", options.MetricsPort), - HealthProbeBindAddress: fmt.Sprintf(":%d", options.HealthProbePort), +// NewManager creates a new controller manager with tink controller controllers pre-registered. +// If opts.Scheme is nil, DefaultScheme() is used. +func NewManager(cfg *rest.Config, opts ctrl.Options) (ctrl.Manager, error) { + if opts.Scheme == nil { + opts.Scheme = DefaultScheme() } -} -// NewManagerOrDie instantiates a controller manager. -func NewManagerOrDie(config *rest.Config, options controllerruntime.Options) Manager { - m, err := NewManager(config, options) + mgr, err := ctrl.NewManager(cfg, opts) if err != nil { - panic(err) + return nil, fmt.Errorf("controller manager: %w", err) } - return m -} -// NewManager instantiates a controller manager. -func NewManager(config *rest.Config, options controllerruntime.Options) (Manager, error) { - m, err := controllerruntime.NewManager(config, options) - if err != nil { - return nil, err - } - indexers := []struct { - obj client.Object - field string - extractValue client.IndexerFunc - }{ - { - &v1alpha1.Workflow{}, - WorkflowWorkerAddrIndex, - WorkflowWorkerAddrIndexFunc, - }, - { - &v1alpha1.Workflow{}, - WorkflowWorkerNonTerminalStateIndex, - WorkflowWorkerNonTerminalStateIndexFunc, - }, - { - &v1alpha1.Workflow{}, - WorkflowStateIndex, - WorkflowStateIndexFunc, - }, - { - &v1alpha1.Hardware{}, - HardwareIPAddrIndex, - HardwareIPIndexFunc, - }, - { - &v1alpha1.Hardware{}, - HardwareMACAddrIndex, - HardwareMacIndexFunc, - }, + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + return nil, fmt.Errorf("set up health check: %w", err) } - for _, indexer := range indexers { - if err := m.GetFieldIndexer().IndexField( - context.Background(), - indexer.obj, - indexer.field, - indexer.extractValue, - ); err != nil { - return nil, fmt.Errorf("failed to setup %s indexer, %w", indexer.field, err) - } - } - return &GenericControllerManager{Manager: m}, nil -} -// GenericControllerManager is a manager.Manager that allows for registering of controllers. -type GenericControllerManager struct { - manager.Manager -} - -// RegisterControllers registers a set of controllers to the controller manager. -func (m *GenericControllerManager) RegisterControllers(ctx context.Context, controllers ...Controller) Manager { - for _, c := range controllers { - if err := c.Register(ctx, m); err != nil { - panic(err) - } + if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + return nil, fmt.Errorf("set up ready check: %w", err) } - if err := m.AddHealthzCheck("healthz", healthz.Ping); err != nil { - panic(fmt.Sprintf("Failed to add readiness probe, %s", err.Error())) - } - return m -} -// WorkflowWorkerAddrIndexFunc func returns a list of worker addresses from a workflow. -func WorkflowWorkerAddrIndexFunc(obj client.Object) []string { - wf, ok := obj.(*v1alpha1.Workflow) - if !ok { - return nil - } - resp := []string{} - for _, task := range wf.Status.Tasks { - if task.WorkerAddr != "" { - resp = append(resp, task.WorkerAddr) - } - } - return resp -} - -// WorkflowWorkerNonTerminalStateIndexFunc func returns a list of worker addresses for workflows -// in a running or pending state. -func WorkflowWorkerNonTerminalStateIndexFunc(obj client.Object) []string { - wf, ok := obj.(*v1alpha1.Workflow) - if !ok { - return nil - } - - resp := []string{} - if !(wf.Status.State == v1alpha1.WorkflowStateRunning || wf.Status.State == v1alpha1.WorkflowStatePending) { - return resp - } - for _, task := range wf.Status.Tasks { - if task.WorkerAddr != "" { - resp = append(resp, task.WorkerAddr) - } - } - return resp -} - -// WorkflowStateIndexFunc func returns the workflow state. -func WorkflowStateIndexFunc(obj client.Object) []string { - wf, ok := obj.(*v1alpha1.Workflow) - if !ok { - return nil - } - return []string{string(wf.Status.State)} -} - -// HardwareMacIndexFunc returns a list of mac addresses from a hardware. -func HardwareMacIndexFunc(obj client.Object) []string { - hw, ok := obj.(*v1alpha1.Hardware) - if !ok { - return nil - } - resp := []string{} - for _, iface := range hw.Spec.Interfaces { - if iface.DHCP != nil && iface.DHCP.MAC != "" { - resp = append(resp, iface.DHCP.MAC) - } + err = workflow.NewReconciler(mgr.GetClient()).SetupWithManager(mgr) + if err != nil { + return nil, fmt.Errorf("setup workflow reconciler: %w", err) } - return resp -} -// HardwareIPIndexFunc returns a list of IP addresses from a hardware. -func HardwareIPIndexFunc(obj client.Object) []string { - hw, ok := obj.(*v1alpha1.Hardware) - if !ok { - return nil - } - resp := []string{} - for _, iface := range hw.Spec.Interfaces { - if iface.DHCP != nil && iface.DHCP.IP != nil && iface.DHCP.IP.Address != "" { - resp = append(resp, iface.DHCP.IP.Address) - } - } - return resp + return mgr, nil } diff --git a/internal/controller/manager_test.go b/internal/controller/manager_test.go deleted file mode 100644 index 1ac71af1d..000000000 --- a/internal/controller/manager_test.go +++ /dev/null @@ -1,216 +0,0 @@ -package controller - -import ( - "reflect" - "testing" - - "github.com/tinkerbell/tink/api/v1alpha1" - - "sigs.k8s.io/controller-runtime/pkg/client" -) - -func TestWorkflowIndexFuncs(t *testing.T) { - cases := []struct { - name string - input client.Object - wantAddrs []string - wantStateAddrs []string - wantStates []string - }{ - { - "non workflow", - &v1alpha1.Hardware{}, - nil, - nil, - nil, - }, - { - "empty workflow", - &v1alpha1.Workflow{ - Status: v1alpha1.WorkflowStatus{ - State: "", - Tasks: []v1alpha1.Task{}, - }, - }, - []string{}, - []string{}, - []string{""}, - }, - { - "pending workflow", - &v1alpha1.Workflow{ - Status: v1alpha1.WorkflowStatus{ - State: v1alpha1.WorkflowStatePending, - Tasks: []v1alpha1.Task{ - { - WorkerAddr: "worker1", - }, - }, - }, - }, - []string{"worker1"}, - []string{"worker1"}, - []string{string(v1alpha1.WorkflowStatePending)}, - }, - { - "running workflow", - &v1alpha1.Workflow{ - Status: v1alpha1.WorkflowStatus{ - State: v1alpha1.WorkflowStateRunning, - Tasks: []v1alpha1.Task{ - { - WorkerAddr: "worker1", - }, - { - WorkerAddr: "worker2", - }, - }, - }, - }, - []string{"worker1", "worker2"}, - []string{"worker1", "worker2"}, - []string{string(v1alpha1.WorkflowStateRunning)}, - }, - { - "complete workflow", - &v1alpha1.Workflow{ - Status: v1alpha1.WorkflowStatus{ - State: v1alpha1.WorkflowStateSuccess, - Tasks: []v1alpha1.Task{ - { - WorkerAddr: "worker1", - }, - }, - }, - }, - []string{"worker1"}, - []string{}, - []string{string(v1alpha1.WorkflowStateSuccess)}, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - gotAddr := WorkflowWorkerAddrIndexFunc(tc.input) - if !reflect.DeepEqual(tc.wantAddrs, gotAddr) { - t.Errorf("Unexpected wokerAddr response: wanted %#v, got %#v", tc.wantAddrs, gotAddr) - } - gotStateAddrs := WorkflowWorkerNonTerminalStateIndexFunc(tc.input) - if !reflect.DeepEqual(tc.wantStateAddrs, gotStateAddrs) { - t.Errorf("Unexpected non-terminating workflow response: wanted %#v, got %#v", tc.wantStateAddrs, gotStateAddrs) - } - gotStates := WorkflowStateIndexFunc(tc.input) - if !reflect.DeepEqual(tc.wantStates, gotStates) { - t.Errorf("Unexpected workflow state response: wanted %#v, got %#v", tc.wantStates, gotStates) - } - }) - } -} - -func TestHardwareIndexFunc(t *testing.T) { - cases := []struct { - name string - input client.Object - wantMac []string - wantIP []string - }{ - { - "non hardware", - &v1alpha1.Workflow{}, - nil, - nil, - }, - { - "empty hardware dhcp", - &v1alpha1.Hardware{ - Spec: v1alpha1.HardwareSpec{ - Interfaces: []v1alpha1.Interface{ - { - DHCP: nil, - }, - }, - }, - }, - []string{}, - []string{}, - }, - { - "blank mac and ip", - &v1alpha1.Hardware{ - Spec: v1alpha1.HardwareSpec{ - Interfaces: []v1alpha1.Interface{ - { - DHCP: &v1alpha1.DHCP{ - MAC: "", - IP: &v1alpha1.IP{ - Address: "", - }, - }, - }, - }, - }, - }, - []string{}, - []string{}, - }, - { - "single ip and mac", - &v1alpha1.Hardware{ - Spec: v1alpha1.HardwareSpec{ - Interfaces: []v1alpha1.Interface{ - { - DHCP: &v1alpha1.DHCP{ - MAC: "3c:ec:ef:4c:4f:54", - IP: &v1alpha1.IP{ - Address: "172.16.10.100", - }, - }, - }, - }, - }, - }, - []string{"3c:ec:ef:4c:4f:54"}, - []string{"172.16.10.100"}, - }, - { - "double ip and mac", - &v1alpha1.Hardware{ - Spec: v1alpha1.HardwareSpec{ - Interfaces: []v1alpha1.Interface{ - { - DHCP: &v1alpha1.DHCP{ - MAC: "3c:ec:ef:4c:4f:54", - IP: &v1alpha1.IP{ - Address: "172.16.10.100", - }, - }, - }, - { - DHCP: &v1alpha1.DHCP{ - MAC: "3c:ec:ef:4c:4f:55", - IP: &v1alpha1.IP{ - Address: "172.16.10.101", - }, - }, - }, - }, - }, - }, - []string{"3c:ec:ef:4c:4f:54", "3c:ec:ef:4c:4f:55"}, - []string{"172.16.10.100", "172.16.10.101"}, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - gotMac := HardwareMacIndexFunc(tc.input) - if !reflect.DeepEqual(tc.wantMac, gotMac) { - t.Errorf("Unexpected response: wanted %#v, got %#v", tc.wantMac, gotMac) - } - gotIPs := HardwareIPIndexFunc(tc.input) - if !reflect.DeepEqual(tc.wantIP, gotIPs) { - t.Errorf("Unexpected response: wanted %#v, got %#v", tc.wantIP, gotIPs) - } - }) - } -} diff --git a/internal/controller/types.go b/internal/controller/types.go deleted file mode 100644 index 7f0ea07c3..000000000 --- a/internal/controller/types.go +++ /dev/null @@ -1,25 +0,0 @@ -package controller - -import ( - "context" - - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -// Controller is an interface implemented by Karpenter custom resources. -type Controller interface { - // Reconcile hands a hydrated kubernetes resource to the controller for - // reconciliation. Any changes made to the resource's status are persisted - // after Reconcile returns, even if it returns an error. - reconcile.Reconciler - - // Register will register the controller with the manager - Register(context.Context, manager.Manager) error -} - -// Manager manages a set of controllers and webhooks. -type Manager interface { - manager.Manager - RegisterControllers(context.Context, ...Controller) Manager -} diff --git a/internal/e2e/tink_suite_test.go b/internal/e2e/tink_suite_test.go index ca6d68cd0..a73f466d6 100644 --- a/internal/e2e/tink_suite_test.go +++ b/internal/e2e/tink_suite_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/go-logr/zapr" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/packethost/pkg/log" @@ -14,9 +15,9 @@ import ( "github.com/tinkerbell/tink/internal/controller" "github.com/tinkerbell/tink/internal/grpcserver" "github.com/tinkerbell/tink/internal/server" - "github.com/tinkerbell/tink/internal/workflow" + "go.uber.org/zap" "k8s.io/client-go/kubernetes/scheme" - + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" ) @@ -39,7 +40,6 @@ var _ = BeforeSuite(func() { ctx, cancel = context.WithCancel(context.TODO()) var err error - // Create Test Tink API gRPC Server logger, err = log.Init("github.com/tinkerbell/tink/tests") Expect(err).NotTo(HaveOccurred()) @@ -66,14 +66,11 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) - // database, err := db.NewK8sDatabaseFromREST(cfg, logger) - // Expect(err).NotTo(HaveOccurred()) errCh := make(chan error, 2) - tinkServer := server.NewKubeBackedServerFromREST(logger, - cfg, - "default", - ) + tinkServer, err := server.NewKubeBackedServerFromREST(logger, cfg, "default") + Expect(err).To(Succeed()) + serverAddr, err = grpcserver.SetupGRPC( ctx, tinkServer, @@ -84,19 +81,25 @@ var _ = BeforeSuite(func() { logger.Info("HTTP server: ", fmt.Sprintf("%+v", serverAddr)) // Start the controller - options := controller.GetControllerOptions() - options.LeaderElectionNamespace = "default" + zapLogger, err := zap.NewDevelopment() + Expect(err).To(Succeed()) + + options := ctrl.Options{ + Logger: zapr.NewLogger(zapLogger), + } + manager, err := controller.NewManager(cfg, options) Expect(err).NotTo(HaveOccurred()) + go func() { - err := manager.RegisterControllers(ctx, workflow.NewController(manager.GetClient())).Start(ctx) + err := manager.Start(ctx) Expect(err).To(BeNil()) }() }) var _ = AfterSuite(func() { By("Cancelling the context") - cancel() + By("stopping the test environment") err := testEnv.Stop() Expect(err).NotTo(HaveOccurred()) diff --git a/internal/server/index.go b/internal/server/index.go new file mode 100644 index 000000000..9a39f5694 --- /dev/null +++ b/internal/server/index.go @@ -0,0 +1,30 @@ +package server + +import ( + "github.com/tinkerbell/tink/api/v1alpha1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// workflowByNonTerminalState is the index name for retrieving workflows in a non-terminal state. +const workflowByNonTerminalState = ".status.state.nonTerminalWorker" + +// workflowByNonTerminalStateFunc inspects obj - which must be a Workflow - for a Pending or +// Running state. If in either Pending or Running it returns a list of worker addresses. +func workflowByNonTerminalStateFunc(obj client.Object) []string { + wf, ok := obj.(*v1alpha1.Workflow) + if !ok { + return nil + } + + resp := []string{} + if !(wf.Status.State == v1alpha1.WorkflowStateRunning || wf.Status.State == v1alpha1.WorkflowStatePending) { + return resp + } + for _, task := range wf.Status.Tasks { + if task.WorkerAddr != "" { + resp = append(resp, task.WorkerAddr) + } + } + + return resp +} diff --git a/internal/server/index_test.go b/internal/server/index_test.go new file mode 100644 index 000000000..3a661bdcc --- /dev/null +++ b/internal/server/index_test.go @@ -0,0 +1,88 @@ +package server + +import ( + "reflect" + "testing" + + "github.com/tinkerbell/tink/api/v1alpha1" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestWorkflowIndexFuncs(t *testing.T) { + cases := []struct { + name string + input client.Object + wantStateAddrs []string + }{ + { + "non workflow", + &v1alpha1.Hardware{}, + nil, + }, + { + "empty workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: "", + Tasks: []v1alpha1.Task{}, + }, + }, + []string{}, + }, + { + "pending workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: v1alpha1.WorkflowStatePending, + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + }, + }, + }, + []string{"worker1"}, + }, + { + "running workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: v1alpha1.WorkflowStateRunning, + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + { + WorkerAddr: "worker2", + }, + }, + }, + }, + []string{"worker1", "worker2"}, + }, + { + "complete workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: v1alpha1.WorkflowStateSuccess, + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + }, + }, + }, + []string{}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + gotStateAddrs := workflowByNonTerminalStateFunc(tc.input) + if !reflect.DeepEqual(tc.wantStateAddrs, gotStateAddrs) { + t.Errorf("Unexpected non-terminating workflow response: wanted %#v, got %#v", tc.wantStateAddrs, gotStateAddrs) + } + }) + } +} diff --git a/internal/server/kubernetes_api.go b/internal/server/kubernetes_api.go index 8242ed455..8c9ff9f78 100644 --- a/internal/server/kubernetes_api.go +++ b/internal/server/kubernetes_api.go @@ -2,16 +2,21 @@ package server import ( "context" + "fmt" "time" + "github.com/go-logr/zapr" "github.com/packethost/pkg/log" + "github.com/tinkerbell/tink/api/v1alpha1" "github.com/tinkerbell/tink/internal/controller" "github.com/tinkerbell/tink/internal/proto" + "go.uber.org/zap" "google.golang.org/grpc" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" ) // +kubebuilder:rbac:groups=tinkerbell.org,resources=hardware;hardware/status,verbs=get;list;watch @@ -45,27 +50,45 @@ func NewKubeBackedServer(logger log.Logger, kubeconfig, apiserver, namespace str if err != nil { return nil, err } - return NewKubeBackedServerFromREST(logger, cfg, namespace), nil + + return NewKubeBackedServerFromREST(logger, cfg, namespace) } // NewKubeBackedServerFromREST returns a server that implements the Workflow // server interface with the given Kubernetes rest client and namespace. -func NewKubeBackedServerFromREST(logger log.Logger, config *rest.Config, namespace string) *KubernetesBackedServer { - options := controller.GetServerOptions() - options.Namespace = namespace - manager := controller.NewManagerOrDie(config, options) +func NewKubeBackedServerFromREST(logger log.Logger, config *rest.Config, namespace string) (*KubernetesBackedServer, error) { + clstr, err := cluster.New(config, func(opts *cluster.Options) { + opts.Scheme = controller.DefaultScheme() + opts.Logger = zapr.NewLogger(zap.NewNop()) + opts.Namespace = namespace + }) + if err != nil { + return nil, fmt.Errorf("init client: %w", err) + } + + err = clstr.GetFieldIndexer().IndexField( + context.Background(), + &v1alpha1.Workflow{}, + workflowByNonTerminalState, + workflowByNonTerminalStateFunc, + ) + if err != nil { + return nil, fmt.Errorf("setup %s index: %w", workflowByNonTerminalState, err) + } + go func() { - err := manager.Start(context.Background()) + err := clstr.Start(context.Background()) if err != nil { - logger.Error(err, "Error starting manager") + logger.Error(err, "Error starting cluster") } }() + return &KubernetesBackedServer{ logger: logger, - ClientFunc: manager.GetClient, + ClientFunc: clstr.GetClient, namespace: namespace, nowFunc: time.Now, - } + }, nil } // KubernetesBackedServer is a server that implements a workflow API. diff --git a/internal/server/kubernetes_api_workflow.go b/internal/server/kubernetes_api_workflow.go index 691a33cd3..f3fa7ba34 100644 --- a/internal/server/kubernetes_api_workflow.go +++ b/internal/server/kubernetes_api_workflow.go @@ -5,7 +5,6 @@ import ( "github.com/pkg/errors" "github.com/tinkerbell/tink/api/v1alpha1" - "github.com/tinkerbell/tink/internal/controller" "github.com/tinkerbell/tink/internal/proto" "github.com/tinkerbell/tink/internal/workflow" "google.golang.org/grpc/codes" @@ -38,7 +37,7 @@ func getWorkflowContext(wf v1alpha1.Workflow) *proto.WorkflowContext { func (s *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker(ctx context.Context, workerID string) ([]v1alpha1.Workflow, error) { stored := &v1alpha1.WorkflowList{} err := s.ClientFunc().List(ctx, stored, &client.MatchingFields{ - controller.WorkflowWorkerNonTerminalStateIndex: workerID, + workflowByNonTerminalState: workerID, }) if err != nil { return nil, err diff --git a/internal/workflow/controller.go b/internal/workflow/reconciler.go similarity index 77% rename from internal/workflow/controller.go rename to internal/workflow/reconciler.go index d2a2e6350..13df986c2 100644 --- a/internal/workflow/controller.go +++ b/internal/workflow/reconciler.go @@ -10,22 +10,22 @@ import ( "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "knative.dev/pkg/ptr" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -// Controller is a type for managing Workflows. -type Controller struct { - kubeClient client.Client - nowFunc func() time.Time +// Reconciler is a type for managing Workflows. +type Reconciler struct { + client ctrlclient.Client + nowFunc func() time.Time } -func NewController(kubeClient client.Client) *Controller { - return &Controller{ - kubeClient: kubeClient, - nowFunc: time.Now, +func NewReconciler(client ctrlclient.Client) *Reconciler { + return &Reconciler{ + client: client, + nowFunc: time.Now, } } @@ -33,12 +33,12 @@ func NewController(kubeClient client.Client) *Controller { // +kubebuilder:rbac:groups=tinkerbell.org,resources=templates;templates/status,verbs=get;list;watch;update;patch // +kubebuilder:rbac:groups=tinkerbell.org,resources=workflows;workflows/status,verbs=get;list;watch;update;patch;delete -func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - logger := controllerruntime.LoggerFrom(ctx) +func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + logger := ctrl.LoggerFrom(ctx) logger.Info("Reconciling") stored := &v1alpha1.Workflow{} - if err := c.kubeClient.Get(ctx, req.NamespacedName, stored); err != nil { + if err := r.client.Get(ctx, req.NamespacedName, stored); err != nil { if errors.IsNotFound(err) { return reconcile.Result{}, nil } @@ -55,25 +55,25 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco ) switch wflow.Status.State { case "": - resp, err = c.processNewWorkflow(ctx, logger, wflow) + resp, err = r.processNewWorkflow(ctx, logger, wflow) case v1alpha1.WorkflowStateRunning: - resp = c.processRunningWorkflow(ctx, wflow) + resp = r.processRunningWorkflow(ctx, wflow) default: return resp, nil } // Patch any changes, regardless of errors if !equality.Semantic.DeepEqual(wflow, stored) { - if perr := c.kubeClient.Status().Patch(ctx, wflow, client.MergeFrom(stored)); perr != nil { + if perr := r.client.Status().Patch(ctx, wflow, ctrlclient.MergeFrom(stored)); perr != nil { err = fmt.Errorf("error patching workflow %s, %w", wflow.Name, perr) } } return resp, err } -func (c *Controller) processNewWorkflow(ctx context.Context, logger logr.Logger, stored *v1alpha1.Workflow) (reconcile.Result, error) { +func (r *Reconciler) processNewWorkflow(ctx context.Context, logger logr.Logger, stored *v1alpha1.Workflow) (reconcile.Result, error) { tpl := &v1alpha1.Template{} - if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: stored.Spec.TemplateRef, Namespace: stored.Namespace}, tpl); err != nil { + if err := r.client.Get(ctx, ctrlclient.ObjectKey{Name: stored.Spec.TemplateRef, Namespace: stored.Namespace}, tpl); err != nil { if errors.IsNotFound(err) { // Throw an error to raise awareness and take advantage of immediate requeue. logger.Error(err, "error getting Template object in processNewWorkflow function") @@ -92,7 +92,7 @@ func (c *Controller) processNewWorkflow(ctx context.Context, logger logr.Logger, } var hardware v1alpha1.Hardware - err := c.kubeClient.Get(ctx, client.ObjectKey{Name: stored.Spec.HardwareRef, Namespace: stored.Namespace}, &hardware) + err := r.client.Get(ctx, ctrlclient.ObjectKey{Name: stored.Spec.HardwareRef, Namespace: stored.Namespace}, &hardware) if err != nil && !errors.IsNotFound(err) { logger.Error(err, "error getting Hardware object in processNewWorkflow function") return reconcile.Result{}, err @@ -139,9 +139,9 @@ func toTemplateHardwareData(hardware v1alpha1.Hardware) templateHardwareData { return contract } -func (c *Controller) processRunningWorkflow(_ context.Context, stored *v1alpha1.Workflow) reconcile.Result { +func (r *Reconciler) processRunningWorkflow(_ context.Context, stored *v1alpha1.Workflow) reconcile.Result { // Check for global timeout expiration - if c.nowFunc().After(stored.GetStartTime().Add(time.Duration(stored.Status.GlobalTimeout) * time.Second)) { + if r.nowFunc().After(stored.GetStartTime().Add(time.Duration(stored.Status.GlobalTimeout) * time.Second)) { stored.Status.State = v1alpha1.WorkflowStateTimeout } @@ -150,11 +150,11 @@ func (c *Controller) processRunningWorkflow(_ context.Context, stored *v1alpha1. for ai, action := range task.Actions { // A running workflow task action has timed out if action.Status == v1alpha1.WorkflowStateRunning && action.StartedAt != nil && - c.nowFunc().After(action.StartedAt.Add(time.Duration(action.Timeout)*time.Second)) { + r.nowFunc().After(action.StartedAt.Add(time.Duration(action.Timeout)*time.Second)) { // Set fields on the timed out action stored.Status.Tasks[ti].Actions[ai].Status = v1alpha1.WorkflowStateTimeout stored.Status.Tasks[ti].Actions[ai].Message = "Action timed out" - stored.Status.Tasks[ti].Actions[ai].Seconds = int64(c.nowFunc().Sub(action.StartedAt.Time).Seconds()) + stored.Status.Tasks[ti].Actions[ai].Seconds = int64(r.nowFunc().Sub(action.StartedAt.Time).Seconds()) // Mark the workflow as timed out stored.Status.State = v1alpha1.WorkflowStateTimeout } @@ -164,9 +164,9 @@ func (c *Controller) processRunningWorkflow(_ context.Context, stored *v1alpha1. return reconcile.Result{} } -func (c *Controller) Register(_ context.Context, m manager.Manager) error { - return controllerruntime. - NewControllerManagedBy(m). +func (r *Reconciler) SetupWithManager(mgr manager.Manager) error { + return ctrl. + NewControllerManagedBy(mgr). For(&v1alpha1.Workflow{}). - Complete(c) + Complete(r) } diff --git a/internal/workflow/controller_test.go b/internal/workflow/reconciler_test.go similarity index 99% rename from internal/workflow/controller_test.go rename to internal/workflow/reconciler_test.go index 5b783ee3a..71a5375eb 100644 --- a/internal/workflow/controller_test.go +++ b/internal/workflow/reconciler_test.go @@ -905,9 +905,9 @@ tasks: if tc.seedWorkflow != nil { kc = kc.WithObjects(tc.seedWorkflow) } - controller := &Controller{ - kubeClient: kc.Build(), - nowFunc: TestTime.Now, + controller := &Reconciler{ + client: kc.Build(), + nowFunc: TestTime.Now, } t.Run(tc.name, func(t *testing.T) { @@ -929,7 +929,7 @@ tasks: // Don't return, also check the modified object } wflow := &v1alpha1.Workflow{} - err := controller.kubeClient.Get( + err := controller.client.Get( context.Background(), client.ObjectKey{Name: tc.wantWflow.Name, Namespace: tc.wantWflow.Namespace}, wflow)