Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal for dynamic pipelines #1480

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 252 additions & 0 deletions docs/design/1443-dynamic-pipelines.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
# Proposal: Alloy proposal process

* Author: Paulin Todev (@ptodev), Piotr Gwizdala (@thampiotr)
* Last updated: 2024-08-15
* Original issue: https://github.com/grafana/alloy/issues/1443

## Abstract

We are proposing a new feature to the [Alloy standard library][stdlib].
It will be similar to a `map` operation over a collection such as a `list()`.
Each `map` transformation will be done by a chain of components (a "sub-pipeline") created for this transformation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I am not a fan of double-quoted quasi-concept of "sub-pipeline"... it's not essential to this proposal, but coming up with a better way to explain it using well-defined concepts is something we'll need to do for the documentation of this feature.

Each item in the collection will be processed by a different "sub-pipeline".
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how feasible this is when the number of items in the collection is really big. Should there be a limit to how many sub-pipelines there are?


The final solution may differ from a standard `map` operation, since there may be multiple outputs for the same input.
For example, the sub-pipeline may branch into different `prometheus.relabel` components,
each of which sends outputs to different components outside of the sub-pipeline.

[stdlib]: https://grafana.com/docs/alloy/latest/reference/stdlib/

## Use cases

<!-- TODO: Add more use cases. It'd be helpful to gather feedback from the community and from solutions engineers. -->

### Using discovery components together with prometheus.exporter ones

Discovery components output a list of targets. It's not possible to input those lists directly to most exporter components.

Suppose we have a list of targets produced by a `discovery` component:

```
[
{"__address__" = "redis-one:9115", "instance" = "one"},
{"__address__" = "redis-two:9116", "instance" = "two"},
]
```

The [Alloy type][alloy-types] of the list above is `list(map(string))`.
However, you may want to pipe information from this list of targets to a component which doesn't work with a `list()` or a `map()`.
For example, you may want to input the `"__address__"` string to a `prometheus.exporter.redis`,
and you may want to use the `"instance"` string in a `discovery.relabel`.

[alloy-types]: https://grafana.com/docs/alloy/latest/get-started/configuration-syntax/expressions/types_and_values/

## Proposal 1: A foreach block
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this one IMO.


A `foreach` block may start several sub-pipelines for a `collection` specified in its arguments.

```alloy
// All components in the sub-pipeline will be scoped under "foreach.default/1/...".
// Here, "1" is sub-pipeline number 1.
// This way component names won't clash with other sub-pipelines from the same foreach,
// and with the names of components outside of the foreach.
foreach "default" {

// "collection" is what the for loop will iterate over.
collection = discovery.file.default.targets

// Each item in the collection will be accessible via the "target" variable.
// E.g. `target["__address__"]`.
var = "target"

// A sub-pipeline consisting of components which process each target.
...
}
```

<details>
<summary>Example</summary>

```alloy
discovery.file "default" {
files = ["/Users/batman/Desktop/redis_addresses.yaml"]
}

// Every component defined in the "foreach" block will be instantiated for each item in the collection.
// The instantiated components will be scoped using the name of the foreach block and the index of the
// item in the collection. For example: /foreach.redis/0/prometheus.exporter.redis.default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/foreach.redis/0/prometheus.exporter.redis.default

Targets can come and go between discovery intervals. If you use the index to scope them you will need to shift it on every update. Maybe you can just use the target itself which should be unique: /foreach.redis/target/prometheus.exporter.redis.default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can just use the target itself which should be unique

What do you mean by "the target itself"? Do you mean the "address" label? We're not guaranteed to have such a label on the collection which is being iterated on :) The collection might not be targets. Also, some targets might not follow this convention of having an __address__ label.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the target (or more generally the item in the collection) can be hashed and we can use the hash for scoping and for avoid recomputations on update when a new collection comes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think targets going away in the middle of the list is a good point to consider. Hashing the way that William is proposing might work and we'd need to recommend people to try change the input targets as little as possible (e.g. use a reasonable refresh interval), otherwise there will be a lot of churn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, couldn't there be hash collisions? I don't see how hashing is a very resilient way of handling this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With sufficiently long hash you don't need to worry about collisions. Unless there are implementation bugs.

If collisions worry you or we want to use shorter IDs than a 512bit hash, we could think of a mapper that will assign stable IDs to targets, using a simple int sequence.

foreach "redis" {
collection = discovery.file.default.targets
// Here, "target" is a variable whose value is the current item in the collection.
var = "target"

prometheus.exporter.redis "default" {
redis_addr = target["__address__"] // we can also do the necessary rewrites before this.
}

discovery.relabel "default" {
targets = prometheus.exporter.redis.default.targets
// Add a label which comes from the discovery component.
rule {
target_label = "filepath"
// __meta_filepath comes from discovery.file
replacement = target["__meta_filepath"]
}
}

prometheus.scrape "default" {
targets = discovery.relabel.default.targets
forward_to = prometheus.remote_write.mimir.receiver
}
}

prometheus.remote_write "mimir" {
endpoint {
url = "https://prometheus-prod-05-gb-south-0.grafana.net/api/prom/push"
basic_auth {
username = ""
password = ""
}
}
}
```

