Skip to content

Commit

Permalink
Make pipelinerun reconciler call task resolution logic
Browse files Browse the repository at this point in the history
Now when resolving a PipelineRun, we will resolve all of the resource
references as well. To do this, we make it so that PipelineRuns and
TaskRuns use the same types to bind resources. This has the bonus of
allowing users to provide resource paths in pipelineruns if they wanted
to.

This will allow us to use the same logic to validate resource bindings
for both pipelineruns and taskruns (for tektoncd#213).
  • Loading branch information
bobcatfish committed Dec 4, 2018
1 parent acce788 commit 9c18103
Show file tree
Hide file tree
Showing 23 changed files with 389 additions and 220 deletions.
8 changes: 0 additions & 8 deletions pkg/apis/pipeline/v1alpha1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ type PipelineTaskResource struct {
Outputs []TaskResourceBinding `json:"outputs"`
}

// TaskResourceBinding is used to bind a PipelineResource to a PipelineResource required for a Task as an input or an output.
type TaskResourceBinding struct {
// Name is the name of the Task's input that this Resource should be used for.
Name string `json:"name"`
// The Resource that should be provided to the Task for the Resource it requires.
ResourceRef PipelineResourceRef `json:"resourceRef"`
}

// PipelineResourceRef can be used to refer to a specific instance of a Resource
type PipelineResourceRef struct {
// Name of the referent; More info: http://kubernetes.io/docs/user-guide/identifiers#names
Expand Down
8 changes: 5 additions & 3 deletions pkg/apis/pipeline/v1alpha1/resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ type PipelineResource struct {
Status PipelineResourceStatus `json:"status,omitempty"`
}

// TaskRunResource points to the PipelineResource that
// will be used for the Task input or output called Name.
type TaskRunResource struct {
// TaskResourceBinding points to the PipelineResource that
// will be used for the Task input or output called Name. The optional Path field
// corresponds to a path on disk at which the Resource can be found (used when providing
// the resource via mounted volume, overriding the default logic to fetch the Resource).
type TaskResourceBinding struct {
Name string `json:"name"`
ResourceRef PipelineResourceRef `json:"resourceRef"`
// +optional
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ type TaskRunSpec struct {
// TaskRunInputs holds the input values that this task was invoked with.
type TaskRunInputs struct {
// +optional
Resources []TaskRunResource `json:"resources,omitempty"`
Resources []TaskResourceBinding `json:"resources,omitempty"`
// +optional
Params []Param `json:"params,omitempty"`
}

// TaskRunOutputs holds the output values that this task was invoked with.
type TaskRunOutputs struct {
// +optional
Resources []TaskRunResource `json:"resources,omitempty"`
Resources []TaskResourceBinding `json:"resources,omitempty"`
// +optional
Params []Param `json:"params,omitempty"`
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/pipeline/v1alpha1/taskrun_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (o TaskRunOutputs) Validate(path string) *apis.FieldError {
return checkForPipelineResourceDuplicates(o.Resources, fmt.Sprintf("%s.Resources.Name", path))
}

func checkForPipelineResourceDuplicates(resources []TaskRunResource, path string) *apis.FieldError {
func checkForPipelineResourceDuplicates(resources []TaskResourceBinding, path string) *apis.FieldError {
encountered := map[string]struct{}{}
for _, r := range resources {
// We should provide only one binding for each resource required by the Task.
Expand Down
10 changes: 5 additions & 5 deletions pkg/apis/pipeline/v1alpha1/taskrun_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func TestInput_Validate(t *testing.T) {
Name: "name",
Value: "value",
}},
Resources: []TaskRunResource{{
Resources: []TaskResourceBinding{{
ResourceRef: PipelineResourceRef{
Name: "testresource",
},
Expand All @@ -261,7 +261,7 @@ func TestInput_Invalidate(t *testing.T) {
{
name: "duplicate task inputs",
inputs: TaskRunInputs{
Resources: []TaskRunResource{{
Resources: []TaskResourceBinding{{
ResourceRef: PipelineResourceRef{
Name: "testresource1",
},
Expand All @@ -278,7 +278,7 @@ func TestInput_Invalidate(t *testing.T) {
{
name: "invalid task input params",
inputs: TaskRunInputs{
Resources: []TaskRunResource{{
Resources: []TaskResourceBinding{{
ResourceRef: PipelineResourceRef{
Name: "testresource",
},
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestInput_Invalidate(t *testing.T) {

func TestOutput_Validate(t *testing.T) {
i := TaskRunOutputs{
Resources: []TaskRunResource{{
Resources: []TaskResourceBinding{{
ResourceRef: PipelineResourceRef{
Name: "testresource",
},
Expand All @@ -327,7 +327,7 @@ func TestOutput_Invalidate(t *testing.T) {
{
name: "duplicated task outputs",
outputs: TaskRunOutputs{
Resources: []TaskRunResource{{
Resources: []TaskResourceBinding{{
ResourceRef: PipelineResourceRef{
Name: "testresource1",
},
Expand Down
39 changes: 13 additions & 26 deletions pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 24 additions & 23 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,10 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
return nil
}

pipelineState, err := resources.GetPipelineState(
func(namespace, name string) (*v1alpha1.Task, error) {
return c.taskLister.Tasks(namespace).Get(name)
},
func(namespace, name string) (*v1alpha1.TaskRun, error) {
return c.taskRunLister.TaskRuns(namespace).Get(name)
},
p, pr.Name,
pipelineState, err := resources.ResolvePipelineRun(
c.taskLister.Tasks(pr.Namespace).Get,
c.resourceLister.PipelineResources(pr.Namespace).Get,
p, pr,
)
if err != nil {
if errors.IsNotFound(err) {
Expand All @@ -186,21 +182,26 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
})
return nil
}
return fmt.Errorf("error getting Tasks and/or TaskRuns for Pipeline %s: %s", p.Name, err)
return fmt.Errorf("error getting Tasks for Pipeline %s: %s", p.Name, err)
}
prtr := resources.GetNextTask(pr.Name, pipelineState, c.Logger)
err = resources.ResolveTaskRuns(c.taskRunLister.TaskRuns(pr.Namespace).Get, pipelineState)
if err != nil {
return fmt.Errorf("error getting TaskRunss for Pipeline %s: %s", p.Name, err)
}

rprt := resources.GetNextTask(pr.Name, pipelineState, c.Logger)

if err := getOrCreatePVC(pr, c.KubeClientSet); err != nil {
c.Logger.Infof("PipelineRun failed to create/get volume %s", pr.Name)
return fmt.Errorf("Failed to create/get persistent volume claim %s for task %q: %v", pr.Name, err, pr.Name)
}

if prtr != nil {
c.Logger.Infof("Creating a new TaskRun object %s", prtr.TaskRunName)
prtr.TaskRun, err = c.createTaskRun(c.Logger, prtr.Task, prtr.TaskRunName, pr, prtr.PipelineTask, serviceAccount)
if rprt != nil {
c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
rprt.TaskRun, err = c.createTaskRun(c.Logger, pr.Namespace, rprt.ResolvedTaskRun.TaskName, rprt.TaskRunName, pr, rprt.PipelineTask, serviceAccount)
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", prtr.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", prtr.TaskRunName, prtr.PipelineTask.Name, pr.Name, err)
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
}
}

Expand All @@ -216,19 +217,19 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
return nil
}

func UpdateTaskRunsStatus(pr *v1alpha1.PipelineRun, pipelineState []*resources.PipelineRunTaskRun) {
for _, prtr := range pipelineState {
if prtr.TaskRun != nil {
pr.Status.TaskRuns[prtr.TaskRun.Name] = prtr.TaskRun.Status
func UpdateTaskRunsStatus(pr *v1alpha1.PipelineRun, pipelineState []*resources.ResolvedPipelineRunTask) {
for _, rprt := range pipelineState {
if rprt.TaskRun != nil {
pr.Status.TaskRuns[rprt.TaskRun.Name] = rprt.TaskRun.Status
}
}
}

func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, t *v1alpha1.Task, trName string, pr *v1alpha1.PipelineRun, pt *v1alpha1.PipelineTask, sa string) (*v1alpha1.TaskRun, error) {
func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, namespace, taskName, trName string, pr *v1alpha1.PipelineRun, pt *v1alpha1.PipelineTask, sa string) (*v1alpha1.TaskRun, error) {
tr := &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: trName,
Namespace: t.Namespace,
Namespace: namespace,
OwnerReferences: pr.GetOwnerReference(),
Labels: map[string]string{
pipeline.GroupName + pipeline.PipelineLabelKey: pr.Spec.PipelineRef.Name,
Expand All @@ -237,7 +238,7 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, t *v1alpha1.Task,
},
Spec: v1alpha1.TaskRunSpec{
TaskRef: &v1alpha1.TaskRef{
Name: t.Name,
Name: taskName,
},
Inputs: v1alpha1.TaskRunInputs{
Params: pt.Params,
Expand All @@ -247,7 +248,7 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, t *v1alpha1.Task,
}
resources.WrapSteps(&tr.Spec, pr.Spec.PipelineTaskResources, pt)

return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(t.Namespace).Create(tr)
return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(namespace).Create(tr)
}

func (c *Reconciler) updateStatus(pr *v1alpha1.PipelineRun) (*v1alpha1.PipelineRun, error) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun"
"github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun/resources"
taskrunresources "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun/resources"
"github.com/knative/build-pipeline/test"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -232,15 +233,15 @@ func TestReconcile(t *testing.T) {
Value: "${inputs.workspace.revision}",
},
},
Resources: []v1alpha1.TaskRunResource{{
Resources: []v1alpha1.TaskResourceBinding{{
ResourceRef: v1alpha1.PipelineResourceRef{
Name: "some-repo",
},
Name: "workspace",
}},
},
Outputs: v1alpha1.TaskRunOutputs{
Resources: []v1alpha1.TaskRunResource{{
Resources: []v1alpha1.TaskResourceBinding{{
Name: "image-to-use",
ResourceRef: v1alpha1.PipelineResourceRef{Name: "some-image"},
Paths: []string{"/pvc/unit-test-1/image-to-use"},
Expand Down Expand Up @@ -483,11 +484,13 @@ func TestUpdateTaskRunsState(t *testing.T) {
TaskRuns: expectedTaskRunsStatus,
}

state := []*resources.PipelineRunTaskRun{{
Task: task,
state := []*resources.ResolvedPipelineRunTask{{
PipelineTask: &pipelineTask,
TaskRunName: "test-pipeline-run-success-unit-test-1",
TaskRun: taskrun,
ResolvedTaskRun: &taskrunresources.ResolvedTaskRun{
TaskSpec: &task.Spec,
},
}}
pr.Status.InitializeConditions()
pipelinerun.UpdateTaskRunsStatus(pr, state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ var (
pvcDir = "/pvc"
)

// GetOutputSteps function reads output task resources and constructs post task step
// postSteps contains array of resource names and named path(pvcdir + name of task + name of resource) under which the output resource will be dumped in PVC
func GetOutputSteps(taskResources []v1alpha1.TaskResourceBinding, taskName string) []v1alpha1.TaskRunResource {
var taskOutputResources []v1alpha1.TaskRunResource
func GetOutputSteps(taskResources []v1alpha1.TaskResourceBinding, taskName string) []v1alpha1.TaskResourceBinding {
var taskOutputResources []v1alpha1.TaskResourceBinding

for _, outputRes := range taskResources {
taskOutputResources = append(taskOutputResources, v1alpha1.TaskRunResource{
taskOutputResources = append(taskOutputResources, v1alpha1.TaskResourceBinding{
ResourceRef: outputRes.ResourceRef,
Name: outputRes.Name,
Paths: []string{filepath.Join(pvcDir, taskName, outputRes.Name)},
Expand All @@ -40,13 +38,11 @@ func GetOutputSteps(taskResources []v1alpha1.TaskResourceBinding, taskName strin
return taskOutputResources
}

// GetInputSteps function reads input bindings and constructs pre build step
// with information to create build step to setup altered inputs.
func GetInputSteps(taskResources []v1alpha1.TaskResourceBinding, pt *v1alpha1.PipelineTask) []v1alpha1.TaskRunResource {
var taskInputResources []v1alpha1.TaskRunResource
func GetInputSteps(taskResources []v1alpha1.TaskResourceBinding, pt *v1alpha1.PipelineTask) []v1alpha1.TaskResourceBinding {
var taskInputResources []v1alpha1.TaskResourceBinding

for _, inputResource := range taskResources {
taskInputResource := v1alpha1.TaskRunResource{
taskInputResource := v1alpha1.TaskResourceBinding{
ResourceRef: inputResource.ResourceRef,
Name: inputResource.Name,
}
Expand All @@ -67,7 +63,6 @@ func GetInputSteps(taskResources []v1alpha1.TaskResourceBinding, pt *v1alpha1.Pi
return taskInputResources
}

// WrapSteps input and resources for taskrun along with presteps , poststeps
func WrapSteps(tr *v1alpha1.TaskRunSpec, pipelineResources []v1alpha1.PipelineTaskResource, pt *v1alpha1.PipelineTask) {
if pt == nil {
return
Expand Down
Loading

0 comments on commit 9c18103

Please sign in to comment.