Skip to content

Commit

Permalink
Use dag execution instead of linear one 👣
Browse files Browse the repository at this point in the history
- Rework the `dag` code a bit, adds GetSchedulable methods to DAG :
  Given a list of "done" task, give me the task that can be scheduled
- fix invalid detected cycle in `dag`

  The current DAG implementation was marking some graph as invalid even
  though they were. For example, the following DAG is valid.

  ```
             42
            /  \
         100    200
           \   / |
            101  |
              \  |
               102
  ```

  But the `dag.Build` would say there is a cycle in there. This is fixed
  by appending the visited node with the node currently "checked", that
  way, we can go through the same task twice, but from different path.

- Use the `dag` in the reconcilier
- Add a e2e test that "exercices" the dag
- Update the update pipeline status code, do not short-circuit
  pipeline status in case of a taskrun not being run yet… As we
  use a DAG execution flow, we want to check all tasks now.
- Make sure `create-dir` containers gets a unique name (the same way
  `copy-` containers do)

Signed-off-by: Vincent Demeester <[email protected]>
  • Loading branch information
vdemeester authored and bobcatfish committed Feb 27, 2019
1 parent 3ee62a6 commit d76325c
Show file tree
Hide file tree
Showing 10 changed files with 457 additions and 139 deletions.
63 changes: 53 additions & 10 deletions pkg/reconciler/v1alpha1/pipeline/resources/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,21 @@ func (g *DAG) addPrevPipelineTask(prev *Node, next *Node) error {
// Check if we are adding cycles.
visited := map[string]bool{prev.Task.Name: true, next.Task.Name: true}
path := []string{next.Task.Name, prev.Task.Name}
if err := visit(prev.Prev, path, visited); err != nil {
if err := visit(next.Task.Name, prev.Prev, path, visited); err != nil {
return fmt.Errorf("cycle detected; %s ", err.Error())
}
next.Prev = append(next.Prev, prev)
return nil
}

func visit(nodes []*Node, path []string, visited map[string]bool) error {
func visit(currentName string, nodes []*Node, path []string, visited map[string]bool) error {
for _, n := range nodes {
path = append(path, n.Task.Name)
if _, ok := visited[n.Task.Name]; ok {
return fmt.Errorf(getVisitedPath(path))
}
visited[n.Task.Name] = true
if err := visit(n.Prev, path, visited); err != nil {
visited[currentName+"."+n.Task.Name] = true
if err := visit(n.Task.Name, n.Prev, path, visited); err != nil {
return err
}
}
Expand All @@ -84,13 +84,56 @@ func getVisitedPath(path []string) string {
return strings.Join(path, " -> ")
}

//GetPreviousTasks return all the previous tasks for a PipelineTask in the DAG
func (g *DAG) GetPreviousTasks(pt string) []v1alpha1.PipelineTask {
v, ok := g.Nodes[pt]
if !ok {
return nil
// GetSchedulable returns a list of PipelineTask that can be scheduled,
// given a list of successfully finished task.
// If the list is empty, this returns all tasks in the DAG that nobody
// depends on. Else, it returns task which have all dependecies marked
// as done, and thus can be scheduled.
func (g *DAG) GetSchedulable(tasks ...string) []v1alpha1.PipelineTask {
d := []v1alpha1.PipelineTask{}
if len(tasks) == 0 {
// return node that have no previous tasks
for _, node := range g.Nodes {
if len(node.getPrevTasks()) == 0 {
d = append(d, node.Task)
}
}
} else {
tm := toMap(tasks...)
for name, node := range g.Nodes {
if _, ok := tm[name]; ok {
// skip done element
continue
}
if !isSchedulable(tm, node.getPrevTasks()) {
// skip non-schedulable element
continue
}
d = append(d, node.Task)
}
}
return d
}

func isSchedulable(tm map[string]struct{}, prevs []v1alpha1.PipelineTask) bool {
if len(prevs) == 0 {
return false
}
collected := []string{}
for _, t := range prevs {
if _, ok := tm[t.Name]; ok {
collected = append(collected, t.Name)
}
}
return len(collected) == len(prevs)
}

func toMap(t ...string) map[string]struct{} {
m := make(map[string]struct{}, len(t))
for _, s := range t {
m[s] = struct{}{}
}
return v.getPrevTasks()
return m
}

// Build returns a valid pipeline DAG. Returns error if the pipeline is invalid
Expand Down
167 changes: 129 additions & 38 deletions pkg/reconciler/v1alpha1/pipeline/resources/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package pipeline

import (
"sort"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -52,6 +53,30 @@ func TestBuild(t *testing.T) {
Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"z"}}},
},
}
dDependsOnA := v1alpha1.PipelineTask{
Name: "d",
Resources: &v1alpha1.PipelineTaskResources{
Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}},
},
}
eDependsOnA := v1alpha1.PipelineTask{
Name: "e",
Resources: &v1alpha1.PipelineTaskResources{
Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}},
},
}
fDependsOnDAndE := v1alpha1.PipelineTask{
Name: "f",
Resources: &v1alpha1.PipelineTaskResources{
Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"d", "e"}}},
},
}
gDependOnF := v1alpha1.PipelineTask{
Name: "g",
Resources: &v1alpha1.PipelineTaskResources{
Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"f"}}},
},
}
selfLink := v1alpha1.PipelineTask{
Name: "a",
Resources: &v1alpha1.PipelineTaskResources{
Expand All @@ -73,20 +98,22 @@ func TestBuild(t *testing.T) {
shdErr bool
expectedErr string
}{
{"linear-pipeline",
v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, b, c}},
&DAG{
{
name: "linear-pipeline",
spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, b, c}},
expectedDAG: &DAG{
Nodes: map[string]*Node{
"a": {Task: a},
"b": {Task: b},
"c": {Task: c},
},
},
false,
""},
{"complex-pipeline",
v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, xDependsOnA, yDependsOnAB, zDependsOnX, b, c}},
&DAG{
shdErr: false,
expectedErr: "",
}, {
name: "complex-pipeline",
spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, xDependsOnA, yDependsOnAB, zDependsOnX, b, c}},
expectedDAG: &DAG{
Nodes: map[string]*Node{
"a": {Task: a},
"b": {Task: b},
Expand All @@ -96,28 +123,57 @@ func TestBuild(t *testing.T) {
"z": {Task: zDependsOnX, Prev: []*Node{nodeX}},
},
},
false,
""},
{"self-link",
v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{selfLink}},
nil,
true,
` "self-link" is invalid: : Internal error: cycle detected; task "a" depends on itself`},
{"cycle-2",
v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{xDependsOnA, zDependsOnX, aDependsOnZ}},
nil,
true,
` "cycle-2" is invalid: : Internal error: cycle detected; a -> x -> z -> a `},
{"duplicate-tasks",
v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, a}},
nil,
true,
` "duplicate-tasks" is invalid: spec.tasks.name: Duplicate value: "a"`},
{"invalid-task-name",
v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{invalidTask}},
nil,
true,
` "invalid-task-name" is invalid: spec.tasks.name: Not found: "none"`},
shdErr: false,
expectedErr: "",
}, {
name: "self-link",
spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{selfLink}},
expectedDAG: nil,
shdErr: true,
expectedErr: ` "self-link" is invalid: : Internal error: cycle detected; task "a" depends on itself`,
}, {
name: "cycle-2",
spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{xDependsOnA, zDependsOnX, aDependsOnZ}},
expectedDAG: nil,
shdErr: true,
expectedErr: ` "cycle-2" is invalid: : Internal error: cycle detected; a -> x -> z -> a `,
}, {
name: "duplicate-tasks",
spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, a}},
expectedDAG: nil,
shdErr: true,
expectedErr: ` "duplicate-tasks" is invalid: spec.tasks.name: Duplicate value: "a"`,
}, {
name: "invalid-task-name",
spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{invalidTask}},
expectedDAG: nil,
shdErr: true,
expectedErr: ` "invalid-task-name" is invalid: spec.tasks.name: Not found: "none"`,
}, {
// This test make sure we don't detect cycle (A -> B -> B -> …) when there is not
// The graph looks like the following.
// a
// / \
// d e
// \ /
// f
// |
// g
// This means we "visit" a twice, from two different path ; but there is no cycle.
name: "no-cycle",
spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, dDependsOnA, eDependsOnA, fDependsOnDAndE, gDependOnF}},
expectedDAG: &DAG{
Nodes: map[string]*Node{
"a": {Task: a},
"d": {Task: dDependsOnA, Prev: []*Node{{Task: a}}},
"e": {Task: eDependsOnA, Prev: []*Node{{Task: a}}},
"f": {Task: fDependsOnDAndE, Prev: []*Node{{Task: dDependsOnA, Prev: []*Node{{Task: a}}}, {Task: eDependsOnA, Prev: []*Node{{Task: a}}}}},
"g": {Task: gDependOnF, Prev: []*Node{{Task: fDependsOnDAndE, Prev: []*Node{{Task: dDependsOnA, Prev: []*Node{{Task: a}}}, {Task: eDependsOnA, Prev: []*Node{{Task: a}}}}}}},
},
},
shdErr: false,
expectedErr: "",
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -144,8 +200,15 @@ func TestBuild(t *testing.T) {
}
}

