Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Generic support for overrides during an execution #475

Open
3 of 13 tasks
kumare3 opened this issue Aug 17, 2020 · 20 comments
Open
3 of 13 tasks

[Feature] Generic support for overrides during an execution #475

kumare3 opened this issue Aug 17, 2020 · 20 comments
Assignees
Labels
enhancement New feature or request flyteadmin Issue for FlyteAdmin Service spec-first This issue needs a spec to be created and then work. It should be an independent stream stale

Comments

@kumare3
Copy link
Contributor

kumare3 commented Aug 17, 2020

Motivation: Why do you think this is important?
As a user of Flyte, configuration like "resources (cpu/mem/gpu)", "catalog (enable/disable/version)", "retries",, "spark config", "hive cluster" are created only at registration time. The only way to update is to re-register. This is not desirable, as there are times, when the user may want to update these parameters for specific launchplans, or executions.

Goal: What should the final outcome look like, ideally?
A user should be able to optionally override any configuration information for an execution at the launchtime or launchplan creation time.

Describe alternatives you've considered
Re-register with the updated configuration

Flyte component

  • Overall
  • Flyte Setup and Installation scripts
  • Flyte Documentation
  • Flyte communication (slack/email etc)
  • FlytePropeller
  • FlyteIDL (Flyte specification language)
  • Flytekit (Python SDK)
  • FlyteAdmin (Control Plane service)
  • FlytePlugins
  • DataCatalog
  • FlyteStdlib (common libraries)
  • FlyteConsole (UI)
  • Other

High level original proposal

Goal

The purpose of this Allow users to override task metadatas at launch time and decide caching configuration and resource allocation without needing to push a new commit in their PR.

Motivation

Users have requested cache toggling and cache version override at launch time. Other than that, users have also been wanting to be able to change the task resource quickly. So this is generally a highly requested feature as it allows users to change runtime configurations quickly without needing any code change, meaning that they don't need to go through the entire CI pipelines.

Brief description of proposal

We can achieve this by associating each execution with objects StaticMetadata, MetadataOverride, and MetadataFinal using some simple, serializable format like JSON.
UX/UI
At registration time we can extract all the metadata of every task and create an StaticMetadata object.
At launch time we let users specify MetadataOverride through the following ways:
If using UI, in the "Launch Workflow" panel, we have an "Input" tab and a "Execution Parameter Override" tab. The latter tab, by default, shows the StaticMetadata but the fields are editable, which allows users to specify overrides of the metadata
If using flyte-cli, we add an option, e.g., --execution-parameter-override={"task1": {cache=false}} when launching an execution
Every MetadataOverride object is also stored in Flyteadmin, and is versioned with the execution id.
The StaticMetadata and MetadataOverride objects were merged by Flyteadmin to produce MetadataFinal and that is passed to the propeller
Propeller or plugin parses that MetadataFinal object and creates pod, talks to data catalog, etc., accordingly
In the console, we add an extra field/link in the header of the execution page, showing the MetadataOverride and/or MetadataFinal

Reproducibility

As far as reproducibility is concerned, we note from the above description that both the StaticMetadata and the MetadataOverride are versioned. StaticMetadata becomes available to Flyteadmin at workflow registration time because it comes with the workflow definition. An extra storage of this object is not required. This object is also inherently versioned and can be extracted from the registered workflow when needed.

MetadataOverride will be versioned differently than StaticMetadata. To enable the targeted high flexibility, we want to allow the users to associate each execution with one MetadataOverride object. The natural way to version MetadataOverride objects, therefore, is to version it with the execution's id. Due to this goal, we store the MetadataOverride object in Flyteadmin.

Flyteadmin APIs

https://gist.github.com/bnsblue/8499584ec7e1aacf4b43e81d9c50e0fb

FlyteIdl Protos

Additional context
This is a highly requested feature from multiple teams with Lyft

Is this a blocker for you to adopt Flyte
NA

