diff --git a/benches/walk_benchmark.rs b/benches/walk_benchmark.rs index 818c82b..80e8092 100644 --- a/benches/walk_benchmark.rs +++ b/benches/walk_benchmark.rs @@ -36,7 +36,7 @@ fn checkout_linux_if_needed() { fn walk_benches(c: &mut Criterion) { checkout_linux_if_needed(); - c.bench_function("walkdir::WalkDir", move |b| { + /*c.bench_function("walkdir::WalkDir", move |b| { b.iter(|| for _ in walkdir::WalkDir::new(linux_dir()) {}) }); @@ -56,12 +56,13 @@ fn walk_benches(c: &mut Criterion) { .build_parallel() .run(move || Box::new(move |_| ignore::WalkState::Continue)); }) - }); + });*/ c.bench_function("jwalk::WalkDir", |b| { b.iter(|| for _ in WalkDir::new(linux_dir()) {}) }); + /* c.bench_function("jwalk::WalkDir_1", |b| { b.iter(|| for _ in WalkDir::new(linux_dir()).into_iter().take(1) {}) }); @@ -86,7 +87,7 @@ fn walk_benches(c: &mut Criterion) { for _each_entry in each_dir_contents.contents.iter() {} } }) - }); + });*/ } criterion_group! { diff --git a/src/core/delegate.rs b/src/core/delegate.rs new file mode 100644 index 0000000..5d8f148 --- /dev/null +++ b/src/core/delegate.rs @@ -0,0 +1,99 @@ +use rayon::prelude::*; +use std::fs::{self, FileType}; +use std::io::{Error, Result}; +use std::path::{Path, PathBuf}; + +//use super::WorkContext; +use crate::core::DirEntry; + +pub struct WorkContext +where + D: Delegate, +{ + items: Vec>, +} + +enum WorkContextItem +where + D: Delegate, +{ + Item(D::Item), + Work(D::Work), +} + +impl WorkContext +where + D: Delegate, +{ + fn send_item(&mut self, item: D::Item) { + self.items.push(WorkContextItem::Item(item)); + } + fn schedule_work(&mut self, work: D::Work) { + self.items.push(WorkContextItem::Work(work)); + } +} + +pub trait Delegate: Clone + Send { + type State; // don't need... include state in "work" + type Item; + type Work; + + fn process_work(&self, work: Self::Work, context: &mut WorkContext) -> Result<()>; + + fn handle_error(&self, path: &Path, error: &Error) -> bool; + fn process_entries( + &self, + path: &Path, + state: Self::State, + entries: Vec, + ) -> (Self::State, Vec); +} + +#[derive(Clone)] +pub struct DefaultDelegate {} + +impl Delegate for DefaultDelegate { + type State = usize; + type Item = PathBuf; + type Work = PathBuf; + + fn process_work(&self, work: Self::Work, context: &mut WorkContext) -> Result<()> { + fs::read_dir(&work)?.for_each(|entry_result| { + let entry = match entry_result { + Ok(entry) => entry, + Err(_) => { + return; + } + }; + + let file_type = match entry.file_type() { + Ok(file_type) => file_type, + Err(_) => { + return; + } + }; + + context.send_item(entry.path()); + + if file_type.is_dir() { + context.schedule_work(entry.path()); + } + }); + + Ok(()) + } + + fn handle_error(&self, path: &Path, error: &Error) -> bool { + eprintln!("{} {}", path.display(), error); + true + } + fn process_entries( + &self, + _path: &Path, + state: Self::State, + mut entries: Vec, + ) -> (Self::State, Vec) { + entries.par_sort_by(|a, b| a.file_name().cmp(b.file_name())); + (state, entries) + } +} diff --git a/src/core/mod.rs b/src/core/mod.rs index d29127c..358e308 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,41 +1,45 @@ -/*! Provides a more flexible walk function suitable for arbitrary sorting and filtering. +/*! A flexible walk function suitable for arbitrary sorting/filtering. # Example Recursively iterate over the "foo" directory and print each entry's path: ```no_run -use jwalk::core::walk; +use jwalk::core::{walk, Delegate, DirEntry}; +use std::io::Error; +use std::path::Path; # fn main() { -let dir_list_iter = walk( - // Directory to walk - "foo", - // Initial state value (unused in this example). - 0, - // Sort, filter, maintain per directory state. - |path, state, mut entries| { +#[derive(Clone)] +pub struct MyDelegate {} + +impl Delegate for MyDelegate { + type State = usize; + fn handle_error(&self, path: &Path, error: &Error) -> bool { + eprintln!("{} {}", path.display(), error); + true + } + fn process_entries(&self, path: &Path, state: Self::State, mut entries: Vec) -> (Self::State, Vec) { entries.sort_by(|a, b| a.file_name().cmp(b.file_name())); (state, entries) - }, - // Continue walk on any error - |path, error| true, -); - -for mut each_dir_list in dir_list_iter { - for each_entry in each_dir_list.contents.iter() { - each_dir_list.path.push(each_entry.file_name()); - println!("{}", each_dir_list.path.display()); - each_dir_list.path.pop(); + } +} + +for mut dir_list in walk("foo", None, MyDelegate {}) { + for entry in dir_list.contents.iter() { + dir_list.path.push(entry.file_name()); + println!("{}", dir_list.path.display()); + dir_list.path.pop(); } } # } ``` */ +mod delegate; mod index_path; mod results_queue; mod work_queue; - + use crossbeam::channel::SendError; use rayon::iter::ParallelBridge; use rayon::prelude::*; @@ -51,6 +55,7 @@ use index_path::*; use results_queue::*; use work_queue::*; +pub use delegate::{DefaultDelegate, Delegate}; pub use results_queue::ResultsQueueIter; /// Recursively walk the given path. @@ -70,18 +75,12 @@ pub use results_queue::ResultsQueueIter; /// filter entries based on that state. And then that cloned state is later /// passed in when processing child `DirList`s. /// -/// The returned iterator yields on `DirList` at a time. -pub fn walk( - path: P, - state: S, - process_entries: F, - handle_error: H, -) -> ResultsQueueIter +/// Returns iterator of `DirList`s. +pub fn walk(path: P, state: Option, delegate: D) -> ResultsQueueIter where P: AsRef, - S: Send + Clone + 'static, - F: Fn(&Path, S, Vec) -> (S, Vec) + Send + Sync + 'static, - H: Fn(&Path, &Error) -> bool + Send + Sync + 'static, + D: Delegate + 'static, + D::State: Clone + Send + Default, { let (results_queue, results_iterator) = new_results_queue(); let path = path.as_ref().to_owned(); @@ -89,18 +88,17 @@ where rayon::spawn(move || { let (work_queue, work_iterator) = new_work_queue(); - let work_context = ReadDirWorkContext { + let work_context = WorkContext { + delegate, work_queue, results_queue, - handle_error: Arc::new(handle_error), - process_entries: Arc::new(process_entries), }; work_context - .push_work(ReadDirWork::new( + .push_work(Work::new( path.to_path_buf(), IndexPath::with_vec(vec![0]), - state, + state.unwrap_or(D::State::default()), )) .unwrap(); @@ -120,8 +118,11 @@ pub struct DirEntry { pub(crate) has_read_dir: bool, } -pub struct DirList { - pub state: S, +pub struct DirList +where + D: Delegate, +{ + pub state: D::State, pub path: PathBuf, pub index_path: IndexPath, pub contents: Vec, @@ -129,43 +130,50 @@ pub struct DirList { pub(crate) scheduled_read_dirs: usize, } -pub struct DirListIter { - pub state: S, +pub struct DirListIter +where + D: Delegate, +{ + pub state: D::State, pub path: Arc, pub contents: vec::IntoIter, } -pub(crate) struct ReadDirWork { - dir_state: S, - dir_path: PathBuf, - dir_index_path: IndexPath, +pub(crate) struct Work +where + D: Delegate, +{ + state: D::State, + path: PathBuf, + index_path: IndexPath, } #[derive(Clone)] -struct ReadDirWorkContext +struct WorkContext where - S: Clone, + D: Delegate, + D::State: Clone + Send, { - work_queue: WorkQueue, - results_queue: ResultsQueue, - handle_error: Arc bool + Send + Sync + 'static>, - process_entries: Arc) -> (S, Vec) + Send + Sync + 'static>, + delegate: D, + work_queue: WorkQueue, + results_queue: ResultsQueue, } -fn process_work(work: ReadDirWork, work_context: &ReadDirWorkContext) +fn process_work(work: Work, work_context: &WorkContext) where - S: Clone, + D: Delegate + Clone, + D::State: Clone + Send, { - let mut read_dir_value = work.read_value(work_context); - let generated_read_dir_works = read_dir_value.generate_read_dir_works(); + let mut dir_list = work.read_dir_list(work_context); + let new_work = dir_list.new_work(); - if work_context.push_result(read_dir_value).is_err() { + if work_context.push_result(dir_list).is_err() { work_context.stop_now(); work_context.completed_work(); return; } - for each in generated_read_dir_works { + for each in new_work { if work_context.push_work(each).is_err() { work_context.stop_now(); return; @@ -193,17 +201,18 @@ impl DirEntry { } } -impl DirList +impl DirList where - S: Clone, + D: Delegate, + D::State: Clone + Send, { pub fn depth(&self) -> usize { self.index_path.len() } - fn generate_read_dir_works(&mut self) -> Vec> { + fn new_work(&mut self) -> Vec> { let mut dir_index = 0; - let read_dir_works: Vec<_> = self + let new_work: Vec<_> = self .contents .iter() .filter_map(|each| { @@ -213,31 +222,27 @@ where work_path.push(each.file_name()); work_index_path.push(dir_index); dir_index += 1; - Some(ReadDirWork::new( - work_path, - work_index_path, - self.state.clone(), - )) + Some(Work::new(work_path, work_index_path, self.state.clone())) } else { None } }) .collect(); - self.scheduled_read_dirs = read_dir_works.len(); + self.scheduled_read_dirs = new_work.len(); - read_dir_works + new_work } } -impl IntoIterator for DirList +impl IntoIterator for DirList where - S: Default, + D: Delegate, { type Item = DirEntry; - type IntoIter = DirListIter; + type IntoIter = DirListIter; - fn into_iter(self) -> DirListIter { + fn into_iter(self) -> DirListIter { DirListIter { state: self.state, path: Arc::new(self.path), @@ -246,62 +251,75 @@ where } } -impl PartialEq for DirList { +impl PartialEq for DirList +where + D: Delegate, +{ fn eq(&self, o: &Self) -> bool { self.index_path.eq(&o.index_path) } } -impl Eq for DirList {} +impl Eq for DirList where D: Delegate {} -impl PartialOrd for DirList { +impl PartialOrd for DirList +where + D: Delegate, +{ fn partial_cmp(&self, o: &Self) -> Option { o.index_path.partial_cmp(&self.index_path) } } -impl Ord for DirList { +impl Ord for DirList +where + D: Delegate, +{ fn cmp(&self, o: &Self) -> Ordering { o.index_path.cmp(&self.index_path) } } -impl Iterator for DirListIter { +impl Iterator for DirListIter +where + D: Delegate, +{ type Item = DirEntry; fn next(&mut self) -> Option { self.contents.next() } } -impl ReadDirWork +impl Work where - S: Clone, + D: Delegate, + D::State: Clone + Send, { - fn new(dir_path: PathBuf, dir_index_path: IndexPath, dir_state: S) -> ReadDirWork { - ReadDirWork { - dir_path, - dir_index_path, - dir_state, + fn new(path: PathBuf, index_path: IndexPath, state: D::State) -> Work { + Work { + state, + path, + index_path, } } - fn read_value(self, work_context: &ReadDirWorkContext) -> DirList { - let ReadDirWork { - dir_path, - dir_index_path, - dir_state, + fn read_dir_list(self, work_context: &WorkContext) -> DirList { + let Work { + path, + index_path, + state, } = self; - let read_dir = match fs::read_dir(&dir_path) { + let read_dir = match fs::read_dir(&path) { Ok(read_dir) => read_dir, Err(err) => { - if !work_context.handle_error(&dir_path, &err) { + if !work_context.delegate.handle_error(&path, &err) { work_context.stop_now(); } return DirList { - state: dir_state.clone(), - path: dir_path, - index_path: dir_index_path, + state: state.clone(), + path: path, + index_path: index_path, contents: Vec::new(), contents_error: Some(err), scheduled_read_dirs: 0, @@ -309,37 +327,38 @@ where } }; - let (dir_state, dir_entries) = (work_context.process_entries)( - &dir_path, - dir_state, - map_entries(&dir_path, read_dir, work_context), + let (state, entries) = work_context.delegate.process_entries( + &path, + state, + map_entries(&path, read_dir, work_context), ); DirList { - state: dir_state.clone(), - path: dir_path, - index_path: dir_index_path, - contents: dir_entries, + state: state, + path: path, + index_path: index_path, + contents: entries, contents_error: None, scheduled_read_dirs: 0, } } } -fn map_entries( +fn map_entries( dir_path: &Path, read_dir: fs::ReadDir, - work_context: &ReadDirWorkContext, + work_context: &WorkContext, ) -> Vec where - S: Clone, + D: Delegate, + D::State: Clone + Send, { read_dir .filter_map(|entry_result| { let entry = match entry_result { Ok(entry) => entry, Err(err) => { - if !work_context.handle_error(&dir_path, &err) { + if !work_context.delegate.handle_error(&dir_path, &err) { work_context.stop_now(); } return None; @@ -349,7 +368,7 @@ where let file_type = match entry.file_type() { Ok(file_type) => file_type, Err(err) => { - if !work_context.handle_error(&entry.path(), &err) { + if !work_context.delegate.handle_error(&entry.path(), &err) { work_context.stop_now(); } return None; @@ -361,19 +380,20 @@ where .collect() } -impl ReadDirWorkContext +impl WorkContext where - S: Clone, + D: Delegate, + D::State: Clone + Send, { - fn handle_error(&self, path: &Path, error: &Error) -> bool { - (self.handle_error)(path, error) - } + //fn handle_error(&self, path: &Path, error: &Error) -> bool { + // (self.handle_error)(path, error) + //} fn stop_now(&self) { self.work_queue.stop_now() } - fn push_work(&self, work: ReadDirWork) -> Result<(), SendError>> { + fn push_work(&self, work: Work) -> Result<(), SendError>> { self.work_queue.push(work) } @@ -381,27 +401,36 @@ where self.work_queue.completed_work() } - fn push_result(&self, result: DirList) -> Result<(), SendError>> { + fn push_result(&self, result: DirList) -> Result<(), SendError>> { self.results_queue.push(result) } } -impl PartialEq for ReadDirWork { +impl PartialEq for Work +where + D: Delegate, +{ fn eq(&self, o: &Self) -> bool { - self.dir_index_path.eq(&o.dir_index_path) + self.index_path.eq(&o.index_path) } } -impl Eq for ReadDirWork {} +impl Eq for Work where D: Delegate {} -impl PartialOrd for ReadDirWork { +impl PartialOrd for Work +where + D: Delegate, +{ fn partial_cmp(&self, o: &Self) -> Option { - o.dir_index_path.partial_cmp(&self.dir_index_path) + o.index_path.partial_cmp(&self.index_path) } } -impl Ord for ReadDirWork { +impl Ord for Work +where + D: Delegate, +{ fn cmp(&self, o: &Self) -> Ordering { - o.dir_index_path.cmp(&self.dir_index_path) + o.index_path.cmp(&self.index_path) } } diff --git a/src/core/results_queue.rs b/src/core/results_queue.rs index 4364a7f..ab9743c 100644 --- a/src/core/results_queue.rs +++ b/src/core/results_queue.rs @@ -3,33 +3,41 @@ use std::collections::BinaryHeap; use std::marker::PhantomData; use std::thread; +use super::Delegate; use super::DirList; use super::IndexPath; #[derive(Clone)] -pub struct ResultsQueue +pub struct ResultsQueue where - S: Clone, + D: Delegate, { - sender: Sender>, + sender: Sender>, } -pub struct ResultsQueueIter { - next_matcher: NextResultMatcher, - receiver: Receiver>, - receive_buffer: BinaryHeap>, +pub struct ResultsQueueIter +where + D: Delegate, +{ + next_matcher: NextResultMatcher, + receiver: Receiver>, + receive_buffer: BinaryHeap>, } -struct NextResultMatcher { +struct NextResultMatcher +where + D: Delegate, +{ looking_for_index_path: IndexPath, remaining_read_dirs: Vec, - phantom: PhantomData, + phantom: PhantomData, } -pub fn new_results_queue() -> (ResultsQueue, ResultsQueueIter) +pub fn new_results_queue() -> (ResultsQueue, ResultsQueueIter) where - S: Clone, + D: Delegate, { + //let (sender, receiver) = channel::bounded(100); let (sender, receiver) = channel::unbounded(); ( ResultsQueue { sender }, @@ -41,18 +49,21 @@ where ) } -impl ResultsQueue +impl ResultsQueue where - S: Clone, + D: Delegate, { - pub fn push(&self, dent: DirList) -> std::result::Result<(), SendError>> { + pub fn push(&self, dent: DirList) -> std::result::Result<(), SendError>> { self.sender.send(dent) } } -impl Iterator for ResultsQueueIter { - type Item = DirList; - fn next(&mut self) -> Option> { +impl Iterator for ResultsQueueIter +where + D: Delegate, +{ + type Item = DirList; + fn next(&mut self) -> Option> { let looking_for = &self.next_matcher.looking_for_index_path; loop { let top_dir_list = self.receive_buffer.peek(); @@ -86,7 +97,10 @@ impl Iterator for ResultsQueueIter { } } -impl NextResultMatcher { +impl NextResultMatcher +where + D: Delegate, +{ fn is_none(&self) -> bool { self.looking_for_index_path.is_empty() } @@ -95,7 +109,7 @@ impl NextResultMatcher { *self.remaining_read_dirs.last_mut().unwrap() -= 1; } - fn increment_past(&mut self, dir_list: &DirList) { + fn increment_past(&mut self, dir_list: &DirList) { self.decrement_remaining_read_dirs_at_this_level(); if dir_list.scheduled_read_dirs > 0 { @@ -120,8 +134,11 @@ impl NextResultMatcher { } } -impl Default for NextResultMatcher { - fn default() -> NextResultMatcher { +impl Default for NextResultMatcher +where + D: Delegate, +{ + fn default() -> NextResultMatcher { NextResultMatcher { looking_for_index_path: IndexPath::with_vec(vec![0]), remaining_read_dirs: vec![1], diff --git a/src/core/work_queue.rs b/src/core/work_queue.rs index 6cee85c..ca71da3 100644 --- a/src/core/work_queue.rs +++ b/src/core/work_queue.rs @@ -4,23 +4,33 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; use std::sync::Arc; use std::thread; -use super::ReadDirWork; +use super::Delegate; +use super::Work; #[derive(Clone)] -pub(crate) struct WorkQueue { - sender: Sender>, +pub(crate) struct WorkQueue +where + D: Delegate, +{ + sender: Sender>, work_count: Arc, stop_now: Arc, } -pub(crate) struct WorkQueueIter { - receiver: Receiver>, - receive_buffer: BinaryHeap>, +pub(crate) struct WorkQueueIter +where + D: Delegate, +{ + receiver: Receiver>, + receive_buffer: BinaryHeap>, work_count: Arc, stop_now: Arc, } -pub(crate) fn new_work_queue() -> (WorkQueue, WorkQueueIter) { +pub(crate) fn new_work_queue() -> (WorkQueue, WorkQueueIter) +where + D: Delegate, +{ let work_count = Arc::new(AtomicUsize::new(0)); let stop_now = Arc::new(AtomicBool::new(false)); let (sender, receiver) = channel::unbounded(); @@ -39,8 +49,11 @@ pub(crate) fn new_work_queue() -> (WorkQueue, WorkQueueIter) { ) } -impl WorkQueue { - pub fn push(&self, work: ReadDirWork) -> std::result::Result<(), SendError>> { +impl WorkQueue +where + D: Delegate, +{ + pub fn push(&self, work: Work) -> std::result::Result<(), SendError>> { self.work_count.fetch_add(1, AtomicOrdering::SeqCst); self.sender.send(work) } @@ -54,7 +67,10 @@ impl WorkQueue { } } -impl WorkQueueIter { +impl WorkQueueIter +where + D: Delegate, +{ fn work_count(&self) -> usize { self.work_count.load(AtomicOrdering::SeqCst) } @@ -64,9 +80,12 @@ impl WorkQueueIter { } } -impl Iterator for WorkQueueIter { - type Item = ReadDirWork; - fn next(&mut self) -> Option> { +impl Iterator for WorkQueueIter +where + D: Delegate, +{ + type Item = Work; + fn next(&mut self) -> Option> { loop { if self.is_stop_now() { return None; diff --git a/src/lib.rs b/src/lib.rs index 0f16197..08adc37 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,7 @@ for entry in WalkDir::new("foo") { */ mod walk; +mod work_tree; pub mod core; diff --git a/src/walk.rs b/src/walk.rs index 728a27a..e821404 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -4,7 +4,7 @@ use std::fs::FileType; use std::path::{Path, PathBuf}; use std::sync::Arc; -use crate::core::{self, DirListIter, ResultsQueueIter}; +use crate::core::{self, DefaultDelegate, DirListIter, ResultsQueueIter}; pub struct WalkDir { root: PathBuf, @@ -37,15 +37,7 @@ impl IntoIterator for WalkDir { type IntoIter = WalkDirIter; fn into_iter(self) -> WalkDirIter { - let mut results_queue_iter = core::walk( - &self.root, - 0, - |_path, state, mut entries| { - entries.par_sort_by(|a, b| a.file_name().cmp(b.file_name())); - (state, entries) - }, - |_path, _error| true, - ); + let mut results_queue_iter = core::walk(&self.root, None, DefaultDelegate {}); let mut dir_list_stack = Vec::new(); if let Some(root_dir_list) = results_queue_iter.next() { @@ -60,8 +52,8 @@ impl IntoIterator for WalkDir { } pub struct WalkDirIter { - results_queue_iter: ResultsQueueIter, - dir_list_stack: Vec>, + results_queue_iter: ResultsQueueIter, + dir_list_stack: Vec>, } impl Iterator for WalkDirIter { @@ -115,17 +107,14 @@ mod tests { #[test] fn test_walk() { - for each in WalkDir::new(test_dir()).into_iter() { - println!("{:?}", each.file_name); + for each in WalkDir::new(linux_dir()).into_iter() { + println!("{}", each.path().display()); } } #[test] fn test_walk_1() { for _ in WalkDir::new(linux_dir()).into_iter().take(1) {} - for _ in 0..10 { - thread::yield_now(); - } } } diff --git a/src/work_tree/index_path.rs b/src/work_tree/index_path.rs new file mode 100644 index 0000000..d6e6cdc --- /dev/null +++ b/src/work_tree/index_path.rs @@ -0,0 +1,50 @@ +use std::cmp::Ordering; + +#[derive(Clone, Debug)] +pub struct IndexPath(pub Vec); + +impl IndexPath { + pub fn new(vec: Vec) -> IndexPath { + IndexPath(vec) + } + + pub fn push(&mut self, index: usize) { + self.0.push(index); + } + + pub fn increment_last(&mut self) { + *self.0.last_mut().unwrap() += 1; + } + + pub fn pop(&mut self) -> Option { + self.0.pop() + } + + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl PartialEq for IndexPath { + fn eq(&self, o: &Self) -> bool { + self.0.eq(&o.0) + } +} + +impl Eq for IndexPath {} + +impl PartialOrd for IndexPath { + fn partial_cmp(&self, o: &Self) -> Option { + self.0.partial_cmp(&o.0) + } +} + +impl Ord for IndexPath { + fn cmp(&self, o: &Self) -> Ordering { + self.0.cmp(&o.0) + } +} diff --git a/src/work_tree/mod.rs b/src/work_tree/mod.rs new file mode 100644 index 0000000..992fa82 --- /dev/null +++ b/src/work_tree/mod.rs @@ -0,0 +1,128 @@ +#![allow(dead_code)] +#![allow(unused_imports)] +#![allow(unused_variables)] + +mod index_path; +mod ordered; +mod ordered_queue; +mod work_queue; +mod work_results; +mod work_results_queue; + +use crossbeam::channel::{self, Receiver, SendError, Sender}; +use rayon::iter::ParallelBridge; +use rayon::prelude::*; +use std::fs::{self, FileType}; +use std::io::Error; +use std::path::{Path, PathBuf}; + +use index_path::*; +use ordered::*; +use ordered_queue::*; +use work_queue::*; +use work_results::*; +use work_results_queue::*; + +pub trait Delegate: Clone + Send +where + Self::Item: Send, + Self::Work: Send, +{ + type Item; + type Work; + fn perform_work(&self, work: Self::Work, work_context: &WorkContext); +} + +pub enum WorkResult +where + D: Delegate, +{ + Item(D::Item), + Work(D::Work), +} + +pub fn process(work: Vec, delegate: D) -> WorkResultsQueueIter +where + D: Delegate + 'static, + D::Work: Clone, // WHY iS THIS NEEDED? Work shouldn't be cloned +{ + let (work_results_queue, work_results_iterator) = new_work_results_queue(); + + rayon::spawn(move || { + let (work_queue, work_iterator) = new_ordered_queue(); + + work.into_iter().enumerate().for_each(|(i, work)| { + work_queue + .push(Ordered::new(work, IndexPath::new(vec![i]))) + .unwrap(); + }); + + let work_context = WorkContext { + work_results_queue, + work_queue, + }; + + work_iterator.par_bridge().for_each_with( + (delegate, work_context), + |(delegate, work_context), work| { + perform_work(delegate, work, work_context); + }, + ); + }); + + work_results_iterator +} + +fn perform_work(delegate: &D, orderd_work: Ordered, work_context: &mut WorkContext) +where + D: Delegate, +{ + let Ordered { value, index_path } = orderd_work; + + //Goal is that when delegate peroforms work it schedules it into the work context in order. + //this means index_path nees to be associated with work_context so that push_work nad push_result + //can get automatically assigned index paths + + delegate.perform_work(value, work_context); + + /* + let mut dir_list = work.read_dir_list(work_context); + let new_work = dir_list.new_work(); + + if work_context.push_result(dir_list).is_err() { + work_context.stop_now(); + work_context.completed_work(); + return; + } + + for each in new_work { + if work_context.push_work(each).is_err() { + work_context.stop_now(); + return; + } + }*/ + + work_context.work_queue.complete_item() +} + +#[derive(Clone)] +pub struct WorkContext +where + D: Delegate, +{ + work_results_queue: WorkResultsQueue, + work_queue: OrderedQueue, +} + +impl WorkContext +where + D: Delegate, +{ + //fn push_work(&self, work: D::Work, index_path: IndexPath) -> Result<(), SendError> { + //self.work_queue.push(work, index_path) + //} + + fn push_result(&self, result: WorkResults) -> Result<(), SendError>> { + self.work_results_queue.push(result) + } +} diff --git a/src/work_tree/ordered.rs b/src/work_tree/ordered.rs new file mode 100644 index 0000000..9c6c4ee --- /dev/null +++ b/src/work_tree/ordered.rs @@ -0,0 +1,34 @@ +use std::cmp::Ordering; + +use super::IndexPath; + +pub struct Ordered { + pub value: T, + pub index_path: IndexPath, +} + +impl Ordered { + pub fn new(value: T, index_path: IndexPath) -> Ordered { + Ordered { value, index_path } + } +} + +impl PartialEq for Ordered { + fn eq(&self, o: &Self) -> bool { + self.index_path.eq(&o.index_path) + } +} + +impl Eq for Ordered {} + +impl PartialOrd for Ordered { + fn partial_cmp(&self, o: &Self) -> Option { + self.index_path.partial_cmp(&o.index_path) + } +} + +impl Ord for Ordered { + fn cmp(&self, o: &Self) -> Ordering { + self.index_path.cmp(&o.index_path) + } +} diff --git a/src/work_tree/ordered_queue.rs b/src/work_tree/ordered_queue.rs new file mode 100644 index 0000000..e919689 --- /dev/null +++ b/src/work_tree/ordered_queue.rs @@ -0,0 +1,109 @@ +use crossbeam::channel::{self, Receiver, SendError, Sender}; +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; +use std::sync::Arc; +use std::thread; + +use super::*; + +#[derive(Clone)] +pub(crate) struct OrderedQueue +where + T: Send, +{ + sender: Sender>, + pending_count: Arc, + stop: Arc, +} + +pub(crate) struct OrderedQueueIter +where + T: Send, +{ + receiver: Receiver>, + receive_buffer: BinaryHeap>, + pending_count: Arc, + stop: Arc, +} + +pub(crate) fn new_ordered_queue() -> (OrderedQueue, OrderedQueueIter) +where + T: Send, +{ + let pending_count = Arc::new(AtomicUsize::new(0)); + let stop = Arc::new(AtomicBool::new(false)); + let (sender, receiver) = channel::unbounded(); + ( + OrderedQueue { + sender, + pending_count: pending_count.clone(), + stop: stop.clone(), + }, + OrderedQueueIter { + receiver, + receive_buffer: BinaryHeap::new(), + pending_count: pending_count.clone(), + stop: stop.clone(), + }, + ) +} + +impl OrderedQueue +where + T: Send, +{ + pub fn push(&self, ordered: Ordered) -> std::result::Result<(), SendError>> { + self.pending_count.fetch_add(1, AtomicOrdering::SeqCst); + self.sender.send(ordered) + } + + pub fn complete_item(&self) { + self.pending_count.fetch_sub(1, AtomicOrdering::SeqCst); + } + + pub fn stop(&self) { + self.stop.store(true, AtomicOrdering::SeqCst); + } +} + +impl OrderedQueueIter +where + T: Send, +{ + fn pending_count(&self) -> usize { + self.pending_count.load(AtomicOrdering::SeqCst) + } + + fn is_stop(&self) -> bool { + self.stop.load(AtomicOrdering::SeqCst) + } +} + +impl Iterator for OrderedQueueIter +where + T: Send, +{ + type Item = Ordered; + fn next(&mut self) -> Option> { + loop { + if self.is_stop() { + return None; + } + + while let Ok(ordered_work) = self.receiver.try_recv() { + self.receive_buffer.push(ordered_work) + } + + if let Some(ordered_work) = self.receive_buffer.pop() { + return Some(ordered_work); + } else { + if self.pending_count() == 0 { + return None; + } else { + thread::yield_now(); + } + } + } + } +} diff --git a/src/work_tree/work_queue.rs b/src/work_tree/work_queue.rs new file mode 100644 index 0000000..59859f2 --- /dev/null +++ b/src/work_tree/work_queue.rs @@ -0,0 +1,154 @@ +use crossbeam::channel::{self, Receiver, SendError, Sender}; +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; +use std::sync::Arc; +use std::thread; + +use super::*; + +#[derive(Clone)] +pub(crate) struct WorkQueue +where + D: Delegate, +{ + sender: Sender>, + work_count: Arc, + stop_now: Arc, +} + +struct OrderedWork +where + D: Delegate, +{ + index_path: IndexPath, + work: D::Work, +} + +pub(crate) struct WorkQueueIter +where + D: Delegate, +{ + receiver: Receiver>, + receive_buffer: BinaryHeap>, + work_count: Arc, + stop_now: Arc, +} + +pub(crate) fn new_work_queue() -> (WorkQueue, WorkQueueIter) +where + D: Delegate, +{ + let work_count = Arc::new(AtomicUsize::new(0)); + let stop_now = Arc::new(AtomicBool::new(false)); + let (sender, receiver) = channel::unbounded(); + ( + WorkQueue { + sender, + work_count: work_count.clone(), + stop_now: stop_now.clone(), + }, + WorkQueueIter { + receiver, + receive_buffer: BinaryHeap::new(), + work_count: work_count.clone(), + stop_now: stop_now.clone(), + }, + ) +} + +impl WorkQueue +where + D: Delegate, +{ + pub fn push( + &self, + work: D::Work, + index_path: IndexPath, + ) -> std::result::Result<(), SendError> { + self.work_count.fetch_add(1, AtomicOrdering::SeqCst); + if let Err(err) = self.sender.send(OrderedWork { work, index_path }) { + Err(SendError(err.0.work)) + } else { + Ok(()) + } + } + + pub fn completed_work(&self) { + self.work_count.fetch_sub(1, AtomicOrdering::SeqCst); + } + + pub fn stop_now(&self) { + self.stop_now.store(true, AtomicOrdering::SeqCst); + } +} + +impl WorkQueueIter +where + D: Delegate, +{ + fn work_count(&self) -> usize { + self.work_count.load(AtomicOrdering::SeqCst) + } + + fn is_stop_now(&self) -> bool { + self.stop_now.load(AtomicOrdering::SeqCst) + } +} + +impl Iterator for WorkQueueIter +where + D: Delegate, +{ + type Item = D::Work; + fn next(&mut self) -> Option { + loop { + if self.is_stop_now() { + return None; + } + + while let Ok(ordered_work) = self.receiver.try_recv() { + self.receive_buffer.push(ordered_work) + } + + if let Some(ordered_work) = self.receive_buffer.pop() { + return Some(ordered_work.work); + } else { + if self.work_count() == 0 { + return None; + } else { + thread::yield_now(); + } + } + } + } +} + +impl PartialEq for OrderedWork +where + D: Delegate, +{ + fn eq(&self, o: &Self) -> bool { + self.index_path.eq(&o.index_path) + } +} + +impl Eq for OrderedWork where D: Delegate {} + +impl PartialOrd for OrderedWork +where + D: Delegate, +{ + fn partial_cmp(&self, o: &Self) -> Option { + o.index_path.partial_cmp(&self.index_path) + } +} + +impl Ord for OrderedWork +where + D: Delegate, +{ + fn cmp(&self, o: &Self) -> Ordering { + o.index_path.cmp(&self.index_path) + } +} diff --git a/src/work_tree/work_results.rs b/src/work_tree/work_results.rs new file mode 100644 index 0000000..1a36d70 --- /dev/null +++ b/src/work_tree/work_results.rs @@ -0,0 +1,62 @@ +use std::cmp::Ordering; + +use super::*; + +pub struct WorkResults +where + D: Delegate, +{ + results: Vec>, + pub(crate) scheduled_work: usize, + pub(crate) index_path: IndexPath, +} + +impl WorkResults +where + D: Delegate, +{ + fn push_item(&mut self, item: D::Item) { + self.results.push(WorkResultsItem::Item(item)); + } + + fn push_work(&mut self, work: D::Work) { + self.results.push(WorkResultsItem::Work(work)); + } +} + +impl PartialEq for WorkResults +where + D: Delegate, +{ + fn eq(&self, o: &Self) -> bool { + self.index_path.eq(&o.index_path) + } +} + +impl Eq for WorkResults where D: Delegate {} + +impl PartialOrd for WorkResults +where + D: Delegate, +{ + fn partial_cmp(&self, o: &Self) -> Option { + o.index_path.partial_cmp(&self.index_path) + } +} + +impl Ord for WorkResults +where + D: Delegate, +{ + fn cmp(&self, o: &Self) -> Ordering { + o.index_path.cmp(&self.index_path) + } +} + +enum WorkResultsItem +where + D: Delegate, +{ + Item(D::Item), + Work(D::Work), +} diff --git a/src/work_tree/work_results_queue.rs b/src/work_tree/work_results_queue.rs new file mode 100644 index 0000000..792303f --- /dev/null +++ b/src/work_tree/work_results_queue.rs @@ -0,0 +1,148 @@ +use crossbeam::channel::{self, Receiver, SendError, Sender, TryRecvError}; +use std::collections::BinaryHeap; +use std::marker::PhantomData; +use std::thread; + +use super::Delegate; +use super::IndexPath; +use super::WorkResults; + +#[derive(Clone)] +pub struct WorkResultsQueue +where + D: Delegate, +{ + sender: Sender>, +} + +pub struct WorkResultsQueueIter +where + D: Delegate, +{ + next_matcher: NextResultMatcher, + receiver: Receiver>, + receive_buffer: BinaryHeap>, +} + +struct NextResultMatcher +where + D: Delegate, +{ + looking_for_index_path: IndexPath, + remaining_work: Vec, + phantom: PhantomData, +} + +pub fn new_work_results_queue() -> (WorkResultsQueue, WorkResultsQueueIter) +where + D: Delegate, +{ + //let (sender, receiver) = channel::bounded(100); + let (sender, receiver) = channel::unbounded(); + ( + WorkResultsQueue { sender }, + WorkResultsQueueIter { + receiver, + next_matcher: NextResultMatcher::default(), + receive_buffer: BinaryHeap::new(), + }, + ) +} + +impl WorkResultsQueue +where + D: Delegate, +{ + pub fn push(&self, dent: WorkResults) -> std::result::Result<(), SendError>> { + self.sender.send(dent) + } +} + +impl Iterator for WorkResultsQueueIter +where + D: Delegate, +{ + type Item = WorkResults; + fn next(&mut self) -> Option> { + let looking_for = &self.next_matcher.looking_for_index_path; + loop { + let top_dir_list = self.receive_buffer.peek(); + if let Some(top_dir_list) = top_dir_list { + if top_dir_list.index_path.eq(looking_for) { + break; + } + } + + if self.next_matcher.is_none() { + return None; + } + + match self.receiver.try_recv() { + Ok(dir_list) => { + self.receive_buffer.push(dir_list); + } + Err(err) => match err { + TryRecvError::Empty => thread::yield_now(), + TryRecvError::Disconnected => break, + }, + } + } + + if let Some(dir_list) = self.receive_buffer.pop() { + self.next_matcher.increment_past(&dir_list); + Some(dir_list) + } else { + None + } + } +} + +impl NextResultMatcher +where + D: Delegate, +{ + fn is_none(&self) -> bool { + self.looking_for_index_path.is_empty() + } + + fn decrement_remaining_work_at_this_level(&mut self) { + *self.remaining_work.last_mut().unwrap() -= 1; + } + + fn increment_past(&mut self, branch: &WorkResults) { + self.decrement_remaining_work_at_this_level(); + + if branch.scheduled_work > 0 { + // If visited item has children then push 0 index path, since we are now + // looking for the first child. + self.looking_for_index_path.push(0); + self.remaining_work.push(branch.scheduled_work); + } else { + // Incrememnt sibling index + self.looking_for_index_path.increment_last(); + + // If no siblings remain at this level unwind stacks + while !self.remaining_work.is_empty() && *self.remaining_work.last().unwrap() == 0 { + self.looking_for_index_path.pop(); + self.remaining_work.pop(); + // Finished processing level, so increment sibling index + if !self.looking_for_index_path.is_empty() { + self.looking_for_index_path.increment_last(); + } + } + } + } +} + +impl Default for NextResultMatcher +where + D: Delegate, +{ + fn default() -> NextResultMatcher { + NextResultMatcher { + looking_for_index_path: IndexPath::new(vec![0]), + remaining_work: vec![1], + phantom: PhantomData, + } + } +}