diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 6ceaf1cfcc7d9b..b984d653c8ae5f 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -7,7 +7,7 @@ use futures::{ stream::StreamExt, Future, Sink, }; -use glob::{glob, Pattern}; +use glob::glob; use indexmap::IndexMap; use std::collections::{HashMap, HashSet}; use std::fs::{self, File}; @@ -18,6 +18,7 @@ use tokio::time::delay_for; use tracing::field; use crate::metadata_ext::PortableFileExt; +use crate::paths_provider::PathsProvider; /// `FileServer` is a Source which cooperatively schedules reads over files, /// converting the lines of said files into `LogLine` structures. As @@ -28,9 +29,11 @@ use crate::metadata_ext::PortableFileExt; /// `FileServer` is configured on a path to watch. The files do _not_ need to /// exist at startup. `FileServer` will discover new files which match /// its path in at most 60 seconds. -pub struct FileServer { - pub include: Vec, - pub exclude: Vec, +pub struct FileServer +where + PP: PathsProvider, +{ + pub paths_provider: PP, pub max_read_bytes: usize, pub start_at_beginning: bool, pub ignore_before: Option, @@ -54,7 +57,10 @@ pub struct FileServer { /// /// Specific operating systems support evented interfaces that correct this /// problem but your intrepid authors know of no generic solution. -impl FileServer { +impl FileServer +where + PP: PathsProvider, +{ pub fn run( self, mut chans: impl Sink<(Bytes, String), Error = ()> + Unpin, @@ -71,34 +77,16 @@ impl FileServer { let mut checkpointer = Checkpointer::new(&self.data_dir); checkpointer.read_checkpoints(self.ignore_before); - let exclude_patterns = self - .exclude - .iter() - .map(|e| Pattern::new(e.to_str().expect("no ability to glob")).unwrap()) - .collect::>(); - let mut known_small_files = HashSet::new(); let mut existing_files = Vec::new(); - for include_pattern in &self.include { - for path in glob(include_pattern.to_str().expect("no ability to glob")) - .expect("Failed to read glob pattern") - .filter_map(Result::ok) - { - if exclude_patterns - .iter() - .any(|e| e.matches(path.to_str().unwrap())) - { - continue; - } - - if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error( - &path, - &mut fingerprint_buffer, - &mut known_small_files, - ) { - existing_files.push((path, file_id)); - } + for path in self.paths_provider.paths().into_iter() { + if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error( + &path, + &mut fingerprint_buffer, + &mut known_small_files, + ) { + existing_files.push((path, file_id)); } } @@ -145,73 +133,55 @@ impl FileServer { for (_file_id, watcher) in &mut fp_map { watcher.set_file_findable(false); // assume not findable until found } - for include_pattern in &self.include { - for path in glob(include_pattern.to_str().expect("no ability to glob")) - .expect("Failed to read glob pattern") - .filter_map(Result::ok) - { - if exclude_patterns - .iter() - .any(|e| e.matches(path.to_str().unwrap())) - { - continue; - } - - if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error( - &path, - &mut fingerprint_buffer, - &mut known_small_files, - ) { - if let Some(watcher) = fp_map.get_mut(&file_id) { - // file fingerprint matches a watched file - let was_found_this_cycle = watcher.file_findable(); - watcher.set_file_findable(true); - if watcher.path == path { - trace!( - message = "Continue watching file.", + for path in self.paths_provider.paths().into_iter() { + if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error( + &path, + &mut fingerprint_buffer, + &mut known_small_files, + ) { + if let Some(watcher) = fp_map.get_mut(&file_id) { + // file fingerprint matches a watched file + let was_found_this_cycle = watcher.file_findable(); + watcher.set_file_findable(true); + if watcher.path == path { + trace!( + message = "Continue watching file.", + path = field::debug(&path), + ); + } else { + // matches a file with a different path + if !was_found_this_cycle { + info!( + message = "Watched file has been renamed.", path = field::debug(&path), + old_path = field::debug(&watcher.path) ); + watcher.update_path(path).ok(); // ok if this fails: might fix next cycle } else { - // matches a file with a different path - if !was_found_this_cycle { - info!( - message = "Watched file has been renamed.", - path = field::debug(&path), - old_path = field::debug(&watcher.path) - ); - watcher.update_path(path).ok(); // ok if this fails: might fix next cycle - } else { - info!( - message = "More than one file has same fingerprint.", - path = field::debug(&path), - old_path = field::debug(&watcher.path) - ); - let (old_path, new_path) = (&watcher.path, &path); - if let (Ok(old_modified_time), Ok(new_modified_time)) = ( - fs::metadata(&old_path).and_then(|m| m.modified()), - fs::metadata(&new_path).and_then(|m| m.modified()), - ) { - if old_modified_time < new_modified_time { - info!( + info!( + message = "More than one file has same fingerprint.", + path = field::debug(&path), + old_path = field::debug(&watcher.path) + ); + let (old_path, new_path) = (&watcher.path, &path); + if let (Ok(old_modified_time), Ok(new_modified_time)) = ( + fs::metadata(&old_path).and_then(|m| m.modified()), + fs::metadata(&new_path).and_then(|m| m.modified()), + ) { + if old_modified_time < new_modified_time { + info!( message = "Switching to watch most recently modified file.", new_modified_time = field::debug(&new_modified_time), old_modified_time = field::debug(&old_modified_time), ); - watcher.update_path(path).ok(); // ok if this fails: might fix next cycle - } + watcher.update_path(path).ok(); // ok if this fails: might fix next cycle } } } - } else { - // untracked file fingerprint - self.watch_new_file( - path, - file_id, - &mut fp_map, - &checkpointer, - false, - ); } + } else { + // untracked file fingerprint + self.watch_new_file(path, file_id, &mut fp_map, &checkpointer, false); } } }