@kumare3 kumare3 added enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers labels Aug 17, 2020
@kumare3 kumare3 added flyteadmin Issue for FlyteAdmin Service spec-first This issue needs a spec to be created and then work. It should be an independent stream and removed untriaged This issues has not yet been looked at by the Maintainers labels Feb 10, 2021
@kumare3
Copy link
Contributor Author

kumare3 commented Feb 10, 2021

pulling in the example here

@task(cpu=1, memory=4GB)
def fn1(f: flyte.File) -> flyte.File:
    ...

@task(spark=Spark())
def fn2(f: flyte.File) -> flyte.File:
    ...

# Executing the tasks
fn1.with_override=Override(cpu=2, memory=8GB)).execute(f="test.txt")
fn2.with_override=Override(custom={spark_conf={}})).execute(f="test.txt")

@workflow
def WF1():
   fn1(...)
   fn2(...)

# Executing the workflow
WF1.with_overrides(overrides={
  "fn1": Override(cpu=2, caching=False),
  "fn2": Override(spark_conf={}),
caching=False,
}).execute(....)

@kumare3
Copy link
Contributor Author

kumare3 commented Feb 11, 2021

@anandswaminathan / @EngHabu / @kanterov / @jeevb,
I think we are all in agreement that this feature in some form is extremely useful - can be used for multiple things.

I deally I would like that at execution time, the Launch form has a JSON editor, with all the default config filled in and the person who is launching can modify any of the values

@rahul-theorem
Copy link
Contributor

rahul-theorem commented May 25, 2022

+1 to this, we'd find it very useful as we have a relatively small number of distinct workflows (for training several different types of models), but need to run them on a wide range of datasets at various scales (so we can't fully configure everything we need with the LaunchPlan right now).

A similar concept in Kubeflow Pipelines which might be relevant here is the PipelineConf/op_transformer pattern. When compiling a pipeline, one can specify arbitrary transforms on the Python class representing a stage in a Kubeflow DAG (a ContainerOp) -- it also seems quite similar to the extensions to @flytekit.task from flytekitplugins-pod

import kfp
import kubernetes

def op_transformer(op: kfp.dsl.ContainerOp) -> kfp.dsl.ContainerOp:
    # Set resource requets
    op.add_resource_request("cpu", "100m")
    op.add_resource_request("memory", "500Mi")
    
    # Schedule on a particular node group
    op.add_toleration(
        kubernetes.client.V1Toleration(
            key="capability",
            value="model-training",
            effect="NoSchedule",
        ),
    )
   return op

eapolinario pushed a commit to eapolinario/flyte that referenced this issue Dec 6, 2022
@sbrunk
Copy link
Member

sbrunk commented Dec 9, 2022

Here's another real-world use-case where changing the pod spec during execution would be really useful: Changing the GPU type or the instance type. Right now, if we want to change the number of A100 or switch to a different GPU, we have to register a new task version.

def generate_a100_pod_spec(ngpu: int):
    cpu = ngpu * 12
    mem = f"{ngpu * 85 - 5}G"
    shared_volume_mount = V1VolumeMount(
        name="dshm",
        mount_path="/dev/shm",
    )
    pod_spec = V1PodSpec(
        containers=[
            V1Container(
                name="primary",
                resources=V1ResourceRequirements(
                    requests={"cpu": str(cpu - 1), "memory": mem, "gpu": str(ngpu)},
                    limits={"cpu": str(cpu), "memory": mem, "gpu": str(ngpu)}
                ),
                volume_mounts=[shared_volume_mount]
            )
        ],
        volumes=[V1Volume(name="dshm", empty_dir=V1EmptyDirVolumeSource(medium="Memory"))],
        node_selector={"cloud.google.com/gke-accelerator": "nvidia-tesla-a100",
                       "node.kubernetes.io/instance-type": f"a2-highgpu-{ngpu}g"},
        tolerations=[
            V1Toleration(effect="NoSchedule", key="cloud.google.com/gke-accelerator", value="nvidia-tesla-a100",
                         operator="Equal"),
            V1Toleration(effect="NoSchedule", key="beta.kubernetes.io/instance-type", value=f"a2-highgpu-{ngpu}g",
                         operator="Equal"),
        ]
    )
    return pod_spec

