diff --git a/pkg/internal/tests/frozen_time.go b/internal/tests/frozen_time.go similarity index 100% rename from pkg/internal/tests/frozen_time.go rename to internal/tests/frozen_time.go diff --git a/pkg/apis/core/v1alpha1/workflow_methods.go b/pkg/apis/core/v1alpha1/workflow_methods.go index b2cad6045..899a0d596 100644 --- a/pkg/apis/core/v1alpha1/workflow_methods.go +++ b/pkg/apis/core/v1alpha1/workflow_methods.go @@ -30,3 +30,88 @@ func (w *Workflow) GetStartTime() *metav1.Time { } return nil } + +type taskInfo struct { + CurrentWorker string + CurrentTask string + CurrentTaskIndex int + CurrentAction string + CurrentActionIndex int + CurrentActionState WorkflowState + TotalNumberOfActions int +} + +// helper function for task info. +func (w *Workflow) getTaskActionInfo() taskInfo { + var ( + found bool + taskIndex = -1 + actionIndex int + actionTaskIndex int + actionCount int + ) + for ti, task := range w.Status.Tasks { + actionCount += len(task.Actions) + if found { + continue + } + INNER: + for ai, action := range task.Actions { + // Find the first non-successful action + switch action.Status { + case WorkflowStateSuccess: + actionIndex++ + continue + case WorkflowStatePending, WorkflowStateRunning, WorkflowStateFailed, WorkflowStateTimeout: + taskIndex = ti + actionTaskIndex = ai + found = true + break INNER + } + } + } + + ti := taskInfo{ + TotalNumberOfActions: actionCount, + CurrentActionIndex: actionIndex, + } + if taskIndex >= 0 { + ti.CurrentWorker = w.Status.Tasks[taskIndex].WorkerAddr + ti.CurrentTask = w.Status.Tasks[taskIndex].Name + ti.CurrentTaskIndex = taskIndex + } + if taskIndex >= 0 && actionIndex >= 0 { + ti.CurrentAction = w.Status.Tasks[taskIndex].Actions[actionTaskIndex].Name + ti.CurrentActionState = w.Status.Tasks[taskIndex].Actions[actionTaskIndex].Status + } + + return ti +} + +func (w *Workflow) GetCurrentWorker() string { + return w.getTaskActionInfo().CurrentWorker +} + +func (w *Workflow) GetCurrentTask() string { + return w.getTaskActionInfo().CurrentTask +} + +func (w *Workflow) GetCurrentTaskIndex() int { + return w.getTaskActionInfo().CurrentTaskIndex +} + +func (w *Workflow) GetCurrentAction() string { + return w.getTaskActionInfo().CurrentAction +} + +func (w *Workflow) GetCurrentActionIndex() int { + return w.getTaskActionInfo().CurrentActionIndex +} + +func (w *Workflow) GetCurrentActionState() WorkflowState { + return w.getTaskActionInfo().CurrentActionState +} + +func (w *Workflow) GetTotalNumberOfActions() int { + return w.getTaskActionInfo().TotalNumberOfActions +} diff --git a/pkg/apis/core/v1alpha1/workflow_test.go b/pkg/apis/core/v1alpha1/workflow_test.go index 95af57dce..b4081458f 100644 --- a/pkg/apis/core/v1alpha1/workflow_test.go +++ b/pkg/apis/core/v1alpha1/workflow_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/tinkerbell/tink/pkg/internal/tests" + "github.com/tinkerbell/tink/internal/tests" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -89,7 +89,7 @@ func TestGetStartTime(t *testing.T) { }, Spec: WorkflowSpec{}, Status: WorkflowStatus{ - State: "STATE_RUNNING", + State: WorkflowStateRunning, GlobalTimeout: 600, Tasks: []Task{ { @@ -105,7 +105,7 @@ func TestGetStartTime(t *testing.T) { "DEST_DISK": "/dev/nvme0n1", "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", }, - Status: "STATE_SUCCESS", + Status: WorkflowStateSuccess, StartedAt: TestNow.MetaV1Now(), Seconds: 20, }, @@ -118,7 +118,7 @@ func TestGetStartTime(t *testing.T) { "DEST_DISK": "/dev/nvme0n1", "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", }, - Status: "STATE_RUNNING", + Status: WorkflowStateRunning, StartedAt: TestNow.MetaV1AfterSec(21), }, }, @@ -141,7 +141,7 @@ func TestGetStartTime(t *testing.T) { }, Spec: WorkflowSpec{}, Status: WorkflowStatus{ - State: "STATE_PENDING", + State: WorkflowStatePending, GlobalTimeout: 600, Tasks: []Task{ { @@ -157,7 +157,7 @@ func TestGetStartTime(t *testing.T) { "DEST_DISK": "/dev/nvme0n1", "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", }, - Status: "STATE_PENDING", + Status: WorkflowStatePending, StartedAt: nil, }, }, @@ -180,3 +180,231 @@ func TestGetStartTime(t *testing.T) { }) } } + +func TestWorkflowMethods(t *testing.T) { + cases := []struct { + name string + wf *Workflow + want taskInfo + }{ + { + "Empty wflow", + &Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "debian", + Namespace: "default", + }, + }, + taskInfo{}, + }, + { + "invalid workflow", + &Workflow{ + TypeMeta: metav1.TypeMeta{ + Kind: "Workflow", + APIVersion: "tinkerbell.org/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "debian", + Namespace: "default", + }, + Spec: WorkflowSpec{}, + Status: WorkflowStatus{ + State: WorkflowStateRunning, + GlobalTimeout: 600, + Tasks: []Task{ + { + Name: "empty task", + // WorkerAddr: "", // intentionally not set + Actions: []Action{ + { + Name: "empty action", + Status: WorkflowStateFailed, + }, + }, + }, + { + Name: "os-installation", + WorkerAddr: "3c:ec:ef:4c:4f:54", + Actions: []Action{ + { + Name: "stream-debian-image", + Image: "quay.io/tinkerbell-actions/image2disk:v1.0.0", + Timeout: 60, + Environment: map[string]string{ + "COMPRESSED": "true", + "DEST_DISK": "/dev/nvme0n1", + "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", + }, + Status: WorkflowStateSuccess, + StartedAt: TestNow.MetaV1Now(), + Seconds: 20, + }, + { + Name: "stream-debian-image", + Image: "quay.io/tinkerbell-actions/image2disk:v1.0.0", + Timeout: 60, + Environment: map[string]string{ + "COMPRESSED": "true", + "DEST_DISK": "/dev/nvme0n1", + "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", + }, + Status: WorkflowStateRunning, + StartedAt: TestNow.MetaV1AfterSec(21), + }, + }, + }, + }, + }, + }, + taskInfo{ + TotalNumberOfActions: 3, + CurrentTaskIndex: 0, + CurrentTask: "empty task", + CurrentWorker: "", + CurrentAction: "empty action", + CurrentActionState: WorkflowStateFailed, + CurrentActionIndex: 0, + }, + }, + { + "Running workflow", + &Workflow{ + TypeMeta: metav1.TypeMeta{ + Kind: "Workflow", + APIVersion: "tinkerbell.org/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "debian", + Namespace: "default", + }, + Spec: WorkflowSpec{}, + Status: WorkflowStatus{ + State: WorkflowStateRunning, + GlobalTimeout: 600, + Tasks: []Task{ + { + Name: "bmc-manage", + WorkerAddr: "pbnj", + Actions: []Action{ + { + Name: "configure-pxe", + Image: "quay.io/tinkerbell-actions/pbnj:v1.0.0", + Timeout: 20, + Status: WorkflowStateSuccess, + StartedAt: TestNow.MetaV1BeforeSec(15), + Seconds: 15, + }, + }, + }, + { + Name: "os-installation", + WorkerAddr: "3c:ec:ef:4c:4f:54", + Actions: []Action{ + { + Name: "stream-debian-image", + Image: "quay.io/tinkerbell-actions/image2disk:v1.0.0", + Timeout: 60, + Environment: map[string]string{ + "COMPRESSED": "true", + "DEST_DISK": "/dev/nvme0n1", + "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", + }, + Status: WorkflowStateSuccess, + StartedAt: TestNow.MetaV1Now(), + Seconds: 20, + }, + { + Name: "write-file", + Image: "quay.io/tinkerbell-actions/writefile:v1.0.0", + Timeout: 60, + Environment: map[string]string{ + "COMPRESSED": "true", + "DEST_DISK": "/dev/nvme0n1", + "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", + }, + Status: WorkflowStateRunning, + StartedAt: TestNow.MetaV1AfterSec(21), + }, + }, + }, + }, + }, + }, + taskInfo{ + TotalNumberOfActions: 3, + CurrentTaskIndex: 1, + CurrentTask: "os-installation", + CurrentWorker: "3c:ec:ef:4c:4f:54", + CurrentAction: "write-file", + CurrentActionState: WorkflowStateRunning, + CurrentActionIndex: 2, + }, + }, + { + "Pending workflow", + &Workflow{ + TypeMeta: metav1.TypeMeta{ + Kind: "Workflow", + APIVersion: "tinkerbell.org/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "debian", + Namespace: "default", + }, + Spec: WorkflowSpec{}, + Status: WorkflowStatus{ + State: WorkflowStatePending, + GlobalTimeout: 600, + Tasks: []Task{ + { + Name: "os-installation", + WorkerAddr: "3c:ec:ef:4c:4f:54", + Actions: []Action{ + { + Name: "stream-debian-image", + Image: "quay.io/tinkerbell-actions/image2disk:v1.0.0", + Timeout: 60, + Environment: map[string]string{ + "COMPRESSED": "true", + "DEST_DISK": "/dev/nvme0n1", + "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", + }, + Status: WorkflowStatePending, + }, + { + Name: "write-file", + Image: "quay.io/tinkerbell-actions/writefile:v1.0.0", + Timeout: 60, + Environment: map[string]string{ + "COMPRESSED": "true", + "DEST_DISK": "/dev/nvme0n1", + "IMG_URL": "http://10.1.1.11:8080/debian-10-openstack-amd64.raw.gz", + }, + Status: WorkflowStatePending, + }, + }, + }, + }, + }, + }, + taskInfo{ + TotalNumberOfActions: 2, + CurrentTaskIndex: 0, + CurrentTask: "os-installation", + CurrentWorker: "3c:ec:ef:4c:4f:54", + CurrentAction: "stream-debian-image", + CurrentActionState: WorkflowStatePending, + CurrentActionIndex: 0, + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := tc.wf.getTaskActionInfo() + if got != tc.want { + t.Errorf("Got \n\t%#v\nwanted:\n\t%#v", got, tc.want) + } + }) + } +} diff --git a/pkg/apis/core/v1alpha1/workflow_types.go b/pkg/apis/core/v1alpha1/workflow_types.go index bfcb311c1..c4b3d6688 100644 --- a/pkg/apis/core/v1alpha1/workflow_types.go +++ b/pkg/apis/core/v1alpha1/workflow_types.go @@ -4,6 +4,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +type WorkflowState string + +const ( + WorkflowStatePending = WorkflowState("STATE_PENDING") + WorkflowStateRunning = WorkflowState("STATE_RUNNING") + WorkflowStateFailed = WorkflowState("STATE_FAILED") + WorkflowStateTimeout = WorkflowState("STATE_TIMEOUT") + WorkflowStateSuccess = WorkflowState("STATE_SUCCESS") +) + // WorkflowSpec defines the desired state of Workflow. type WorkflowSpec struct { // Name of the Template associated with this workflow. @@ -16,7 +26,7 @@ type WorkflowSpec struct { // WorkflowStatus defines the observed state of Workflow. type WorkflowStatus struct { // State is the state of the workflow in Tinkerbell. - State string `json:"state,omitempty"` + State WorkflowState `json:"state,omitempty"` // GlobalTimeout represents the max execution time GlobalTimeout int64 `json:"globalTimeout,omitempty"` @@ -43,7 +53,7 @@ type Action struct { Volumes []string `json:"volumes,omitempty"` Pid string `json:"pid,omitempty"` Environment map[string]string `json:"environment,omitempty"` - Status string `json:"status,omitempty"` + Status WorkflowState `json:"status,omitempty"` StartedAt *metav1.Time `json:"startedAt,omitempty"` Seconds int64 `json:"seconds,omitempty"` Message string `json:"message,omitempty"` diff --git a/pkg/controllers/workflow/controller.go b/pkg/controllers/workflow/controller.go index ce30c8b3f..2641f2a17 100644 --- a/pkg/controllers/workflow/controller.go +++ b/pkg/controllers/workflow/controller.go @@ -8,7 +8,6 @@ import ( "github.com/tinkerbell/tink/pkg/apis/core/v1alpha1" "github.com/tinkerbell/tink/pkg/controllers" "github.com/tinkerbell/tink/pkg/convert" - protoworkflow "github.com/tinkerbell/tink/protos/workflow" tinkworkflow "github.com/tinkerbell/tink/workflow" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" @@ -52,8 +51,10 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco switch wflow.Status.State { case "": resp, err = c.processNewWorkflow(ctx, wflow) - case protoworkflow.State_name[int32(protoworkflow.State_STATE_RUNNING)]: + case v1alpha1.WorkflowStateRunning: resp = c.processRunningWorkflow(ctx, wflow) + default: + return resp, nil } // Patch any changes, regardless of errors @@ -82,29 +83,28 @@ func (c *Controller) processNewWorkflow(ctx context.Context, stored *v1alpha1.Wo // populate Task and Action data stored.Status = *convert.WorkflowYAMLToStatus(tinkWf) - stored.Status.State = protoworkflow.State_name[int32(protoworkflow.State_STATE_PENDING)] + stored.Status.State = v1alpha1.WorkflowStatePending return reconcile.Result{}, nil } func (c *Controller) processRunningWorkflow(_ context.Context, stored *v1alpha1.Workflow) reconcile.Result { // Check for global timeout expiration if c.nowFunc().After(stored.GetStartTime().Add(time.Duration(stored.Status.GlobalTimeout) * time.Second)) { - stored.Status.State = protoworkflow.State_name[int32(protoworkflow.State_STATE_TIMEOUT)] + stored.Status.State = v1alpha1.WorkflowStateTimeout } // check for any running actions that may have timed out for ti, task := range stored.Status.Tasks { for ai, action := range task.Actions { // A running workflow task action has timed out - if action.Status == protoworkflow.State_name[int32(protoworkflow.State_STATE_RUNNING)] && - action.StartedAt != nil && + if action.Status == v1alpha1.WorkflowStateRunning && action.StartedAt != nil && c.nowFunc().After(action.StartedAt.Add(time.Duration(action.Timeout)*time.Second)) { // Set fields on the timed out action - stored.Status.Tasks[ti].Actions[ai].Status = protoworkflow.State_name[int32(protoworkflow.State_STATE_TIMEOUT)] + stored.Status.Tasks[ti].Actions[ai].Status = v1alpha1.WorkflowStateTimeout stored.Status.Tasks[ti].Actions[ai].Message = "Action timed out" stored.Status.Tasks[ti].Actions[ai].Seconds = int64(c.nowFunc().Sub(action.StartedAt.Time).Seconds()) // Mark the workflow as timed out - stored.Status.State = protoworkflow.State_name[int32(protoworkflow.State_STATE_TIMEOUT)] + stored.Status.State = v1alpha1.WorkflowStateTimeout } } } diff --git a/pkg/controllers/workflow/controller_test.go b/pkg/controllers/workflow/controller_test.go index 688a6151e..0b3ce9c0e 100644 --- a/pkg/controllers/workflow/controller_test.go +++ b/pkg/controllers/workflow/controller_test.go @@ -6,8 +6,8 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/tinkerbell/tink/internal/tests" "github.com/tinkerbell/tink/pkg/apis/core/v1alpha1" - "github.com/tinkerbell/tink/pkg/internal/tests" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" diff --git a/pkg/convert/template_test.go b/pkg/convert/template_test.go index 74f016216..23a2c996b 100644 --- a/pkg/convert/template_test.go +++ b/pkg/convert/template_test.go @@ -4,8 +4,8 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/tinkerbell/tink/internal/tests" "github.com/tinkerbell/tink/pkg/apis/core/v1alpha1" - "github.com/tinkerbell/tink/pkg/internal/tests" prototemplate "github.com/tinkerbell/tink/protos/template" "google.golang.org/protobuf/testing/protocmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/convert/workflow.go b/pkg/convert/workflow.go index d552251b1..d94c69039 100644 --- a/pkg/convert/workflow.go +++ b/pkg/convert/workflow.go @@ -11,6 +11,21 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func WorkflowToWorkflowContext(wf *v1alpha1.Workflow) *protoworkflow.WorkflowContext { + if wf == nil { + return nil + } + return &protoworkflow.WorkflowContext{ + WorkflowId: wf.GetName(), + CurrentWorker: wf.GetCurrentWorker(), + CurrentTask: wf.GetCurrentTask(), + CurrentAction: wf.GetCurrentAction(), + CurrentActionIndex: int64(wf.GetCurrentActionIndex()), + CurrentActionState: protoworkflow.State(protoworkflow.State_value[string(wf.GetCurrentActionState())]), + TotalNumberOfActions: int64(wf.GetTotalNumberOfActions()), + } +} + func WorkflowYAMLToStatus(wf *workflow.Workflow) *v1alpha1.WorkflowStatus { if wf == nil { return nil @@ -25,7 +40,7 @@ func WorkflowYAMLToStatus(wf *workflow.Workflow) *v1alpha1.WorkflowStatus { Timeout: action.Timeout, Command: action.Command, Volumes: action.Volumes, - Status: protoworkflow.State_name[int32(protoworkflow.State_STATE_PENDING)], + Status: v1alpha1.WorkflowState(protoworkflow.State_name[int32(protoworkflow.State_STATE_PENDING)]), Environment: action.Environment, Pid: action.Pid, }) @@ -48,7 +63,7 @@ func WorkflowCRDToProto(w *v1alpha1.Workflow) *protoworkflow.Workflow { if w == nil { return nil } - v, ok := protoworkflow.State_value[w.Status.State] + v, ok := protoworkflow.State_value[string(w.Status.State)] state := protoworkflow.State(v) if !ok { state = protoworkflow.State_STATE_PENDING @@ -123,7 +138,7 @@ func WorkflowProtoToCRD(w *protoworkflow.Workflow) *v1alpha1.Workflow { } if v, ok := protoworkflow.State_name[int32(w.State)]; ok { - resp.Status.State = v + resp.Status.State = v1alpha1.WorkflowState(v) } return resp } diff --git a/pkg/convert/workflow_test.go b/pkg/convert/workflow_test.go index b8d048c49..be781b010 100644 --- a/pkg/convert/workflow_test.go +++ b/pkg/convert/workflow_test.go @@ -1,6 +1,7 @@ package convert import ( + "reflect" "testing" "time" @@ -13,6 +14,82 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func TestWorkflowToWorkflowContext(t *testing.T) { + cases := []struct { + name string + input *v1alpha1.Workflow + want *protoworkflow.WorkflowContext + }{ + { + "nil workflow", + nil, + nil, + }, + { + "empty workflow", + &v1alpha1.Workflow{}, + &protoworkflow.WorkflowContext{ + WorkflowId: "", + CurrentWorker: "", + CurrentTask: "", + CurrentAction: "", + CurrentActionIndex: 0, + CurrentActionState: 0, + TotalNumberOfActions: 0, + }, + }, + { + "running workflow", + &v1alpha1.Workflow{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "wf1", + Namespace: "default", + }, + Spec: v1alpha1.WorkflowSpec{}, + Status: v1alpha1.WorkflowStatus{ + State: "STATE_RUNNING", + GlobalTimeout: 600, + Tasks: []v1alpha1.Task{ + { + Name: "task1", + WorkerAddr: "worker1", + Actions: []v1alpha1.Action{ + { + Name: "action1", + Status: "STATE_SUCCESS", + }, + { + Name: "action2", + Status: "STATE_RUNNING", + }, + }, + }, + }, + }, + }, + &protoworkflow.WorkflowContext{ + WorkflowId: "wf1", + CurrentWorker: "worker1", + CurrentTask: "task1", + CurrentAction: "action2", + CurrentActionIndex: 1, + CurrentActionState: protoworkflow.State_STATE_RUNNING, + TotalNumberOfActions: 2, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := WorkflowToWorkflowContext(tc.input) + if !reflect.DeepEqual(got, tc.want) { + t.Errorf("Unexpedted response: wanted\n\t%#v\ngot\n\t%#v", tc.want, got) + } + }) + } +} + func TestWorkflowCRDToProto(t *testing.T) { cases := []struct { name string