</details>

Pros:
* The `foreach` name is consistent with other programming languages.

Cons:
* It looks less like a component than a `declare.dynamic` block.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a problem, you can still define a declare component (or import one) and use it inside the foreach syntax to make it smaller, add the parameters you need and reuse it for multiple foreach blocks.

In order to instantiate multiple `foreach` blocks with similar config, you'd have to wrap them in a `declare` block.

## Proposal 2: A declare.dynamic block

A new `declare.dynamic` block would create a custom component which starts several sub-pipelines internally.
Users can use `argument` and `export` blocks, just like in a normal `declare` block.

```alloy
declare.dynamic "ex1" {
argument "input_targets" {
optional = false
comment = "We will create a sub-pipeline for each target in input_targets."
}

argument "output_metrics" {
optional = false
comment = "All the metrics gathered from all pipelines."
}

// A sub-pipeline consisting of components which process each target.
...
}

declare.dynamic.ex1 "default" {
input_targets = discovery.file.default.targets
output_metrics = [prometheus.remote_write.mimir.receiver]
}
```

<details>
<summary>Example</summary>

```alloy
// declare.dynamic "maps" each target to a sub-pipeline.
// Each sub-pipeline has 1 exporter, 1 relabel, and 1 scraper.
// Internally, maybe one way this can be done via serializing the pipeline to a string and then importing it as a module?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's an interesting idea. One way to do this could be similar to the import node. The declare.dynamic instance would manage a list of custom component nodes that it can run using the runner package. These custom component nodes would not be part of the initial graph and would take the pipeline template as AST body to run.

declare.dynamic "redis_exporter" {
argument "input_targets" {
optional = false
comment = "We will create a sub-pipeline for each target in input_targets."
}

argument "output_metrics" {
optional = false
comment = "All the metrics gathered from all pipelines."
}

// "id" is a special identifier for every "sub-pipeline".
// The number of "sub-pipelines" is equal to len(input_targets).
prometheus.exporter.redis id {
redis_addr = input_targets["__address__"]
}

discovery.relabel id {
targets = prometheus.exporter.redis[id].targets
// Add a label which comes from the discovery component.
rule {
target_label = "filepath"
// __meta_filepath comes from discovery.file
replacement = input_targets["__meta_filepath"]
}
}

prometheus.scrape id {
targets = prometheus.exporter.redis[id].targets
forward_to = output_metrics
}

}
discovery.file "default" {
files = ["/Users/batman/Desktop/redis_addresses.yaml"]
}

declare.dynamic.redis_exporter "default" {
input_targets = discovery.file.default.targets
output_metrics = [prometheus.remote_write.mimir.receiver]
}

prometheus.remote_write "mimir" {
endpoint {
url = "https://prometheus-prod-05-gb-south-0.grafana.net/api/prom/push"
basic_auth {
username = ""
password = ""
}
}
}
```

</details>

Pros:
* Looks more like a component than a `foreach` block.
* Flexible number of inputs and outputs.

