Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add running workflow for worker index to cache #599

Merged
merged 1 commit into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 95 additions & 5 deletions pkg/controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import (
)

const (
WorkerAddr = "status.tasks.workeraddr"
WorkflowWorkerAddrIndex = ".status.tasks.workerAddr"
WorkflowWorkerNonTerminalStateIndex = ".status.state.nonTerminalWorker"
WorkflowStateIndex = ".status.state"
HardwareMACAddrIndex = ".spec.interfaces.dhcp.mac"
HardwareIPAddrIndex = ".spec.interfaces.dhcp.ip"
)

var (
Expand Down Expand Up @@ -49,6 +53,14 @@ func GetControllerOptions() controllerruntime.Options {
}
}

// GetNamespacedControllerOptions returns a set of options used by the Tink controller.
// These options include leader election enabled.
func GetNamespacedControllerOptions(namespace string) controllerruntime.Options {
opts := GetControllerOptions()
opts.Namespace = namespace
return opts
}

// GetServerOptions returns a set of options used by the Tink API.
// These options include leader election disabled.
func GetServerOptions() controllerruntime.Options {
Expand Down Expand Up @@ -83,8 +95,28 @@ func NewManager(config *rest.Config, options controllerruntime.Options) (Manager
}{
{
&v1alpha1.Workflow{},
WorkerAddr,
wokerIndexFunc,
WorkflowWorkerAddrIndex,
workflowWorkerAddrIndexFunc,
},
{
&v1alpha1.Workflow{},
WorkflowWorkerNonTerminalStateIndex,
workflowWorkerNonTerminalStateIndexFunc,
},
{
&v1alpha1.Workflow{},
WorkflowStateIndex,
workflowStateIndexFunc,
},
{
&v1alpha1.Hardware{},
HardwareIPAddrIndex,
hardwareIPIndexFunc,
},
{
&v1alpha1.Hardware{},
HardwareMACAddrIndex,
hardwareMacIndexFunc,
},
}
for _, indexer := range indexers {
Expand Down Expand Up @@ -118,17 +150,75 @@ func (m *GenericControllerManager) RegisterControllers(ctx context.Context, cont
return m
}

// workerIndex func returns a list of worker addresses from a workflow.
func wokerIndexFunc(obj client.Object) []string {
// workflowWorkerAddrIndexFunc func returns a list of worker addresses from a workflow.
func workflowWorkerAddrIndexFunc(obj client.Object) []string {
wf, ok := obj.(*v1alpha1.Workflow)
if !ok {
return nil
}
resp := []string{}
for _, task := range wf.Status.Tasks {
if task.WorkerAddr != "" {
resp = append(resp, task.WorkerAddr)
}
}
return resp
}

// workflowWorkerNonTerminalStateIndexFunc func indexes workflow by worker for non terminal workflows.
func workflowWorkerNonTerminalStateIndexFunc(obj client.Object) []string {
wf, ok := obj.(*v1alpha1.Workflow)
if !ok {
return nil
}

resp := []string{}
if !(wf.Status.State == v1alpha1.WorkflowStateRunning || wf.Status.State == v1alpha1.WorkflowStatePending) {
return resp
}
for _, task := range wf.Status.Tasks {
if task.WorkerAddr != "" {
resp = append(resp, task.WorkerAddr)
}
}
return resp
}

// workflowStateIndexFunc func indexes workflow by worker for non terminal workflows.
func workflowStateIndexFunc(obj client.Object) []string {
wf, ok := obj.(*v1alpha1.Workflow)
if !ok {
return nil
}
return []string{string(wf.Status.State)}
}

// hardwareMacIndexFunc returns a list of mac addresses from a hardware.
func hardwareMacIndexFunc(obj client.Object) []string {
hw, ok := obj.(*v1alpha1.Hardware)
if !ok {
return nil
}
resp := []string{}
for _, iface := range hw.Spec.Interfaces {
if iface.DHCP != nil && iface.DHCP.MAC != "" {
resp = append(resp, iface.DHCP.MAC)
}
}
return resp
}

// hardwareIPIndexFunc returns a list of mac addresses from a hardware.
func hardwareIPIndexFunc(obj client.Object) []string {
hw, ok := obj.(*v1alpha1.Hardware)
if !ok {
return nil
}
resp := []string{}
for _, iface := range hw.Spec.Interfaces {
if iface.DHCP != nil && iface.DHCP.IP != nil && iface.DHCP.IP.Address != "" {
resp = append(resp, iface.DHCP.IP.Address)
}
}
return resp
}
216 changes: 216 additions & 0 deletions pkg/controllers/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package controllers

import (
"reflect"
"testing"

"github.com/tinkerbell/tink/pkg/apis/core/v1alpha1"

"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestWorkflowIndexFuncs(t *testing.T) {
cases := []struct {
name string
input client.Object
wantAddrs []string
wantStateAddrs []string
wantStates []string
}{
{
"non workflow",
&v1alpha1.Hardware{},
nil,
nil,
nil,
},
{
"empty workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "",
Tasks: []v1alpha1.Task{},
},
},
[]string{},
[]string{},
[]string{""},
},
{
"pending workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: v1alpha1.WorkflowStatePending,
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
},
},
},
[]string{"worker1"},
[]string{"worker1"},
[]string{string(v1alpha1.WorkflowStatePending)},
},
{
"running workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: v1alpha1.WorkflowStateRunning,
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
{
WorkerAddr: "worker2",
},
},
},
},
[]string{"worker1", "worker2"},
[]string{"worker1", "worker2"},
[]string{string(v1alpha1.WorkflowStateRunning)},
},
{
"complete workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: v1alpha1.WorkflowStateSuccess,
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
},
},
},
[]string{"worker1"},
[]string{},
[]string{string(v1alpha1.WorkflowStateSuccess)},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
gotAddr := workflowWorkerAddrIndexFunc(tc.input)
if !reflect.DeepEqual(tc.wantAddrs, gotAddr) {
t.Errorf("Unexpected wokerAddr response: wanted %#v, got %#v", tc.wantAddrs, gotAddr)
}
gotStateAddrs := workflowWorkerNonTerminalStateIndexFunc(tc.input)
if !reflect.DeepEqual(tc.wantStateAddrs, gotStateAddrs) {
t.Errorf("Unexpected non-terminating workflow response: wanted %#v, got %#v", tc.wantStateAddrs, gotStateAddrs)
}
gotStates := workflowStateIndexFunc(tc.input)
if !reflect.DeepEqual(tc.wantStates, gotStates) {
t.Errorf("Unexpected workflow state response: wanted %#v, got %#v", tc.wantStates, gotStates)
}
})
}
}

