Skip to content

Commit

Permalink
LOG-3949: Vector not releasing deleted file handles
Browse files Browse the repository at this point in the history
  • Loading branch information
syedriko committed Sep 28, 2023
1 parent 6ab0df1 commit 63662e2
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Dockerfile.unit
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ COPY . /src

ENV PROTOC=/src/thirdparty/protoc/protoc-linux-x86_64

RUN chmod -R 777 /src $CARGO_HOME
RUN chmod -R 777 /src $CARGO_HOME $HOME
RUN mkdir -p ~/.cargo/bin && \
for plugin in nextest deny; do \
ln -s /src/thirdparty/cargo-${plugin}/cargo-${plugin}-linux-$(arch) ~/.cargo/bin/cargo-${plugin}; \
Expand Down
8 changes: 8 additions & 0 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ where
pub remove_after: Option<Duration>,
pub emitter: E,
pub handle: tokio::runtime::Handle,
pub rotate_wait: Duration,
}

/// `FileServer` as Source
Expand Down Expand Up @@ -292,6 +293,13 @@ where
}
}

for (_, watcher) in &mut fp_map {
if !watcher.file_findable() && watcher.last_seen().elapsed() > self.rotate_wait {
self.emitter.emit_gave_up_on_deleted_file(&watcher.path);
watcher.set_dead();
}
}

// A FileWatcher is dead when the underlying file has disappeared.
// If the FileWatcher is dead we don't retain it; it will be deallocated.
fp_map.retain(|file_id, watcher| {
Expand Down
10 changes: 10 additions & 0 deletions lib/file-source/src/file_watcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct FileWatcher {
is_dead: bool,
last_read_attempt: Instant,
last_read_success: Instant,
last_seen: Instant,
max_line_bytes: usize,
line_delimiter: Bytes,
buf: BytesMut,
Expand Down Expand Up @@ -145,6 +146,7 @@ impl FileWatcher {
is_dead: false,
last_read_attempt: ts,
last_read_success: ts,
last_seen: ts,
max_line_bytes,
line_delimiter,
buf: BytesMut::new(),
Expand Down Expand Up @@ -176,6 +178,9 @@ impl FileWatcher {

pub fn set_file_findable(&mut self, f: bool) {
self.findable = f;
if f {
self.last_seen = Instant::now();
}
}

pub fn file_findable(&self) -> bool {
Expand Down Expand Up @@ -268,6 +273,11 @@ impl FileWatcher {
self.last_read_success.elapsed() < Duration::from_secs(10)
|| self.last_read_attempt.elapsed() > Duration::from_secs(10)
}

#[inline]
pub fn last_seen(&self) -> Instant {
self.last_seen
}
}

fn is_gzipped(r: &mut io::BufReader<fs::File>) -> io::Result<bool> {
Expand Down
2 changes: 2 additions & 0 deletions lib/file-source/src/fingerprinter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,5 +554,7 @@ mod test {
fn emit_files_open(&self, _: usize) {}

fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {}

fn emit_gave_up_on_deleted_file(&self, _: &Path) {}
}
}
2 changes: 2 additions & 0 deletions lib/file-source/src/internal_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
fn emit_files_open(&self, count: usize);

fn emit_path_globbing_failed(&self, path: &Path, error: &Error);

fn emit_gave_up_on_deleted_file(&self, path: &Path);
}
23 changes: 23 additions & 0 deletions src/internal_events/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,25 @@ mod source {
}
}

#[derive(Debug)]
pub struct GaveUpOnDeletedFile<'a> {
pub file: &'a Path,
}

impl<'a> InternalEvent for GaveUpOnDeletedFile<'a> {
fn emit(self) {
info!(
message = "Gave up on deleted file.",
file = %self.file.display(),
);
counter!(
"files_deleted_given_up_total", 1,
"file" => self.file.to_string_lossy().into_owned(),
);
}
}


#[derive(Clone)]
pub struct FileSourceInternalEventsEmitter;

Expand Down Expand Up @@ -458,5 +477,9 @@ mod source {
fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
emit!(PathGlobbingError { path, error });
}

fn emit_gave_up_on_deleted_file(&self, file: &Path) {
emit!(GaveUpOnDeletedFile { file });
}
}
}
12 changes: 12 additions & 0 deletions src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ pub struct FileConfig {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

/// How long to keep an open handle to a rotated log file around.
#[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
#[configurable(metadata(docs::type_unit = "milliseconds"))]
#[serde(default = "default_rotate_wait_ms")]
pub rotate_wait_ms: Duration,
}

fn default_max_line_bytes() -> usize {
Expand Down Expand Up @@ -268,6 +274,10 @@ fn default_line_delimiter() -> String {
"\n".to_string()
}

const fn default_rotate_wait_ms() -> Duration {
Duration::from_millis(5_000)
}

/// Configuration for how files should be identified.
///
/// This is important for `checkpointing` when file rotation is used.
Expand Down Expand Up @@ -385,6 +395,7 @@ impl Default for FileConfig {
encoding: None,
acknowledgements: Default::default(),
log_namespace: None,
rotate_wait_ms: default_rotate_wait_ms()
}
}
}
Expand Down Expand Up @@ -532,6 +543,7 @@ pub fn file_source(
remove_after: config.remove_after_secs.map(Duration::from_secs),
emitter: FileSourceInternalEventsEmitter,
handle: tokio::runtime::Handle::current(),
rotate_wait: config.rotate_wait_ms
};

let event_metadata = EventMetadata {
Expand Down
15 changes: 15 additions & 0 deletions src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ pub struct Config {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

/// How long to keep an open handle to a rotated log file around.
///
#[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
#[serde(default)]
rotate_wait_ms: Duration
}

const fn default_read_from() -> ReadFromConfig {
Expand Down Expand Up @@ -273,6 +279,7 @@ impl Default for Config {
kube_config_file: None,
delay_deletion_ms: default_delay_deletion_ms(),
log_namespace: None,
rotate_wait_ms: default_rotate_wait_ms()
}
}
}
Expand Down Expand Up @@ -519,6 +526,7 @@ struct Source {
glob_minimum_cooldown: Duration,
ingestion_timestamp_field: Option<OwnedTargetPath>,
delay_deletion: Duration,
rotate_wait: Duration
}

impl Source {
Expand Down Expand Up @@ -595,6 +603,7 @@ impl Source {
glob_minimum_cooldown,
ingestion_timestamp_field,
delay_deletion,
rotate_wait: config.rotate_wait_ms
})
}

Expand Down Expand Up @@ -625,6 +634,7 @@ impl Source {
glob_minimum_cooldown,
ingestion_timestamp_field,
delay_deletion,
rotate_wait
} = self;

let mut reflectors = Vec::new();
Expand Down Expand Up @@ -755,6 +765,7 @@ impl Source {
emitter: FileSourceInternalEventsEmitter,
// A handle to the current tokio runtime
handle: tokio::runtime::Handle::current(),
rotate_wait
};

let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
Expand Down Expand Up @@ -951,6 +962,10 @@ const fn default_delay_deletion_ms() -> Duration {
Duration::from_millis(60_000)
}

const fn default_rotate_wait_ms() -> Duration {
Duration::from_millis(5_000)
}

// This function constructs the patterns we exclude from file watching, created
// from the defaults or user provided configuration.
fn prepare_exclude_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
Expand Down

0 comments on commit 63662e2

Please sign in to comment.