From 26dcb8956fb39f6c1566eff8d74a903460a1f767 Mon Sep 17 00:00:00 2001 From: Micah Hausler Date: Tue, 22 Mar 2022 18:31:34 +0000 Subject: [PATCH] Add running workflow for worker index to cache * Add tests on index functions Signed-off-by: Micah Hausler --- pkg/controllers/manager.go | 29 +++++- pkg/controllers/manager_test.go | 170 ++++++++++++++++++++++++++++++++ 2 files changed, 197 insertions(+), 2 deletions(-) create mode 100644 pkg/controllers/manager_test.go diff --git a/pkg/controllers/manager.go b/pkg/controllers/manager.go index 9571c4c17..506e8465f 100644 --- a/pkg/controllers/manager.go +++ b/pkg/controllers/manager.go @@ -17,7 +17,8 @@ import ( ) const ( - WorkerAddr = "status.tasks.workeraddr" + WorkerAddrIndex = "status.tasks.workeraddr" + NonTerminalStateIndex = "status.state.non_terminal_worker" ) var ( @@ -83,9 +84,14 @@ func NewManager(config *rest.Config, options controllerruntime.Options) (Manager }{ { &v1alpha1.Workflow{}, - WorkerAddr, + WorkerAddrIndex, wokerIndexFunc, }, + { + &v1alpha1.Workflow{}, + NonTerminalStateIndex, + nonTerminalStateForWorkerIndexFunc, + }, } for _, indexer := range indexers { if err := m.GetFieldIndexer().IndexField( @@ -132,3 +138,22 @@ func wokerIndexFunc(obj client.Object) []string { } return resp } + +// nonTerminalStateForWorkerIndexFunc func indexes workflow by worker for non terminal workflows. +func nonTerminalStateForWorkerIndexFunc(obj client.Object) []string { + wf, ok := obj.(*v1alpha1.Workflow) + if !ok { + return nil + } + + resp := []string{} + if !(wf.Status.State == "STATE_RUNNING" || wf.Status.State == "STATE_PENDING") { + return resp + } + for _, task := range wf.Status.Tasks { + if task.WorkerAddr != "" { + resp = append(resp, task.WorkerAddr) + } + } + return resp +} diff --git a/pkg/controllers/manager_test.go b/pkg/controllers/manager_test.go new file mode 100644 index 000000000..785b075df --- /dev/null +++ b/pkg/controllers/manager_test.go @@ -0,0 +1,170 @@ +package controllers + +import ( + "reflect" + "testing" + + "github.com/tinkerbell/tink/pkg/apis/core/v1alpha1" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestWokerIndexFunc(t *testing.T) { + cases := []struct { + name string + input client.Object + want []string + }{ + { + "non workflow", + &v1alpha1.Hardware{}, + nil, + }, + { + "empty workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: "", + Tasks: []v1alpha1.Task{}, + }, + }, + []string{}, + }, + { + "pending workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: "STATE_PENDING", + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + }, + }, + }, + []string{"worker1"}, + }, + { + "running workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: "STATE_RUNNING", + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + { + WorkerAddr: "worker2", + }, + }, + }, + }, + []string{"worker1", "worker2"}, + }, + { + "complete workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: "STATE_SUCCESS", + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + }, + }, + }, + []string{"worker1"}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := wokerIndexFunc(tc.input) + if !reflect.DeepEqual(tc.want, got) { + t.Errorf("Unexpected response: wanted %#v, got %#v", tc.want, got) + } + }) + } +} + +func TestNonTerminalStateForWorkerIndexFunc(t *testing.T) { + cases := []struct { + name string + input client.Object + want []string + }{ + { + "non workflow", + &v1alpha1.Hardware{}, + nil, + }, + { + "empty workflow status", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: "", + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + }, + }, + }, + []string{}, + }, + { + "pending workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: "STATE_PENDING", + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + }, + }, + }, + []string{"worker1"}, + }, + { + "running workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: "STATE_RUNNING", + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + { + WorkerAddr: "worker2", + }, + }, + }, + }, + []string{"worker1", "worker2"}, + }, + { + "complete workflow", + &v1alpha1.Workflow{ + Status: v1alpha1.WorkflowStatus{ + State: "STATE_SUCCESS", + Tasks: []v1alpha1.Task{ + { + WorkerAddr: "worker1", + }, + }, + }, + }, + []string{}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := nonTerminalStateForWorkerIndexFunc(tc.input) + if !reflect.DeepEqual(tc.want, got) { + t.Errorf("Unexpected response: wanted %#v, got %#v", tc.want, got) + } + }) + } +}