Skip to content

Commit

Permalink
Refactor reconcilers and controller manager
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Doherty <[email protected]>
  • Loading branch information
chrisdoherty4 committed Mar 20, 2023
1 parent 5126bd0 commit acdffd3
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 482 deletions.
72 changes: 51 additions & 21 deletions cmd/tink-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ import (
"os"
"strings"

"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"github.com/packethost/pkg/log"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/tinkerbell/tink/internal/controller"
"github.com/tinkerbell/tink/internal/workflow"
"go.uber.org/zap"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
ctrl "sigs.k8s.io/controller-runtime"
)

// version is set at build time.
Expand All @@ -22,41 +25,64 @@ var version = "devel"
// DaemonConfig represents all the values you can configure as part of the tink-server.
// You can change the configuration via environment variable, or file, or command flags.
type DaemonConfig struct {
K8sAPI string
Kubeconfig string // only applies to out of cluster
K8sAPI string
Kubeconfig string // only applies to out of cluster
MetricsAddr string
ProbeAddr string
EnableLeaderElection bool
}

func (c *DaemonConfig) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&c.K8sAPI, "kubernetes", "", "The Kubernetes API URL, used for in-cluster client construction.")
fs.StringVar(&c.K8sAPI, "kubernetes", "",
"The Kubernetes API URL, used for in-cluster client construction.")
fs.StringVar(&c.Kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file")
fs.StringVar(&c.MetricsAddr, "metrics-bind-address", ":8080",
"The address the metric endpoint binds to.")
fs.StringVar(&c.ProbeAddr, "health-probe-bind-address", ":8081",
"The address the probe endpoint binds to.")
fs.BoolVar(&c.EnableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
}

func main() {
// Init the packet logger as its used throughout the codebase.
logger, err := log.Init("github.com/tinkerbell/tink")
if err != nil {
panic(err)
}
defer logger.Close()

config := &DaemonConfig{}

cmd := NewRootCommand(config, logger)
cmd := NewRootCommand()
if err := cmd.ExecuteContext(context.Background()); err != nil {
defer os.Exit(1)
fmt.Fprint(os.Stderr, err.Error())
os.Exit(1)
}
}

func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command {
func NewRootCommand() *cobra.Command {
config := &DaemonConfig{}
zapLogger, err := zap.NewProduction()
if err != nil {
panic(err)
}
logger := zapr.NewLogger(zapLogger)

cmd := &cobra.Command{
Use: "tink-controller",
PreRunE: func(cmd *cobra.Command, args []string) error {
viper, err := createViper(logger)
if err != nil {
return err
return fmt.Errorf("config init: %w", err)
}
return applyViper(viper, cmd)
},
RunE: func(cmd *cobra.Command, args []string) error {
logger, err := zap.NewProduction()
if err != nil {
return fmt.Errorf("init logger: %w", err)
}

logger.Info("starting controller version " + version)

ccfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
Expand All @@ -72,24 +98,29 @@ func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command {
if err != nil {
return err
}
options := controller.GetControllerOptions()
options.LeaderElectionNamespace = namespace

options := ctrl.Options{
Logger: zapr.NewLogger(logger),
LeaderElection: true,
LeaderElectionID: "tink.tinkerbell.org",
LeaderElectionNamespace: namespace,
MetricsBindAddress: config.MetricsAddr,
HealthProbeBindAddress: config.ProbeAddr,
}

manager, err := controller.NewManager(cfg, options)
if err != nil {
return err
return fmt.Errorf("controller manager: %w", err)
}

return manager.RegisterControllers(
cmd.Context(),
workflow.NewController(manager.GetClient()),
).Start(cmd.Context())
return manager.Start(cmd.Context())
},
}
config.AddFlags(cmd.Flags())
return cmd
}

func createViper(logger log.Logger) (*viper.Viper, error) {
func createViper(logger logr.Logger) (*viper.Viper, error) {
v := viper.New()
v.AutomaticEnv()
v.SetConfigName("tink-controller")
Expand All @@ -100,12 +131,11 @@ func createViper(logger log.Logger) (*viper.Viper, error) {
// If a config file is found, read it in.
if err := v.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
logger.With("configFile", v.ConfigFileUsed()).Error(err, "could not load config file")
return nil, err
return nil, fmt.Errorf("loading config file: %w", err)
}
logger.Info("no config file found")
} else {
logger.With("configFile", v.ConfigFileUsed()).Info("loaded config file")
logger.Info("loaded config file", "configFile", v.ConfigFileUsed())
}

return v, nil
Expand Down
221 changes: 25 additions & 196 deletions internal/controller/manager.go
Original file line number Diff line number Diff line change
@@ -1,225 +1,54 @@
package controller

import (
"context"
"fmt"

"github.com/go-logr/zapr"
"github.com/tinkerbell/tink/api/v1alpha1"
"github.com/tinkerbell/tink/internal/workflow"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"knative.dev/pkg/logging"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

const (
WorkflowWorkerAddrIndex = ".status.tasks.workerAddr"
WorkflowWorkerNonTerminalStateIndex = ".status.state.nonTerminalWorker"
WorkflowStateIndex = ".status.state"
HardwareMACAddrIndex = ".spec.interfaces.dhcp.mac"
HardwareIPAddrIndex = ".spec.interfaces.dhcp.ip"
var schemeBuilder = runtime.NewSchemeBuilder(
clientgoscheme.AddToScheme,
v1alpha1.AddToScheme,
)

var (
runtimescheme = runtime.NewScheme()
options = Options{}
)

func init() {
_ = clientgoscheme.AddToScheme(runtimescheme)
_ = v1alpha1.AddToScheme(runtimescheme)
}

// Options for running this binary.
type Options struct {
MetricsPort int
HealthProbePort int
}

// GetControllerOptions returns a set of options used by the Tink controller.
// These options include leader election enabled.
func GetControllerOptions() controllerruntime.Options {
return controllerruntime.Options{
Logger: zapr.NewLogger(logging.FromContext(context.Background()).Desugar()),
LeaderElection: true,
LeaderElectionID: "tink-leader-election",
Scheme: runtimescheme,
MetricsBindAddress: fmt.Sprintf(":%d", options.MetricsPort),
HealthProbeBindAddress: fmt.Sprintf(":%d", options.HealthProbePort),
}
// DefaultScheme returns a default scheme for the manager that includes Go client and Tink API
// schemes.
func DefaultScheme() *runtime.Scheme {
scheme := runtime.NewScheme()
_ = schemeBuilder.AddToScheme(scheme)
return scheme
}

// GetNamespacedControllerOptions returns a set of options used by the Tink controller.
// These options include leader election enabled.
func GetNamespacedControllerOptions(namespace string) controllerruntime.Options {
opts := GetControllerOptions()
opts.Namespace = namespace
return opts
}

// GetServerOptions returns a set of options used by the Tink API.
// These options include leader election disabled.
func GetServerOptions() controllerruntime.Options {
return controllerruntime.Options{
Logger: zapr.NewLogger(logging.FromContext(context.Background()).Desugar()),
LeaderElection: false,
Scheme: runtimescheme,
MetricsBindAddress: fmt.Sprintf(":%d", options.MetricsPort),
HealthProbeBindAddress: fmt.Sprintf(":%d", options.HealthProbePort),
// NewManager creates a controller-runtime manager with all controllers necessary for Tink
// pre-registered. If opts.Scheme is nil, DefaultScheme() is used.
func NewManager(config *rest.Config, opts ctrl.Options) (ctrl.Manager, error) {
if opts.Scheme == nil {
opts.Scheme = DefaultScheme()
}
}

// NewManagerOrDie instantiates a controller manager.
func NewManagerOrDie(config *rest.Config, options controllerruntime.Options) Manager {
m, err := NewManager(config, options)
if err != nil {
panic(err)
}
return m
}

// NewManager instantiates a controller manager.
func NewManager(config *rest.Config, options controllerruntime.Options) (Manager, error) {
m, err := controllerruntime.NewManager(config, options)
mgr, err := ctrl.NewManager(config, opts)
if err != nil {
return nil, err
}
indexers := []struct {
obj client.Object
field string
extractValue client.IndexerFunc
}{
{
&v1alpha1.Workflow{},
WorkflowWorkerAddrIndex,
WorkflowWorkerAddrIndexFunc,
},
{
&v1alpha1.Workflow{},
WorkflowWorkerNonTerminalStateIndex,
WorkflowWorkerNonTerminalStateIndexFunc,
},
{
&v1alpha1.Workflow{},
WorkflowStateIndex,
WorkflowStateIndexFunc,
},
{
&v1alpha1.Hardware{},
HardwareIPAddrIndex,
HardwareIPIndexFunc,
},
{
&v1alpha1.Hardware{},
HardwareMACAddrIndex,
HardwareMacIndexFunc,
},
}
for _, indexer := range indexers {
if err := m.GetFieldIndexer().IndexField(
context.Background(),
indexer.obj,
indexer.field,
indexer.extractValue,
); err != nil {
return nil, fmt.Errorf("failed to setup %s indexer, %w", indexer.field, err)
}
}
return &GenericControllerManager{Manager: m}, nil
}

// GenericControllerManager is a manager.Manager that allows for registering of controllers.
type GenericControllerManager struct {
manager.Manager
}

// RegisterControllers registers a set of controllers to the controller manager.
func (m *GenericControllerManager) RegisterControllers(ctx context.Context, controllers ...Controller) Manager {
for _, c := range controllers {
if err := c.Register(ctx, m); err != nil {
panic(err)
}
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
return nil, fmt.Errorf("set up health check: %w", err)
}
if err := m.AddHealthzCheck("healthz", healthz.Ping); err != nil {
panic(fmt.Sprintf("Failed to add readiness probe, %s", err.Error()))
}
return m
}

// WorkflowWorkerAddrIndexFunc func returns a list of worker addresses from a workflow.
func WorkflowWorkerAddrIndexFunc(obj client.Object) []string {
wf, ok := obj.(*v1alpha1.Workflow)
if !ok {
return nil
}
resp := []string{}
for _, task := range wf.Status.Tasks {
if task.WorkerAddr != "" {
resp = append(resp, task.WorkerAddr)
}
}
return resp
}

// WorkflowWorkerNonTerminalStateIndexFunc func returns a list of worker addresses for workflows
// in a running or pending state.
func WorkflowWorkerNonTerminalStateIndexFunc(obj client.Object) []string {
wf, ok := obj.(*v1alpha1.Workflow)
if !ok {
return nil
}

resp := []string{}
if !(wf.Status.State == v1alpha1.WorkflowStateRunning || wf.Status.State == v1alpha1.WorkflowStatePending) {
return resp
}
for _, task := range wf.Status.Tasks {
if task.WorkerAddr != "" {
resp = append(resp, task.WorkerAddr)
}
}
return resp
}

// WorkflowStateIndexFunc func returns the workflow state.
func WorkflowStateIndexFunc(obj client.Object) []string {
wf, ok := obj.(*v1alpha1.Workflow)
if !ok {
return nil
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
return nil, fmt.Errorf("set up ready check: %w", err)
}
return []string{string(wf.Status.State)}
}

// HardwareMacIndexFunc returns a list of mac addresses from a hardware.
func HardwareMacIndexFunc(obj client.Object) []string {
hw, ok := obj.(*v1alpha1.Hardware)
if !ok {
return nil
}
resp := []string{}
for _, iface := range hw.Spec.Interfaces {
if iface.DHCP != nil && iface.DHCP.MAC != "" {
resp = append(resp, iface.DHCP.MAC)
}
err = workflow.NewReconciler(mgr.GetClient()).SetupWithManager(mgr)
if err != nil {
return nil, fmt.Errorf("setup workflow reconciler: %w", err)
}
return resp
}

// HardwareIPIndexFunc returns a list of IP addresses from a hardware.
func HardwareIPIndexFunc(obj client.Object) []string {
hw, ok := obj.(*v1alpha1.Hardware)
if !ok {
return nil
}
resp := []string{}
for _, iface := range hw.Spec.Interfaces {
if iface.DHCP != nil && iface.DHCP.IP != nil && iface.DHCP.IP.Address != "" {
resp = append(resp, iface.DHCP.IP.Address)
}
}
return resp
return mgr, nil
}
Loading

0 comments on commit acdffd3

Please sign in to comment.