Skip to content

Commit

Permalink
Swap the glob implmenetation at the FileServer with generic PathsProv…
Browse files Browse the repository at this point in the history
…ider

Signed-off-by: MOZGIII <[email protected]>
  • Loading branch information
MOZGIII committed May 5, 2020
1 parent e054de3 commit 9db5557
Showing 1 changed file with 56 additions and 86 deletions.
142 changes: 56 additions & 86 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::{
stream::StreamExt,
Future, Sink,
};
use glob::{glob, Pattern};
use glob::glob;
use indexmap::IndexMap;
use std::collections::{HashMap, HashSet};
use std::fs::{self, File};
Expand All @@ -18,6 +18,7 @@ use tokio::time::delay_for;
use tracing::field;

use crate::metadata_ext::PortableFileExt;
use crate::paths_provider::PathsProvider;

/// `FileServer` is a Source which cooperatively schedules reads over files,
/// converting the lines of said files into `LogLine` structures. As
Expand All @@ -28,9 +29,11 @@ use crate::metadata_ext::PortableFileExt;
/// `FileServer` is configured on a path to watch. The files do _not_ need to
/// exist at startup. `FileServer` will discover new files which match
/// its path in at most 60 seconds.
pub struct FileServer {
pub include: Vec<PathBuf>,
pub exclude: Vec<PathBuf>,
pub struct FileServer<PP>
where
PP: PathsProvider,
{
pub paths_provider: PP,
pub max_read_bytes: usize,
pub start_at_beginning: bool,
pub ignore_before: Option<time::SystemTime>,
Expand All @@ -54,7 +57,10 @@ pub struct FileServer {
///
/// Specific operating systems support evented interfaces that correct this
/// problem but your intrepid authors know of no generic solution.
impl FileServer {
impl<PP> FileServer<PP>
where
PP: PathsProvider,
{
pub fn run(
self,
mut chans: impl Sink<(Bytes, String), Error = ()> + Unpin,
Expand All @@ -71,34 +77,16 @@ impl FileServer {
let mut checkpointer = Checkpointer::new(&self.data_dir);
checkpointer.read_checkpoints(self.ignore_before);

let exclude_patterns = self
.exclude
.iter()
.map(|e| Pattern::new(e.to_str().expect("no ability to glob")).unwrap())
.collect::<Vec<_>>();

let mut known_small_files = HashSet::new();

let mut existing_files = Vec::new();
for include_pattern in &self.include {
for path in glob(include_pattern.to_str().expect("no ability to glob"))
.expect("Failed to read glob pattern")
.filter_map(Result::ok)
{
if exclude_patterns
.iter()
.any(|e| e.matches(path.to_str().unwrap()))
{
continue;
}

if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error(
&path,
&mut fingerprint_buffer,
&mut known_small_files,
) {
existing_files.push((path, file_id));
}
for path in self.paths_provider.paths().into_iter() {
if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error(
&path,
&mut fingerprint_buffer,
&mut known_small_files,
) {
existing_files.push((path, file_id));
}
}

Expand Down Expand Up @@ -145,73 +133,55 @@ impl FileServer {
for (_file_id, watcher) in &mut fp_map {
watcher.set_file_findable(false); // assume not findable until found
}
for include_pattern in &self.include {
for path in glob(include_pattern.to_str().expect("no ability to glob"))
.expect("Failed to read glob pattern")
.filter_map(Result::ok)
{
if exclude_patterns
.iter()
.any(|e| e.matches(path.to_str().unwrap()))
{
continue;
}

if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error(
&path,
&mut fingerprint_buffer,
&mut known_small_files,
) {
if let Some(watcher) = fp_map.get_mut(&file_id) {
// file fingerprint matches a watched file
let was_found_this_cycle = watcher.file_findable();
watcher.set_file_findable(true);
if watcher.path == path {
trace!(
message = "Continue watching file.",
for path in self.paths_provider.paths().into_iter() {
if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error(
&path,
&mut fingerprint_buffer,
&mut known_small_files,
) {
if let Some(watcher) = fp_map.get_mut(&file_id) {
// file fingerprint matches a watched file
let was_found_this_cycle = watcher.file_findable();
watcher.set_file_findable(true);
if watcher.path == path {
trace!(
message = "Continue watching file.",
path = field::debug(&path),
);
} else {
// matches a file with a different path
if !was_found_this_cycle {
info!(
message = "Watched file has been renamed.",
path = field::debug(&path),
old_path = field::debug(&watcher.path)
);
watcher.update_path(path).ok(); // ok if this fails: might fix next cycle
} else {
// matches a file with a different path
if !was_found_this_cycle {
info!(
message = "Watched file has been renamed.",
path = field::debug(&path),
old_path = field::debug(&watcher.path)
);
watcher.update_path(path).ok(); // ok if this fails: might fix next cycle
} else {
info!(
message = "More than one file has same fingerprint.",
path = field::debug(&path),
old_path = field::debug(&watcher.path)
);
let (old_path, new_path) = (&watcher.path, &path);
if let (Ok(old_modified_time), Ok(new_modified_time)) = (
fs::metadata(&old_path).and_then(|m| m.modified()),
fs::metadata(&new_path).and_then(|m| m.modified()),
) {
if old_modified_time < new_modified_time {
info!(
info!(
message = "More than one file has same fingerprint.",
path = field::debug(&path),
old_path = field::debug(&watcher.path)
);
let (old_path, new_path) = (&watcher.path, &path);
if let (Ok(old_modified_time), Ok(new_modified_time)) = (
fs::metadata(&old_path).and_then(|m| m.modified()),
fs::metadata(&new_path).and_then(|m| m.modified()),
) {
if old_modified_time < new_modified_time {
info!(
message = "Switching to watch most recently modified file.",
new_modified_time = field::debug(&new_modified_time),
old_modified_time = field::debug(&old_modified_time),
);
watcher.update_path(path).ok(); // ok if this fails: might fix next cycle
}
watcher.update_path(path).ok(); // ok if this fails: might fix next cycle
}
}
}
} else {
// untracked file fingerprint
self.watch_new_file(
path,
file_id,
&mut fp_map,
&checkpointer,
false,
);
}
} else {
// untracked file fingerprint
self.watch_new_file(path, file_id, &mut fp_map, &checkpointer, false);
}
}
}
Expand Down

0 comments on commit 9db5557

Please sign in to comment.