Skip to content

Commit

Permalink
feat(sdk): Sdk ParallelFor task spec inline (kubeflow#633)
Browse files Browse the repository at this point in the history
* TODO1: PipelineLoop inlined including nested loops and added tests.

* Supported recursive and parallel tasks inlining.

    For recursive tasks, we simply skip them.
    For Parallel tasks, relevant tests were added.

* Run all tests against inlined taskSpec and normal taskRefs

* Added CLI Flag disable/enable spec inlining.
  • Loading branch information
ScrapCodes authored Jun 30, 2021
1 parent 2fc23fa commit 64d4324
Show file tree
Hide file tree
Showing 72 changed files with 8,637 additions and 58 deletions.
2 changes: 2 additions & 0 deletions sdk/python/kfp_tekton/compiler/_tekton_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def _handle_tekton_pipeline_variables(pipeline_run):
for task in task_list:
if task.get('taskRef', {}):
continue
if 'taskSpec' in task and 'apiVersion' in task['taskSpec']:
continue
for key, val in pipeline_variables.items():
task_str = json.dumps(task['taskSpec']['steps'])
if val in task_str:
Expand Down
88 changes: 72 additions & 16 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,36 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import ast
import inspect
import json
import tarfile
import zipfile
import os
import re
import tarfile
import textwrap
import yaml
import os
import uuid
import ast

from typing import Callable, List, Text, Dict, Any
from os import environ as env
import zipfile
from collections import defaultdict
from distutils.util import strtobool
from os import environ as env
from typing import Callable, List, Text, Dict, Any

import yaml
# Kubeflow Pipeline imports
from kfp import dsl
from kfp.compiler._default_transformers import add_pod_env
from kfp.compiler.compiler import Compiler
from kfp.components.structures import InputSpec
from kfp.dsl._for_loop import LoopArguments
from kfp.dsl._metadata import _extract_pipeline_metadata
from collections import defaultdict

# KFP-Tekton imports
from kfp_tekton.compiler import __tekton_api_version__ as tekton_api_version
from kfp_tekton.compiler._data_passing_rewriter import fix_big_data_passing
from kfp_tekton.compiler._k8s_helper import convert_k8s_obj_to_json, sanitize_k8s_name, sanitize_k8s_object
from kfp_tekton.compiler._op_to_template import _op_to_template
from kfp_tekton.compiler.yaml_utils import dump_yaml
from kfp_tekton.compiler.pipeline_utils import TektonPipelineConf
from kfp_tekton.compiler._tekton_handler import _handle_tekton_pipeline_variables, _handle_tekton_custom_task
from kfp_tekton.compiler.pipeline_utils import TektonPipelineConf
from kfp_tekton.compiler.yaml_utils import dump_yaml
from kfp_tekton.tekton import TEKTON_CUSTOM_TASK_IMAGES, DEFAULT_CONDITION_OUTPUT_KEYWORD

DEFAULT_ARTIFACT_BUCKET = env.get('DEFAULT_ARTIFACT_BUCKET', 'mlpipeline')
Expand Down Expand Up @@ -131,11 +129,13 @@ def __init__(self, **kwargs):
self._group_names = []
self.pipeline_labels = {}
self.pipeline_annotations = {}
self.tekton_inline_spec = True
super().__init__(**kwargs)

def _set_pipeline_conf(self, tekton_pipeline_conf: TektonPipelineConf):
self.pipeline_labels = tekton_pipeline_conf.pipeline_labels
self.pipeline_annotations = tekton_pipeline_conf.pipeline_annotations
self.tekton_inline_spec = tekton_pipeline_conf.tekton_inline_spec

def _resolve_value_or_reference(self, value_or_reference, potential_references):
"""_resolve_value_or_reference resolves values and PipelineParams, which could be task parameters or input parameters.
Expand Down Expand Up @@ -1205,7 +1205,7 @@ def compile(self,
Args:
pipeline_func: pipeline functions with @dsl.pipeline decorator.
package_path: the output workflow tar.gz file path. for example, "~/a.tar.gz"
type_check: whether to enable the type check or not, default: False.
type_check: whether to enable the type check or not, default: True.
pipeline_conf: PipelineConf instance. Can specify op transforms,
image pull secrets and other pipeline-level configuration options.
Overrides any configuration that may be set by the pipeline.
Expand Down Expand Up @@ -1316,18 +1316,74 @@ def _create_and_write_workflow(self,
# Separate loop workflow from the main workflow
if self.loops_pipeline:
pipeline_loop_crs, workflow = _handle_tekton_custom_task(self.loops_pipeline, workflow, self.recursive_tasks, self._group_names)
inlined_as_taskSpec: List[Text] = []
recursive_tasks_names: List[Text] = [x['taskRef'].get('name', "") for x in self.recursive_tasks]
if self.tekton_inline_spec:
# Step 1. inline all the pipeline_loop_crs as they may refer to each other.
for i in range(len(pipeline_loop_crs)):
if 'pipelineSpec' in pipeline_loop_crs[i]['spec']:
if 'params' in pipeline_loop_crs[i]['spec']['pipelineSpec']:
# Preserve order of params, required by tests.
pipeline_loop_crs[i]['spec']['pipelineSpec']['params'] =\
sorted(pipeline_loop_crs[i]['spec']['pipelineSpec']['params'], key=lambda kv: (kv['name']))
t, e = TektonCompiler._inline_tasks(pipeline_loop_crs[i]['spec']['pipelineSpec']['tasks'],
pipeline_loop_crs, recursive_tasks_names)
if e:
pipeline_loop_crs[i]['spec']['pipelineSpec']['tasks'] = t
inlined_as_taskSpec.extend(e)
# Step 2. inline pipeline_loop_crs in the workflow
workflow_tasks, e = TektonCompiler._inline_tasks(workflow['spec']['pipelineSpec']['tasks'],
pipeline_loop_crs, recursive_tasks_names)
inlined_as_taskSpec.extend(e)
workflow['spec']['pipelineSpec']['tasks'] = workflow_tasks

TektonCompiler._write_workflow(workflow=workflow, package_path=package_path)

# create cr yaml for only those pipelineLoop cr which could not be converted to inlined spec.
for i in range(len(pipeline_loop_crs)):
TektonCompiler._write_workflow(workflow=pipeline_loop_crs[i],
package_path=os.path.splitext(package_path)[0] + "_pipelineloop_cr" + str(i + 1) + '.yaml')
if pipeline_loop_crs[i]['metadata'].get('name', "") not in inlined_as_taskSpec:
TektonCompiler._write_workflow(workflow=pipeline_loop_crs[i],
package_path=os.path.splitext(package_path)[0] +
"_pipelineloop_cr" + str(i + 1) + '.yaml')
else:
TektonCompiler._write_workflow(workflow=workflow, package_path=package_path) # Tekton change
# Separate custom task CR from the main workflow
for i in range(len(self.custom_task_crs)):
TektonCompiler._write_workflow(workflow=self.custom_task_crs[i],
package_path=os.path.splitext(package_path)[0] + "_customtask_cr" + str(i + 1) + '.yaml')
package_path=os.path.splitext(package_path)[0] +
"_customtask_cr" + str(i + 1) + '.yaml')
_validate_workflow(workflow)

@staticmethod
def _inline_tasks(tasks: List[Dict[Text, Any]], crs: List[Dict[Text, Any]], recursive_tasks: List[Text]):
"""
Scan all the `tasks` and for each taskRef in `tasks` resolve it in `crs`
and inline them as taskSpec.
return tasks with all the taskRef -> taskSpec resolved.
list of names of the taskRef that were successfully converted.
"""
workflow_tasks = tasks.copy()
inlined_as_taskSpec = []
for j in range(len(workflow_tasks)):
if 'params' in workflow_tasks[j]:
# Preserve order of params, required by tests.
workflow_tasks[j]['params'] = sorted(workflow_tasks[j]['params'], key=lambda kv: (kv['name']))
if 'taskRef' in workflow_tasks[j]:
wf_taskRef = workflow_tasks[j]['taskRef']
if 'name' in wf_taskRef and \
wf_taskRef['name'] not in recursive_tasks: # we do not inline recursive tasks.
cr_apiVersion = wf_taskRef['apiVersion']
cr_kind = wf_taskRef['kind']
cr_ref_name = wf_taskRef['name']
for i in range(len(crs)):
if crs[i]['metadata'].get('name', "") == cr_ref_name:
workflow_tasks[j]['taskSpec'] = \
{'apiVersion': cr_apiVersion, 'kind': cr_kind,
'spec': crs[i]['spec']}
inlined_as_taskSpec.append(cr_ref_name)
workflow_tasks[j].pop('taskRef')
return workflow_tasks, inlined_as_taskSpec


def _validate_workflow(workflow: Dict[Text, Any]):

Expand Down
25 changes: 17 additions & 8 deletions sdk/python/kfp_tekton/compiler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp.compiler.main as kfp_compiler_main
import argparse
import sys
import os
from . import TektonCompiler
import sys

import kfp.compiler.main as kfp_compiler_main
from kfp_tekton.compiler.pipeline_utils import TektonPipelineConf

from . import TektonCompiler
from .. import __version__


Expand All @@ -44,12 +46,15 @@ def parse_arguments():
parser.add_argument('--disable-type-check',
action='store_true',
help='disable the type check, default is enabled.')
parser.add_argument('--disable-task-inline',
action='store_true',
help='disable taskSpec inlining, default is enabled.')

args = parser.parse_args()
return args


def _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check):
def _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check, tekton_pipeline_conf=None):
if len(pipeline_funcs) == 0:
raise ValueError('A function with @dsl.pipeline decorator is required in the py file.')

Expand All @@ -65,16 +70,16 @@ def _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_
else:
pipeline_func = pipeline_funcs[0]

TektonCompiler().compile(pipeline_func, output_path, type_check)
TektonCompiler().compile(pipeline_func, output_path, type_check, tekton_pipeline_conf=tekton_pipeline_conf)


def compile_pyfile(pyfile, function_name, output_path, type_check):
def compile_pyfile(pyfile, function_name, output_path, type_check, tekton_pipeline_conf=None):
sys.path.insert(0, os.path.dirname(pyfile))
try:
filename = os.path.basename(pyfile)
with kfp_compiler_main.PipelineCollectorContext() as pipeline_funcs:
__import__(os.path.splitext(filename)[0])
_compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check)
_compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check, tekton_pipeline_conf)
finally:
del sys.path[0]

Expand All @@ -86,7 +91,11 @@ def main():
(args.py is not None and args.package is not None)):
raise ValueError('Either --py or --package is needed but not both.')
if args.py:
compile_pyfile(args.py, args.function, args.output, not args.disable_type_check)
tekton_pipeline_conf = TektonPipelineConf()
tekton_pipeline_conf.set_tekton_inline_spec(True)
if args.disable_task_inline:
tekton_pipeline_conf.set_tekton_inline_spec(False)
compile_pyfile(args.py, args.function, args.output, not args.disable_type_check, tekton_pipeline_conf)
else:
if args.namespace is None:
raise ValueError('--namespace is required for compiling packages.')
Expand Down
11 changes: 11 additions & 0 deletions sdk/python/kfp_tekton/compiler/pipeline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@ class TektonPipelineConf(dsl.PipelineConf):
def __init__(self, **kwargs):
self.pipeline_labels = {}
self.pipeline_annotations = {}
self.tekton_inline_spec = True
super().__init__(**kwargs)

def copy(self):
return TektonPipelineConf()\
.add_pipeline_label(self.pipeline_labels)\
.add_pipeline_annotation(self.pipeline_annotations)\
.set_tekton_inline_spec(self.tekton_inline_spec)

def add_pipeline_label(self, label_name: str, value: str):
self.pipeline_labels[label_name] = value
return self
Expand All @@ -37,3 +44,7 @@ def add_pipeline_annotation(self, annotation_name: str, value: str):
% annotation_name, value)
self.pipeline_annotations[annotation_name] = value
return self

def set_tekton_inline_spec(self, value: bool):
self.tekton_inline_spec = value
return self
38 changes: 32 additions & 6 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
import tempfile
import textwrap
import unittest
import yaml
import pytest

from os import environ as env

import pytest
import yaml
from kfp_tekton import compiler

from kfp_tekton.compiler.pipeline_utils import TektonPipelineConf

# temporarily set this flag to True in order to (re)generate new "golden" YAML
# files after making code changes that modify the expected YAML output.
Expand Down Expand Up @@ -506,15 +505,38 @@ def test_any_sequencer(self):

self._test_pipeline_workflow(any_sequence_pipeline, 'any_sequencer.yaml')

def _test_pipeline_workflow(self,
def _test_pipeline_workflow_inlined_spec(self,
pipeline_function,
pipeline_yaml,
normalize_compiler_output_function=None,
tekton_pipeline_conf=None):
tekton_pipeline_conf=TektonPipelineConf()):
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml)
temp_dir = tempfile.mkdtemp()
compiled_yaml_file = os.path.join(temp_dir, 'workflow.yaml')
tekton_pipeline_conf.set_tekton_inline_spec(True)
try:
compiler.TektonCompiler().compile(pipeline_function,
compiled_yaml_file,
tekton_pipeline_conf=tekton_pipeline_conf)
with open(compiled_yaml_file, 'r') as f:
f = normalize_compiler_output_function(
f.read()) if normalize_compiler_output_function else f
compiled = yaml.safe_load(f)
self._verify_compiled_workflow(golden_yaml_file, compiled)
finally:
shutil.rmtree(temp_dir)

def _test_pipeline_workflow(self,
pipeline_function,
pipeline_yaml,
normalize_compiler_output_function=None,
tekton_pipeline_conf=TektonPipelineConf()):
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml.replace(".yaml", "") + "_noninlined.yaml")
temp_dir = tempfile.mkdtemp()
compiled_yaml_file = os.path.join(temp_dir, 'workflow.yaml')
tekton_pipeline_conf.set_tekton_inline_spec(False)
try:
compiler.TektonCompiler().compile(pipeline_function,
compiled_yaml_file,
Expand All @@ -526,6 +548,10 @@ def _test_pipeline_workflow(self,
self._verify_compiled_workflow(golden_yaml_file, compiled)
finally:
shutil.rmtree(temp_dir)
self._test_pipeline_workflow_inlined_spec(pipeline_function=pipeline_function,
pipeline_yaml=pipeline_yaml,
normalize_compiler_output_function=normalize_compiler_output_function,
tekton_pipeline_conf=tekton_pipeline_conf)

def _test_workflow_without_decorator(self, pipeline_yaml, params_dict):
"""
Expand Down
62 changes: 62 additions & 0 deletions sdk/python/tests/compiler/testdata/affinity_noninlined.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "A pipeline with affinity",
"name": "affinity"}'
sidecar.istio.io/inject: 'false'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"echo": []}'
tekton.dev/input_artifacts: '{}'
tekton.dev/output_artifacts: '{}'
name: affinity
spec:
pipelineSpec:
tasks:
- name: echo
taskSpec:
metadata:
annotations:
tekton.dev/template: ''
labels:
pipelines.kubeflow.org/cache_enabled: 'true'
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/pipelinename: ''
steps:
- args:
- echo "Got scheduled"
command:
- sh
- -c
image: busybox
name: main
timeout: 0s
taskRunSpecs:
- pipelineTaskName: echo
taskPodTemplate:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
timeout: 0s
Loading

0 comments on commit 64d4324

Please sign in to comment.