Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create eyeball-im-util with FilteredVectorSubscriber API #19

Merged
merged 4 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ jobs:
with:
components: miri
- uses: Swatinem/rust-cache@v2
- run: cargo miri test
# Most eyeball-im-util tests are failing right now, due to Vector::retain
# usage which might be unsound: https://github.com/jneem/imbl/issues/59
- run: cargo miri test --workspace --exclude eyeball-im-util

clippy:
name: Run clippy (Rust nightly)
Expand Down
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
[workspace]
members = ["eyeball", "eyeball-im"]
members = ["eyeball", "eyeball-im", "eyeball-im-util"]

[workspace.dependencies]
assert_matches = "1.5.0"
futures-core = "0.3.26"
futures-util = { version = "0.3.26", default-features = false }
readlock = "0.1.5"
stream_assert = "0.1.0"
tokio = { version = "1.25.0", features = ["sync"] }
tokio-stream = { version = "0.1.11", default-features = false, features = ["sync"] }
tracing = { version = "0.1.37", default-features = false, features = ["std"] }
19 changes: 19 additions & 0 deletions eyeball-im-util/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "eyeball-im-util"
version = "0.1.0"
edition = "2021"
license = "MPL-2.0"
description = "Helpful utilities for `eyeball-im`."
repository = "https://github.com/jplatte/eyeball"

[package.metadata.docs.rs]
all-features = true

[dependencies]
eyeball-im = { version = "0.2.0", path = "../eyeball-im" }
futures-core.workspace = true
imbl = "2.0.0"
pin-project-lite = "0.2.9"

[dev-dependencies]
stream_assert.workspace = true
5 changes: 5 additions & 0 deletions eyeball-im-util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! Helpful utilities for [`eyeball-im`][eyeball_im].

mod vector;

pub use vector::{FilteredVectorSubscriber, VectorExt};
223 changes: 223 additions & 0 deletions eyeball-im-util/src/vector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use std::{
collections::VecDeque,
ops::Not,
pin::Pin,
task::{self, ready, Poll},
};

use eyeball_im::{ObservableVector, Vector, VectorDiff, VectorSubscriber};
use futures_core::Stream;
use pin_project_lite::pin_project;

pub trait VectorExt<T>
where
T: Clone + Send + Sync + 'static,
{
fn subscribe_filtered<F>(&self, filter: F) -> (Vector<T>, FilteredVectorSubscriber<T, F>)
where
F: Fn(&T) -> bool + Unpin;
}

impl<T> VectorExt<T> for ObservableVector<T>
where
T: Clone + Send + Sync + 'static,
{
fn subscribe_filtered<F>(&self, filter: F) -> (Vector<T>, FilteredVectorSubscriber<T, F>)
where
F: Fn(&T) -> bool + Unpin,
{
let mut filtered_indices = VecDeque::new();
let mut v = (*self).clone();

let mut original_idx = 0;
v.retain(|val| {
let keep = filter(val);
if keep {
filtered_indices.push_back(original_idx);
}
original_idx += 1;
keep
});

let inner = self.subscribe();
let original_len = self.len();
let sub = FilteredVectorSubscriber { inner, filter, filtered_indices, original_len };

(v, sub)
}
}

pin_project! {
pub struct FilteredVectorSubscriber<T, F> {
#[pin]
inner: VectorSubscriber<T>,
filter: F,
filtered_indices: VecDeque<usize>,
original_len: usize,
}
}

