Skip to content

Commit

Permalink
Add running workflow for worker index to cache
Browse files Browse the repository at this point in the history
* Add tests on index functions
* Add constants for action states

Signed-off-by: Micah Hausler <[email protected]>
  • Loading branch information
micahhausler committed Apr 4, 2022
1 parent 07cf28c commit 90101b6
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 5 deletions.
100 changes: 95 additions & 5 deletions pkg/controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import (
)

const (
WorkerAddr = "status.tasks.workeraddr"
WorkflowWorkerAddrIndex = ".status.tasks.workerAddr"
WorkflowWorkerNonTerminalStateIndex = ".status.state.nonTerminalWorker"
WorkflowStateIndex = ".status.state"
HardwareMACAddrIndex = ".spec.interfaces.dhcp.mac"
HardwareIPAddrIndex = ".spec.interfaces.dhcp.ip"
)

var (
Expand Down Expand Up @@ -49,6 +53,14 @@ func GetControllerOptions() controllerruntime.Options {
}
}

// GetNamespacedControllerOptions returns a set of options used by the Tink controller.
// These options include leader election enabled.
func GetNamespacedControllerOptions(namespace string) controllerruntime.Options {
opts := GetControllerOptions()
opts.Namespace = namespace
return opts
}

// GetServerOptions returns a set of options used by the Tink API.
// These options include leader election disabled.
func GetServerOptions() controllerruntime.Options {
Expand Down Expand Up @@ -83,8 +95,28 @@ func NewManager(config *rest.Config, options controllerruntime.Options) (Manager
}{
{
&v1alpha1.Workflow{},
WorkerAddr,
wokerIndexFunc,
WorkflowWorkerAddrIndex,
workflowWorkerAddrIndexFunc,
},
{
&v1alpha1.Workflow{},
WorkflowWorkerNonTerminalStateIndex,
workflowWorkerNonTerminalStateIndexFunc,
},
{
&v1alpha1.Workflow{},
WorkflowStateIndex,
workflowStateIndexFunc,
},
{
&v1alpha1.Hardware{},
HardwareIPAddrIndex,
hardwareIPIndexFunc,
},
{
&v1alpha1.Hardware{},
HardwareMACAddrIndex,
hardwareMacIndexFunc,
},
}
for _, indexer := range indexers {
Expand Down Expand Up @@ -118,17 +150,75 @@ func (m *GenericControllerManager) RegisterControllers(ctx context.Context, cont
return m
}

// workerIndex func returns a list of worker addresses from a workflow.
func wokerIndexFunc(obj client.Object) []string {
// workflowWorkerAddrIndexFunc func returns a list of worker addresses from a workflow.
func workflowWorkerAddrIndexFunc(obj client.Object) []string {
wf, ok := obj.(*v1alpha1.Workflow)
if !ok {
return nil
}
resp := []string{}
for _, task := range wf.Status.Tasks {
if task.WorkerAddr != "" {
resp = append(resp, task.WorkerAddr)
}
}
return resp
}

// workflowWorkerNonTerminalStateIndexFunc func indexes workflow by worker for non terminal workflows.
func workflowWorkerNonTerminalStateIndexFunc(obj client.Object) []string {
wf, ok := obj.(*v1alpha1.Workflow)
if !ok {
return nil
}

resp := []string{}
if !(wf.Status.State == "STATE_RUNNING" || wf.Status.State == "STATE_PENDING") {
return resp
}
for _, task := range wf.Status.Tasks {
if task.WorkerAddr != "" {
resp = append(resp, task.WorkerAddr)
}
}
return resp
}

// workflowStateIndexFunc func indexes workflow by worker for non terminal workflows.
func workflowStateIndexFunc(obj client.Object) []string {
wf, ok := obj.(*v1alpha1.Workflow)
if !ok {
return nil
}
return []string{wf.Status.State}
}

// hardwareMacIndexFunc returns a list of mac addresses from a hardware.
func hardwareMacIndexFunc(obj client.Object) []string {
hw, ok := obj.(*v1alpha1.Hardware)
if !ok {
return nil
}
resp := []string{}
for _, iface := range hw.Spec.Interfaces {
if iface.DHCP != nil && iface.DHCP.MAC != "" {
resp = append(resp, iface.DHCP.MAC)
}
}
return resp
}

// hardwareIPIndexFunc returns a list of mac addresses from a hardware.
func hardwareIPIndexFunc(obj client.Object) []string {
hw, ok := obj.(*v1alpha1.Hardware)
if !ok {
return nil
}
resp := []string{}
for _, iface := range hw.Spec.Interfaces {
if iface.DHCP != nil && iface.DHCP.IP != nil && iface.DHCP.IP.Address != "" {
resp = append(resp, iface.DHCP.IP.Address)
}
}
return resp
}
216 changes: 216 additions & 0 deletions pkg/controllers/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package controllers

import (
"reflect"
"testing"

"github.com/tinkerbell/tink/pkg/apis/core/v1alpha1"

"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestWorkflowIndexFuncs(t *testing.T) {
cases := []struct {
name string
input client.Object
wantAddrs []string
wantStateAddrs []string
wantStates []string
}{
{
"non workflow",
&v1alpha1.Hardware{},
nil,
nil,
nil,
},
{
"empty workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "",
Tasks: []v1alpha1.Task{},
},
},
[]string{},
[]string{},
[]string{""},
},
{
"pending workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "STATE_PENDING",
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
},
},
},
[]string{"worker1"},
[]string{"worker1"},
[]string{"STATE_PENDING"},
},
{
"running workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "STATE_RUNNING",
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
{
WorkerAddr: "worker2",
},
},
},
},
[]string{"worker1", "worker2"},
[]string{"worker1", "worker2"},
[]string{"STATE_RUNNING"},
},
{
"complete workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "STATE_SUCCESS",
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
},
},
},
[]string{"worker1"},
[]string{},
[]string{"STATE_SUCCESS"},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
gotAddr := workflowWorkerAddrIndexFunc(tc.input)
if !reflect.DeepEqual(tc.wantAddrs, gotAddr) {
t.Errorf("Unexpected wokerAddr response: wanted %#v, got %#v", tc.wantAddrs, gotAddr)
}
gotStateAddrs := workflowWorkerNonTerminalStateIndexFunc(tc.input)
if !reflect.DeepEqual(tc.wantStateAddrs, gotStateAddrs) {
t.Errorf("Unexpected non-terminating workflow response: wanted %#v, got %#v", tc.wantStateAddrs, gotStateAddrs)
}
gotStates := workflowStateIndexFunc(tc.input)
if !reflect.DeepEqual(tc.wantStates, gotStates) {
t.Errorf("Unexpected workflow state response: wanted %#v, got %#v", tc.wantStates, gotStates)
}
})
}
}

