diff --git a/pkg/reconciler/v1alpha1/pipeline/resources/dag.go b/pkg/reconciler/v1alpha1/pipeline/resources/dag.go index 4c80c58e466..e969436114e 100644 --- a/pkg/reconciler/v1alpha1/pipeline/resources/dag.go +++ b/pkg/reconciler/v1alpha1/pipeline/resources/dag.go @@ -54,21 +54,21 @@ func (g *DAG) addPrevPipelineTask(prev *Node, next *Node) error { // Check if we are adding cycles. visited := map[string]bool{prev.Task.Name: true, next.Task.Name: true} path := []string{next.Task.Name, prev.Task.Name} - if err := visit(prev.Prev, path, visited); err != nil { + if err := visit(next.Task.Name, prev.Prev, path, visited); err != nil { return fmt.Errorf("cycle detected; %s ", err.Error()) } next.Prev = append(next.Prev, prev) return nil } -func visit(nodes []*Node, path []string, visited map[string]bool) error { +func visit(currentName string, nodes []*Node, path []string, visited map[string]bool) error { for _, n := range nodes { path = append(path, n.Task.Name) if _, ok := visited[n.Task.Name]; ok { return fmt.Errorf(getVisitedPath(path)) } - visited[n.Task.Name] = true - if err := visit(n.Prev, path, visited); err != nil { + visited[currentName+"."+n.Task.Name] = true + if err := visit(n.Task.Name, n.Prev, path, visited); err != nil { return err } } @@ -84,13 +84,56 @@ func getVisitedPath(path []string) string { return strings.Join(path, " -> ") } -//GetPreviousTasks return all the previous tasks for a PipelineTask in the DAG -func (g *DAG) GetPreviousTasks(pt string) []v1alpha1.PipelineTask { - v, ok := g.Nodes[pt] - if !ok { - return nil +// GetSchedulable returns a list of PipelineTask that can be scheduled, +// given a list of successfully finished task. +// If the list is empty, this returns all tasks in the DAG that nobody +// depends on. Else, it returns task which have all dependecies marked +// as done, and thus can be scheduled. +func (g *DAG) GetSchedulable(tasks ...string) []v1alpha1.PipelineTask { + d := []v1alpha1.PipelineTask{} + if len(tasks) == 0 { + // return node that have no previous tasks + for _, node := range g.Nodes { + if len(node.getPrevTasks()) == 0 { + d = append(d, node.Task) + } + } + } else { + tm := toMap(tasks...) + for name, node := range g.Nodes { + if _, ok := tm[name]; ok { + // skip done element + continue + } + if !isSchedulable(tm, node.getPrevTasks()) { + // skip non-schedulable element + continue + } + d = append(d, node.Task) + } + } + return d +} + +func isSchedulable(tm map[string]struct{}, prevs []v1alpha1.PipelineTask) bool { + if len(prevs) == 0 { + return false + } + collected := []string{} + for _, t := range prevs { + if _, ok := tm[t.Name]; ok { + collected = append(collected, t.Name) + } + } + return len(collected) == len(prevs) +} + +func toMap(t ...string) map[string]struct{} { + m := make(map[string]struct{}, len(t)) + for _, s := range t { + m[s] = struct{}{} } - return v.getPrevTasks() + return m } // Build returns a valid pipeline DAG. Returns error if the pipeline is invalid diff --git a/pkg/reconciler/v1alpha1/pipeline/resources/dag_test.go b/pkg/reconciler/v1alpha1/pipeline/resources/dag_test.go index e6c1024fc96..06fcab70f9f 100644 --- a/pkg/reconciler/v1alpha1/pipeline/resources/dag_test.go +++ b/pkg/reconciler/v1alpha1/pipeline/resources/dag_test.go @@ -17,6 +17,7 @@ limitations under the License. package pipeline import ( + "sort" "testing" "github.com/google/go-cmp/cmp" @@ -52,6 +53,30 @@ func TestBuild(t *testing.T) { Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"z"}}}, }, } + dDependsOnA := v1alpha1.PipelineTask{ + Name: "d", + Resources: &v1alpha1.PipelineTaskResources{ + Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + eDependsOnA := v1alpha1.PipelineTask{ + Name: "e", + Resources: &v1alpha1.PipelineTaskResources{ + Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + fDependsOnDAndE := v1alpha1.PipelineTask{ + Name: "f", + Resources: &v1alpha1.PipelineTaskResources{ + Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"d", "e"}}}, + }, + } + gDependOnF := v1alpha1.PipelineTask{ + Name: "g", + Resources: &v1alpha1.PipelineTaskResources{ + Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"f"}}}, + }, + } selfLink := v1alpha1.PipelineTask{ Name: "a", Resources: &v1alpha1.PipelineTaskResources{ @@ -73,20 +98,22 @@ func TestBuild(t *testing.T) { shdErr bool expectedErr string }{ - {"linear-pipeline", - v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, b, c}}, - &DAG{ + { + name: "linear-pipeline", + spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, b, c}}, + expectedDAG: &DAG{ Nodes: map[string]*Node{ "a": {Task: a}, "b": {Task: b}, "c": {Task: c}, }, }, - false, - ""}, - {"complex-pipeline", - v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, xDependsOnA, yDependsOnAB, zDependsOnX, b, c}}, - &DAG{ + shdErr: false, + expectedErr: "", + }, { + name: "complex-pipeline", + spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, xDependsOnA, yDependsOnAB, zDependsOnX, b, c}}, + expectedDAG: &DAG{ Nodes: map[string]*Node{ "a": {Task: a}, "b": {Task: b}, @@ -96,28 +123,57 @@ func TestBuild(t *testing.T) { "z": {Task: zDependsOnX, Prev: []*Node{nodeX}}, }, }, - false, - ""}, - {"self-link", - v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{selfLink}}, - nil, - true, - ` "self-link" is invalid: : Internal error: cycle detected; task "a" depends on itself`}, - {"cycle-2", - v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{xDependsOnA, zDependsOnX, aDependsOnZ}}, - nil, - true, - ` "cycle-2" is invalid: : Internal error: cycle detected; a -> x -> z -> a `}, - {"duplicate-tasks", - v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, a}}, - nil, - true, - ` "duplicate-tasks" is invalid: spec.tasks.name: Duplicate value: "a"`}, - {"invalid-task-name", - v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{invalidTask}}, - nil, - true, - ` "invalid-task-name" is invalid: spec.tasks.name: Not found: "none"`}, + shdErr: false, + expectedErr: "", + }, { + name: "self-link", + spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{selfLink}}, + expectedDAG: nil, + shdErr: true, + expectedErr: ` "self-link" is invalid: : Internal error: cycle detected; task "a" depends on itself`, + }, { + name: "cycle-2", + spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{xDependsOnA, zDependsOnX, aDependsOnZ}}, + expectedDAG: nil, + shdErr: true, + expectedErr: ` "cycle-2" is invalid: : Internal error: cycle detected; a -> x -> z -> a `, + }, { + name: "duplicate-tasks", + spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, a}}, + expectedDAG: nil, + shdErr: true, + expectedErr: ` "duplicate-tasks" is invalid: spec.tasks.name: Duplicate value: "a"`, + }, { + name: "invalid-task-name", + spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{invalidTask}}, + expectedDAG: nil, + shdErr: true, + expectedErr: ` "invalid-task-name" is invalid: spec.tasks.name: Not found: "none"`, + }, { + // This test make sure we don't detect cycle (A -> B -> B -> …) when there is not + // The graph looks like the following. + // a + // / \ + // d e + // \ / + // f + // | + // g + // This means we "visit" a twice, from two different path ; but there is no cycle. + name: "no-cycle", + spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, dDependsOnA, eDependsOnA, fDependsOnDAndE, gDependOnF}}, + expectedDAG: &DAG{ + Nodes: map[string]*Node{ + "a": {Task: a}, + "d": {Task: dDependsOnA, Prev: []*Node{{Task: a}}}, + "e": {Task: eDependsOnA, Prev: []*Node{{Task: a}}}, + "f": {Task: fDependsOnDAndE, Prev: []*Node{{Task: dDependsOnA, Prev: []*Node{{Task: a}}}, {Task: eDependsOnA, Prev: []*Node{{Task: a}}}}}, + "g": {Task: gDependOnF, Prev: []*Node{{Task: fDependsOnDAndE, Prev: []*Node{{Task: dDependsOnA, Prev: []*Node{{Task: a}}}, {Task: eDependsOnA, Prev: []*Node{{Task: a}}}}}}}, + }, + }, + shdErr: false, + expectedErr: "", + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -144,8 +200,15 @@ func TestBuild(t *testing.T) { } } -func TestGetPrevTasks(t *testing.T) { +func TestGetSchedulable(t *testing.T) { a := v1alpha1.PipelineTask{Name: "a"} + b := v1alpha1.PipelineTask{Name: "b"} + w := v1alpha1.PipelineTask{ + Name: "w", + Resources: &v1alpha1.PipelineTaskResources{ + Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"b", "y"}}}, + }, + } x := v1alpha1.PipelineTask{ Name: "x", Resources: &v1alpha1.PipelineTaskResources{ @@ -158,28 +221,56 @@ func TestGetPrevTasks(t *testing.T) { Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"x", "a"}}}, }, } + z := v1alpha1.PipelineTask{ + Name: "z", + Resources: &v1alpha1.PipelineTaskResources{ + Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"x"}}}, + }, + } p := v1alpha1.Pipeline{ ObjectMeta: metav1.ObjectMeta{ Namespace: "namespace", Name: "test", }, Spec: v1alpha1.PipelineSpec{ - Tasks: []v1alpha1.PipelineTask{a, x, y}, + Tasks: []v1alpha1.PipelineTask{a, b, w, x, y, z}, }, } g, err := Build(&p) if err != nil { t.Fatalf("unexpected error %s", err) } - if d := cmp.Diff(g.GetPreviousTasks("a"), []v1alpha1.PipelineTask{}); d != "" { - t.Errorf("incorrect prev tasks for PipelineTask a. diff %s", d) + if d := cmp.Diff(sortPipelineTask(g.GetSchedulable()), []v1alpha1.PipelineTask{a, b}); d != "" { + t.Errorf("incorrect dependencees for no task. diff %s", d) + } + if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("a")), []v1alpha1.PipelineTask{x}); d != "" { + t.Errorf("incorrect dependencees for no task. diff %s", d) + } + if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("b")), []v1alpha1.PipelineTask{}); d != "" { + t.Errorf("incorrect dependencees for no task. diff %s", d) + } + if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("a", "b")), []v1alpha1.PipelineTask{x}); d != "" { + t.Errorf("incorrect dependencees for no task. diff %s", d) } - if d := cmp.Diff(g.GetPreviousTasks("x"), []v1alpha1.PipelineTask{a}); d != "" { - t.Errorf("incorrect prev tasks for PipelineTask x. diff %s", d) + if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("x")), []v1alpha1.PipelineTask{z}); d != "" { + t.Errorf("incorrect dependencees for no task. diff %s", d) } - if d := cmp.Diff(g.GetPreviousTasks("y"), []v1alpha1.PipelineTask{x, a}); d != "" { - t.Errorf("incorrect prev tasks for PipelineTask y. diff %s", d) + if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("a", "x")), []v1alpha1.PipelineTask{y, z}); d != "" { + t.Errorf("incorrect dependencees for no task. diff %s", d) } + if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("a", "x", "b")), []v1alpha1.PipelineTask{y, z}); d != "" { + t.Errorf("incorrect dependencees for no task. diff %s", d) + } + if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("a", "x", "y")), []v1alpha1.PipelineTask{z}); d != "" { + t.Errorf("incorrect dependencees for no task. diff %s", d) + } +} + +func sortPipelineTask(tasks []v1alpha1.PipelineTask) []v1alpha1.PipelineTask { + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].Name < tasks[j].Name + }) + return tasks } // hasErr returns true if err is not nil diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go index f51dbce5070..d6a6a9a3775 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go @@ -28,6 +28,7 @@ import ( informers "github.com/knative/build-pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1" listers "github.com/knative/build-pipeline/pkg/client/listers/pipeline/v1alpha1" "github.com/knative/build-pipeline/pkg/reconciler" + dag "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipeline/resources" "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun/config" "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun/resources" "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun" @@ -218,6 +219,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er p = p.DeepCopy() + d, err := dag.Build(p) + if err != nil { + // This Run has failed, so we need to mark it as failed and stop reconciling it + pr.Status.SetCondition(&duckv1alpha1.Condition{ + Type: duckv1alpha1.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: ReasonInvalidBindings, + Message: fmt.Sprintf("PipelineRun %s's Pipeline DAG is invalid: %s", + fmt.Sprintf("%s/%s", pr.Namespace, pr.Name), err), + }) + return nil + } providedResources, err := resources.GetResourcesFromBindings(p, pr) if err != nil { // This Run has failed, so we need to mark it as failed and stop reconciling it @@ -254,7 +267,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er c.resourceLister.PipelineResources(pr.Namespace).Get, p.Spec.Tasks, providedResources, ) - if err != nil { // This Run has failed, so we need to mark it as failed and stop reconciling it switch err := err.(type) { @@ -285,7 +297,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er } return nil } - if err := resources.ValidateFrom(pipelineState); err != nil { // This Run has failed, so we need to mark it as failed and stop reconciling it pr.Status.SetCondition(&duckv1alpha1.Condition{ @@ -322,7 +333,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er return cancelPipelineRun(pr, pipelineState, c.PipelineClientSet) } - rprt := resources.GetNextTask(pr.Name, pipelineState, c.Logger) + rprts := resources.GetNextTasks(pr.Name, d, pipelineState, c.Logger) var as artifacts.ArtifactStorageInterface if as, err = artifacts.InitializeArtifactStorage(pr, c.KubeClientSet, c.Logger); err != nil { @@ -330,12 +341,14 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er return err } - if rprt != nil { - c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName) - rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, as.StorageBasePath(pr)) - if err != nil { - c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err) - return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err) + for _, rprt := range rprts { + if rprt != nil { + c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName) + rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, as.StorageBasePath(pr)) + if err != nil { + c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err) + return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err) + } } } diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go index 3c0e078c79d..8d9084f9a08 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go @@ -232,11 +232,14 @@ func TestReconcile(t *testing.T) { t.Errorf("Expected reason %q but was %s", resources.ReasonRunning, condition.Reason) } - if len(reconciledRun.Status.TaskRuns) != 1 { - t.Errorf("Expected PipelineRun status to include only one TaskRun status item") + if len(reconciledRun.Status.TaskRuns) != 2 { + t.Errorf("Expected PipelineRun status to include only one TaskRun status item: %v", reconciledRun.Status.TaskRuns) } if _, exists := reconciledRun.Status.TaskRuns["test-pipeline-run-success-unit-test-1-9l9zj"]; exists == false { - t.Errorf("Expected PipelineRun status to include TaskRun status") + t.Error("Expected PipelineRun status to include TaskRun status") + } + if _, exists := reconciledRun.Status.TaskRuns["test-pipeline-run-success-unit-test-cluster-task-mssqb"]; exists == false { + t.Error("Expected PipelineRun status to include TaskRun status") } } diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution.go index 53f63242461..808e70c26c9 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution.go @@ -21,15 +21,17 @@ import ( "strings" "time" - "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" "github.com/knative/build-pipeline/pkg/names" - "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun/list" - "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun/resources" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" + dag "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipeline/resources" + "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun/list" + "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun/resources" ) const ( @@ -49,31 +51,28 @@ const ( ReasonTimedOut = "PipelineRunTimeout" ) -// GetNextTask returns the next Task for which a TaskRun should be created, -// or nil if no TaskRun should be created. -func GetNextTask(prName string, state []*ResolvedPipelineRunTask, logger *zap.SugaredLogger) *ResolvedPipelineRunTask { - for _, prtr := range state { - if prtr.TaskRun != nil { - c := prtr.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded) - if c == nil { - logger.Infof("TaskRun %s doesn't have a condition so it is just starting and we shouldn't start more for PipelineRun %s", prtr.TaskRunName, prName) - return nil +func GetNextTasks(prName string, d *dag.DAG, state []*ResolvedPipelineRunTask, logger *zap.SugaredLogger) []*ResolvedPipelineRunTask { + tasks := []*ResolvedPipelineRunTask{} + done := []string{} + for _, t := range state { + if t.TaskRun != nil { + c := t.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if c != nil && c.Status == corev1.ConditionTrue { + done = append(done, t.PipelineTask.Name) } - switch c.Status { - case corev1.ConditionFalse: - logger.Infof("TaskRun %s has failed; we don't need to run PipelineRun %s", prtr.TaskRunName, prName) - return nil - case corev1.ConditionUnknown: - logger.Infof("TaskRun %s is still running so we shouldn't start more for PipelineRun %s", prtr.TaskRunName, prName) - return nil + } + } + toSchedule := d.GetSchedulable(done...) + for _, t := range state { + for _, s := range toSchedule { + if t.PipelineTask.Name == s.Name { + if t.TaskRun == nil { + tasks = append(tasks, t) + } } - } else { - logger.Infof("TaskRun %s should be started for PipelineRun %s", prtr.TaskRunName, prName) - return prtr } } - logger.Infof("No TaskRuns to start for PipelineRun %s", prName) - return nil + return tasks } // ResolvedPipelineRunTask contains a Task and its associated TaskRun, if it @@ -271,14 +270,15 @@ func GetPipelineConditionStatus(prName string, state []*ResolvedPipelineRunTask, if rprt.TaskRun == nil { logger.Infof("TaskRun %s doesn't have a Status, so PipelineRun %s isn't finished", rprt.TaskRunName, prName) allFinished = false - break + continue } c := rprt.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded) if c == nil { logger.Infof("TaskRun %s doesn't have a condition, so PipelineRun %s isn't finished", rprt.TaskRunName, prName) allFinished = false - break + continue } + logger.Infof("TaskRun %s status : %v", rprt.TaskRunName, c.Status) // If any TaskRuns have failed, we should halt execution and consider the run failed if c.Status == corev1.ConditionFalse { logger.Infof("TaskRun %s has failed, so PipelineRun %s has failed", rprt.TaskRunName, prName) diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution_test.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution_test.go index 2e340e4ccd5..5ed7ee76b89 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution_test.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution_test.go @@ -221,53 +221,6 @@ var allFinishedState = []*ResolvedPipelineRunTask{{ }, }} -func TestGetNextTask(t *testing.T) { - tcs := []struct { - name string - state []*ResolvedPipelineRunTask - expectedTask *ResolvedPipelineRunTask - }{ - { - name: "no-tasks-started", - state: noneStartedState, - expectedTask: noneStartedState[0], - }, - { - name: "one-task-started", - state: oneStartedState, - expectedTask: nil, - }, - { - name: "one-task-finished", - state: oneFinishedState, - expectedTask: oneFinishedState[1], - }, - { - name: "one-task-failed", - state: oneFailedState, - expectedTask: nil, - }, - { - name: "first-task-finished", - state: firstFinishedState, - expectedTask: firstFinishedState[1], - }, - { - name: "all-finished", - state: allFinishedState, - expectedTask: nil, - }, - } - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - nextTask := GetNextTask("somepipelinerun", tc.state, zap.NewNop().Sugar()) - if d := cmp.Diff(nextTask, tc.expectedTask); d != "" { - t.Fatalf("Expected to indicate first task should be run, but different state returned: %s", d) - } - }) - } -} - func TestGetResourcesFromBindings(t *testing.T) { p := tb.Pipeline("pipelines", "namespace", tb.PipelineSpec( tb.PipelineDeclaredResource("git-resource", "git"), diff --git a/pkg/reconciler/v1alpha1/taskrun/resources/input_resource_test.go b/pkg/reconciler/v1alpha1/taskrun/resources/input_resource_test.go index 18cd96d2e5d..cfa33a2a69a 100644 --- a/pkg/reconciler/v1alpha1/taskrun/resources/input_resource_test.go +++ b/pkg/reconciler/v1alpha1/taskrun/resources/input_resource_test.go @@ -446,7 +446,7 @@ func TestAddResourceToBuild(t *testing.T) { wantErr: false, want: buildv1alpha1.BuildSpec{ Steps: []corev1.Container{{ - Name: "create-dir-gitspace-mz4c7", + Name: "create-dir-gitspace-0-0-mz4c7", Image: "override-with-bash-noop:latest", Args: []string{"-args", "mkdir -p /workspace/gitspace"}, }, { @@ -522,7 +522,7 @@ func TestAddResourceToBuild(t *testing.T) { wantErr: false, want: buildv1alpha1.BuildSpec{ Steps: []corev1.Container{{ - Name: "create-dir-workspace-mz4c7", + Name: "create-dir-workspace-0-0-mz4c7", Image: "override-with-bash-noop:latest", Args: []string{"-args", "mkdir -p /workspace/gcs-dir"}, }, { diff --git a/pkg/reconciler/v1alpha1/taskrun/resources/input_resources.go b/pkg/reconciler/v1alpha1/taskrun/resources/input_resources.go index 560e501d141..07f63e09af5 100644 --- a/pkg/reconciler/v1alpha1/taskrun/resources/input_resources.go +++ b/pkg/reconciler/v1alpha1/taskrun/resources/input_resources.go @@ -95,9 +95,10 @@ func AddInputResource( if as.GetType() == v1alpha1.ArtifactStoragePVCType { mountPVC = true - for _, ct := range cpContainers { + for j, ct := range cpContainers { ct.VolumeMounts = []corev1.VolumeMount{getPvcMount(pvcName)} - createAndCopyContainers := []corev1.Container{v1alpha1.CreateDirContainer(boundResource.Name, dPath), ct} + name := fmt.Sprintf("%s-%d-%d", boundResource.Name, i, j) + createAndCopyContainers := []corev1.Container{v1alpha1.CreateDirContainer(name, dPath), ct} copyStepsFromPrevTasks = append(copyStepsFromPrevTasks, createAndCopyContainers...) } } else { diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go index 72bf1e99329..e5de4f0d7e7 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go @@ -405,7 +405,7 @@ func TestReconcile(t *testing.T) { tb.PodRestartPolicy(corev1.RestartPolicyNever), tb.PodContainer("nop", "override-with-nop:latest"), getCredentialsInitContainer("vr6ds"), - tb.PodInitContainer("build-step-create-dir-another-git-resource-78c5n", "override-with-bash-noop:latest", + tb.PodInitContainer("build-step-create-dir-another-git-resource-0-0-78c5n", "override-with-bash-noop:latest", tb.Args("-args", "mkdir -p /workspace/another-git-resource"), tb.WorkingDir(workspaceDir), tb.EnvVar("HOME", "/builder/home"), @@ -420,7 +420,7 @@ func TestReconcile(t *testing.T) { tb.VolumeMount("workspace", workspaceDir), tb.VolumeMount("home", "/builder/home"), ), - tb.PodInitContainer("build-step-create-dir-git-resource-mz4c7", "override-with-bash-noop:latest", + tb.PodInitContainer("build-step-create-dir-git-resource-0-0-mz4c7", "override-with-bash-noop:latest", tb.Args("-args", "mkdir -p /workspace/git-resource"), tb.WorkingDir(workspaceDir), tb.EnvVar("HOME", "/builder/home"), diff --git a/test/dag_test.go b/test/dag_test.go new file mode 100644 index 00000000000..80230c8ecc7 --- /dev/null +++ b/test/dag_test.go @@ -0,0 +1,214 @@ +// +build e2e + +/* +Copyright 2018 Knative Authors LLC +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "fmt" + "regexp" + "strconv" + "strings" + "testing" + "time" + + "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" + tb "github.com/knative/build-pipeline/test/builder" + knativetest "github.com/knative/pkg/test" + "github.com/knative/pkg/test/logging" +) + +func TestDAGPipelineRun(t *testing.T) { + logger := logging.GetContextLogger(t.Name()) + c, namespace := setup(t, logger) + + knativetest.CleanupOnInterrupt(func() { tearDown(t, logger, c, namespace) }, logger) + defer tearDown(t, logger, c, namespace) + + echoTask := tb.Task("time-echo-task", namespace, tb.TaskSpec( + tb.TaskInputs( + tb.InputsResource("repo", v1alpha1.PipelineResourceTypeGit), + tb.InputsParam("filename", tb.ParamDescription("Name of the file to echo the time into")), + tb.InputsParam("sleep-sec", tb.ParamDescription("Number of seconds to sleep after echoing")), + ), + tb.TaskOutputs(tb.OutputsResource("repo", v1alpha1.PipelineResourceTypeGit)), + tb.Step("echo-time-into-file", "busybox", tb.Command("/bin/sh", "-c"), + tb.Args("pwd && mkdir -p /workspace/repo/folder && date +%s > /workspace/repo/folder/${inputs.params.filename} && sleep ${inputs.params.sleep-sec}"), + ), + )) + if _, err := c.TaskClient.Create(echoTask); err != nil { + t.Fatalf("Failed to create time echo Task: %s", err) + } + readTask := tb.Task("folder-read", namespace, tb.TaskSpec( + tb.TaskInputs( + tb.InputsResource("repo", v1alpha1.PipelineResourceTypeGit), + ), + tb.Step("read-all", "busybox", tb.Command("/bin/sh", "-c"), + tb.Args("cd /workspace/repo/folder && tail -n +1 -- *"), + ), + )) + if _, err := c.TaskClient.Create(readTask); err != nil { + t.Fatalf("Failed to create folder reader Task: %s", err) + } + repoResource := tb.PipelineResource("repo", namespace, tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeGit, + tb.PipelineResourceSpecParam("Url", "https://github.com/githubtraining/example-basic"), + )) + if _, err := c.PipelineResourceClient.Create(repoResource); err != nil { + t.Fatalf("Failed to create simple repo PipelineResource: %s", err) + } + pipeline := tb.Pipeline("dag-pipeline", namespace, tb.PipelineSpec( + tb.PipelineDeclaredResource("repo", "repo"), + tb.PipelineTask("pipeline-task-3", "time-echo-task", + tb.PipelineTaskInputResource("repo", "repo", tb.From("pipeline-task-2-parallel-1", "pipeline-task-2-parallel-2")), + tb.PipelineTaskOutputResource("repo", "repo"), + tb.PipelineTaskParam("filename", "pipeline-task-3"), + tb.PipelineTaskParam("sleep-sec", "5"), + ), + tb.PipelineTask("pipeline-task-2-parallel-2", "time-echo-task", + tb.PipelineTaskInputResource("repo", "repo", tb.From("pipeline-task-1")), tb.PipelineTaskOutputResource("repo", "repo"), + tb.PipelineTaskOutputResource("repo", "repo"), + tb.PipelineTaskParam("filename", "pipeline-task-2-paralell-2"), + tb.PipelineTaskParam("sleep-sec", "5"), + ), + tb.PipelineTask("pipeline-task-2-parallel-1", "time-echo-task", + tb.PipelineTaskInputResource("repo", "repo", tb.From("pipeline-task-1")), + tb.PipelineTaskOutputResource("repo", "repo"), + tb.PipelineTaskParam("filename", "pipeline-task-2-paralell-1"), + tb.PipelineTaskParam("sleep-sec", "5"), + ), + tb.PipelineTask("pipeline-task-1", "time-echo-task", + tb.PipelineTaskInputResource("repo", "repo"), + tb.PipelineTaskOutputResource("repo", "repo"), + tb.PipelineTaskParam("filename", "pipeline-task-1"), + tb.PipelineTaskParam("sleep-sec", "5"), + ), + tb.PipelineTask("pipeline-task-4-validate-results", "folder-read", + tb.PipelineTaskInputResource("repo", "repo", tb.From("pipeline-task-3")), + ), + )) + if _, err := c.PipelineClient.Create(pipeline); err != nil { + t.Fatalf("Failed to create dag-pipeline Pipeline: %s", err) + } + pipelineRun := tb.PipelineRun("dag-pipeline-run", namespace, tb.PipelineRunSpec("dag-pipeline", + tb.PipelineRunResourceBinding("repo", tb.PipelineResourceBindingRef("repo")), + )) + if _, err := c.PipelineRunClient.Create(pipelineRun); err != nil { + t.Fatalf("Failed to create dag-pipeline-run PipelineRun: %s", err) + } + logger.Infof("Waiting for DAG pipeline to complete") + if err := WaitForPipelineRunState(c, "dag-pipeline-run", pipelineRunTimeout, PipelineRunSucceed("dag-pipeline-run"), "PipelineRunSuccess"); err != nil { + t.Fatalf("Error waiting for PipelineRun to finish: %s", err) + } + // FIXME(vdemeester) do the rest :) + /* + logger.Infof("Getting logs from results validation task") + // The volume created with the results will have the same name as the TaskRun + validationTaskRunName := "dag-pipeline-run-pipeline-task-4-validate-results" + output, err := getBuildOutputFromVolume(logger, c, namespace, validationTaskRunName, "dag-validation-pod") + if err != nil { + t.Fatalf("Unable to get build output for taskrun %s: %s", validationTaskRunName, err) + } + fmt.Println(output) + + // Check that the overall order is correct + times, err := getTimes(output) + if err != nil { + t.Fatalf("Unable to parse output %q: %v", output, err) + } + sort.Sort(times) + + if times[0].name != "pipeline-task-1" { + t.Errorf("Expected first task to execute first, but %q was first", times[0].name) + } + if !strings.HasPrefix(times[1].name, "pipeline-task-2") { + t.Errorf("Expected parallel tasks to run second & third, but %q was second", times[1].name) + } + if !strings.HasPrefix(times[2].name, "pipeline-task-2") { + t.Errorf("Expected parallel tasks to run second & third, but %q was third", times[2].name) + } + if times[3].name != "pipeline-task-3" { + t.Errorf("Expected third task to execute third, but %q was third", times[3].name) + } + + // Check that the two tasks that can run in parallel did + parallelDiff := times[2].t.Sub(times[1].t) + if parallelDiff > (time.Second * 5) { + t.Errorf("Expected parallel tasks to execute more or less at the ame time, but they were %v apart", parallelDiff) + } + */ +} + +type fileTime struct { + name string + t time.Time +} + +type fileTimes []fileTime + +func (f fileTimes) Len() int { + return len(f) +} + +func (f fileTimes) Less(i, j int) bool { + return f[i].t.Before(f[j].t) +} + +func (f fileTimes) Swap(i, j int) { + f[i], f[j] = f[j], f[i] +} + +func getTimes(output string) (fileTimes, error) { + times := fileTimes{} + // The output of tail doesn't include the filename when it only outputs one file + if len(output) <= 1 { + return times, fmt.Errorf("output %q is too short to parse, this implies not all tasks wrote their files", output) + } + // ==> pipeline-task-1 <== + //1544055212 + // + //==> pipeline-task-2-parallel-1 <== + //1544055304 + // + //==> pipeline-task-2-parallel-2 <== + //1544055263 + // + //==> pipeline-task-3 <== + //1544055368 + r, err := regexp.Compile("==> (.*) <==") + if err != nil { + return times, fmt.Errorf("couldn't compile filename regex: %v", err) + } + + lines := strings.Split(output, "\n") + for i := 0; i < len(lines); i += 3 { + // First line is the name of the file + m := r.FindStringSubmatch(lines[i]) + if len(m) != 2 { + return times, fmt.Errorf("didn't find expected filename in output line %d: %q", i, lines[i]) + } + + // Next line is the date + i, err := strconv.Atoi(lines[i+1]) + if err != nil { + return times, fmt.Errorf("error converting date %q to int: %v", lines[i+1], err) + } + + times = append(times, fileTime{ + name: m[1], + t: time.Unix(int64(i), 0), + }) + } + return times, nil +}