Skip to content

Commit

Permalink
Add kubernetes-based backend (#250)
Browse files Browse the repository at this point in the history
Signed-off-by: Micah Hausler <[email protected]>

## Description

The new backend uses the Kubernetes CRD types in Tinkerbell core. Boots could use a bit of a refactor/organization in how the backends work, but for now I've tried to fit in to the existing structure. 

## Why is this needed

This is the boots side of work for tinkerbell/proposals#46

Fixes: #

## How Has This Been Tested?

Tested with real bare metal workers using an image built from this branch. I will add E2E tests in a follow up PR, or can add to this branch

## How are existing users impacted? What migration steps/scripts do we need?

No impact to existing users

## Checklist:

I have:

- [ ] updated the documentation and/or roadmap (if required)
- [ ] added unit or e2e tests
- [ ] provided instructions on how to upgrade
  • Loading branch information
mergify[bot] authored May 5, 2022
2 parents 1138ed3 + 4d162dd commit 3f3b97c
Show file tree
Hide file tree
Showing 12 changed files with 901 additions and 43 deletions.
135 changes: 135 additions & 0 deletions client/kubernetes/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package kubernetes

import (
"context"
"fmt"

"github.com/tinkerbell/tink/pkg/apis/core/v1alpha1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
)

const (
WorkflowWorkerNonTerminalStateIndex = ".status.state.nonTerminalWorker"
HardwareMACAddrIndex = ".spec.interfaces.dhcp.mac"
HardwareIPAddrIndex = ".spec.interfaces.dhcp.ip"
)

// NewCluster returns a controller-runtime cluster.Cluster with the Tinkerbell runtime
// scheme registered, and indexers for:
// * Hardware by MAC address
// * Hardware by IP address
// * Workflows by worker address
//
// Callers must instantiate the client-side cache by calling Start() before use.
func NewCluster(config *rest.Config) (cluster.Cluster, error) {
runtimescheme := runtime.NewScheme()

err := clientgoscheme.AddToScheme(runtimescheme)
if err != nil {
return nil, err
}

err = v1alpha1.AddToScheme(runtimescheme)
if err != nil {
return nil, err
}

c, err := cluster.New(config, func(o *cluster.Options) {
o.Scheme = runtimescheme
})

if err != nil {
return nil, err
}
indexers := []struct {
obj client.Object
field string
extractValue client.IndexerFunc
}{
{
&v1alpha1.Workflow{},
WorkflowWorkerNonTerminalStateIndex,
workflowWorkerNonTerminalStateIndexFunc,
},
{
&v1alpha1.Hardware{},
HardwareIPAddrIndex,
hardwareIPIndexFunc,
},
{
&v1alpha1.Hardware{},
HardwareMACAddrIndex,
hardwareMacIndexFunc,
},
}
for _, indexer := range indexers {
if err := c.GetFieldIndexer().IndexField(
context.Background(),
indexer.obj,
indexer.field,
indexer.extractValue,
); err != nil {
return nil, fmt.Errorf("failed to setup %s indexer, %w", indexer.field, err)
}
}

return c, nil
}

// TODO micahhausler: make the following index functions public in tinkerbell/tink import from there

// workflowWorkerNonTerminalStateIndexFunc func indexes workflow by worker for non terminal workflows.
func workflowWorkerNonTerminalStateIndexFunc(obj client.Object) []string {
wf, ok := obj.(*v1alpha1.Workflow)
if !ok {
return nil
}

resp := []string{}
if !(wf.Status.State == v1alpha1.WorkflowStateRunning || wf.Status.State == v1alpha1.WorkflowStatePending) {
return resp
}
for _, task := range wf.Status.Tasks {
if task.WorkerAddr != "" {
resp = append(resp, task.WorkerAddr)
}
}

return resp
}

// hardwareMacIndexFunc returns a list of mac addresses from a hardware.
func hardwareMacIndexFunc(obj client.Object) []string {
hw, ok := obj.(*v1alpha1.Hardware)
if !ok {
return nil
}
resp := []string{}
for _, iface := range hw.Spec.Interfaces {
if iface.DHCP != nil && iface.DHCP.MAC != "" {
resp = append(resp, iface.DHCP.MAC)
}
}

return resp
}

// hardwareIPIndexFunc returns a list of mac addresses from a hardware.
func hardwareIPIndexFunc(obj client.Object) []string {
hw, ok := obj.(*v1alpha1.Hardware)
if !ok {
return nil
}
resp := []string{}
for _, iface := range hw.Spec.Interfaces {
if iface.DHCP != nil && iface.DHCP.IP != nil && iface.DHCP.IP.Address != "" {
resp = append(resp, iface.DHCP.IP.Address)
}
}

return resp
}
137 changes: 137 additions & 0 deletions client/kubernetes/hardware_finder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package kubernetes

import (
"context"
"net"

"github.com/packethost/pkg/log"
"github.com/pkg/errors"
"github.com/tinkerbell/boots/client"
"github.com/tinkerbell/tink/pkg/apis/core/v1alpha1"
"github.com/tinkerbell/tink/pkg/controllers"
"github.com/tinkerbell/tink/pkg/convert"
"github.com/tinkerbell/tink/protos/workflow"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
crclient "sigs.k8s.io/controller-runtime/pkg/client"
)

// Finder is a type that looks up hardware and workflows from Kubernetes
type Finder struct {
clientFunc func() crclient.Client
cacheStarter func(context.Context) error
logger log.Logger
namespace string
}

// NewFinder returns a HardwareFinder that discovers hardware from Kubernetes.
//
// Callers must instantiate the client-side cache by calling Start() before use.
func NewFinder(logger log.Logger, k8sAPI, kubeconfig string) (*Finder, error) {
ccfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: k8sAPI}})

