Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Argo artifacts #336

Closed
SinaChavoshi opened this issue Nov 20, 2018 · 15 comments · Fixed by #2173
Closed

Support for Argo artifacts #336

SinaChavoshi opened this issue Nov 20, 2018 · 15 comments · Fixed by #2173

Comments

@SinaChavoshi
Copy link
Contributor

Argo has built in support for Artifacts, however that is not seem to be currently supported in pipelines.This is a critical feature, lack of which adds a lot of friction. Currently the only way to pass large objects ( images etc) is to actually copy them and read them back manually. Furthermore any support for caching artifacts based on the run / data used requires manual development. Version control and caching for artifacts is a separate feature ask though it ties to the over all experience hence adding here as well.

This is not a blocker as there are two work arounds as follows, however both add friction.

  1. For Non TF specific tasks add libraries to the base docker image, for example include gcloud sdk for GCS.
  2. Using TF base docker
from tensorflow.python.lib.io import file_io
data = file_io.read_file_to_string('gs://some_file_path')
@Ark-kun Ark-kun self-assigned this Nov 20, 2018
@Ark-kun
Copy link
Contributor

Ark-kun commented Nov 21, 2018

I'm working on support for this as part of my components effort.

@swiftdiaries
Copy link
Member

This is great! Would love this feature for local and on-prem pipelines

@vicaire
Copy link
Contributor

vicaire commented Mar 26, 2019

The current plan is to support

  1. volumes
  2. containers handling storage on their own.

There seems to be a couple difficulties with Argo artifact storage feature:

  1. The data is only available at the end of the container execution.
  2. It takes a while to transmit data from one step of a workflow to another (requires upload then download)
  3. If I remember correctly, there is a size limit.

A volume implementation backed by an object store (GCS/S3) would provide something equivalent to Argo artifact storage but without the drawbacks.

@Ark-kun
Copy link
Contributor

Ark-kun commented May 17, 2019

Support for artifact passing in DSL is independent from the low-level storage details.

Features that are needed to support artifact passing in DSL:

