Skip to content

Commit

Permalink
Add running workflow for worker index to cache
Browse files Browse the repository at this point in the history
* Add tests on index functions

Signed-off-by: Micah Hausler <[email protected]>
  • Loading branch information
micahhausler committed Mar 22, 2022
1 parent e1e2af6 commit 58a3754
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 2 deletions.
29 changes: 27 additions & 2 deletions pkg/controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
)

const (
WorkerAddr = "status.tasks.workeraddr"
WorkerAddrIndex = "status.tasks.workeraddr"
NonTerminalStateIndex = "status.state.non_terminal_worker"
)

var (
Expand Down Expand Up @@ -83,9 +84,14 @@ func NewManager(config *rest.Config, options controllerruntime.Options) (Manager
}{
{
&v1alpha1.Workflow{},
WorkerAddr,
WorkerAddrIndex,
wokerIndexFunc,
},
{
&v1alpha1.Workflow{},
NonTerminalStateIndex,
nonTerminalStateForWorkerIndexFunc,
},
}
for _, indexer := range indexers {
if err := m.GetFieldIndexer().IndexField(
Expand Down Expand Up @@ -132,3 +138,22 @@ func wokerIndexFunc(obj client.Object) []string {
}
return resp
}

// nonTerminalStateForWorkerIndexFunc func indexes workflow by worker for non terminal workflows.
func nonTerminalStateForWorkerIndexFunc(obj client.Object) []string {
wf, ok := obj.(*v1alpha1.Workflow)
if !ok {
return nil
}

resp := []string{}
if !(wf.Status.State == "STATE_RUNNING" || wf.Status.State == "STATE_PENDING") {
return resp
}
for _, task := range wf.Status.Tasks {
if task.WorkerAddr != "" {
resp = append(resp, task.WorkerAddr)
}
}
return resp
}
169 changes: 169 additions & 0 deletions pkg/controllers/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package controllers

import (
"reflect"
"testing"

"github.com/tinkerbell/tink/pkg/apis/core/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestWokerIndexFunc(t *testing.T) {
cases := []struct {
name string
input client.Object
want []string
}{
{
"non workflow",
&v1alpha1.Hardware{},
nil,
},
{
"empty workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "",
Tasks: []v1alpha1.Task{},
},
},
[]string{},
},
{
"pending workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "STATE_PENDING",
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
},
},
},
[]string{"worker1"},
},
{
"running workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "STATE_RUNNING",
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
{
WorkerAddr: "worker2",
},
},
},
},
[]string{"worker1", "worker2"},
},
{
"complete workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "STATE_SUCCESS",
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
},
},
},
[]string{"worker1"},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := wokerIndexFunc(tc.input)
if !reflect.DeepEqual(tc.want, got) {
t.Errorf("Unexpected response: wanted %#v, got %#v", tc.want, got)
}
})
}
}

func TestNonTerminalStateForWorkerIndexFunc(t *testing.T) {
cases := []struct {
name string
input client.Object
want []string
}{
{
"non workflow",
&v1alpha1.Hardware{},
nil,
},
{
"empty workflow status",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "",
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
},
},
},
[]string{},
},
{
"pending workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "STATE_PENDING",
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
},
},
},
[]string{"worker1"},
},
{
"running workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "STATE_RUNNING",
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
{
WorkerAddr: "worker2",
},
},
},
},
[]string{"worker1", "worker2"},
},
{
"complete workflow",
&v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{
State: "STATE_SUCCESS",
Tasks: []v1alpha1.Task{
{
WorkerAddr: "worker1",
},
},
},
},
[]string{},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := nonTerminalStateForWorkerIndexFunc(tc.input)
if !reflect.DeepEqual(tc.want, got) {
t.Errorf("Unexpected response: wanted %#v, got %#v", tc.want, got)
}
})
}
}

0 comments on commit 58a3754

Please sign in to comment.