@task(
    task_config=Pod(pod_spec=generate_a100_pod_spec(4), primary_container_name="primary")
)
def my_task...

eapolinario pushed a commit to eapolinario/flyte that referenced this issue Dec 20, 2022
* add examples to decorate tasks and workflows

Signed-off-by: Niels Bantilan <[email protected]>

* Apply suggestions from code review

Co-authored-by: Samhita Alla <[email protected]>

* update conditional examples

Signed-off-by: Niels Bantilan <[email protected]>

Co-authored-by: Samhita Alla <[email protected]>
@kumare3
Copy link
Contributor Author

kumare3 commented Feb 4, 2023

Folks this is something very useful and we are thinking of starting to first design this and then implement it.

@sbrunk
Copy link
Member

sbrunk commented Feb 6, 2023

This would be very useful for us too. Let me know if we can help you here.

@ByronHsu
Copy link
Contributor

ByronHsu commented Feb 6, 2023

I am down to help the implementation 🙌

@kumare3
Copy link
Contributor Author

kumare3 commented Feb 6, 2023

cc @rahul-theorem too

@ByronHsu
Copy link
Contributor

I borrowed @rahul-theorem's idea and formulated a RFC (LINK).

The UX will look like the following in my case:

@task(
  requests=Resource(cpu="1", mem="1Mi")
)
def t1(x: int, y: int) -> int:
  return x+y

@task(
  requests=Resource(cpu="2", mem="2Mi")
)
def t2(x: int, y: int) -> int:
  return x*y

@workflow
def wf(a: int, b: int, c1: str, m1: str, c2: str, m2: str) -> int:
  t1_ = t1.override_config(requests=Resource(cpu=c1, mem=m1))
  o1 = t1_(a, b)
  t2_ = t2.override_config(requests=Resource(cpu=c2, mem=m2))
  o2 = t2_(a, b)
  return o1+o2

Feel free to leave any comments!! Thanks!!

@wild-endeavor
Copy link
Contributor

Had a chat with @kumare3 yesterday about this. Below is a gist that he shared with us. I think it more accurately reflects some of the ideas here.

This is the simpler case, and I think we should start here first.

@task(task_config=Spark(...))
def t1():
  ...
  
@task(resources=Resources(...), cache=True, cache_version=1.0)
def t2():
  ...
  
@workflow
def sub(x: int, y: int):
  return t1()

@workflow
def wf():
  x, y = t1().with_overrides(name="n1")
  t2(x, y).with_overrides(...)
  m, n = t1().with_overrides(...)
  return sub()

This is the first step I think. It will allow users to call the same task with different resources, and it'll allow a workflow to call a task with different resources than the task defined for itself.

The next step is a bit harder... the execution-time overrides. So the ux might look something like

override_lp = Launchplan.create(wf, 
                                overrides=wf.with_override(...))