Full artifact passing DSL example (based on the Argo's artifact-passing.yaml example):

# Op producing artifacts (Done in https://github.com/kubeflow/pipelines/pull/998)
def producer_op(text):
    return dsl.ContainerOp(
        name='producer',
        image='alpine',
        command=['sh', '-c', 'echo ' + text + ' > /tmp/output.txt'],
        file_outputs={'text-artifact': '/tmp/output.txt'},
    )

# Op consuming artifacts (https://github.com/kubeflow/pipelines/pull/791)
def consumer_op(text_artifact):
    return dsl.ContainerOp(
        name='consumer',
        image='alpine',
        command=['cat', dsl.InputArtifactArgument(text_artifact)],
    )

# Pipeline with artifact passing (future PR)
@pipelinename(name='Artifact-passing pipeline')
def artifact_pipeline():
    producer_task = producer_op('Hello world!')
    consumer_task = consumer_op(producer_task.outputs['text-artifact'])

The pipeline looks exactly the same as when the ops are passing parameters - no new constructs are introduced there.

This pipeline is portable, works on-premise and does not depend on GCS.

@ashokramadass
Copy link

Any updates on this one?

@Ark-kun
Copy link
Contributor

Ark-kun commented Oct 4, 2019

Fixed by #791, #998, #2042, #2134, #2173

See the Data Passing tutorial that shows how to pass bigger data in python components

@Ark-kun Ark-kun closed this as completed Oct 4, 2019
@kevinpauli
Copy link

kevinpauli commented Oct 23, 2019

This one mentions a "future PR"

# Pipeline with artifact passing (future PR)
@pipelinename(name='Artifact-passing pipeline')
def artifact_pipeline():
    producer_task = producer_op('Hello world!')
    consumer_task = consumer_op(producer_task.outputs['text-artifact'])

Does this PR exist yet?

@Ark-kun
Copy link
Contributor

Ark-kun commented Oct 24, 2019

Does this PR exist yet?

All the PRs related to artifact passing have been merged.

The preferred ways to utilize the artifact passing is by either:

  • creating component from python function (func_to_container_op) - see the Data Passing tutorial
  • creating a component definition (component.yaml) and using {inputPath: Big data input} command-line argument placeholder to consume data as file.

Let me help you with your use case.

Using ContainerOp directly is somewhat discouraged as it does not create a component that can be shared. Still when using ContainerOp directly it's still possible to produce and consume artifacts. To produce artifacts just use file_outputs as usual.
The syntax for consumption is slightly different though: See #791

@Ark-kun
Copy link
Contributor

Ark-kun commented Oct 24, 2019

@kevinpauli I've updated the code in the comment: #336 (comment)

@kevinpauli
Copy link

kevinpauli commented Oct 24, 2019

@Ark-kun thank you so much for your response!

My use case is to be able to use ContainerOps directly, to be able to do artifact passing in KFP just like Argo's artifact-passing.yaml example that is linked above.

I wanted pretty much exactly what you had shown for the "future PR" code snippet I had referenced above. But still when I try it (using kfp 0.1.32), it fails to compile due to KeyError: 'text-artifact' when attempting to dereference producer_task.outputs['text-artifact']

You say that it is "possible" using ContainerOp to directly consume artifacts that were produced in this same pipeline in an earlier step, but despite searching for a couple days I haven't been able to locate a working code example.

In #791 when someone asks for example code using DSL, it refers back to this issue #336. Plus #791 seems to be focused on "raw" artifacts... in my case, I want to wire the output artifact of one component into the input of another. All with ContainerOp.

So any help is much appreciated!

@kevinpauli
Copy link

kevinpauli commented Oct 24, 2019

@Ark-kun nevermind, I just re-read where you said we should use file_outputs... I was mistakenly using output_artifact_paths. This works!

def producer_op(text):
    return kfp.dsl.ContainerOp(
        name='producer',
        image='alpine',
        command=['sh', '-c', 'echo ' + text + ' > /tmp/output.txt'],
        file_outputs={'text-artifact': '/tmp/output.txt'}
    )


def consumer_op(text_artifact):
    return kfp.dsl.ContainerOp(
        name='consumer',
        image='alpine',
        command=['cat', (kfp.dsl.InputArgumentPath(text_artifact))],
    )


@kfp.dsl.pipeline(
    name='artifact-passing'
)
def artifact_passing():
    producer_task = producer_op('Hello world!')
    consumer_task = consumer_op(producer_task.outputs['text-artifact'])

Thanks!

@Ark-kun
Copy link
Contributor

Ark-kun commented Nov 1, 2019

@Ark-kun nevermind, I just re-read where you said we should use file_outputs... I was mistakenly using output_artifact_paths. This works!

Hmm. output_artifact_paths should have worked too (it's deprecated and all entries are being added to file_outputs).

Sorry for some confusion in this area. I understand that this part in ContainerOp is overly confusing. Adding artifact passing took a very long time and some intermediate parameters were added that are not needed in the final result.
output_artifact_paths was added, because people wanted control over output artifact paths and wanted to output artifacts. file_outputs could not be used for that since it was resulting in Argo's outputs parameters and also because adding entries to file_outputs would result in output artifact references in task.outputs, but people would not have been able to use those since there was no way to pass those output artifact references anywhere. Now output_artifact_paths is a deprecated semi-alias for file_outputs.

My use case is to be able to use ContainerOps directly, to be able to do artifact passing in KFP just like Argo's artifact-passing.yaml example that is linked above.

Take a look at my Creating components from command-line programs sample. Component specifications are very similar to the ContainerOp "factories" that you write, but they're real components - they can be shared, versioned etc.

@ksonbol
Copy link

ksonbol commented Jul 16, 2020

Hi @Ark-kun , I want to do something similar to the example given here but instead of passing small text or local files or GCS Paths, I want to pass S3 paths. My use case is as follows: I need to download some files from a folder in an S3 bucket, do some processing on them, then upload some results to another S3 folder. Is this possible with this approach?

I have read the data passing tutorial and the "Creating components from command-line programs" tutorial but I am still quite confused about how to achieve this. In the latter tutorial, it is not clear to me for example how the system decides which "Repo dir", sub, or GCS Path to return.

Apologies if this is not the best place for this question.

@saamalik
Copy link

saamalik commented Aug 1, 2020

@ksonbol - did you find an answer. I have the exact same usecase, except I want to use the built-in "minio://" endpoint. I've created a folder in that repository and uploaded the content there; would like for the pipeline to automatically download the files.

@fahadh4ilyas
Copy link

@Ark-kun nevermind, I just re-read where you said we should use file_outputs... I was mistakenly using output_artifact_paths. This works!

def producer_op(text):
    return kfp.dsl.ContainerOp(
        name='producer',
        image='alpine',
        command=['sh', '-c', 'echo ' + text + ' > /tmp/output.txt'],
        file_outputs={'text-artifact': '/tmp/output.txt'}
    )


def consumer_op(text_artifact):
    return kfp.dsl.ContainerOp(
        name='consumer',
        image='alpine',
        command=['cat', (kfp.dsl.InputArgumentPath(text_artifact))],
    )


@kfp.dsl.pipeline(
    name='artifact-passing'
)
def artifact_passing():
    producer_task = producer_op('Hello world!')
    consumer_task = consumer_op(producer_task.outputs['text-artifact'])

Thanks!

Hi @Ark-kun ! I've been trying to recreate pipeline above using reusable component but it's not working. Could you please show me how?

This is what I'm doing

producer_text = '''
name: producer
inputs:
- {name: text}
outputs:
- {name: text-artifact}
implementation:
  container:
    image: alpine
    command:
    - sh
    - -c
    - echo
    - {inputValue: text}
    - >
    - /tmp/output.txt
    fileOutputs:
      text-artifact: /tmp/output.txt
'''
producer_op = components.load_component_from_text(producer_text)

consumer_text = '''
name: consumer
inputs:
- {name: Text}
implementation:
  container:
    image: alpine
    command:
    - cat
    - {inputPath: Text}
'''
consumer_op = components.load_component_from_text(consumer_text)

@kfp.dsl.pipeline(
    name='artifact-passing'
)
def artifact_passing():
    producer_task = producer_op('Hello world!')
    consumer_task = consumer_op(producer_task.outputs['text-artifact'])

Linchin pushed a commit to Linchin/pipelines that referenced this issue Apr 11, 2023
HumairAK pushed a commit to red-hat-data-services/data-science-pipelines that referenced this issue Mar 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants