diff --git a/Dockerfile.unit b/Dockerfile.unit index 59d5871834bc31..1b340cb5eddef4 100644 --- a/Dockerfile.unit +++ b/Dockerfile.unit @@ -29,7 +29,7 @@ COPY . /src ENV PROTOC=/src/thirdparty/protoc/protoc-linux-x86_64 -RUN chmod -R 777 /src $CARGO_HOME +RUN chmod -R 777 /src $CARGO_HOME $HOME RUN mkdir -p ~/.cargo/bin && \ for plugin in nextest deny; do \ ln -s /src/thirdparty/cargo-${plugin}/cargo-${plugin}-linux-$(arch) ~/.cargo/bin/cargo-${plugin}; \ diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index a3de6b26fbfc31..93461333082beb 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -52,6 +52,7 @@ where pub remove_after: Option, pub emitter: E, pub handle: tokio::runtime::Handle, + pub rotate_wait: Duration, } /// `FileServer` as Source @@ -292,6 +293,13 @@ where } } + for (_, watcher) in &mut fp_map { + if !watcher.file_findable() && watcher.last_seen().elapsed() > self.rotate_wait { + self.emitter.emit_gave_up_on_deleted_file(&watcher.path); + watcher.set_dead(); + } + } + // A FileWatcher is dead when the underlying file has disappeared. // If the FileWatcher is dead we don't retain it; it will be deallocated. fp_map.retain(|file_id, watcher| { diff --git a/lib/file-source/src/file_watcher/mod.rs b/lib/file-source/src/file_watcher/mod.rs index f39df392748cf2..55bf90e69f20fb 100644 --- a/lib/file-source/src/file_watcher/mod.rs +++ b/lib/file-source/src/file_watcher/mod.rs @@ -44,6 +44,7 @@ pub struct FileWatcher { is_dead: bool, last_read_attempt: Instant, last_read_success: Instant, + last_seen: Instant, max_line_bytes: usize, line_delimiter: Bytes, buf: BytesMut, @@ -145,6 +146,7 @@ impl FileWatcher { is_dead: false, last_read_attempt: ts, last_read_success: ts, + last_seen: ts, max_line_bytes, line_delimiter, buf: BytesMut::new(), @@ -176,6 +178,9 @@ impl FileWatcher { pub fn set_file_findable(&mut self, f: bool) { self.findable = f; + if f { + self.last_seen = Instant::now(); + } } pub fn file_findable(&self) -> bool { @@ -268,6 +273,11 @@ impl FileWatcher { self.last_read_success.elapsed() < Duration::from_secs(10) || self.last_read_attempt.elapsed() > Duration::from_secs(10) } + + #[inline] + pub fn last_seen(&self) -> Instant { + self.last_seen + } } fn is_gzipped(r: &mut io::BufReader) -> io::Result { diff --git a/lib/file-source/src/fingerprinter.rs b/lib/file-source/src/fingerprinter.rs index de83749eb9d227..faf99fa960ef03 100644 --- a/lib/file-source/src/fingerprinter.rs +++ b/lib/file-source/src/fingerprinter.rs @@ -554,5 +554,7 @@ mod test { fn emit_files_open(&self, _: usize) {} fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {} + + fn emit_gave_up_on_deleted_file(&self, _: &Path) {} } } diff --git a/lib/file-source/src/internal_events.rs b/lib/file-source/src/internal_events.rs index 20195bb5deb228..dadb9969a8db8d 100644 --- a/lib/file-source/src/internal_events.rs +++ b/lib/file-source/src/internal_events.rs @@ -26,4 +26,6 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static { fn emit_files_open(&self, count: usize); fn emit_path_globbing_failed(&self, path: &Path, error: &Error); + + fn emit_gave_up_on_deleted_file(&self, path: &Path); } diff --git a/src/internal_events/file.rs b/src/internal_events/file.rs index 2e3cb17344fb92..2b907556c77fdd 100644 --- a/src/internal_events/file.rs +++ b/src/internal_events/file.rs @@ -404,6 +404,25 @@ mod source { } } + #[derive(Debug)] + pub struct GaveUpOnDeletedFile<'a> { + pub file: &'a Path, + } + + impl<'a> InternalEvent for GaveUpOnDeletedFile<'a> { + fn emit(self) { + info!( + message = "Gave up on deleted file.", + file = %self.file.display(), + ); + counter!( + "files_deleted_given_up_total", 1, + "file" => self.file.to_string_lossy().into_owned(), + ); + } + } + + #[derive(Clone)] pub struct FileSourceInternalEventsEmitter; @@ -458,5 +477,9 @@ mod source { fn emit_path_globbing_failed(&self, path: &Path, error: &Error) { emit!(PathGlobbingError { path, error }); } + + fn emit_gave_up_on_deleted_file(&self, file: &Path) { + emit!(GaveUpOnDeletedFile { file }); + } } } diff --git a/src/sources/file.rs b/src/sources/file.rs index 9ad00590de88fc..81e0c583a7201e 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -234,6 +234,12 @@ pub struct FileConfig { #[configurable(metadata(docs::hidden))] #[serde(default)] log_namespace: Option, + + /// How long to keep an open handle to a rotated log file around. + #[serde_as(as = "serde_with::DurationMilliSeconds")] + #[configurable(metadata(docs::type_unit = "milliseconds"))] + #[serde(default = "default_rotate_wait_ms")] + pub rotate_wait_ms: Duration, } fn default_max_line_bytes() -> usize { @@ -268,6 +274,10 @@ fn default_line_delimiter() -> String { "\n".to_string() } +const fn default_rotate_wait_ms() -> Duration { + Duration::from_millis(5_000) +} + /// Configuration for how files should be identified. /// /// This is important for `checkpointing` when file rotation is used. @@ -385,6 +395,7 @@ impl Default for FileConfig { encoding: None, acknowledgements: Default::default(), log_namespace: None, + rotate_wait_ms: default_rotate_wait_ms() } } } @@ -532,6 +543,7 @@ pub fn file_source( remove_after: config.remove_after_secs.map(Duration::from_secs), emitter: FileSourceInternalEventsEmitter, handle: tokio::runtime::Handle::current(), + rotate_wait: config.rotate_wait_ms }; let event_metadata = EventMetadata { diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index ef052c41f5bea2..7610f9c4e22f8d 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -232,6 +232,12 @@ pub struct Config { #[configurable(metadata(docs::hidden))] #[serde(default)] log_namespace: Option, + + /// How long to keep an open handle to a rotated log file around. + /// + #[serde_as(as = "serde_with::DurationMilliSeconds")] + #[serde(default)] + rotate_wait_ms: Duration } const fn default_read_from() -> ReadFromConfig { @@ -273,6 +279,7 @@ impl Default for Config { kube_config_file: None, delay_deletion_ms: default_delay_deletion_ms(), log_namespace: None, + rotate_wait_ms: default_rotate_wait_ms() } } } @@ -519,6 +526,7 @@ struct Source { glob_minimum_cooldown: Duration, ingestion_timestamp_field: Option, delay_deletion: Duration, + rotate_wait: Duration } impl Source { @@ -595,6 +603,7 @@ impl Source { glob_minimum_cooldown, ingestion_timestamp_field, delay_deletion, + rotate_wait: config.rotate_wait_ms }) } @@ -625,6 +634,7 @@ impl Source { glob_minimum_cooldown, ingestion_timestamp_field, delay_deletion, + rotate_wait } = self; let mut reflectors = Vec::new(); @@ -755,6 +765,7 @@ impl Source { emitter: FileSourceInternalEventsEmitter, // A handle to the current tokio runtime handle: tokio::runtime::Handle::current(), + rotate_wait }; let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::>(2); @@ -951,6 +962,10 @@ const fn default_delay_deletion_ms() -> Duration { Duration::from_millis(60_000) } +const fn default_rotate_wait_ms() -> Duration { + Duration::from_millis(5_000) +} + // This function constructs the patterns we exclude from file watching, created // from the defaults or user provided configuration. fn prepare_exclude_paths(config: &Config) -> crate::Result> {