-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhancement(kubernetes source): Kubernetes partial message merge #2134
Changes from all commits
3b562ed
3c97d45
5b04af2
960be0a
1ae3037
f7a6ec3
bed4c6e
dd009a8
24fcf58
f22d088
0b79d1c
89e2799
bd206e1
3a0793b
f3e4b2e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ use crate::{ | |
sources::Source, | ||
topology::config::{DataType, GlobalOptions, SourceConfig}, | ||
transforms::{ | ||
merge::{Merge, TrailingNewlineNormalizer}, | ||
regex_parser::{RegexParser, RegexParserConfig}, | ||
Transform, | ||
}, | ||
|
@@ -39,12 +40,15 @@ enum BuildError { | |
IllegalCharacterInUid { uid: String }, | ||
} | ||
|
||
#[derive(Deserialize, Serialize, Debug, Clone, Default)] | ||
#[derive(Deserialize, Serialize, Debug, Clone, Derivative)] | ||
#[derivative(Default)] | ||
#[serde(deny_unknown_fields, default)] | ||
pub struct KubernetesConfig { | ||
include_container_names: Vec<String>, | ||
include_pod_uids: Vec<String>, | ||
include_namespaces: Vec<String>, | ||
#[derivative(Default(value = "true"))] | ||
auto_partial_merge: bool, | ||
} | ||
|
||
#[typetag::serde(name = "kubernetes")] | ||
|
@@ -73,12 +77,23 @@ impl SourceConfig for KubernetesConfig { | |
let mut parse_message = message_parser::build_message_parser()?; | ||
|
||
// Kubernetes source | ||
let source = file_recv | ||
let stream = file_recv | ||
.filter_map(move |event| transform_file.transform(event)) | ||
.filter_map(move |event| parse_message.transform(event)) | ||
.filter_map(move |event| now.filter(event)) | ||
.map(remove_ending_newline) | ||
.filter_map(move |event| transform_pod_uid.transform(event)) | ||
.filter_map(move |event| now.filter(event)); | ||
|
||
let stream: Box<dyn Stream<Item = Event, Error = _> + Send> = if self.auto_partial_merge { | ||
let mut transform_merge_partial_events = transform_merge_partial_events(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This approach to merging won't work with CRI runtimes. Implementations of it usually leave newline from the This is evident if you test it against Kubernetes cluster with CRI runtime, for example microk8s. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch! Thanks for pointing this out. Indeed, we can't merge before this is corrected. |
||
Box::new( | ||
stream.filter_map(move |event| transform_merge_partial_events.transform(event)), | ||
) | ||
} else { | ||
Box::new(stream) | ||
}; | ||
|
||
let stream = stream.filter_map(move |event| transform_pod_uid.transform(event)); | ||
|
||
let source = stream | ||
.forward(out.sink_map_err(drop)) | ||
.map(drop) | ||
.join(file_source) | ||
|
@@ -120,18 +135,6 @@ impl TimeFilter { | |
} | ||
} | ||
|
||
fn remove_ending_newline(mut event: Event) -> Event { | ||
if let Some(Value::Bytes(msg)) = event | ||
.as_mut_log() | ||
.get_mut(&event::log_schema().message_key()) | ||
{ | ||
if msg.ends_with(&['\n' as u8]) { | ||
msg.truncate(msg.len() - 1); | ||
} | ||
} | ||
event | ||
} | ||
|
||
fn transform_file() -> crate::Result<Box<dyn Transform>> { | ||
let mut config = RegexParserConfig::default(); | ||
|
||
|
@@ -210,6 +213,17 @@ fn transform_pod_uid() -> crate::Result<ApplicableTransform> { | |
Ok(ApplicableTransform::Candidates(transforms)) | ||
} | ||
|
||
fn transform_merge_partial_events() -> Merge<TrailingNewlineNormalizer> { | ||
let message_key = event::log_schema().message_key(); | ||
Merge::new( | ||
TrailingNewlineNormalizer { | ||
probe_field: message_key.clone(), | ||
}, | ||
vec![message_key.clone()], | ||
vec!["pod_uid".into()], | ||
) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
@@ -271,4 +285,75 @@ mod tests { | |
|
||
has(&event, "object_uid", "306cd636-0c6d-11ea-9079-1c1b0de4d755"); | ||
} | ||
|
||
#[test] | ||
fn partial_events_merge() { | ||
let message_key = event::log_schema().message_key(); | ||
let sample_pod_uid = "qwerty"; | ||
|
||
let part1 = { | ||
let mut event = Event::new_empty_log(); | ||
event | ||
.as_mut_log() | ||
.insert("pod_uid", sample_pod_uid.to_owned()); | ||
event.as_mut_log().insert(message_key, "hello".to_owned()); | ||
event | ||
}; | ||
let part2 = { | ||
let mut event = Event::new_empty_log(); | ||
event | ||
.as_mut_log() | ||
.insert("pod_uid", sample_pod_uid.to_owned()); | ||
event | ||
.as_mut_log() | ||
.insert(message_key, " world!\n".to_owned()); | ||
event | ||
}; | ||
|
||
let mut transform = transform_merge_partial_events(); | ||
|
||
assert!(transform.transform(part1).is_none()); | ||
let event = transform.transform(part2).unwrap(); | ||
|
||
has(&event, "pod_uid", sample_pod_uid); | ||
has(&event, message_key, "hello world!"); | ||
} | ||
|
||
#[test] | ||
fn partial_events_merge_separate_streams() { | ||
let message_key = event::log_schema().message_key(); | ||
|
||
let part1 = { | ||
let mut event = Event::new_empty_log(); | ||
event.as_mut_log().insert("pod_uid", "qwerty1".to_owned()); | ||
event.as_mut_log().insert(message_key, "par".to_owned()); | ||
event | ||
}; | ||
let part2 = { | ||
let mut event = Event::new_empty_log(); | ||
event.as_mut_log().insert("pod_uid", "qwerty2".to_owned()); | ||
event | ||
.as_mut_log() | ||
.insert(message_key, "non-partial!\n".to_owned()); | ||
event | ||
}; | ||
let part3 = { | ||
let mut event = Event::new_empty_log(); | ||
event.as_mut_log().insert("pod_uid", "qwerty1".to_owned()); | ||
event.as_mut_log().insert(message_key, "tial!\n".to_owned()); | ||
event | ||
}; | ||
|
||
let mut transform = transform_merge_partial_events(); | ||
|
||
assert!(transform.transform(part1).is_none()); | ||
|
||
let event = transform.transform(part2).unwrap(); | ||
has(&event, "pod_uid", "qwerty2"); | ||
has(&event, message_key, "non-partial!"); | ||
|
||
let event = transform.transform(part3).unwrap(); | ||
has(&event, "pod_uid", "qwerty1"); | ||
has(&event, message_key, "partial!"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Majority of the logs won't be separated, so I wouldn't consider it common.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how it is in
docker
source, but I don't mind changing this.@binarylogic what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that this should be
false
. I don't think this is a common option that users should be changing.