From eb40daeb24db63d3f3a33e16a86adfafb1e6e3e7 Mon Sep 17 00:00:00 2001 From: Micah Hausler Date: Tue, 22 Mar 2022 18:31:34 +0000 Subject: [PATCH] Add running workflow for worker index to cache * Add tests on index functions * Add constants for action states Signed-off-by: Micah Hausler --- pkg/controllers/manager.go | 100 ++++++++- pkg/controllers/manager_test.go | 216 ++++++++++++++++++++ pkg/controllers/workflow/controller_test.go | 16 +- 3 files changed, 319 insertions(+), 13 deletions(-) create mode 100644 pkg/controllers/manager_test.go diff --git a/pkg/controllers/manager.go b/pkg/controllers/manager.go index 9571c4c17..cbc4729f3 100644 --- a/pkg/controllers/manager.go +++ b/pkg/controllers/manager.go @@ -17,7 +17,11 @@ import ( ) const ( - WorkerAddr = "status.tasks.workeraddr" + WorkflowWorkerAddrIndex = ".status.tasks.workerAddr" + WorkflowWorkerNonTerminalStateIndex = ".status.state.nonTerminalWorker" + WorkflowStateIndex = ".status.state" + HardwareMACAddrIndex = ".spec.interfaces.dhcp.mac" + HardwareIPAddrIndex = ".spec.interfaces.dhcp.ip" ) var ( @@ -49,6 +53,14 @@ func GetControllerOptions() controllerruntime.Options { } } +// GetNamespacedControllerOptions returns a set of options used by the Tink controller. +// These options include leader election enabled. +func GetNamespacedControllerOptions(namespace string) controllerruntime.Options { + opts := GetControllerOptions() + opts.Namespace = namespace + return opts +} + // GetServerOptions returns a set of options used by the Tink API. // These options include leader election disabled. func GetServerOptions() controllerruntime.Options { @@ -83,8 +95,28 @@ func NewManager(config *rest.Config, options controllerruntime.Options) (Manager }{ { &v1alpha1.Workflow{}, - WorkerAddr, - wokerIndexFunc, + WorkflowWorkerAddrIndex, + workflowWorkerAddrIndexFunc, + }, + { + &v1alpha1.Workflow{}, + WorkflowWorkerNonTerminalStateIndex, + workflowWorkerNonTerminalStateIndexFunc, + }, + { + &v1alpha1.Workflow{}, + WorkflowStateIndex, + workflowStateIndexFunc, + }, + { + &v1alpha1.Hardware{}, + HardwareIPAddrIndex, + hardwareIPIndexFunc, + }, + { + &v1alpha1.Hardware{}, + HardwareMACAddrIndex, + hardwareMacIndexFunc, }, } for _, indexer := range indexers { @@ -118,13 +150,32 @@ func (m *GenericControllerManager) RegisterControllers(ctx context.Context, cont return m } -// workerIndex func returns a list of worker addresses from a workflow. -func wokerIndexFunc(obj client.Object) []string { +// workflowWorkerAddrIndexFunc func returns a list of worker addresses from a workflow. +func workflowWorkerAddrIndexFunc(obj client.Object) []string { + wf, ok := obj.(*v1alpha1.Workflow) + if !ok { + return nil + } + resp := []string{} + for _, task := range wf.Status.Tasks { + if task.WorkerAddr != "" { + resp = append(resp, task.WorkerAddr) + } + } + return resp +} + +// workflowWorkerNonTerminalStateIndexFunc func indexes workflow by worker for non terminal workflows. +func workflowWorkerNonTerminalStateIndexFunc(obj client.Object) []string { wf, ok := obj.(*v1alpha1.Workflow) if !ok { return nil } + resp := []string{} + if !(wf.Status.State == v1alpha1.WorkflowStateRunning || wf.Status.State == v1alpha1.WorkflowStatePending) { + return resp + } for _, task := range wf.Status.Tasks { if task.WorkerAddr != "" { resp = append(resp, task.WorkerAddr) @@ -132,3 +183,42 @@ func wokerIndexFunc(obj client.Object) []string { } return resp } + +// workflowStateIndexFunc func indexes workflow by worker for non terminal workflows. +func workflowStateIndexFunc(obj client.Object) []string { + wf, ok := obj.(*v1alpha1.Workflow) + if !ok { + return nil + } + return []string{string(wf.Status.State)} +} + +// hardwareMacIndexFunc returns a list of mac addresses from a hardware. +func hardwareMacIndexFunc(obj client.Object) []string { + hw, ok := obj.(*v1alpha1.Hardware) + if !ok { + return nil + } + resp := []string{} + for _, iface := range hw.Spec.Interfaces { + if iface.DHCP != nil && iface.DHCP.MAC != "" { + resp = append(resp, iface.DHCP.MAC) + } + } + return resp +} + +// hardwareIPIndexFunc returns a list of mac addresses from a hardware. +func hardwareIPIndexFunc(obj client.Object) []string { + hw, ok := obj.(*v1alpha1.Hardware) + if !ok { + return nil + } + resp := []string{} + for _, iface := range hw.Spec.Interfaces { + if iface.DHCP != nil && iface.DHCP.IP != nil && iface.DHCP.IP.Address != "" { + resp = append(resp, iface.DHCP.IP.Address) + } + } + return resp +} diff --git a/pkg/controllers/manager_test.go b/pkg/controllers/manager_test.go new file mode 100644 index 000000000..594ad4407 --- /dev/null +++ b/pkg/controllers/manager_test.go @@ -0,0 +1,216 @@ +package controllers + +import ( + "reflect" + "testing" + + "github.com/tinkerbell/tink/pkg/apis/core/v1alpha1" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestWorkflowIndexFuncs(t *testing.T) { + cases := []struct { + name string + input client.Object + wantAddrs []string + wantStateAddrs []string + wantStates []string + }{ + { + "non workflow", + &v1alpha1.Hardware{}, + nil, + nil, + nil, + }, + { + "empty workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: "", + Tasks: []v1alpha1.Task{}, + }, + }, + []string{}, + []string{}, + []string{""}, + }, + { + "pending workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: v1alpha1.WorkflowStatePending, + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + }, + }, + }, + []string{"worker1"}, + []string{"worker1"}, + []string{string(v1alpha1.WorkflowStatePending)}, + }, + { + "running workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: v1alpha1.WorkflowStateRunning, + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + { + WorkerAddr: "worker2", + }, + }, + }, + }, + []string{"worker1", "worker2"}, + []string{"worker1", "worker2"}, + []string{string(v1alpha1.WorkflowStateRunning)}, + }, + { + "complete workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: v1alpha1.WorkflowStateSuccess, + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + }, + }, + }, + []string{"worker1"}, + []string{}, + []string{string(v1alpha1.WorkflowStateSuccess)}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + gotAddr := workflowWorkerAddrIndexFunc(tc.input) + if !reflect.DeepEqual(tc.wantAddrs, gotAddr) { + t.Errorf("Unexpected wokerAddr response: wanted %#v, got %#v", tc.wantAddrs, gotAddr) + } + gotStateAddrs := workflowWorkerNonTerminalStateIndexFunc(tc.input) + if !reflect.DeepEqual(tc.wantStateAddrs, gotStateAddrs) { + t.Errorf("Unexpected non-terminating workflow response: wanted %#v, got %#v", tc.wantStateAddrs, gotStateAddrs) + } + gotStates := workflowStateIndexFunc(tc.input) + if !reflect.DeepEqual(tc.wantStates, gotStates) { + t.Errorf("Unexpected workflow state response: wanted %#v, got %#v", tc.wantStates, gotStates) + } + }) + } +} + +func TestHardwareIndexFunc(t *testing.T) { + cases := []struct { + name string + input client.Object + wantMac []string + wantIP []string + }{ + { + "non hardware", + &v1alpha1.Workflow{}, + nil, + nil, + }, + { + "empty hardware dhcp", + &v1alpha1.Hardware{ + Spec: v1alpha1.HardwareSpec{ + Interfaces: []v1alpha1.Interface{ + { + DHCP: nil, + }, + }, + }, + }, + []string{}, + []string{}, + }, + { + "blank mac and ip", + &v1alpha1.Hardware{ + Spec: v1alpha1.HardwareSpec{ + Interfaces: []v1alpha1.Interface{ + { + DHCP: &v1alpha1.DHCP{ + MAC: "", + IP: &v1alpha1.IP{ + Address: "", + }, + }, + }, + }, + }, + }, + []string{}, + []string{}, + }, + { + "single ip and mac", + &v1alpha1.Hardware{ + Spec: v1alpha1.HardwareSpec{ + Interfaces: []v1alpha1.Interface{ + { + DHCP: &v1alpha1.DHCP{ + MAC: "3c:ec:ef:4c:4f:54", + IP: &v1alpha1.IP{ + Address: "172.16.10.100", + }, + }, + }, + }, + }, + }, + []string{"3c:ec:ef:4c:4f:54"}, + []string{"172.16.10.100"}, + }, + { + "double ip and mac", + &v1alpha1.Hardware{ + Spec: v1alpha1.HardwareSpec{ + Interfaces: []v1alpha1.Interface{ + { + DHCP: &v1alpha1.DHCP{ + MAC: "3c:ec:ef:4c:4f:54", + IP: &v1alpha1.IP{ + Address: "172.16.10.100", + }, + }, + }, + { + DHCP: &v1alpha1.DHCP{ + MAC: "3c:ec:ef:4c:4f:55", + IP: &v1alpha1.IP{ + Address: "172.16.10.101", + }, + }, + }, + }, + }, + }, + []string{"3c:ec:ef:4c:4f:54", "3c:ec:ef:4c:4f:55"}, + []string{"172.16.10.100", "172.16.10.101"}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + gotMac := hardwareMacIndexFunc(tc.input) + if !reflect.DeepEqual(tc.wantMac, gotMac) { + t.Errorf("Unexpected response: wanted %#v, got %#v", tc.wantMac, gotMac) + } + gotIPs := hardwareIPIndexFunc(tc.input) + if !reflect.DeepEqual(tc.wantIP, gotIPs) { + t.Errorf("Unexpected response: wanted %#v, got %#v", tc.wantIP, gotIPs) + } + }) + } +} diff --git a/pkg/controllers/workflow/controller_test.go b/pkg/controllers/workflow/controller_test.go index 0b3ce9c0e..9b0defd02 100644 --- a/pkg/controllers/workflow/controller_test.go +++ b/pkg/controllers/workflow/controller_test.go @@ -177,7 +177,7 @@ func TestReconcile(t *testing.T) { }, }, Status: v1alpha1.WorkflowStatus{ - State: "STATE_PENDING", + State: v1alpha1.WorkflowStatePending, GlobalTimeout: 1800, Tasks: []v1alpha1.Task{ { @@ -199,7 +199,7 @@ func TestReconcile(t *testing.T) { "DEST_DISK": "/dev/nvme0n1", "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", }, - Status: "STATE_PENDING", + Status: v1alpha1.WorkflowStatePending, }, }, }, @@ -304,7 +304,7 @@ tasks: }, }, Status: v1alpha1.WorkflowStatus{ - State: "STATE_PENDING", + State: v1alpha1.WorkflowStatePending, GlobalTimeout: 1800, Tasks: []v1alpha1.Task{ { @@ -326,7 +326,7 @@ tasks: "DEST_DISK": "/dev/nvme0n1", "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", }, - Status: "STATE_PENDING", + Status: v1alpha1.WorkflowStatePending, }, }, }, @@ -538,7 +538,7 @@ tasks: }, }, Status: v1alpha1.WorkflowStatus{ - State: "STATE_RUNNING", + State: v1alpha1.WorkflowStateRunning, GlobalTimeout: 600, Tasks: []v1alpha1.Task{ { @@ -559,7 +559,7 @@ tasks: "DEST_DISK": "/dev/nvme0n1", "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", }, - Status: "STATE_RUNNING", + Status: v1alpha1.WorkflowStateRunning, StartedAt: TestTime.MetaV1BeforeSec(601), }, }, @@ -624,7 +624,7 @@ tasks: }, }, Status: v1alpha1.WorkflowStatus{ - State: "STATE_TIMEOUT", + State: v1alpha1.WorkflowStateTimeout, GlobalTimeout: 600, Tasks: []v1alpha1.Task{ { @@ -645,7 +645,7 @@ tasks: "DEST_DISK": "/dev/nvme0n1", "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", }, - Status: "STATE_TIMEOUT", + Status: v1alpha1.WorkflowStateTimeout, StartedAt: TestTime.MetaV1BeforeSec(601), Seconds: 601, Message: "Action timed out",