From ed8f6354a287f1b4b5be6d297f142c9ab1445c32 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 10 Sep 2021 14:30:46 +0100 Subject: [PATCH 1/6] Add `emptyDir` volume mounted at `/gcs` --- .../v2_compatible_two_step_pipeline.yaml | 4 ++ ...wo_step_pipeline_with_custom_launcher.yaml | 4 ++ sdk/python/kfp/compiler/v2_compat.py | 4 ++ .../compiler/testdata/uri_artifacts.yaml | 10 ++++ v2/compiler/argo_test.go | 6 +++ v2/compiler/container.go | 47 +++++++++++++------ v2/compiler/executor.go | 3 +- 7 files changed, 63 insertions(+), 15 deletions(-) diff --git a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml index 6f23ea689c5..bcef7d2df01 100644 --- a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml +++ b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml @@ -107,6 +107,7 @@ spec: image: python:3.7 volumeMounts: - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /gcs, name: gcs} inputs: parameters: - {name: pipeline-name} @@ -134,6 +135,7 @@ spec: name: kfp-launcher mirrorVolumeMounts: true volumes: + - {name: gcs} - {name: kfp-launcher} - name: train container: @@ -210,6 +212,7 @@ spec: image: python:3.7 volumeMounts: - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /gcs, name: gcs} inputs: parameters: - {name: pipeline-name} @@ -236,6 +239,7 @@ spec: name: kfp-launcher mirrorVolumeMounts: true volumes: + - {name: gcs} - {name: kfp-launcher} arguments: parameters: diff --git a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml index 38296e77284..f220746008f 100644 --- a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml +++ b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml @@ -107,6 +107,7 @@ spec: image: python:3.7 volumeMounts: - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /gcs, name: gcs} inputs: parameters: - {name: pipeline-name} @@ -134,6 +135,7 @@ spec: name: kfp-launcher mirrorVolumeMounts: true volumes: + - {name: gcs} - {name: kfp-launcher} - name: train container: @@ -210,6 +212,7 @@ spec: image: python:3.7 volumeMounts: - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /gcs, name: gcs} inputs: parameters: - {name: pipeline-name} @@ -236,6 +239,7 @@ spec: name: kfp-launcher mirrorVolumeMounts: true volumes: + - {name: gcs} - {name: kfp-launcher} arguments: parameters: diff --git a/sdk/python/kfp/compiler/v2_compat.py b/sdk/python/kfp/compiler/v2_compat.py index 3cab23f1d24..9e58bb66640 100644 --- a/sdk/python/kfp/compiler/v2_compat.py +++ b/sdk/python/kfp/compiler/v2_compat.py @@ -52,6 +52,10 @@ def update_op(op: dsl.ContainerOp, op.add_volume_mount( k8s_client.V1VolumeMount( name='kfp-launcher', mount_path='/kfp-launcher')) + op.add_volume(k8s_client.V1Volume(name='gcs')) + op.add_volume_mount( + k8s_client.V1VolumeMount( + name='gcs', mount_path='/gcs')) # op.command + op.args will have the following sections: # 1. args passed to kfp-launcher diff --git a/sdk/python/tests/compiler/testdata/uri_artifacts.yaml b/sdk/python/tests/compiler/testdata/uri_artifacts.yaml index 229978c8824..e379cf6e6e5 100644 --- a/sdk/python/tests/compiler/testdata/uri_artifacts.yaml +++ b/sdk/python/tests/compiler/testdata/uri_artifacts.yaml @@ -73,6 +73,7 @@ spec: image: python:alpine3.6 volumeMounts: - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /gcs, name: gcs} inputs: parameters: - {name: pipeline-name} @@ -98,6 +99,7 @@ spec: name: kfp-launcher mirrorVolumeMounts: true volumes: + - {name: gcs} - {name: kfp-launcher} - name: for-loop-2 inputs: @@ -161,6 +163,7 @@ spec: image: google/cloud-sdk:slim volumeMounts: - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /gcs, name: gcs} inputs: parameters: - {name: pipeline-name} @@ -182,6 +185,7 @@ spec: name: kfp-launcher mirrorVolumeMounts: true volumes: + - {name: gcs} - {name: kfp-launcher} - name: read-from-gcs-2 container: @@ -228,6 +232,7 @@ spec: image: google/cloud-sdk:slim volumeMounts: - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /gcs, name: gcs} inputs: parameters: - {name: pipeline-name} @@ -249,6 +254,7 @@ spec: name: kfp-launcher mirrorVolumeMounts: true volumes: + - {name: gcs} - {name: kfp-launcher} - name: read-from-gcs-3 container: @@ -295,6 +301,7 @@ spec: image: google/cloud-sdk:slim volumeMounts: - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /gcs, name: gcs} inputs: parameters: - {name: pipeline-name} @@ -316,6 +323,7 @@ spec: name: kfp-launcher mirrorVolumeMounts: true volumes: + - {name: gcs} - {name: kfp-launcher} - name: uri-artifact-pipeline inputs: @@ -414,6 +422,7 @@ spec: image: google/cloud-sdk:slim volumeMounts: - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /gcs, name: gcs} inputs: parameters: - {name: pipeline-name} @@ -438,6 +447,7 @@ spec: name: kfp-launcher mirrorVolumeMounts: true volumes: + - {name: gcs} - {name: kfp-launcher} arguments: parameters: diff --git a/v2/compiler/argo_test.go b/v2/compiler/argo_test.go index f58de8f8480..ff7af598496 100644 --- a/v2/compiler/argo_test.go +++ b/v2/compiler/argo_test.go @@ -132,6 +132,8 @@ func Test_argo_compiler(t *testing.T) { volumeMounts: - mountPath: /kfp-launcher name: kfp-launcher + - mountPath: /gcs + name: gcs initContainers: - command: - launcher-v2 @@ -144,6 +146,8 @@ func Test_argo_compiler(t *testing.T) { volumeMounts: - mountPath: /kfp-launcher name: kfp-launcher + - mountPath: /gcs + name: gcs inputs: parameters: - name: executor-input @@ -155,6 +159,8 @@ func Test_argo_compiler(t *testing.T) { volumes: - emptyDir: {} name: kfp-launcher + - emptyDir: {} + name: gcs - dag: tasks: - arguments: diff --git a/v2/compiler/container.go b/v2/compiler/container.go index b82de04e383..4c5fdcef1ce 100644 --- a/v2/compiler/container.go +++ b/v2/compiler/container.go @@ -159,21 +159,34 @@ func containerExecutorTemplate(container *pipelinespec.PipelineDeploymentConfig_ {Name: paramComponent}, }, }, - Volumes: []k8score.Volume{{ - Name: volumeNameKFPLauncher, - VolumeSource: k8score.VolumeSource{ - EmptyDir: &k8score.EmptyDirVolumeSource{}, + Volumes: []k8score.Volume{ + { + Name: volumeNameKFPLauncher, + VolumeSource: k8score.VolumeSource{ + EmptyDir: &k8score.EmptyDirVolumeSource{}, + }, }, - }}, + { + Name: volumeNameGCS, + VolumeSource: k8score.VolumeSource{ + EmptyDir: &k8score.EmptyDirVolumeSource{}, + }, + }, + }, InitContainers: []wfapi.UserContainer{{ Container: k8score.Container{ Name: "kfp-launcher", Image: launcherImage, - Command: []string{"launcher-v2", "--copy", "/kfp-launcher/launch"}, - VolumeMounts: []k8score.VolumeMount{{ - Name: volumeNameKFPLauncher, - MountPath: volumePathKFPLauncher, - }}, + VolumeMounts: []k8score.VolumeMount{ + { + Name: volumeNameKFPLauncher, + MountPath: volumePathKFPLauncher, + }, + { + Name: volumeNameGCS, + MountPath: volumePathGCS, + }, + }, ImagePullPolicy: "Always", }, }}, @@ -181,10 +194,16 @@ func containerExecutorTemplate(container *pipelinespec.PipelineDeploymentConfig_ Command: launcherCmd, Args: userCmdArgs, Image: container.Image, - VolumeMounts: []k8score.VolumeMount{{ - Name: volumeNameKFPLauncher, - MountPath: volumePathKFPLauncher, - }}, + VolumeMounts: []k8score.VolumeMount{ + { + Name: volumeNameKFPLauncher, + MountPath: volumePathKFPLauncher, + }, + { + Name: volumeNameGCS, + MountPath: volumePathGCS, + }, + }, EnvFrom: []k8score.EnvFromSource{{ ConfigMapRef: &k8score.ConfigMapEnvSource{ LocalObjectReference: k8score.LocalObjectReference{ diff --git a/v2/compiler/executor.go b/v2/compiler/executor.go index 525757506f3..325cf38baee 100644 --- a/v2/compiler/executor.go +++ b/v2/compiler/executor.go @@ -3,5 +3,6 @@ package compiler const ( volumeNameKFPLauncher = "kfp-launcher" volumePathKFPLauncher = "/kfp-launcher" + volumeNameGCS = "gcs" + volumePathGCS = "/gcs" ) - From 1378decad012c8cb15c1b0642961a2518a713621 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 10 Sep 2021 14:32:34 +0100 Subject: [PATCH 2/6] Move `launch` binary to `/var/run/kfp/bin` --- .../v2_compatible_two_step_pipeline.yaml | 12 ++++---- ...wo_step_pipeline_with_custom_launcher.yaml | 12 ++++---- sdk/python/kfp/compiler/v2_compat.py | 8 ++--- .../compiler/testdata/uri_artifacts.yaml | 30 +++++++++---------- v2/compiler/argo_test.go | 8 ++--- v2/compiler/container.go | 3 +- v2/compiler/executor.go | 2 +- 7 files changed, 38 insertions(+), 37 deletions(-) diff --git a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml index bcef7d2df01..f013f8434de 100644 --- a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml +++ b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml @@ -70,7 +70,7 @@ spec: - '{{$}}' - --function_to_execute - preprocess - command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), + command: [/var/run/kfp/bin/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, preprocess, --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, @@ -106,7 +106,7 @@ spec: - configMapRef: {name: metadata-grpc-configmap, optional: true} image: python:3.7 volumeMounts: - - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /var/run/kfp, name: kfp-launcher} - {mountPath: /gcs, name: gcs} inputs: parameters: @@ -130,7 +130,7 @@ spec: pipelines.kubeflow.org/v2_component: "true" pipelines.kubeflow.org/enable_caching: "true" initContainers: - - command: [launcher, --copy, /kfp-launcher/launch] + - command: [launcher, --copy, /var/run/kfp/bin/launch] image: gcr.io/ml-pipeline/kfp-launcher:1.7.2 name: kfp-launcher mirrorVolumeMounts: true @@ -173,7 +173,7 @@ spec: - '{{$}}' - --function_to_execute - train - command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), + command: [/var/run/kfp/bin/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, train, --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, workflows.argoproj.io/$(WORKFLOW_ID), @@ -211,7 +211,7 @@ spec: - configMapRef: {name: metadata-grpc-configmap, optional: true} image: python:3.7 volumeMounts: - - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /var/run/kfp, name: kfp-launcher} - {mountPath: /gcs, name: gcs} inputs: parameters: @@ -234,7 +234,7 @@ spec: pipelines.kubeflow.org/v2_component: "true" pipelines.kubeflow.org/enable_caching: "true" initContainers: - - command: [launcher, --copy, /kfp-launcher/launch] + - command: [launcher, --copy, /var/run/kfp/bin/launch] image: gcr.io/ml-pipeline/kfp-launcher:1.7.2 name: kfp-launcher mirrorVolumeMounts: true diff --git a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml index f220746008f..b61181a9c8e 100644 --- a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml +++ b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml @@ -70,7 +70,7 @@ spec: - '{{$}}' - --function_to_execute - preprocess - command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), + command: [/var/run/kfp/bin/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, preprocess, --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, @@ -106,7 +106,7 @@ spec: - configMapRef: {name: metadata-grpc-configmap, optional: true} image: python:3.7 volumeMounts: - - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /var/run/kfp, name: kfp-launcher} - {mountPath: /gcs, name: gcs} inputs: parameters: @@ -130,7 +130,7 @@ spec: pipelines.kubeflow.org/v2_component: "true" pipelines.kubeflow.org/enable_caching: "true" initContainers: - - command: [launcher, --copy, /kfp-launcher/launch] + - command: [launcher, --copy, /var/run/kfp/bin/launch] image: my-custom-image name: kfp-launcher mirrorVolumeMounts: true @@ -173,7 +173,7 @@ spec: - '{{$}}' - --function_to_execute - train - command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), + command: [/var/run/kfp/bin/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, train, --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, workflows.argoproj.io/$(WORKFLOW_ID), @@ -211,7 +211,7 @@ spec: - configMapRef: {name: metadata-grpc-configmap, optional: true} image: python:3.7 volumeMounts: - - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /var/run/kfp, name: kfp-launcher} - {mountPath: /gcs, name: gcs} inputs: parameters: @@ -234,7 +234,7 @@ spec: pipelines.kubeflow.org/v2_component: "true" pipelines.kubeflow.org/enable_caching: "true" initContainers: - - command: [launcher, --copy, /kfp-launcher/launch] + - command: [launcher, --copy, /var/run/kfp/bin/launch] image: my-custom-image name: kfp-launcher mirrorVolumeMounts: true diff --git a/sdk/python/kfp/compiler/v2_compat.py b/sdk/python/kfp/compiler/v2_compat.py index 9e58bb66640..773b405be44 100644 --- a/sdk/python/kfp/compiler/v2_compat.py +++ b/sdk/python/kfp/compiler/v2_compat.py @@ -44,14 +44,14 @@ def update_op(op: dsl.ContainerOp, launcher_container = dsl.UserContainer( name="kfp-launcher", image=image_name, - command=["launcher", "--copy", "/kfp-launcher/launch"], + command=["launcher", "--copy", "/var/run/kfp/bin/launch"], mirror_volume_mounts=True) op.add_init_container(launcher_container) op.add_volume(k8s_client.V1Volume(name='kfp-launcher')) op.add_volume_mount( k8s_client.V1VolumeMount( - name='kfp-launcher', mount_path='/kfp-launcher')) + name='kfp-launcher', mount_path='/var/run/kfp')) op.add_volume(k8s_client.V1Volume(name='gcs')) op.add_volume_mount( k8s_client.V1VolumeMount( @@ -66,7 +66,7 @@ def update_op(op: dsl.ContainerOp, # # example: # - command: - # - /kfp-launcher/launch + # - /var/run/kfp/bin/launch # - '--mlmd_server_address' # - $(METADATA_GRPC_SERVICE_HOST) # - '--mlmd_server_port' @@ -92,7 +92,7 @@ def update_op(op: dsl.ContainerOp, # import xxx # ... op.command = [ - "/kfp-launcher/launch", + "/var/run/kfp/bin/launch", "--mlmd_server_address", "$(METADATA_GRPC_SERVICE_HOST)", "--mlmd_server_port", diff --git a/sdk/python/tests/compiler/testdata/uri_artifacts.yaml b/sdk/python/tests/compiler/testdata/uri_artifacts.yaml index e379cf6e6e5..46ca821a35d 100644 --- a/sdk/python/tests/compiler/testdata/uri_artifacts.yaml +++ b/sdk/python/tests/compiler/testdata/uri_artifacts.yaml @@ -38,7 +38,7 @@ spec: args: [sh, -c, 'mkdir -p "$(dirname $0)" && python -c "import random; result = ''heads'' if random.randint(0,1) == 0 else ''tails''; print(result, end='''')" | tee $0', '{{$.outputs.parameters[''output''].output_file}}'] - command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), + command: [/var/run/kfp/bin/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, flip-coin, --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, @@ -72,7 +72,7 @@ spec: - configMapRef: {name: metadata-grpc-configmap, optional: true} image: python:alpine3.6 volumeMounts: - - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /var/run/kfp, name: kfp-launcher} - {mountPath: /gcs, name: gcs} inputs: parameters: @@ -94,7 +94,7 @@ spec: pipelines.kubeflow.org/v2_component: "true" pipelines.kubeflow.org/enable_caching: "true" initContainers: - - command: [launcher, --copy, /kfp-launcher/launch] + - command: [launcher, --copy, /var/run/kfp/bin/launch] image: gcr.io/ml-pipeline/kfp-launcher:1.6.6 name: kfp-launcher mirrorVolumeMounts: true @@ -127,7 +127,7 @@ spec: set -e -x gsutil cat "$0" - '{{$.inputs.artifacts[''input_gcs_path''].uri}}' - command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), + command: [/var/run/kfp/bin/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, read-from-gcs, --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, @@ -162,7 +162,7 @@ spec: - configMapRef: {name: metadata-grpc-configmap, optional: true} image: google/cloud-sdk:slim volumeMounts: - - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /var/run/kfp, name: kfp-launcher} - {mountPath: /gcs, name: gcs} inputs: parameters: @@ -180,7 +180,7 @@ spec: pipelines.kubeflow.org/v2_component: "true" pipelines.kubeflow.org/enable_caching: "true" initContainers: - - command: [launcher, --copy, /kfp-launcher/launch] + - command: [launcher, --copy, /var/run/kfp/bin/launch] image: gcr.io/ml-pipeline/kfp-launcher:1.6.6 name: kfp-launcher mirrorVolumeMounts: true @@ -196,7 +196,7 @@ spec: set -e -x gsutil cat "$0" - '{{$.inputs.artifacts[''input_gcs_path''].uri}}' - command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), + command: [/var/run/kfp/bin/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, read-from-gcs-2, --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, @@ -231,7 +231,7 @@ spec: - configMapRef: {name: metadata-grpc-configmap, optional: true} image: google/cloud-sdk:slim volumeMounts: - - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /var/run/kfp, name: kfp-launcher} - {mountPath: /gcs, name: gcs} inputs: parameters: @@ -249,7 +249,7 @@ spec: pipelines.kubeflow.org/v2_component: "true" pipelines.kubeflow.org/enable_caching: "true" initContainers: - - command: [launcher, --copy, /kfp-launcher/launch] + - command: [launcher, --copy, /var/run/kfp/bin/launch] image: gcr.io/ml-pipeline/kfp-launcher:1.6.6 name: kfp-launcher mirrorVolumeMounts: true @@ -265,7 +265,7 @@ spec: set -e -x gsutil cat "$0" - '{{$.inputs.artifacts[''input_gcs_path''].uri}}' - command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), + command: [/var/run/kfp/bin/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, read-from-gcs-3, --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, @@ -300,7 +300,7 @@ spec: - configMapRef: {name: metadata-grpc-configmap, optional: true} image: google/cloud-sdk:slim volumeMounts: - - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /var/run/kfp, name: kfp-launcher} - {mountPath: /gcs, name: gcs} inputs: parameters: @@ -318,7 +318,7 @@ spec: pipelines.kubeflow.org/v2_component: "true" pipelines.kubeflow.org/enable_caching: "true" initContainers: - - command: [launcher, --copy, /kfp-launcher/launch] + - command: [launcher, --copy, /var/run/kfp/bin/launch] image: gcr.io/ml-pipeline/kfp-launcher:1.6.6 name: kfp-launcher mirrorVolumeMounts: true @@ -385,7 +385,7 @@ spec: echo "$0" | gsutil cp - "$1" - '{{$.inputs.parameters[''text'']}}' - '{{$.outputs.artifacts[''output_gcs_path''].uri}}' - command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), + command: [/var/run/kfp/bin/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, write-to-gcs, --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, @@ -421,7 +421,7 @@ spec: - configMapRef: {name: metadata-grpc-configmap, optional: true} image: google/cloud-sdk:slim volumeMounts: - - {mountPath: /kfp-launcher, name: kfp-launcher} + - {mountPath: /var/run/kfp, name: kfp-launcher} - {mountPath: /gcs, name: gcs} inputs: parameters: @@ -442,7 +442,7 @@ spec: pipelines.kubeflow.org/v2_component: "true" pipelines.kubeflow.org/enable_caching: "true" initContainers: - - command: [launcher, --copy, /kfp-launcher/launch] + - command: [launcher, --copy, /var/run/kfp/bin/launch] image: gcr.io/ml-pipeline/kfp-launcher:1.6.6 name: kfp-launcher mirrorVolumeMounts: true diff --git a/v2/compiler/argo_test.go b/v2/compiler/argo_test.go index ff7af598496..7ca4d7665f6 100644 --- a/v2/compiler/argo_test.go +++ b/v2/compiler/argo_test.go @@ -97,7 +97,7 @@ func Test_argo_compiler(t *testing.T) { - --text - '{{$.inputs.parameters[''text'']}}' command: - - /kfp-launcher/launch + - /var/run/kfp/bin/launch - --execution_id - '{{inputs.parameters.execution-id}}' - --executor_input @@ -130,7 +130,7 @@ func Test_argo_compiler(t *testing.T) { name: "" resources: {} volumeMounts: - - mountPath: /kfp-launcher + - mountPath: /var/run/kfp name: kfp-launcher - mountPath: /gcs name: gcs @@ -138,13 +138,13 @@ func Test_argo_compiler(t *testing.T) { - command: - launcher-v2 - --copy - - /kfp-launcher/launch + - /var/run/kfp/bin/launch image: gcr.io/ml-pipeline/kfp-launcher-v2:latest imagePullPolicy: Always name: kfp-launcher resources: {} volumeMounts: - - mountPath: /kfp-launcher + - mountPath: /var/run/kfp name: kfp-launcher - mountPath: /gcs name: gcs diff --git a/v2/compiler/container.go b/v2/compiler/container.go index 4c5fdcef1ce..3a8d5bed380 100644 --- a/v2/compiler/container.go +++ b/v2/compiler/container.go @@ -136,7 +136,7 @@ func containerExecutorTemplate(container *pipelinespec.PipelineDeploymentConfig_ userCmdArgs = append(userCmdArgs, container.Command...) userCmdArgs = append(userCmdArgs, container.Args...) launcherCmd := []string{ - volumePathKFPLauncher + "/launch", + volumePathKFPLauncher + "/bin/launch", "--execution_id", inputValue(paramExecutionID), "--executor_input", inputValue(paramExecutorInput), "--component_spec", inputValue(paramComponent), @@ -177,6 +177,7 @@ func containerExecutorTemplate(container *pipelinespec.PipelineDeploymentConfig_ Container: k8score.Container{ Name: "kfp-launcher", Image: launcherImage, + Command: []string{"launcher-v2", "--copy", "/var/run/kfp/bin/launch"}, VolumeMounts: []k8score.VolumeMount{ { Name: volumeNameKFPLauncher, diff --git a/v2/compiler/executor.go b/v2/compiler/executor.go index 325cf38baee..a87df9650aa 100644 --- a/v2/compiler/executor.go +++ b/v2/compiler/executor.go @@ -2,7 +2,7 @@ package compiler const ( volumeNameKFPLauncher = "kfp-launcher" - volumePathKFPLauncher = "/kfp-launcher" + volumePathKFPLauncher = "/var/run/kfp" volumeNameGCS = "gcs" volumePathGCS = "/gcs" ) From caa31ab4206f96c223655d3a02d07794f90e7a0a Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 10 Sep 2021 14:55:04 +0100 Subject: [PATCH 3/6] Move S3 and Minio artifacts to `/var/run/kfp/artifact` --- sdk/python/kfp/v2/components/types/artifact_types.py | 4 ++-- v2/component/launcher.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/kfp/v2/components/types/artifact_types.py b/sdk/python/kfp/v2/components/types/artifact_types.py index d3f027f275a..0a1b2d08346 100644 --- a/sdk/python/kfp/v2/components/types/artifact_types.py +++ b/sdk/python/kfp/v2/components/types/artifact_types.py @@ -20,8 +20,8 @@ from typing import Dict, Generic, List, Optional, Type, TypeVar, Union _GCS_LOCAL_MOUNT_PREFIX = '/gcs/' -_MINIO_LOCAL_MOUNT_PREFIX = '/minio/' -_S3_LOCAL_MOUNT_PREFIX = '/s3/' +_MINIO_LOCAL_MOUNT_PREFIX = '/var/run/kfp/artifact/minio/' +_S3_LOCAL_MOUNT_PREFIX = '/var/run/kfp/artifact/s3/' class Artifact(object): diff --git a/v2/component/launcher.go b/v2/component/launcher.go index 23fa18e9b65..79bc2b9afe8 100644 --- a/v2/component/launcher.go +++ b/v2/component/launcher.go @@ -655,10 +655,10 @@ func localPathForURI(uri string) (string, error) { return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil } if strings.HasPrefix(uri, "minio://") { - return "/minio/" + strings.TrimPrefix(uri, "minio://"), nil + return "/var/run/kfp/artifact/minio/" + strings.TrimPrefix(uri, "minio://"), nil } if strings.HasPrefix(uri, "s3://") { - return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil + return "/var/run/kfp/artifact/s3/" + strings.TrimPrefix(uri, "s3://"), nil } return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri) } From b03cbee1a21e0bdc5adb31244529761efdeb5011 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Sun, 12 Sep 2021 15:46:10 +0100 Subject: [PATCH 4/6] Move output parameters to `/var/run/kfp/parameter` --- v2/compiler/argo_test.go | 14 +++++++------- v2/compiler/container.go | 4 ++-- v2/compiler/dag.go | 4 ++-- v2/component/launcher_v2.go | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/v2/compiler/argo_test.go b/v2/compiler/argo_test.go index 7ca4d7665f6..c200c4fc9af 100644 --- a/v2/compiler/argo_test.go +++ b/v2/compiler/argo_test.go @@ -71,10 +71,10 @@ func Test_argo_compiler(t *testing.T) { parameters: - name: execution-id valueFrom: - path: /tmp/outputs/execution-id + path: /var/run/kfp/parameter/execution-id - name: executor-input valueFrom: - path: /tmp/outputs/executor-input + path: /var/run/kfp/parameter/executor-input - container: args: - sh @@ -247,10 +247,10 @@ func Test_argo_compiler(t *testing.T) { parameters: - name: execution-id valueFrom: - path: /tmp/outputs/execution-id + path: /var/run/kfp/parameter/execution-id - name: context-id valueFrom: - path: /tmp/outputs/context-id + path: /var/run/kfp/parameter/context-id - dag: tasks: - arguments: @@ -284,7 +284,7 @@ func Test_argo_compiler(t *testing.T) { }, { jobPath: "testdata/importer.json", - expectedText: ` + expectedText: ` apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: @@ -402,10 +402,10 @@ func Test_argo_compiler(t *testing.T) { parameters: - name: execution-id valueFrom: - path: /tmp/outputs/execution-id + path: /var/run/kfp/parameter/execution-id - name: context-id valueFrom: - path: /tmp/outputs/context-id + path: /var/run/kfp/parameter/context-id - dag: tasks: - arguments: diff --git a/v2/compiler/container.go b/v2/compiler/container.go index 3a8d5bed380..2feba77e4f5 100644 --- a/v2/compiler/container.go +++ b/v2/compiler/container.go @@ -106,8 +106,8 @@ func (c *workflowCompiler) addContainerDriverTemplate() string { }, Outputs: wfapi.Outputs{ Parameters: []wfapi.Parameter{ - {Name: paramExecutionID, ValueFrom: &wfapi.ValueFrom{Path: "/tmp/outputs/execution-id"}}, - {Name: paramExecutorInput, ValueFrom: &wfapi.ValueFrom{Path: "/tmp/outputs/executor-input"}}, + {Name: paramExecutionID, ValueFrom: &wfapi.ValueFrom{Path: "/var/run/kfp/parameter/execution-id"}}, + {Name: paramExecutorInput, ValueFrom: &wfapi.ValueFrom{Path: "/var/run/kfp/parameter/executor-input"}}, }, }, Container: &k8score.Container{ diff --git a/v2/compiler/dag.go b/v2/compiler/dag.go index 1ffe2bfd02a..a679f322f13 100644 --- a/v2/compiler/dag.go +++ b/v2/compiler/dag.go @@ -155,8 +155,8 @@ func (c *workflowCompiler) addDAGDriverTemplate() string { }, Outputs: wfapi.Outputs{ Parameters: []wfapi.Parameter{ - {Name: paramExecutionID, ValueFrom: &wfapi.ValueFrom{Path: "/tmp/outputs/execution-id"}}, - {Name: paramContextID, ValueFrom: &wfapi.ValueFrom{Path: "/tmp/outputs/context-id"}}, + {Name: paramExecutionID, ValueFrom: &wfapi.ValueFrom{Path: "/var/run/kfp/parameter/execution-id"}}, + {Name: paramContextID, ValueFrom: &wfapi.ValueFrom{Path: "/var/run/kfp/parameter/context-id"}}, }, }, Container: &k8score.Container{ diff --git a/v2/component/launcher_v2.go b/v2/component/launcher_v2.go index 31cb09753ad..df5fc81414f 100644 --- a/v2/component/launcher_v2.go +++ b/v2/component/launcher_v2.go @@ -206,14 +206,14 @@ func addOutputs(executorInput *pipelinespec.ExecutorInput, outputs *pipelinespec } for name := range outputs.GetParameters() { executorInput.Outputs.Parameters[name] = &pipelinespec.ExecutorInput_OutputParameter{ - OutputFile: fmt.Sprintf("/tmp/kfp/outputs/%s", name), + OutputFile: fmt.Sprintf("/var/run/kfp/parameter/%s", name), } } // artifact outputs are added in driver return nil } -func executeV2(ctx context.Context, executorInput *pipelinespec.ExecutorInput, component *pipelinespec.ComponentSpec, cmd string, args []string, bucket *blob.Bucket, bucketConfig *objectstore.Config, metadataClient *metadata.Client, namespace string, k8sClient *kubernetes.Clientset ) (*pipelinespec.ExecutorOutput, []*metadata.OutputArtifact, error) { +func executeV2(ctx context.Context, executorInput *pipelinespec.ExecutorInput, component *pipelinespec.ComponentSpec, cmd string, args []string, bucket *blob.Bucket, bucketConfig *objectstore.Config, metadataClient *metadata.Client, namespace string, k8sClient *kubernetes.Clientset) (*pipelinespec.ExecutorOutput, []*metadata.OutputArtifact, error) { executorOutput, err := execute(ctx, executorInput, cmd, args, bucket, bucketConfig, namespace, k8sClient) if err != nil { return nil, nil, err From 203f8bf8a6687acdf65414bbaba4f21352dc0d4c Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Sun, 12 Sep 2021 15:46:41 +0100 Subject: [PATCH 5/6] Move `outputMetadata` to `/var/run/kfp` --- v2/component/launcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/component/launcher.go b/v2/component/launcher.go index 79bc2b9afe8..ccb529ff767 100644 --- a/v2/component/launcher.go +++ b/v2/component/launcher.go @@ -119,7 +119,7 @@ func (o *LauncherOptions) validate() error { return nil } -const outputMetadataFilepath = "/tmp/kfp_outputs/output_metadata.json" +const outputMetadataFilepath = "/var/run/kfp/output_metadata.json" // NewLauncher creates a new launcher object using the JSON-encoded runtimeInfo // and specified options. From 165f83960729168886ebaf5437436cf09de7f646 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 28 Oct 2021 10:02:50 +0100 Subject: [PATCH 6/6] Ensure directory for binary exists --- v2/component/util.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/v2/component/util.go b/v2/component/util.go index 0af3177a8dc..36e79518eb4 100644 --- a/v2/component/util.go +++ b/v2/component/util.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "os" + "path/filepath" ) // CopyThisBinary copies the running binary into destination path. @@ -23,6 +24,7 @@ func CopyThisBinary(destination string) (err error) { return err } defer src.Close() + os.MkdirAll(filepath.Dir(destination), 0o555) // 0o555 -> readable and executable by all dst, err := os.OpenFile(destination, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o555) // 0o555 -> readable and executable by all if err != nil { return err