impl<T, F> FilteredVectorSubscriber<T, F>
where
T: Clone + Send + Sync + 'static,
F: Fn(&T) -> bool,
{
fn append(mut self: Pin<&mut Self>, mut values: Vector<T>) -> Option<Vector<T>> {
let mut original_idx = self.original_len;
self.original_len += values.len();
values.retain(|value| {
let keep = (self.filter)(value);
if keep {
self.filtered_indices.push_back(original_idx);
}
original_idx += 1;
keep
});

values.is_empty().not().then_some(values)
}

fn handle_append(self: Pin<&mut Self>, values: Vector<T>) -> Option<VectorDiff<T>> {
self.append(values).map(|values| VectorDiff::Append { values })
}

fn handle_clear(mut self: Pin<&mut Self>) -> Option<VectorDiff<T>> {
self.filtered_indices.clear();
self.original_len = 0;
Some(VectorDiff::Clear)
}

fn handle_push_front(mut self: Pin<&mut Self>, value: T) -> Option<VectorDiff<T>> {
self.original_len += 1;
for idx in &mut self.filtered_indices {
*idx += 1;
}

(self.filter)(&value).then(|| {
self.filtered_indices.push_front(0);
VectorDiff::PushFront { value }
})
}

fn handle_push_back(mut self: Pin<&mut Self>, value: T) -> Option<VectorDiff<T>> {
let original_idx = self.original_len;
self.original_len += 1;
(self.filter)(&value).then(|| {
self.filtered_indices.push_back(original_idx);
VectorDiff::PushBack { value }
})
}

fn handle_pop_front(mut self: Pin<&mut Self>) -> Option<VectorDiff<T>> {
self.original_len -= 1;
jplatte marked this conversation as resolved.
Show resolved Hide resolved
let result = self.filtered_indices.front().map_or(false, |&idx| idx == 0).then(|| {
assert!(self.filtered_indices.pop_front().is_some());
VectorDiff::PopFront
});
for idx in &mut self.filtered_indices {
*idx -= 1;
}
Hywan marked this conversation as resolved.
Show resolved Hide resolved

result
}

fn handle_pop_back(mut self: Pin<&mut Self>) -> Option<VectorDiff<T>> {
self.original_len -= 1;
jplatte marked this conversation as resolved.
Show resolved Hide resolved
self.filtered_indices.back().map_or(false, |&idx| idx == self.original_len).then(|| {
assert!(self.filtered_indices.pop_back().is_some());
VectorDiff::PopBack
})
}

fn handle_insert(mut self: Pin<&mut Self>, index: usize, value: T) -> Option<VectorDiff<T>> {
let original_idx = index;
let index = self.filtered_indices.partition_point(|&i| i < original_idx);
for idx in self.filtered_indices.iter_mut().skip(index) {
*idx += 1;
}

(self.filter)(&value).then(|| {
self.filtered_indices.insert(index, original_idx);
VectorDiff::Insert { index, value }
})
}

fn handle_set(mut self: Pin<&mut Self>, index: usize, value: T) -> Option<VectorDiff<T>> {
let original_idx = index;
let new_value_matches = (self.filter)(&value);

let index = self.filtered_indices.partition_point(|&i| i < original_idx);
if self.filtered_indices.get(index).map_or(false, |&i| i == original_idx) {
// The previous value matched the filter
Some(if new_value_matches {
VectorDiff::Set { index, value }
} else {
self.filtered_indices.remove(index);
VectorDiff::Remove { index }
})
} else {
// The previous value didn't match the filter
new_value_matches.then(|| {
self.filtered_indices.insert(index, original_idx);
VectorDiff::Insert { index, value }
})
}
}

fn handle_remove(mut self: Pin<&mut Self>, index: usize) -> Option<VectorDiff<T>> {
let original_idx = index;
self.original_len -= 1;
jplatte marked this conversation as resolved.
Show resolved Hide resolved

let index = self.filtered_indices.partition_point(|&i| i < original_idx);
let result =
self.filtered_indices.get(index).map_or(false, |&i| i == original_idx).then(|| {
// The value that was removed matched the filter
self.filtered_indices.remove(index);
VectorDiff::Remove { index }
});

for idx in self.filtered_indices.iter_mut().skip(index) {
*idx -= 1;
}

result
}

fn handle_reset(mut self: Pin<&mut Self>, values: Vector<T>) -> Option<VectorDiff<T>> {
self.filtered_indices.clear();
self.original_len = 0;
self.append(values).map(|values| VectorDiff::Reset { values })
}
}

impl<T: Clone + Send + Sync + 'static, F> Stream for FilteredVectorSubscriber<T, F>
where
F: Fn(&T) -> bool + Unpin,
{
type Item = VectorDiff<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let Some(diff) = ready!(self.as_mut().project().inner.poll_next(cx)) else {
return Poll::Ready(None);
};

let result = match diff {
VectorDiff::Append { values } => self.as_mut().handle_append(values),
VectorDiff::Clear => self.as_mut().handle_clear(),
VectorDiff::PushFront { value } => self.as_mut().handle_push_front(value),
VectorDiff::PushBack { value } => self.as_mut().handle_push_back(value),
VectorDiff::PopFront => self.as_mut().handle_pop_front(),
VectorDiff::PopBack => self.as_mut().handle_pop_back(),
VectorDiff::Insert { index, value } => self.as_mut().handle_insert(index, value),
VectorDiff::Set { index, value } => self.as_mut().handle_set(index, value),
VectorDiff::Remove { index } => self.as_mut().handle_remove(index),
VectorDiff::Reset { values } => self.as_mut().handle_reset(values),
};

if let Some(diff) = result {
return Poll::Ready(Some(diff));
}
}
}
}
Loading