Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove references of EachParallel and EachSequential #1956

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions content/docs/03.tutorial/05.flowable.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ Run tasks or subflows in parallel, create loops and conditional branching.

One of the most common orchestration requirements is to execute independent processes **in parallel**. For example, you can process data for each partition in parallel. This can significantly speed up the processing time.

The flow below uses the `EachParallel` flowable task to execute a list of `tasks` in parallel.
1. The `value` property defines the list of items to iterate over.
2. The `tasks` property defines the list of tasks to execute for each item in the list. You can access the iteration value using the `{{ taskrun.value }}` variable.
The flow below uses the `ForEach` flowable task to execute a list of `tasks` in parallel.

1. The `concurrencyLimit` property with value `0` makes the list of `tasks` to execute in parallel.
2. The `values` property defines the list of items to iterate over.
3. The `tasks` property defines the list of tasks to execute for each item in the list. You can access the iteration value using the `{{ taskrun.value }}` variable.

```yaml
id: python_partitions
Expand All @@ -35,8 +37,9 @@ tasks:
Kestra.outputs({'partitions': partitions})

- id: processPartitions
type: io.kestra.plugin.core.flow.EachParallel
value: '{{ outputs.getPartitions.vars.partitions }}'
type: io.kestra.plugin.core.flow.ForEach
concurrencyLimit: 0
values: '{{ outputs.getPartitions.vars.partitions }}'
tasks:
- id: partition
type: io.kestra.plugin.scripts.python.Script
Expand All @@ -62,4 +65,3 @@ To learn more about flowable tasks, check out the full [documentation](../05.con
::next-link
[Next, let's configure failure notifications and retries](06.errors.md)
::

6 changes: 3 additions & 3 deletions content/docs/04.workflow-components/04.variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ variables:

tasks:
- id: parallel
type: io.kestra.plugin.core.flow.EachParallel
value: "{{ vars.servers }}"
type: io.kestra.plugin.core.flow.ForEach
concurrencyLimit: 0
values: "{{ vars.servers }}"
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
Expand All @@ -220,4 +221,3 @@ tasks:
- "{{ json(taskrun.value).fqn }}" # prints the value for that key e.g. server01.mydomain.io
- "{{ json(taskrun.value).user }}" # prints the value for that key e.g. root
```

24 changes: 12 additions & 12 deletions content/docs/04.workflow-components/06.outputs.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ namespace: company.team

tasks:
- id: each
type: io.kestra.plugin.core.flow.EachSequential
value: ["value 1", "value 2", "value 3"]
type: io.kestra.plugin.core.flow.ForEach
values: ["value 1", "value 2", "value 3"]
tasks:
- id: inner
type: io.kestra.plugin.core.debug.Return
Expand All @@ -123,8 +123,8 @@ namespace: company.team

tasks:
- id: each
type: io.kestra.plugin.core.flow.EachSequential
value:
type: io.kestra.plugin.core.flow.ForEach
values:
- {"key": "my-key", "value": "my-value"}
- {"key": "my-complex", "value": {"sub": 1, "bool": true}}
tasks:
Expand All @@ -138,7 +138,7 @@ tasks:

Dynamic tasks are tasks that will run other tasks a certain number of times. A dynamic task will run multiple iterations of a set of sub-tasks.

For example, **EachSequential** and **EachParallel** produce other tasks dynamically depending on their `value` property.
For example, **ForEach** produces other tasks dynamically depending on its `values` property.

It is possible to reach each iteration output of dynamic tasks by using the following syntax:

Expand All @@ -148,8 +148,8 @@ namespace: company.team

tasks:
- id: each
type: io.kestra.plugin.core.flow.EachSequential
value: ["s1", "s2", "s3"]
type: io.kestra.plugin.core.flow.ForEach
values: ["s1", "s2", "s3"]
tasks:
- id: sub
type: io.kestra.plugin.core.debug.Return
Expand All @@ -172,8 +172,8 @@ namespace: company.team

tasks:
- id: each
type: io.kestra.plugin.core.flow.EachSequential
value: ["value 1", "value 2", "value 3"]
type: io.kestra.plugin.core.flow.ForEach
values: ["value 1", "value 2", "value 3"]
tasks:
- id: inner
type: io.kestra.plugin.core.debug.Return
Expand All @@ -192,7 +192,7 @@ Sometimes, it can be useful to access previous outputs on the current task tree,

