-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhancement(fluent source): Add support for forwarding over unix socket #22212
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ultimately this change is largely mechanical and is just gluing together the existing decoder with the existing unix stream machinery, but there is definitely the possibility I have missed something subtle.
/// Listening mode for the `fluent` source. | ||
#[configurable_component] | ||
#[derive(Clone, Debug)] | ||
#[serde(untagged, rename_all = "snake_case")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlike the socket data source I used untagged, to avoid this being a breaking change requiring introducing a "mode" tag. As the fields path
vs address
are unlikely to ever overlap, I think this is probably fine
} | ||
|
||
fn can_acknowledge(&self) -> bool { | ||
true | ||
matches!(self.mode, FluentMode::Tcp(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand what this does, I just matched the socket source
listen_path: PathBuf, | ||
socket_file_mode: Option<u32>, | ||
decoder: Decoder, | ||
decoder: D, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is necessary to allow using the FluentDecoder. As the fluent protocol doesn't have an explicit framing mechanism that I could ascertain, there wasn't a good way to make this work with the codecs::Decoder setup which separates framing and decoding.
This, and better discoverability, is also the reason I opted not to implement this as a codec on the socket source, as suggested on the ticket
Thank you for this contribution @tustvold, I will review this PR in the following days (currently have a big backlog). |
Summary
Change Type
Is this a breaking change?
How did you test this PR?
I couldn't work out how to get the integration tests working, but I tested by running fluent-bit with the following config
And then running vector with
And seeing the metrics being transferred.
I think it should be a relatively straightforward lift for someone with familiarity with the existing integration testing harness to hook this up, but I got all sorts of errors.
Does this PR include user facing changes?
Checklist
make check-all
is a good command to run locally. This check isdefined here. Some of these
checks might not be relevant to your PR. For Rust changes, at the very least you should run:
cargo fmt --all
cargo clippy --workspace --all-targets -- -D warnings
cargo nextest run --workspace
(alternatively, you can runcargo test --all
)Cargo.lock
), pleaserun
dd-rust-license-tool write
to regenerate the license inventory and commit the changes (if any). More details here.References