func TestGetPrevTasks(t *testing.T) {
func TestGetSchedulable(t *testing.T) {
a := v1alpha1.PipelineTask{Name: "a"}
b := v1alpha1.PipelineTask{Name: "b"}
w := v1alpha1.PipelineTask{
Name: "w",
Resources: &v1alpha1.PipelineTaskResources{
Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"b", "y"}}},
},
}
x := v1alpha1.PipelineTask{
Name: "x",
Resources: &v1alpha1.PipelineTaskResources{
Expand All @@ -158,28 +221,56 @@ func TestGetPrevTasks(t *testing.T) {
Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"x", "a"}}},
},
}
z := v1alpha1.PipelineTask{
Name: "z",
Resources: &v1alpha1.PipelineTaskResources{
Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"x"}}},
},
}
p := v1alpha1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "test",
},
Spec: v1alpha1.PipelineSpec{
Tasks: []v1alpha1.PipelineTask{a, x, y},
Tasks: []v1alpha1.PipelineTask{a, b, w, x, y, z},
},
}
g, err := Build(&p)
if err != nil {
t.Fatalf("unexpected error %s", err)
}
if d := cmp.Diff(g.GetPreviousTasks("a"), []v1alpha1.PipelineTask{}); d != "" {
t.Errorf("incorrect prev tasks for PipelineTask a. diff %s", d)
if d := cmp.Diff(sortPipelineTask(g.GetSchedulable()), []v1alpha1.PipelineTask{a, b}); d != "" {
t.Errorf("incorrect dependencees for no task. diff %s", d)
}
if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("a")), []v1alpha1.PipelineTask{x}); d != "" {
t.Errorf("incorrect dependencees for no task. diff %s", d)
}
if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("b")), []v1alpha1.PipelineTask{}); d != "" {
t.Errorf("incorrect dependencees for no task. diff %s", d)
}
if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("a", "b")), []v1alpha1.PipelineTask{x}); d != "" {
t.Errorf("incorrect dependencees for no task. diff %s", d)
}
if d := cmp.Diff(g.GetPreviousTasks("x"), []v1alpha1.PipelineTask{a}); d != "" {
t.Errorf("incorrect prev tasks for PipelineTask x. diff %s", d)
if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("x")), []v1alpha1.PipelineTask{z}); d != "" {
t.Errorf("incorrect dependencees for no task. diff %s", d)
}
if d := cmp.Diff(g.GetPreviousTasks("y"), []v1alpha1.PipelineTask{x, a}); d != "" {
t.Errorf("incorrect prev tasks for PipelineTask y. diff %s", d)
if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("a", "x")), []v1alpha1.PipelineTask{y, z}); d != "" {
t.Errorf("incorrect dependencees for no task. diff %s", d)
}
if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("a", "x", "b")), []v1alpha1.PipelineTask{y, z}); d != "" {
t.Errorf("incorrect dependencees for no task. diff %s", d)
}
if d := cmp.Diff(sortPipelineTask(g.GetSchedulable("a", "x", "y")), []v1alpha1.PipelineTask{z}); d != "" {
t.Errorf("incorrect dependencees for no task. diff %s", d)
}
}

