Skip to content

Commit

Permalink
more work on work_tree
Browse files Browse the repository at this point in the history
  • Loading branch information
jessegrosjean committed Feb 1, 2019
1 parent ebf9789 commit 4ced894
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 28 deletions.
32 changes: 21 additions & 11 deletions src/work_tree/index_path.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,60 @@
use std::cmp::Ordering;

#[derive(Clone, Debug)]
pub struct IndexPath(pub Vec<usize>);
pub struct IndexPath {
pub indices: Vec<usize>,
// Should use Arc<Vec<usize>>
//pub child_index: 0, // All children should share parent index
}

impl IndexPath {
pub fn new(vec: Vec<usize>) -> IndexPath {
IndexPath(vec)
pub fn new(indices: Vec<usize>) -> IndexPath {
IndexPath { indices }
}

pub fn adding(&self, index: usize) -> IndexPath {
let mut indices = self.indices.clone();
indices.push(index);
IndexPath::new(indices)
}

pub fn push(&mut self, index: usize) {
self.0.push(index);
self.indices.push(index);
}

pub fn increment_last(&mut self) {
*self.0.last_mut().unwrap() += 1;
*self.indices.last_mut().unwrap() += 1;
}

pub fn pop(&mut self) -> Option<usize> {
self.0.pop()
self.indices.pop()
}

pub fn len(&self) -> usize {
self.0.len()
self.indices.len()
}

pub fn is_empty(&self) -> bool {
self.0.is_empty()
self.indices.is_empty()
}
}

impl PartialEq for IndexPath {
fn eq(&self, o: &Self) -> bool {
self.0.eq(&o.0)
self.indices.eq(&o.indices)
}
}

impl Eq for IndexPath {}

impl PartialOrd for IndexPath {
fn partial_cmp(&self, o: &Self) -> Option<Ordering> {
self.0.partial_cmp(&o.0)
self.indices.partial_cmp(&o.indices)
}
}

impl Ord for IndexPath {
fn cmp(&self, o: &Self) -> Ordering {
self.0.cmp(&o.0)
self.indices.cmp(&o.indices)
}
}
57 changes: 41 additions & 16 deletions src/work_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use work_results_queue::*;

pub trait Delegate: Clone + Send
where
Self::Item: Send,
Self::Work: Send,
Self::Item: Send,
{
type Item;
type Work;
type Item;
fn perform_work(&self, work: Self::Work, work_context: &WorkContext<Self>);
}

Expand All @@ -41,12 +41,13 @@ where
Work(D::Work),
}

pub fn process<D>(work: Vec<D::Work>, delegate: D) -> WorkResultsQueueIter<D>
pub fn process<D>(work: Vec<D::Work>, delegate: D) -> OrderedQueueIter<D::Item>
where
D: Delegate + 'static,
D::Work: Clone, // WHY iS THIS NEEDED? Work shouldn't be cloned
D::Item: Clone, // WHY iS THIS NEEDED? Work shouldn't be cloned
{
let (work_results_queue, work_results_iterator) = new_work_results_queue();
let (item_queue, item_queue_iter) = new_ordered_queue();

rayon::spawn(move || {
let (work_queue, work_iterator) = new_ordered_queue();
Expand All @@ -57,9 +58,13 @@ where
.unwrap();
});

let index_path = IndexPath::new(Vec::new());
let index = 0;
let work_context = WorkContext {
work_results_queue,
item_queue,
work_queue,
index_path,
index,
};

work_iterator.par_bridge().for_each_with(
Expand All @@ -70,7 +75,7 @@ where
);
});

work_results_iterator
item_queue_iter
}

fn perform_work<D>(delegate: &D, orderd_work: Ordered<D::Work>, work_context: &mut WorkContext<D>)
Expand All @@ -79,12 +84,12 @@ where
{
let Ordered { value, index_path } = orderd_work;

//Goal is that when delegate peroforms work it schedules it into the work context in order.
//this means index_path nees to be associated with work_context so that push_work nad push_result
//can get automatically assigned index paths

work_context.index_path = index_path;
work_context.index = 0;
delegate.perform_work(value, work_context);

// Don't push

/*
let mut dir_list = work.read_dir_list(work_context);
let new_work = dir_list.new_work();
Expand All @@ -110,19 +115,39 @@ pub struct WorkContext<D>
where
D: Delegate,
{
work_results_queue: WorkResultsQueue<D>,
item_queue: OrderedQueue<D::Item>,
work_queue: OrderedQueue<D::Work>,
index_path: IndexPath,
index: usize,
}

pub struct WorkPlaceholder {
index_path: IndexPath,
remaining_items: usize,
}

impl<D> WorkContext<D>
where
D: Delegate,
{
//fn push_work(&self, work: D::Work, index_path: IndexPath) -> Result<(), SendError<D::Work>> {
//self.work_queue.push(work, index_path)
//}
fn next_index_path(&mut self) -> IndexPath {
let index_path = self.index_path.adding(self.index);
self.index += 1;
index_path
}

fn push_work(&mut self, work: D::Work) -> Result<(), SendError<Ordered<D::Work>>> {
// When work is pushed also need to push placeholder item to items_queue.
// This placeholder is used to track how many items the new work generates.
let index_path = self.next_index_path();
let ordered_work = Ordered::new(work, index_path);
self.work_queue.push(ordered_work)
self.item_queue.push()
}

fn push_result(&self, result: WorkResults<D>) -> Result<(), SendError<WorkResults<D>>> {
self.work_results_queue.push(result)
fn push_item(&mut self, item: D::Item) -> Result<(), SendError<Ordered<D::Item>>> {
let index_path = self.next_index_path();
let ordered_item = Ordered::new(item, index_path);
self.item_queue.push(ordered_item)
}
}
2 changes: 1 addition & 1 deletion src/work_tree/ordered_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ where
stop: Arc<AtomicBool>,
}

pub(crate) struct OrderedQueueIter<T>
pub struct OrderedQueueIter<T>
where
T: Send,
{
Expand Down

0 comments on commit 4ced894

Please sign in to comment.