diff --git a/Cargo.toml b/Cargo.toml index 5a1ef6f..4f2130f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "quantiles" -version = "0.6.3" +version = "0.7.0-pre" authors = ["Brian L. Troutwine "] description = "a collection of approximate quantile algorithms" @@ -15,7 +15,7 @@ keywords = ["statistics", "histogram", "quantiles", "percentiles", "approximatio lto = true [dev-dependencies] -quickcheck = "0.4" +quickcheck = "0.5" [dependencies] serde = { version = "1.0", optional = true } diff --git a/benches/ckms.rs b/benches/ckms.rs index d82d2b9..5899077 100644 --- a/benches/ckms.rs +++ b/benches/ckms.rs @@ -1,24 +1,24 @@ #![feature(test)] -extern crate quantiles; extern crate test; +extern crate quantiles; mod ckms { #[derive(Debug, Clone, Copy)] pub struct Xorshift { seed: u64, } - + impl Xorshift { pub fn new(seed: u64) -> Xorshift { Xorshift { seed: seed } } - + pub fn next_val(&mut self) -> u32 { // implementation inspired by // https://github.com/astocko/xorshift/blob/master/src/splitmix64.rs use std::num::Wrapping as w; - + let mut z = w(self.seed) + w(0x9E37_79B9_7F4A_7C15_u64); let nxt_seed = z.0; z = (z ^ (z >> 30)) * w(0xBF58_476D_1CE4_E5B9_u64); @@ -94,4 +94,17 @@ mod ckms { generate_primed_tests!(u32, bench_primed_65535, 65_535); } + mod f32 { + use super::*; + + generate_tests!(f32, bench_insert_100, 100); + generate_tests!(f32, bench_insert_1000, 1000); + generate_tests!(f32, bench_insert_10000, 10_000); + generate_tests!(f32, bench_insert_100000, 100_000); + + generate_primed_tests!(f32, bench_primed_100, 100); + generate_primed_tests!(f32, bench_primed_1000, 1000); + generate_primed_tests!(f32, bench_primed_10000, 10_000); + generate_primed_tests!(f32, bench_primed_65535, 65_535); + } } diff --git a/fuzz/.gitignore b/fuzz/.gitignore new file mode 100644 index 0000000..572e03b --- /dev/null +++ b/fuzz/.gitignore @@ -0,0 +1,4 @@ + +target +corpus +artifacts diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml new file mode 100644 index 0000000..116935b --- /dev/null +++ b/fuzz/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "quantiles-fuzz" +version = "0.0.1" +authors = ["Automatically generated"] +publish = false + +[package.metadata] +cargo-fuzz = true + +[dependencies] +byteorder = "1.2" + +[dependencies.quantiles] +path = ".." +[dependencies.libfuzzer-sys] +git = "https://github.com/rust-fuzz/libfuzzer-sys.git" + +# Prevent this from interfering with workspaces +[workspace] +members = ["."] + +[[bin]] +name = "ckms_u32" +path = "fuzz_targets/ckms_u32.rs" diff --git a/fuzz/fuzz_targets/ckms_u32.rs b/fuzz/fuzz_targets/ckms_u32.rs new file mode 100644 index 0000000..ee3bc5f --- /dev/null +++ b/fuzz/fuzz_targets/ckms_u32.rs @@ -0,0 +1,61 @@ +#![no_main] +#[macro_use] extern crate libfuzzer_sys; +extern crate quantiles; +extern crate byteorder; + +use std::io::Cursor; +use byteorder::{BigEndian, ReadBytesExt}; + +#[derive(Debug, Clone, Copy)] +pub struct Xorshift { + seed: u64, +} + +impl Xorshift { + pub fn new(seed: u64) -> Xorshift { + Xorshift { seed: seed } + } + + pub fn next_val(&mut self) -> u32 { + // implementation inspired by + // https://github.com/astocko/xorshift/blob/master/src/splitmix64.rs + use std::num::Wrapping as w; + + let mut z = w(self.seed) + w(0x9E37_79B9_7F4A_7C15_u64); + let nxt_seed = z.0; + z = (z ^ (z >> 30)) * w(0xBF58_476D_1CE4_E5B9_u64); + z = (z ^ (z >> 27)) * w(0x94D0_49BB_1331_11EB_u64); + self.seed = nxt_seed; + u32::from((z ^ (z >> 31)).0 as u16) + } +} + +fuzz_target!(|data: &[u8]| { + let mut cursor = Cursor::new(data); + + // unbounded, CKMS will adjust to within bounds + let error: f64 = if let Ok(res) = cursor.read_f64::() { + res + } else { + return; + }; + // bounded 2**24 + let upper_bound: u32 = if let Ok(res) = cursor.read_u32::() { + res % 16_777_216 + } else { + return; + }; + // unbounded + let seed: u64 = if let Ok(res) = cursor.read_u64::() { + res + } else { + return; + }; + + let mut xshft = Xorshift::new(seed); + let mut ckms = quantiles::ckms::CKMS::::new(error); + for _ in 0..(upper_bound as usize) { + let val = xshft.next_val(); + ckms.insert(val); + } +}); diff --git a/src/ckms/entry.rs b/src/ckms/entry.rs new file mode 100644 index 0000000..94cea00 --- /dev/null +++ b/src/ckms/entry.rs @@ -0,0 +1,32 @@ +use std::cmp; + +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde_support", derive(Serialize, Deserialize))] +pub struct Entry +where + T: PartialEq, +{ + pub g: u32, + pub delta: u32, + pub v: T, +} + +// The derivation of PartialEq for Entry is not appropriate. The sole ordering +// value in an Entry is the value 'v'. +impl PartialEq for Entry +where + T: PartialEq, +{ + fn eq(&self, other: &Entry) -> bool { + self.v == other.v + } +} + +impl PartialOrd for Entry +where + T: PartialOrd, +{ + fn partial_cmp(&self, other: &Entry) -> Option { + self.v.partial_cmp(&other.v) + } +} diff --git a/src/ckms.rs b/src/ckms/mod.rs similarity index 78% rename from src/ckms.rs rename to src/ckms/mod.rs index 32b32d2..6a5c915 100644 --- a/src/ckms.rs +++ b/src/ckms/mod.rs @@ -11,40 +11,13 @@ //! authors correct in their "Space- and Time-Efficient Deterministic Algorithms //! for Biased Quantiles over Data Streams" use std; -use std::cmp; use std::fmt::Debug; use std::ops::{Add, AddAssign, Div, Sub}; -#[derive(Debug, Clone)] -#[cfg_attr(feature = "serde_support", derive(Serialize, Deserialize))] -struct Entry -where - T: Copy + PartialEq, -{ - v: T, - g: usize, - delta: usize, -} - -// The derivation of PartialEq for Entry is not appropriate. The sole ordering -// value in an Entry is the value 'v'. -impl PartialEq for Entry -where - T: Copy + PartialEq, -{ - fn eq(&self, other: &Entry) -> bool { - self.v == other.v - } -} +mod entry; +mod store; -impl PartialOrd for Entry -where - T: Copy + PartialOrd, -{ - fn partial_cmp(&self, other: &Entry) -> Option { - self.v.partial_cmp(&other.v) - } -} +use self::store::Store; /// A structure to provide approximate quantiles queries in bounded memory and /// with bounded error. @@ -65,19 +38,12 @@ where insert_threshold: usize, inserts: usize, - // We aim for the full biased quantiles method. The paper this - // implementation is based on includes a 'targeted' method but the authors - // have granted that it is flawed in private communication. As such, all - // queries for all quantiles will have the same error factor. - error: f64, - // This is the S(n) of the above paper. Entries are stored here and // occasionally merged. The outlined implementation uses a linked list but // we prefer a Vec for reasons of cache locality at the cost of worse // computational complexity. - samples: Vec>, + samples: Store, - sum: Option, cma: Option, last_in: Option, } @@ -94,12 +60,6 @@ where { fn add_assign(&mut self, rhs: CKMS) { self.last_in = rhs.last_in; - self.sum = match (self.sum, rhs.sum) { - (None, None) => None, - (None, Some(y)) => Some(y), - (Some(x), None) => Some(x), - (Some(x), Some(y)) => Some(x.add(y)), - }; self.cma = match (self.cma, rhs.cma) { (None, None) => None, (None, Some(y)) => Some(y), @@ -111,22 +71,15 @@ where } }; self.n += rhs.n; - for v in rhs.samples.iter().map(|x| x.v) { - self.merge(v); + for inner in rhs.samples.data { + for v in inner.data.iter().map(|x| x.v) { + self.samples.insert(v); + } } self.compress(); } } -fn invariant(r: f64, error: f64) -> usize { - let i = (2.0 * error * r).floor() as usize; - if i == 0 { - 1 - } else { - i - } -} - impl< T: Copy + PartialOrd @@ -181,15 +134,12 @@ impl< CKMS { n: 0, - error: error, - insert_threshold: insert_threshold, inserts: 0, - samples: Vec::new(), + samples: Store::new(2048, error), last_in: None, - sum: None, cma: None, } } @@ -210,22 +160,6 @@ impl< self.last_in } - /// Return the sum of the elements added to the CKMS - /// - /// # Example - /// ``` - /// use quantiles::ckms::CKMS; - /// - /// let mut ckms = CKMS::new(0.1); - /// ckms.insert(1.0); - /// ckms.insert(2.0); - /// ckms.insert(3.0); - /// assert_eq!(Some(6.0), ckms.sum()); - /// ``` - pub fn sum(&self) -> Option { - self.sum - } - /// Return the cummulative moving average of the elements added to the CKMS /// /// # Example @@ -252,7 +186,7 @@ impl< /// assert_eq!(0.1, ckms.error_bound()); /// ``` pub fn error_bound(&self) -> f64 { - self.error + self.samples.error } /// Insert a T into the CKMS @@ -262,60 +196,18 @@ impl< /// may grow gradually, as defined in the module-level documentation, but /// will remain bounded. pub fn insert(&mut self, v: T) { - self.sum = self.sum.map_or(Some(v), |s| Some(s.add(v))); self.last_in = Some(v); self.n += 1; let v_f64: f64 = v.into(); self.cma = self.cma .map_or(Some(v_f64), |s| Some(s + ((v_f64 - s) / (self.n as f64)))); - self.merge(v); + self.samples.insert(v); self.inserts = (self.inserts + 1) % self.insert_threshold; if self.inserts == 0 { self.compress() } } - fn merge(&mut self, v: T) { - let s = self.samples.len(); - if s == 0 { - self.samples.insert( - 0, - Entry { - v: v, - g: 1, - delta: 0, - }, - ); - return; - } - - let mut idx = 0; - let mut r = 0; - for i in 0..s { - let smpl = &self.samples[i]; - match smpl.v.partial_cmp(&v).unwrap() { - cmp::Ordering::Less => { - idx += 1; - r += smpl.g - } - _ => break, - } - } - let delta = if idx == 0 || idx == s { - 0 - } else { - invariant(r as f64, self.error) - 1 - }; - self.samples.insert( - idx, - Entry { - v: v, - g: 1, - delta: delta, - }, - ); - } - /// Query CKMS for a ε-approximate quantile /// /// This function returns an approximation to the true quantile-- +/- εΦn @@ -339,30 +231,7 @@ impl< /// assert_eq!(ckms.query(1.0), Some((1000, 999))); /// ``` pub fn query(&self, q: f64) -> Option<(usize, T)> { - let s = self.samples.len(); - - if s == 0 { - return None; - } - - let mut r = 0; - let nphi = q * (self.n as f64); - for i in 1..s { - let prev = &self.samples[i - 1]; - let cur = &self.samples[i]; - - r += prev.g; - - let lhs = (r + cur.g + cur.delta) as f64; - let rhs = nphi + ((invariant(nphi, self.error) as f64) / 2.0); - - if lhs > rhs { - return Some((r, prev.v)); - } - } - - let v = self.samples[s - 1].v; - Some((s, v)) + self.samples.query(q) } /// Query CKMS for the count of its points @@ -403,38 +272,17 @@ impl< /// assert_eq!(ckms.into_vec(), vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); /// ``` pub fn into_vec(self) -> Vec { - self.samples.iter().map(|ent| ent.v).collect() + let mut res = vec![]; + for inner in self.samples.data { + for v in inner.data.iter().map(|x| x.v) { + res.push(v); + } + } + res } fn compress(&mut self) { - if self.samples.len() < 3 { - return; - } - - let mut s_mx = self.samples.len() - 1; - let mut idx = 0; - let mut r: f64 = 1.0; - - while idx < s_mx { - let cur_g = self.samples[idx].g; - let nxt_v = self.samples[idx + 1].v; - let nxt_g = self.samples[idx + 1].g; - let nxt_delta = self.samples[idx + 1].delta; - - if cur_g + nxt_g + nxt_delta <= invariant(r, self.error) { - let ent = Entry { - v: nxt_v, - g: nxt_g + cur_g, - delta: nxt_delta, - }; - self.samples[idx] = ent; - self.samples.remove(idx + 1); - s_mx -= 1; - } else { - idx += 1; - } - r += 1.0; - } + self.samples.compress(); } } @@ -443,6 +291,7 @@ mod test { use super::*; use quickcheck::{QuickCheck, TestResult}; use std::f64::consts::E; + use ckms::store::invariant; fn percentile(data: &Vec, prcnt: f64) -> f64 { let idx = (prcnt * (data.len() as f64)) as usize; @@ -618,34 +467,6 @@ mod test { QuickCheck::new().quickcheck(inner as fn(Vec, Vec) -> TestResult); } - #[test] - fn add_assign_test() { - fn inner(pair: (i32, i32)) -> bool { - let mut lhs = CKMS::::new(0.001); - lhs.insert(pair.0); - let mut rhs = CKMS::::new(0.001); - rhs.insert(pair.1); - - let expected: i32 = pair.0 + pair.1; - lhs += rhs; - - if let Some(x) = lhs.sum() { - if x == expected { - if let Some(y) = lhs.last() { - y == pair.1 - } else { - false - } - } else { - false - } - } else { - false - } - } - QuickCheck::new().quickcheck(inner as fn((i32, i32)) -> bool); - } - // prop: forany phi. (phi*n - f(phi*n, n)/2) =< r_i =< (phi*n + f(phi*n, n)/2) #[test] fn query_invariant_test() { @@ -775,15 +596,20 @@ mod test { } ckms.compress(); - let s = ckms.samples.len() as f64; - let bound = (1.0 / ckms.error) * (ckms.error * (ckms.count() as f64)).log10().powi(2); + let s = ckms.samples.len() as i64; + let bound = ((1.0 / ckms.error_bound()) + * (ckms.error_bound() * (ckms.count() as f64)).log10().powi(2)) + .ceil() as i64; - if !(s <= bound) { + // We have to choose an arbitrary, lowish constant for bound + // invalidation buffer. This is because I don't have a precise + // boundary. 1024 samples worth of slop isn't bad, I guess. + if !(s <= bound) && !((s - bound).abs() < 1_024) { println!( "error: {:?} n: {:?} log10: {:?}", - ckms.error, + ckms.error_bound(), ckms.count() as f64, - (ckms.error * (ckms.count() as f64)).log10().powi(2) + (ckms.error_bound() * (ckms.count() as f64)).log10().powi(2) ); println!("{:?} <= {:?}", s, bound); return TestResult::failed(); diff --git a/src/ckms/store.rs b/src/ckms/store.rs new file mode 100644 index 0000000..16d5fce --- /dev/null +++ b/src/ckms/store.rs @@ -0,0 +1,510 @@ +//! store - a 'poor man's skiplist' for CKMS +//! +//! The CKMS requires a storage data structure that has cheap inserts and +//! constant-ish loookups. That's because: +//! +//! * insertion implies search +//! * compression implies search, shifting of data +//! * query is search +//! +//! Prior to 0.7 CKMS used a Vec for storing its samples. This worked well +//! enough for small collections of samples, but fell over once you got past +//! around 50k on account of the expense of shifting data around, performing a +//! search for every insert. +//! +//! What we've done in this module is build a "poor man's" skiplist -- +//! constructed of nested Vecs of bounded sized -- which contains at each 'node' +//! all the information we need to perform an insert, as if we had examined +//! every sample in the block. Anyhow, we'll get into it below. +use std::fmt; +use std::ops::{Index, IndexMut}; + +use ckms::entry::Entry; + +/// The all-important CKMS invariant. +pub fn invariant(r: f64, error: f64) -> u32 { + let i = (2.0 * error * r).floor() as u32; + if i == 0 { + 1 + } else { + i + } +} + +/// Inner is the 'node' of our poor-man's skiplist. Each Inner stores a block of +/// samples of bounded size -- controlled by `inner_cap` set on `Store` -- and a +/// `g_sum`. The CKMS algorithm requires samples to be stored in sorted order +/// and the insertion procedure builds up a `g_sum`, a sum of the ranks of each +/// sample seen. To avoid re-computing this `g_sum` as we search for our +/// insertion spot we instead keep a `g_sum` on Inner, meaning if we determine +/// that the block will not be inserted into we can just pull `g_sum` and avoid +/// inspecting every sample in the block. This implies a touch more work on +/// every insertion but we come out well ahead not inspecting every sample, even +/// so. +#[derive(Clone, PartialEq, Debug)] +#[cfg_attr(feature = "serde_support", derive(Serialize, Deserialize))] +pub struct Inner +where + T: PartialEq, +{ + pub data: Vec>, + g_sum: u32, +} + +impl Inner +where + T: PartialEq + PartialOrd + Copy, +{ + pub fn len(&self) -> usize { + self.data.len() + } + + /// split_off is patterned on the operation of `Vec::split_off`. + /// + /// The notion here is when an Inner goes over `Store::inner_cap` we need to + /// split the samples that fall over `inner_cap` into a new Inner. This + /// keeps our `inner_cap` bound going and reduces the O(n) burden of + /// inserting into a Vec. + /// + /// The correct `g_sum` is maintained for both sides in the split. + pub fn split_off(&mut self, index: usize) -> Self { + assert!(index < self.data.len()); + let nxt = self.data.split_off(index); + let nxt_g_sum = nxt.iter().fold(0, |acc, x| acc + x.g); + self.g_sum -= nxt_g_sum; + Inner { + data: nxt, + g_sum: nxt_g_sum, + } + } +} + +#[derive(Clone, PartialEq, Debug)] +#[cfg_attr(feature = "serde_support", derive(Serialize, Deserialize))] +pub struct Store +where + T: PartialEq, +{ + /// The way CKMS works, we are allowed to respond back to a user query + /// inaccurately. Just, with known inaccuracy. That's what this is, the + /// known inaccuracy. What's neat is we can perform compression on the + /// stored samples so long as we keep within this error bound. + pub error: f64, + + /// Our collction of samples. See documentation of Inner for more details. + pub data: Vec>, + + inner_cap: usize, // maximum size of an Inner + len: usize, // samples currently stored + n: usize, // total samples ever stored +} + +impl Store +where + T: PartialEq + PartialOrd + Copy, +{ + pub fn new(inner_cap: usize, error: f64) -> Store { + assert!(inner_cap != 0); + let data = Inner { + data: Vec::with_capacity(inner_cap), + g_sum: 0, + }; + Store { + error: error, + data: vec![data], + inner_cap: inner_cap, + len: 0, + n: 0, + } + } + + /// Insert a point into the Store + pub fn insert(&mut self, element: T) -> () + where + T: fmt::Debug, + { + // This function is a touch repetative. There are three possible + // conditions when we insert a point: + // + // * point goes to the front + // * point goes to the back + // * point goes somewhere in the middle + // + // Insertion into the middle is the most expensive. A point inserted at + // the front or back has a 'delta' -- see the referenced paper for full + // details -- of 0. A point that goes into the middle has a delta + // derived from the invariant, the rank of the inserted sample. That + // implies a search. This store is able to skip a linear seek by + // examining the max element of Inner caches, their associated maximum + // rank, g_sum. + + // insert at the front + if self.data[0].data.is_empty() || (self.data[0].data[0].v >= element) { + self.data[0].data.insert( + 0, + Entry { + v: element, + g: 1, + delta: 0, + }, + ); + self.data[0].g_sum += 1; + self.n += 1; + self.len += 1; + + if self.data[0].len() > self.inner_cap { + let nxt = self.data[0].split_off(self.inner_cap); + if self.data.len() > 1 { + self.data.insert(1, nxt); + } else { + self.data.push(nxt); + } + } + return; + } + + let mut outer_idx = self.data.len() - 1; + let mut inner_idx = self.data[outer_idx].len() - 1; + + // insert at the back + if self.data[outer_idx].data[inner_idx].v < element { + self.data[outer_idx].data.push(Entry { + v: element, + g: 1, + delta: 0, + }); + self.data[outer_idx].g_sum += 1; + self.n += 1; + self.len += 1; + if self.data[outer_idx].len() > self.inner_cap { + let nxt = self.data[outer_idx].split_off(self.inner_cap); + self.data.push(nxt); + } + return; + } + + // insert in the middle + outer_idx = 0; + inner_idx = 0; + let mut r = 0; + + // Seek the outer_idx forward to the right cache line + while outer_idx < self.data.len() { + // The element for insertion is larger than the largest in the + // present inner cache. In that case, we kick the outer_idx up and + // capture the g_sum into our r. + let mx = self.data[outer_idx].data.len(); + if element > self.data[outer_idx].data[mx - 1].v { + outer_idx += 1; + r += self.data[outer_idx].g_sum; + } else { + break; + } + } + + // Seek the inner_idx forward to the right location + while inner_idx < self.data[outer_idx].data.len() { + // The inner cache for insertion is here at outer_cache. We now seek + // inner_idx forward while the current inner_idx is < than the + // element for insertion. + if self.data[outer_idx].data[inner_idx].v < element { + inner_idx += 1; + r += 1; + } else { + break; + } + } + + self.data[outer_idx].data.insert( + inner_idx, + Entry { + v: element, + g: 1, + delta: invariant(f64::from(r), self.error) - 1, + }, + ); + self.data[outer_idx].g_sum += 1; + + if self.data[outer_idx].len() > self.inner_cap { + let nxt = self.data[outer_idx].split_off(self.inner_cap); + self.data.insert(outer_idx + 1, nxt); + } + + self.n += 1; + self.len += 1; + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Total stored samples + /// + /// This value will fluctuate as compression happens. + pub fn len(&self) -> usize { + self.len + } + + #[cfg(test)] + /// Total samples, ever + /// + /// This value will never decrease and may or may not be equivalent to + /// `Self::len` + pub fn count(&self) -> usize { + self.n + } + + pub fn compress(&mut self) { + if self.len() < 3 { + return; + } + + let mut cur_outer_idx = 0; + let mut cur_inner_idx = 0; + let mut nxt_outer_idx = 0; + let mut nxt_inner_idx = 1; + + let mut r: u32 = 1; + + while cur_outer_idx < self.data.len() { + let cur_g = self.data[cur_outer_idx][cur_inner_idx].g; + + // If the nxt_inner_idx has gone off the rails then it's time for us + // to move up to the next inner cache for the next point. + if nxt_inner_idx >= self.data[nxt_outer_idx].len() { + nxt_inner_idx = 0; + nxt_outer_idx += 1; + // When nxt_outer_idx goes off the end we've run out of samples + // to compress. + if nxt_outer_idx >= self.data.len() { + break; + } + } + + let nxt_v = self.data[nxt_outer_idx][nxt_inner_idx].v; + let nxt_g = self.data[nxt_outer_idx][nxt_inner_idx].g; + let nxt_delta = self.data[nxt_outer_idx][nxt_inner_idx].delta; + + if cur_g + nxt_g + nxt_delta <= invariant(f64::from(r), self.error) { + self.data[cur_outer_idx][cur_inner_idx].v = nxt_v; + self.data[cur_outer_idx][cur_inner_idx].g += nxt_g; + self.data[cur_outer_idx][cur_inner_idx].delta = nxt_delta; + // If the two outer indexes don't match then we've 'moved' a g + // from one inner cache to another. So, we scoot them. + if cur_outer_idx != nxt_outer_idx { + self.data[nxt_outer_idx].g_sum -= nxt_g; + self.data[cur_outer_idx].g_sum += nxt_g; + } + self.data[nxt_outer_idx].data.remove(nxt_inner_idx); + // Now that we've collapsed a point it's possible that we can + // collapse the next next point into the current one as well. We + // leave the indexes well enough alone as we've just removed an + // item from the present inner cache. + self.len -= 1; + } else { + // If we haven't collapsed any points we move the current + // indexes to the next indexes. We also scoot up the next INNER + // index, taking care to not adjust the outer index. We avoid + // adjusting the outer index because it's possible we don't need + // to move to a new inner cache yet. + cur_outer_idx = nxt_outer_idx; + cur_inner_idx = nxt_inner_idx; + nxt_inner_idx += 1; + } + r += 1; + } + + // It's possible after several compression passes that we'll leave tiny + // inner caches in place. We don't want this. We'll move pairwise + // through the inner caches and combine those that are contiguous and + // fit within inner_cap. + cur_outer_idx = 0; + while (self.data.len() >= 1) && (cur_outer_idx < (self.data.len() - 1)) { + if self.data[cur_outer_idx].data.len() + self.data[cur_outer_idx + 1].data.len() + <= self.inner_cap + { + let mut nxt = self.data.remove(cur_outer_idx + 1); + let cur = &mut self.data[cur_outer_idx]; + cur.g_sum += nxt.g_sum; + cur.data.append(&mut nxt.data); + } else { + cur_outer_idx += 1; + } + } + } + + pub fn query(&self, q: f64) -> Option<(usize, T)> { + if self.is_empty() { + return None; + } + + let mut r: u32 = 0; + let s = self.len(); + let nphi = q * (self.n as f64); + for i in 1..s { + // TODO indexing is no longer constant, make sure we don't do two + // seeking indexes + let prev = &self[i - 1]; + let cur = &self[i]; + + r += prev.g; + + let lhs = f64::from(r + cur.g + cur.delta); + + let inv = invariant(nphi, self.error); + let rhs = nphi + (f64::from(inv) / 2.0); + + if lhs > rhs { + return Some((r as usize, prev.v)); + } + } + + let v = self[s - 1].v; + Some((s, v)) + } + + #[cfg(test)] + pub fn iter(&self) -> StoreIter { + StoreIter { + store: &self.data, + outer_idx: 0, + inner_idx: 0, + } + } +} + +impl IndexMut for Inner +where + T: PartialEq, +{ + fn index_mut(&mut self, index: usize) -> &mut Entry { + &mut self.data[index] + } +} + +impl Index for Inner +where + T: PartialEq, +{ + type Output = Entry; + + fn index(&self, index: usize) -> &Self::Output { + &self.data[index] + } +} + +impl IndexMut for Store +where + T: PartialEq + PartialOrd + Copy, +{ + fn index_mut(&mut self, index: usize) -> &mut Entry { + let mut outer_idx = 0; + let mut idx = index; + while idx >= self.data[outer_idx].len() { + idx -= self.data[outer_idx].len(); + outer_idx += 1; + } + &mut self.data[outer_idx][idx] + } +} + +impl Index for Store +where + T: PartialEq + PartialOrd + Copy, +{ + type Output = Entry; + + fn index(&self, index: usize) -> &Self::Output { + let mut outer_idx = 0; + let mut idx = index; + while idx >= self.data[outer_idx].len() { + idx -= self.data[outer_idx].len(); + outer_idx += 1; + } + &self.data[outer_idx][idx] + } +} + +#[cfg(test)] +pub struct StoreIter<'a, T> +where + T: 'a + PartialEq, +{ + store: &'a Vec>, + outer_idx: usize, + inner_idx: usize, +} + +#[cfg(test)] +impl<'a, T> Iterator for StoreIter<'a, T> +where + T: PartialEq + Copy + PartialOrd + fmt::Debug, +{ + type Item = &'a Entry; + + fn next(&mut self) -> Option { + while self.outer_idx < self.store.len() { + if self.inner_idx < self.store[self.outer_idx].len() { + let ret = &self.store[self.outer_idx][self.inner_idx]; + self.inner_idx += 1; + return Some(ret); + } + self.inner_idx = 0; + self.outer_idx += 1; + } + None + } +} + +#[cfg(test)] +mod test { + use super::*; + use quickcheck::{QuickCheck, TestResult}; + + #[test] + fn inner_caches_test() { + let mut store = Store::::new(10, 0.99); + for i in 0..100 { + store.insert(i); + } + + assert_eq!(10, store.data.len()); + } + + #[test] + fn compression_test() { + let mut store = Store::::new(100, 0.1); + for i in 0..10_000 { + store.insert(i); + } + store.compress(); + + assert_eq!(10_000, store.count()); + assert_eq!(42, store.len()); + } + + #[test] + fn obey_inner_cap() { + fn inner(data: Vec, inner_cap: usize, err: f64) -> TestResult { + if data.is_empty() { + return TestResult::discard(); + } else if inner_cap == 0 { + return TestResult::discard(); + } else if !(err >= 0.0) || !(err <= 1.0) { + return TestResult::discard(); + } + + let mut store = Store::::new(inner_cap, err); + for d in &data { + store.insert(*d); + } + + for inner in store.data { + assert!(inner.len() <= store.inner_cap); + } + + return TestResult::passed(); + } + QuickCheck::new().quickcheck(inner as fn(Vec, usize, f64) -> TestResult); + } +}