func TestHardwareIndexFunc(t *testing.T) {
cases := []struct {
name string
input client.Object
wantMac []string
wantIP []string
}{
{
"non hardware",
&v1alpha1.Workflow{},
nil,
nil,
},
{
"empty hardware dhcp",
&v1alpha1.Hardware{
Spec: v1alpha1.HardwareSpec{
Interfaces: []v1alpha1.Interface{
{
DHCP: nil,
},
},
},
},
[]string{},
[]string{},
},
{
"blank mac and ip",
&v1alpha1.Hardware{
Spec: v1alpha1.HardwareSpec{
Interfaces: []v1alpha1.Interface{
{
DHCP: &v1alpha1.DHCP{
MAC: "",
IP: &v1alpha1.IP{
Address: "",
},
},
},
},
},
},
[]string{},
[]string{},
},
{
"single ip and mac",
&v1alpha1.Hardware{
Spec: v1alpha1.HardwareSpec{
Interfaces: []v1alpha1.Interface{
{
DHCP: &v1alpha1.DHCP{
MAC: "3c:ec:ef:4c:4f:54",
IP: &v1alpha1.IP{
Address: "172.16.10.100",
},
},
},
},
},
},
[]string{"3c:ec:ef:4c:4f:54"},
[]string{"172.16.10.100"},
},
{
"double ip and mac",
&v1alpha1.Hardware{
Spec: v1alpha1.HardwareSpec{
Interfaces: []v1alpha1.Interface{
{
DHCP: &v1alpha1.DHCP{
MAC: "3c:ec:ef:4c:4f:54",
IP: &v1alpha1.IP{
Address: "172.16.10.100",
},
},
},
{
DHCP: &v1alpha1.DHCP{
MAC: "3c:ec:ef:4c:4f:55",
IP: &v1alpha1.IP{
Address: "172.16.10.101",
},
},
},
},
},
},
[]string{"3c:ec:ef:4c:4f:54", "3c:ec:ef:4c:4f:55"},
[]string{"172.16.10.100", "172.16.10.101"},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
gotMac := hardwareMacIndexFunc(tc.input)
if !reflect.DeepEqual(tc.wantMac, gotMac) {
t.Errorf("Unexpected response: wanted %#v, got %#v", tc.wantMac, gotMac)
}
gotIPs := hardwareIPIndexFunc(tc.input)
if !reflect.DeepEqual(tc.wantIP, gotIPs) {
t.Errorf("Unexpected response: wanted %#v, got %#v", tc.wantIP, gotIPs)
}
})
}
}

0 comments on commit 90101b6

Please sign in to comment.