Cons:
* A name such as `declare.dynamic` doesn't sound as familiar to most people than `foreach`.
* It may not be practical to implement this in a way that there's more than one possible input collection.
* How can we limit users to having just one collection?
* Having another variant of the `declare` block can feel complex.
Can we just add this functionality to the normal `declare` block, so that we can avoid having a `declare.dynamic` block?

## Proposal 3: Do nothing

It is customary to always include a "do nothing" proposal, in order to evaluate if the work is really required.

Pros:
* No effort required.
* Alloy's syntax is simpler since we're not adding any new types of blocks.

Cons:
* Not possible to integrate most `prometheus.exporter` components with the `discovery` ones.

## Unknowns

We should find answers to the unknowns below before this proposal is accepted:

* Will the solution only work for `list()`? What about `map()`?
* If we go with a `foreach`, we could have a `key` attribute in addition to the `var` one. That way we can also access the key. The `key` attribute can be a no-op if `collection` is a map?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, a map of {k1: v1, k2: v2} can be translated into list of entries: [{key: k1, value: v1}, {key: k2, value: v2}].

* What about debug metrics? Should we aggregate the metrics for all "sub-pipelines"?
* If there is 1 series for each sub-pipeline, the amount of metrics could be huge.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explosion of metrics is an existing issue with Alloy and using modules...
Also, currently, if someone wanted to implement the use-case of, e.g. 10 redis exporters, and they chose to use modules, they would also get a ton of metrics AFAIU.

So I don't think it's a problem that should be part of this proposal. This is not to say that I don't think it's a problem, we should ideally look into how we can handle metrics for modules a bit better. But IMO this should be a new proposal that will deal with this problem for all usages of modules.

Some service discovery mechanisms may generate a huge number of elements in a list of targets.
* If we want to aggregate the metrics, how would we do that? Is it even possible to do in within Alloy?
* Can we have a configuration parameter which dictates whether the metrics should be aggregated or not?
Comment on lines +240 to +244
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could become a huge blocker. I don't think we want a new series for each sub-pipeline, and I am not sure if we can avoid it.

* Do we have to recreate the sub-pipelines every time a new collection is received,
even if the new collection has the same number of elements?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you identify each sub-pipeline by its corresponding target, it should be ok to only add/remove sub-pipelines according to the new set of targets and keep the sub-pipelines that are still relevant running.

* Do we need to have more than one output, of a different type?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean... in Proposal 1 you can output to as many things as you want, AFAICT

* Do we need to have more than one input, of a different type?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO no. You can use multiple foreach or perhaps merge your targets using something like the join in your other proposal?


## Recommended solution

<!-- TODO: Fill this later -->
16 changes: 13 additions & 3 deletions internal/runtime/alloy.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,22 @@ func newController(o controllerOptions) *Runtime {
OnExportsChange: o.OnExportsChange,
Registerer: o.Reg,
ControllerID: o.ControllerID,
NewModuleController: func(id string) controller.ModuleController {
NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController {
reg := o.Reg
if opts.Reg != nil {
reg = opts.Reg
}

return newModuleController(&moduleControllerOptions{
ComponentRegistry: o.ComponentRegistry,
ModuleRegistry: o.ModuleRegistry,
Logger: log,
Tracer: tracer,
Reg: o.Reg,
Reg: reg,
DataPath: o.DataPath,
MinStability: o.MinStability,
EnableCommunityComps: o.EnableCommunityComps,
ID: id,
ID: opts.Id,
ServiceMap: serviceMap,
WorkerPool: workerPool,
})
Expand Down Expand Up @@ -265,6 +270,7 @@ func (f *Runtime) Run(ctx context.Context) {
components = f.loader.Components()
services = f.loader.Services()
imports = f.loader.Imports()
forEachs = f.loader.ForEachs()

runnables = make([]controller.RunnableNode, 0, len(components)+len(services)+len(imports))
)
Expand All @@ -276,6 +282,10 @@ func (f *Runtime) Run(ctx context.Context) {
runnables = append(runnables, i)
}

for _, fe := range forEachs {
runnables = append(runnables, fe)
}

// Only the root controller should run services, since modules share the
// same service instance as the root.
if !f.opts.IsModule {
Expand Down
Loading
Loading