Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better periodic API #20

Merged
merged 5 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions examples/tsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn tsl(

let high = {
let mut high = Decimal::ZERO;
map(queue_ref(move |w: QueueRef<TickValue<_>>| {
map(queue_ref(move |w: TickQueueRef<TickValue<_>>| {
if w.change().is_new_period() {
high = w[0].value;
} else {
Expand All @@ -24,7 +24,7 @@ fn tsl(

let low = {
let mut low = Decimal::ZERO;
map(queue_ref(move |w: QueueRef<TickValue<Decimal>>| {
map(queue_ref(move |w: TickQueueRef<TickValue<Decimal>>| {
if w.change().is_new_period() {
low = w[0].value;
} else {
Expand All @@ -34,9 +34,9 @@ fn tsl(
}))
};

let cache0 = map(queue_ref(|w: QueueRef<TickValue<Decimal>>| w[0]));
let cache0 = map(queue_ref(|w: TickQueueRef<TickValue<Decimal>>| w[0]));

let cache1 = map(queue_ref(|w: QueueRef<TickValue<Decimal>>| {
let cache1 = map(queue_ref(|w: TickQueueRef<TickValue<Decimal>>| {
let close1 = w.get(1).map(|t| t.value);
w[0].tick.with_value(close1)
}));
Expand All @@ -51,24 +51,24 @@ fn tsl(
.with_value((Decimal::ONE - alpha) * x.value + alpha * rma1)
});

let atr = map(
|(((_close0, close1), high), low): (
((TickValue<Decimal>, TickValue<Option<Decimal>>), Decimal),
Decimal,
)| {
close1.map(|close1| {
close1
.map(|close1| {
(high - low)
.max((close1 - high).abs())
.max((close1 - low).abs())
})
.unwrap_or_default()
})
},
)
type AtrCtx = (
((TickValue<Decimal>, TickValue<Option<Decimal>>), Decimal),
Decimal,
);

let atr = map(|(((_close0, close1), high), low): AtrCtx| {
close1.map(|close1| {
close1
.map(|close1| {
(high - low)
.max((close1 - high).abs())
.max((close1 - low).abs())
})
.unwrap_or_default()
})
})
.then(rma)
.map(queue_ref(|w: QueueRef<TickValue<Decimal>>| w[0]));
.map(queue_ref(|w: TickQueueRef<TickValue<Decimal>>| w[0]));

// long = true if last >= tsl[1] && !long[1]
// false if last <= tsl[1] && long[1]
Expand Down Expand Up @@ -101,7 +101,7 @@ fn tsl(
x.map(|(_, _, down)| (down, true))
}
})
.map(queue_ref(|w: QueueRef<TickValue<(Decimal, bool)>>| {
.map(queue_ref(|w: TickQueueRef<TickValue<(Decimal, bool)>>| {
w[0].map(|v| v.0)
}));

Expand Down
2 changes: 1 addition & 1 deletion src/gat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ pub use operator::{identity::id, map::map, mux::mux, GatOperator, GatOperatorExt
pub use tick_operator::{map_tick::map_t, TickGatOperatorExt};
pub use tumbling_operator::{
operator::{tumbling, TumblingOperator},
periodic::{Periodic, PeriodicOp},
periodic::{Periodic, PeriodicOp, TickQueueRef},
queue::{circular::Circular, Change, Queue, QueueRef, Tumbling},
};
24 changes: 16 additions & 8 deletions src/gat/tumbling_operator/operator.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
use core::num::NonZeroUsize;

use crate::gat::GatOperator;

use super::queue::{circular::Circular, Collection, Queue, QueueMut, QueueRef, Tumbling};
use super::queue::{circular::Circular, Collection, Queue, QueueMut, Tumbling};

/// Operation.
pub trait Operation<I, T> {
/// Output.
type Output<'out>
where
T: 'out;

/// Step.
fn step(&mut self, w: QueueMut<T>, x: I);
fn step<'a>(&mut self, w: QueueMut<'a, T>, x: I) -> Self::Output<'a>;
}

impl<I, T, F> Operation<I, T> for F
where
F: for<'a> FnMut(QueueMut<'a, T>, I),
{
type Output<'out> = () where T: 'out;

#[inline]
fn step(&mut self, w: QueueMut<T>, x: I) {
(self)(w, x)
Expand Down Expand Up @@ -43,7 +52,7 @@ where
Q: Queue,
P: Operation<I, Q::Item>,
{
type Output<'out> = QueueRef<'out, Q::Item>
type Output<'out> = P::Output<'out>
where
Self: 'out,
I: 'out;
Expand All @@ -53,18 +62,17 @@ where
I: 'out,
{
let Self { queue, op } = self;
op.step(queue.as_queue_mut(), input);
queue.as_queue_ref()
op.step(queue.as_queue_mut(), input)
}
}

/// Create a new tumbling operator with circular queue.
pub fn tumbling<const N: usize, I, T, P>(
length: usize,
length: NonZeroUsize,
op: P,
) -> TumblingOperator<Circular<T, N>, P>
) -> TumblingOperator<Circular<N, T>, P>
where
P: for<'a> FnMut(QueueMut<'a, T>, I),
{
TumblingOperator::with_queue(Circular::with_capacity(length), op)
TumblingOperator::with_queue(Circular::with_capacity(length.get()), op)
}
52 changes: 34 additions & 18 deletions src/gat/tumbling_operator/periodic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use crate::{prelude::GatOperator, Period, Tick, Tickable, TumblingWindow};
use core::num::NonZeroUsize;

use crate::{prelude::GatOperator, Period, Tick, TickValue, Tickable, TumblingWindow};

use super::{
operator::{Operation, TumblingOperator},
queue::{circular::Circular, Collection, Queue, QueueMut, QueueRef},
};

/// Ticked [`QueueRef`]
pub type TickQueueRef<'a, T> = TickValue<QueueRef<'a, T>>;

/// Periodic Operation.
pub trait PeriodicOp<I, T> {
/// Received an event from the same period.
Expand Down Expand Up @@ -68,15 +73,19 @@ where
I: Tickable,
P: PeriodicOp<I, T>,
{
fn step(&mut self, mut queue: QueueMut<T>, event: I) {
type Output<'out> = TickQueueRef<'out, T> where T: 'out;

fn step<'a>(&mut self, mut queue: QueueMut<'a, T>, event: I) -> Self::Output<'a> {
let tick = event.tick();
if self.period.same_window(&self.last, &event.tick()) {
let output = self.op.swap(queue.as_queue_ref(), event);
queue.swap(output);
} else {
self.last = event.tick();
let output = self.op.push(queue.as_queue_ref(), event);
queue.push(output);
}
self.last = tick;
tick.with_value(queue.into_queue_ref())
}
}

Expand All @@ -86,22 +95,24 @@ where
T: Clone,
P: PeriodicOp<I, T>,
{
fn step(&mut self, mut queue: QueueMut<T>, event: I) {
type Output<'out> = TickQueueRef<'out, T> where T: 'out;

fn step<'a>(&mut self, mut queue: QueueMut<'a, T>, event: I) -> Self::Output<'a> {
let tick = event.tick();
if self.period.same_window(&self.last, &event.tick()) {
let output = self.op.swap(queue.as_queue_ref(), event);
queue.swap(output);
} else if let Some(last) = queue.get(0).cloned() {
queue.push(last);
let mut output = self.op.push(queue.as_queue_ref(), event);
let last = queue.get_mut(0).unwrap();
core::mem::swap(last, &mut output);
} else {
self.last = event.tick();
if let Some(last) = queue.get(0).cloned() {
queue.push(last);
let mut output = self.op.push(queue.as_queue_ref(), event);
let last = queue.get_mut(0).unwrap();
core::mem::swap(last, &mut output);
} else {
let output = self.op.push(queue.as_queue_ref(), event);
queue.push(output);
}
let output = self.op.push(queue.as_queue_ref(), event);
queue.push(output);
}
self.last = tick;
tick.with_value(queue.into_queue_ref())
}
}

Expand Down Expand Up @@ -162,7 +173,7 @@ where
/// Build a cache operator.
pub fn build_cache(
self,
) -> impl for<'out> GatOperator<Q::Item, Output<'out> = QueueRef<'out, Q::Item>>
) -> impl for<'out> GatOperator<Q::Item, Output<'out> = TickQueueRef<'out, Q::Item>>
where
Q: Queue + 'static,
Q::Item: Tickable + 'static,
Expand Down Expand Up @@ -190,12 +201,17 @@ where

impl Periodic<(), false> {
/// Create a new periodic operator builder using circular queue.
pub fn with_circular<T>(length: usize, period: Period) -> Periodic<Circular<T, 0>, false> {
Periodic::new(Circular::with_capacity(length), period)
pub fn with_circular<T>(
length: NonZeroUsize,
period: Period,
) -> Periodic<Circular<0, T>, false> {
Periodic::new(Circular::with_capacity(length.get()), period)
}

/// Create a new periodic operator builder using circular queue.
pub fn with_circular_n<const N: usize, T>(period: Period) -> Periodic<Circular<T, N>, false> {
/// # Panic
/// Panic if `N` is zero.
pub fn with_circular_n<const N: usize, T>(period: Period) -> Periodic<Circular<N, T>, false> {
Periodic::new(Circular::with_capacity(N), period)
}
}
Expand Down
19 changes: 11 additions & 8 deletions src/gat/tumbling_operator/queue/circular.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use super::{Collection, Queue};

/// Circular Queue backed by [`TinyVec`].
#[derive(Debug, Clone)]
pub struct Circular<T, const N: usize = 1> {
pub struct Circular<const N: usize, T> {
inner: TinyVec<[Option<T>; N]>,
cap: usize,
next_tail: usize,
head: Option<usize>,
}

impl<T, const N: usize> Circular<T, N> {
impl<T, const N: usize> Circular<N, T> {
fn entry_next_tail(&mut self) -> &mut Option<T> {
// Assumption: `next_tail` must be a valid position, that is
// 1) `0 <= next_tail < cap`,
Expand Down Expand Up @@ -64,7 +64,10 @@ impl<T, const N: usize> Circular<T, N> {
}
}

impl<T, const N: usize> Collection for Circular<T, N> {
impl<T, const N: usize> Collection for Circular<N, T> {
/// Create a circular queue with the given capacity.
/// # Panic
/// Panic if `cap` is zero.
fn with_capacity(cap: usize) -> Self {
assert!(cap != 0, "capacity cannot be zero");
Self {
Expand All @@ -76,7 +79,7 @@ impl<T, const N: usize> Collection for Circular<T, N> {
}
}

impl<T, const N: usize> Queue for Circular<T, N> {
impl<T, const N: usize> Queue for Circular<N, T> {
type Item = T;

fn enque(&mut self, item: Self::Item) {
Expand Down Expand Up @@ -131,7 +134,7 @@ mod tests {

#[test]
fn basic() {
let mut queue = Circular::<_, 3>::with_capacity(3);
let mut queue = Circular::<3, _>::with_capacity(3);
assert!(queue.inner.is_inline());
assert!(queue.is_empty());
queue.enque(1);
Expand All @@ -145,7 +148,7 @@ mod tests {

#[test]
fn full() {
let mut queue = Circular::<_, 1>::with_capacity(3);
let mut queue = Circular::<1, _>::with_capacity(3);
assert!(queue.inner.is_heap());
queue.enque(1);
queue.enque(2);
Expand All @@ -160,7 +163,7 @@ mod tests {

#[test]
fn circular_1() {
let mut queue = Circular::<_, 1>::with_capacity(3);
let mut queue = Circular::<1, _>::with_capacity(3);
queue.enque(1);
assert_eq!(queue.deque(), Some(1));
queue.enque(2);
Expand All @@ -179,7 +182,7 @@ mod tests {

#[test]
fn circular_2() {
let mut queue = Circular::<_, 1>::with_capacity(3);
let mut queue = Circular::<1, _>::with_capacity(3);
queue.enque(1);
assert_eq!(queue.deque(), Some(1));
queue.enque(2);
Expand Down
18 changes: 13 additions & 5 deletions src/gat/tumbling_operator/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where
pub fn as_view<'a>(&'a self) -> View<'a, dyn Queue<Item = Q::Item> + 'a> {
View {
queue: &self.0,
change: self.1.as_ref(),
change: &self.1,
}
}

Expand Down Expand Up @@ -162,7 +162,7 @@ where
/// A view of the tumbling queue.
pub struct View<'a, Q: Queue + ?Sized> {
queue: &'a Q,
change: Change<&'a Q::Item>,
change: &'a Change<Q::Item>,
}

impl<'a, Q: Queue + ?Sized> Clone for View<'a, Q> {
Expand All @@ -180,7 +180,7 @@ impl<'a, Q: Queue + ?Sized> View<'a, Q> {
/// Change.
#[inline]
pub fn change(&self) -> Change<&Q::Item> {
self.change
self.change.as_ref()
}
}

Expand Down Expand Up @@ -303,11 +303,19 @@ where
pub struct QueueMut<'a, T>(ViewMut<'a, dyn Queue<Item = T> + 'a>);

impl<'a, T> QueueMut<'a, T> {
/// As [`QueueRef`]
/// As a [`QueueRef`]
pub fn as_queue_ref(&self) -> QueueRef<T> {
QueueRef(View {
queue: self.queue,
change: self.change.as_ref(),
change: self.change,
})
}

/// Convert into a [`QueueRef`]
pub fn into_queue_ref(self) -> QueueRef<'a, T> {
QueueRef(View {
queue: self.0.queue,
change: self.0.change,
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/gat/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::QueueRef;
use super::TickQueueRef;

/// Help infer the right trait bound for closure.
pub fn queue_ref<I, O, F>(f: F) -> F
where
F: for<'a> FnMut(QueueRef<'a, I>) -> O,
F: for<'a> FnMut(TickQueueRef<'a, I>) -> O,
{
f
}
Loading