flyteremote.execute(wf, WFOverrides(nodes=[
    "n1": t1.with_overrides(config=Spark(...)),
    "n2": t2.with_override
    "n3": WFOverrides(nodes=[
      t1.with_overrides(config=Spark(...)),
      ]
    ),
)

wf = flyteremote.fetch_workflow("wf")
flyteremote.execute(wf.with_overrides(

Implementation:
Steps

Create new messages as follows

class TaskOverrides:
    metadata: Optional[TaskMetadata]=None
    custom: Optional[...] = None
    ...

class NodeOverrides:
    task: TaskOverrides
   metadata: NodeMetadata

NodeName = TypeVar(str)

class WorkflowOverrides:
    nodes: optional[dict[NodeName, NodeOverrides]] = None

Update executionSpec to have

oneof {
  WorkflowOverrides, # Workflow
   TaskOverrides. # Single Task
}

In flytekit

 def with_override(...):
     NodeOverride(...)

@rahul-theorem
Copy link
Contributor

rahul-theorem commented Feb 24, 2023

I think it's desirable to keep the details of overrides abstracted away from the business logic of the workflow (since often these might be set by different tenants, ie a research scientist writing business logic vs an ops team setting resources to right-size workloads). Applying overrides at the node-level seems like it'd be a bit verbose (& also it's unclear how this would work for workflows w/ fan-out, etc).

One option here is supporting applying a NodeOverride to any task matching a particular annotation (this would also involve annotation support for tasks, but that also seems doable) -- that's a middle ground of sorts between the KFP op transformer pattern & the proposal above

@ByronHsu
Copy link
Contributor

Thanks @rahul-theorem ! I will continue to polish the design and propose later.

@bnsblue
Copy link
Contributor

bnsblue commented Mar 8, 2023

An idea built on top of the annotation idea @rahul-theorem mentioned is that maybe we can apply overrides to tasks whose execution/run-time signature matches (or partially matched) a specified pattern.

For example,

flyteremote.execute(
    wf.with_overrides(
        [
            Override(requests=Resource(cpu=32, memory="64GB"}, target=OverrideTarget(task="task_a", inputs={"region": "big_region"}, partial_match=True)),
            Override(requests=Resource(cpu=8, memory="2GB"}, target=OverrideTarget(task="task_a", inputs={"region": "small_region"}, partial_match=True)),
        ]
    )
)

@bstadlbauer
Copy link
Member

bstadlbauer commented Mar 30, 2023

I've just read through the thread and it seems like everyone is aligned on how this will be implemented on the backend, but the UX of how an override is assigned to a node is still TDB.

The three competing options at the moment are:

  1. Directly using node-names.
    a. Pros:

    • It's conceptually fairly easy and should also be the most straightforward on the implementation side

    b. Cons:

    • As far as I know, the node name is not explicitly known by users before running a workflow for the first time. Especially for more complex/nested workflows it is non-trivial to figure out the node name.
    • Not sure how deterministic node names are?
    • In case of a large fan-out (e.g. when using a dynamic-task to kick of one task for each element in a list), using node names can become unpractical
  2. Add overrides based on task annotations
    a. Pros

    • The workflow DAG is nicely decoupled from the overrides

    b. Cons

    • Tasks would need to be explicitly annotated before they can be overridden
    • Task annotation support (on a workflow not task level) would need to be implemented first
  3. Add overrides based on task pattern matches
    a. Pros

    • Very flexible in how overrides can be assigned to tasks

    b. Cons

    • Somewhat complex (when compared to the other solutions) and verbose, especially when you "just want to try this one think quickly"

The above list are just things that I could think of, feel free to add on to the pros and cons!

What if we go with @bnsblue's suggestion (option 3 above), but expand it such that the OverrideTarget could also include an optional node_name argument? This would combine approach 1 and 3, letting the user choose what property of the node to match on? In the "simple" workflow case, users could just use the node name, but more complex workflows with a lot of nesting or a higher fan-out can use the more elaborate matching. It would also set us up to add annotation matching support (option 2) in case that's added to tasks. Curious to hear your thoughts

@ByronHsu
Copy link
Contributor

@bstadlbauer Thanks for listing down all the options clearly! Just wondering if we go with option 1 + 3, what will the UI look like?

@bstadlbauer
Copy link
Member

Discussion has moved to #3553

@ai-rnatour
Copy link

Is there an update on this? I see that the RFC was merged.

@ByronHsu
Copy link
Contributor

this feature is not under development rn due to the complexity of the problem and the bandwidth of the contributor.

@vinnamkim
Copy link

vinnamkim commented Mar 11, 2024

Hi all,
Is there any update on this or may I ask a future plan if it exists?

Copy link

github-actions bot commented Dec 7, 2024

Hello 👋, this issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will engage on it to decide if it is still applicable.
Thank you for your contribution and understanding! 🙏

@github-actions github-actions bot added the stale label Dec 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request flyteadmin Issue for FlyteAdmin Service spec-first This issue needs a spec to be created and then work. It should be an independent stream stale
Projects
None yet
Development

No branches or pull requests