config, err := ccfg.ClientConfig()
if err != nil {
return nil, err
}

namespace, _, err := ccfg.Namespace()
if err != nil {
return nil, errors.WithStack(err)
}

cluster, err := NewCluster(config)
if err != nil {
return nil, errors.WithStack(err)
}

return &Finder{
clientFunc: cluster.GetClient,
cacheStarter: cluster.Start,
logger: logger,
namespace: namespace,
}, nil
}

// Start instantiates the client-side cache
func (f *Finder) Start(ctx context.Context) error {
return f.cacheStarter(ctx)
}

// ByIP returns a Discoverer for a particular IP.
func (f *Finder) ByIP(ctx context.Context, ip net.IP) (client.Discoverer, error) {
hardwareList := &v1alpha1.HardwareList{}

err := f.clientFunc().List(ctx, hardwareList, &crclient.MatchingFields{
controllers.HardwareIPAddrIndex: ip.String(),
})
if err != nil {
return nil, errors.Wrap(err, "failed listing hardware")
}

if len(hardwareList.Items) == 0 {
return nil, errors.New("no hardware found")
}

if len(hardwareList.Items) > 1 {
return nil, errors.Errorf("got %d hardware for ip %s, expected only 1", len(hardwareList.Items), ip)
}

return NewK8sDiscoverer(&hardwareList.Items[0]), nil
}

// ByMAC returns a Discoverer for a particular MAC address.
func (f *Finder) ByMAC(ctx context.Context, mac net.HardwareAddr, _ net.IP, _ string) (client.Discoverer, error) {
hardwareList := &v1alpha1.HardwareList{}

err := f.clientFunc().List(ctx, hardwareList, &crclient.MatchingFields{
controllers.HardwareMACAddrIndex: mac.String(),
})
if err != nil {
return nil, errors.Wrap(err, "failed listing hardware")
}

if len(hardwareList.Items) == 0 {
return nil, errors.New("no hardware found")
}

if len(hardwareList.Items) > 1 {
return nil, errors.Errorf("got %d hardware for mac %s, expected only 1", len(hardwareList.Items), mac)
}

return NewK8sDiscoverer(&hardwareList.Items[0]), nil
}

// HasActiveWorkflow finds if an active workflow exists for a particular hardware ID.
func (f *Finder) HasActiveWorkflow(ctx context.Context, hwID client.HardwareID) (bool, error) {
if hwID == "" {
return false, errors.New("missing hardware id")
}

stored := &v1alpha1.WorkflowList{}
err := f.clientFunc().List(ctx, stored, &crclient.MatchingFields{
controllers.WorkflowWorkerNonTerminalStateIndex: hwID.String(),
})
if err != nil {
return false, errors.Wrap(err, "failed to list workflows")
}

wfContexts := []*workflow.WorkflowContext{}
for _, wf := range stored.Items {
wfContexts = append(wfContexts, convert.WorkflowToWorkflowContext(&wf))
}

wcl := &workflow.WorkflowContextList{
WorkflowContexts: wfContexts,
}

for _, wf := range (*wcl).WorkflowContexts {
if wf.CurrentActionState == workflow.State_STATE_PENDING || wf.CurrentActionState == workflow.State_STATE_RUNNING {
return true, nil
}
}

return false, nil
}
Loading

0 comments on commit 3f3b97c

Please sign in to comment.