Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(fluent source): Add support for forwarding over unix socket #22212

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Jan 15, 2025

Summary

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

How did you test this PR?

I couldn't work out how to get the integration tests working, but I tested by running fluent-bit with the following config

[SERVICE]
    Flush      5
    Daemon     off
    Log_Level  info

[INPUT]
    Name       cpu
    Tag        cpu_usage

[OUTPUT]
    Name          forward
    Match         *
    unix_path     /tmp/map/fluent.sock

And then running vector with

sources:
  fluent:
    type: fluent
    path: /tmp/map/fluent.sock

sinks:
  out:
    inputs:
      - fluent
    type: console
    encoding:
      codec: json

And seeing the metrics being transferred.

I think it should be a relatively straightforward lift for someone with familiarity with the existing integration testing harness to hook this up, but I got all sorts of errors.

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the "no-changelog" label to this PR.

Checklist

  • Please read our Vector contributor resources.
    • make check-all is a good command to run locally. This check is
      defined here. Some of these
      checks might not be relevant to your PR. For Rust changes, at the very least you should run:
      • cargo fmt --all
      • cargo clippy --workspace --all-targets -- -D warnings
      • cargo nextest run --workspace (alternatively, you can run cargo test --all)
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run dd-rust-license-tool write to regenerate the license inventory and commit the changes (if any). More details here.

References

@tustvold tustvold requested a review from a team as a code owner January 15, 2025 13:28
@bits-bot
Copy link

bits-bot commented Jan 15, 2025

CLA assistant check
All committers have signed the CLA.

@github-actions github-actions bot added the domain: sources Anything related to the Vector's sources label Jan 15, 2025
Copy link
Contributor Author

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately this change is largely mechanical and is just gluing together the existing decoder with the existing unix stream machinery, but there is definitely the possibility I have missed something subtle.

src/sources/fluent/mod.rs Show resolved Hide resolved
/// Listening mode for the `fluent` source.
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(untagged, rename_all = "snake_case")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike the socket data source I used untagged, to avoid this being a breaking change requiring introducing a "mode" tag. As the fields path vs address are unlikely to ever overlap, I think this is probably fine

}

fn can_acknowledge(&self) -> bool {
true
matches!(self.mode, FluentMode::Tcp(_))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand what this does, I just matched the socket source

listen_path: PathBuf,
socket_file_mode: Option<u32>,
decoder: Decoder,
decoder: D,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is necessary to allow using the FluentDecoder. As the fluent protocol doesn't have an explicit framing mechanism that I could ascertain, there wasn't a good way to make this work with the codecs::Decoder setup which separates framing and decoding.

This, and better discoverability, is also the reason I opted not to implement this as a codec on the socket source, as suggested on the ticket

Cargo.lock Outdated Show resolved Hide resolved
@pront pront self-assigned this Jan 23, 2025
@pront
Copy link
Member

pront commented Jan 23, 2025

Thank you for this contribution @tustvold, I will review this PR in the following days (currently have a big backlog).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: sources Anything related to the Vector's sources
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Source: fluent, allow sending events to unix socket
3 participants