func sortPipelineTask(tasks []v1alpha1.PipelineTask) []v1alpha1.PipelineTask {
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].Name < tasks[j].Name
})
return tasks
}

// hasErr returns true if err is not nil
Expand Down
31 changes: 22 additions & 9 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
informers "github.com/knative/build-pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1"
listers "github.com/knative/build-pipeline/pkg/client/listers/pipeline/v1alpha1"
"github.com/knative/build-pipeline/pkg/reconciler"
dag "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipeline/resources"
"github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun/config"
"github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun/resources"
"github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun"
Expand Down Expand Up @@ -218,6 +219,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er

p = p.DeepCopy()

d, err := dag.Build(p)
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.SetCondition(&duckv1alpha1.Condition{
Type: duckv1alpha1.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: ReasonInvalidBindings,
Message: fmt.Sprintf("PipelineRun %s's Pipeline DAG is invalid: %s",
fmt.Sprintf("%s/%s", pr.Namespace, pr.Name), err),
})
return nil
}
providedResources, err := resources.GetResourcesFromBindings(p, pr)
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
Expand Down Expand Up @@ -254,7 +267,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
c.resourceLister.PipelineResources(pr.Namespace).Get,
p.Spec.Tasks, providedResources,
)

if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
switch err := err.(type) {
Expand Down Expand Up @@ -285,7 +297,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
}
return nil
}

if err := resources.ValidateFrom(pipelineState); err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.SetCondition(&duckv1alpha1.Condition{
Expand Down Expand Up @@ -322,20 +333,22 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
return cancelPipelineRun(pr, pipelineState, c.PipelineClientSet)
}

rprt := resources.GetNextTask(pr.Name, pipelineState, c.Logger)
rprts := resources.GetNextTasks(pr.Name, d, pipelineState, c.Logger)

var as artifacts.ArtifactStorageInterface
if as, err = artifacts.InitializeArtifactStorage(pr, c.KubeClientSet, c.Logger); err != nil {
c.Logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name)
return err
}

if rprt != nil {
c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, as.StorageBasePath(pr))
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
for _, rprt := range rprts {
if rprt != nil {
c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, as.StorageBasePath(pr))
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
}
}
}

Expand Down
Loading

0 comments on commit d76325c

Please sign in to comment.