If the task tree is static, for example when using the [Sequential](/plugins/core/tasks/flows/io.kestra.plugin.core.flow.Sequential) task, you can use the `{{ outputs.sibling.value }}` notation where `sibling`is the identifier of the sibling task.

If the task tree is dynamic, for example when using the [EachSequential](/plugins/core/tasks/flows/io.kestra.plugin.core.flow.EachSequential) task, you need to use `{{ sibling[taskrun.value] }}` to access the current tree task. `taskrun.value` is a special variable that holds the current value of the EachSequential task.
If the task tree is dynamic, for example when using the [ForEach](/plugins/core/tasks/flows/io.kestra.plugin.core.flow.ForEach) task, you need to use `{{ sibling[taskrun.value] }}` to access the current tree task. `taskrun.value` is a special variable that holds the current value of the ForEach task.

For example:

Expand All @@ -202,8 +202,8 @@ namespace: company.team

tasks:
- id: each
type: io.kestra.plugin.core.flow.EachSequential
value: ["value 1", "value 2", "value 3"]
type: io.kestra.plugin.core.flow.ForEach
values: ["value 1", "value 2", "value 3"]
tasks:
- id: first
type: io.kestra.plugin.core.debug.Return
Expand Down
8 changes: 4 additions & 4 deletions content/docs/05.concepts/expression/02.expression-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ tasks:
format: "{{ task.id }} > {{ taskrun.startDate }}"

- id: hierarchy_1
type: io.kestra.plugin.core.flow.EachSequential
value: ["a", "b"]
type: io.kestra.plugin.core.flow.ForEach
values: ["a", "b"]
tasks:
- id: hierarchy_2
type: io.kestra.plugin.core.flow.Switch
Expand All @@ -275,8 +275,8 @@ tasks:
format: "{{ task.id }}"

- id: hierarchy_2_b_second
type: io.kestra.plugin.core.flow.EachSequential
value: ["1", "2"]
type: io.kestra.plugin.core.flow.ForEach
values: ["1", "2"]
tasks:
- id: switch
type: io.kestra.plugin.core.flow.Switch
Expand Down
5 changes: 3 additions & 2 deletions content/docs/11.migration-guide/0.11.0/templates.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ tasks:
outputs:
- out/**
- id: each
type: io.kestra.plugin.core.flow.EachParallel
value: "{{outputs.out.uris | jq('.[]')}}"
type: io.kestra.plugin.core.flow.ForEach
concurrencyLimit: 0
values: "{{outputs.out.uris | jq('.[]')}}"
wrussell1999 marked this conversation as resolved.
Show resolved Hide resolved
tasks:
- id: path
type: io.kestra.plugin.core.debug.Return
Expand Down
2 changes: 1 addition & 1 deletion content/docs/14.best-practices/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Based on previous observations, here are some recommendations.

While it is possible to code a flow with any number of tasks, it is not recommended to have a lot of tasks on the same flow.

A flow can be comprised of manually generated tasks or dynamic ones. While [EachSequential](/plugins/core/tasks/flows/io.kestra.plugin.core.flow.EachSequential) and [EachParallel](/plugins/core/tasks/flows/io.kestra.plugin.core.flow.EachParallel) are really powerful tasks to loop over the result of a previous task, there are some drawbacks. If the task you are looping over is too large, you can easily end up with hundreds of tasks created. If, for example, you were using a pattern with Each inside Each (nested looping), it would take only a flow with 20 TaskRuns X 20 TaskRuns to reach 400 TaskRuns.
A flow can be comprised of manually generated tasks or dynamic ones. While [ForEach](/plugins/core/tasks/flows/io.kestra.plugin.core.flow.ForEach) can be really powerful to loop over the result of a previous task, there are some drawbacks. If the task you are looping over is too large, you can easily end up with hundreds of tasks created. If, for example, you were using a pattern with Each inside Each (nested looping), it would take only a flow with 20 TaskRuns X 20 TaskRuns to reach 400 TaskRuns.

::alert{type="warning"}
Based on our observations, we have seen that in cases where there are **more than 100** tasks on a flow, we see a decrease in performance and longer executions.
Expand Down