func TestHardwareIndexFunc(t *testing.T) {
cases := []struct {
name string
input client.Object
wantMac []string
wantIP []string
}{
{
"non hardware",
&v1alpha1.Workflow{},
nil,
nil,
},
{
"empty hardware dhcp",
&v1alpha1.Hardware{
Spec: v1alpha1.HardwareSpec{
Interfaces: []v1alpha1.Interface{
{
DHCP: nil,
},
},
},
},
[]string{},
[]string{},
},
{
"blank mac and ip",
&v1alpha1.Hardware{
Spec: v1alpha1.HardwareSpec{
Interfaces: []v1alpha1.Interface{
{
DHCP: &v1alpha1.DHCP{
MAC: "",
IP: &v1alpha1.IP{
Address: "",
},
},
},
},
},
},
[]string{},
[]string{},
},
{
"single ip and mac",
&v1alpha1.Hardware{
Spec: v1alpha1.HardwareSpec{
Interfaces: []v1alpha1.Interface{
{
DHCP: &v1alpha1.DHCP{
MAC: "3c:ec:ef:4c:4f:54",
IP: &v1alpha1.IP{
Address: "172.16.10.100",
},
},
},
},
},
},
[]string{"3c:ec:ef:4c:4f:54"},
[]string{"172.16.10.100"},
},
{
"double ip and mac",
&v1alpha1.Hardware{
Spec: v1alpha1.HardwareSpec{
Interfaces: []v1alpha1.Interface{
{
DHCP: &v1alpha1.DHCP{
MAC: "3c:ec:ef:4c:4f:54",
IP: &v1alpha1.IP{
Address: "172.16.10.100",
},
},
},
{
DHCP: &v1alpha1.DHCP{
MAC: "3c:ec:ef:4c:4f:55",
IP: &v1alpha1.IP{
Address: "172.16.10.101",
},
},
},
},
},
},
[]string{"3c:ec:ef:4c:4f:54", "3c:ec:ef:4c:4f:55"},
[]string{"172.16.10.100", "172.16.10.101"},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
gotMac := hardwareMacIndexFunc(tc.input)
if !reflect.DeepEqual(tc.wantMac, gotMac) {
t.Errorf("Unexpected response: wanted %#v, got %#v", tc.wantMac, gotMac)
}
gotIPs := hardwareIPIndexFunc(tc.input)
if !reflect.DeepEqual(tc.wantIP, gotIPs) {
t.Errorf("Unexpected response: wanted %#v, got %#v", tc.wantIP, gotIPs)
}
})
}
}
Loading