Skip to content

Commit

Permalink
Rework streaming execution
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Mar 18, 2024
1 parent 8df626b commit 7f72802
Show file tree
Hide file tree
Showing 81 changed files with 1,114 additions and 326 deletions.
4 changes: 4 additions & 0 deletions locustdb-derive/src/reify_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ fn types(t: &Ident) -> Option<Vec<Type>> {
"NullableFloat" => Some(vec![Type::NullableF64]),
"Primitive" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64]),
"PrimitiveOrVal" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64, Type::Val]),
"VecData" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::USize, Type::Str, Type::OptStr, Type::OptF64, Type::Val, Type::Bitvec]),
"NullablePrimitive" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64, Type::NullableF64, Type::NullableStr]),
"PrimitiveUSize" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::USize]),
"PrimitiveNoU64" => Some(vec![Type::U8, Type::U16, Type::U32, Type::I64, Type::F64, Type::Str]),
Expand All @@ -230,6 +231,7 @@ enum Type {
F64,
Str,
Val,
Bitvec,

OptStr, // Option<&str>, used when sorting instead of representation of raw valls + present bit vec
OptF64, // Option<OrderedFloat<f64>>, used when sorting
Expand Down Expand Up @@ -265,6 +267,7 @@ impl Type {
Type::F64 => parse_quote!(EncodingType::F64),
Type::Str => parse_quote!(EncodingType::Str),
Type::Val => parse_quote!(EncodingType::Val),
Type::Bitvec => parse_quote!(EncodingType::Bitvec),
Type::OptStr => parse_quote!(EncodingType::OptStr),
Type::OptF64 => parse_quote!(EncodingType::OptF64),
Type::NullableU8 => parse_quote!(EncodingType::NullableU8),
Expand Down Expand Up @@ -296,6 +299,7 @@ impl Type {
Type::F64 => parse_quote!( let #variable = #variable.buffer.f64(); ),
Type::Str => parse_quote!( let #variable = #variable.buffer.str(); ),
Type::Val => parse_quote!( let #variable = #variable.buffer.val(); ),
Type::Bitvec => parse_quote!( let #variable = #variable.buffer.u8(); ),
Type::OptStr => parse_quote!( let #variable = #variable.buffer.opt_str(); ),
Type::OptF64 => parse_quote!( let #variable = #variable.buffer.opt_f64(); ),
Type::NullableU8 => parse_quote!( let #variable = #variable.buffer.nullable_u8(); ),
Expand Down
18 changes: 18 additions & 0 deletions src/engine/data_types/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,12 @@ impl<'a, T: VecData<T> + 'a> Data<'a> for &'a [T] {
default fn cast_ref_mixed(&self) -> &[Val<'a>] {
panic!("{}", self.type_error("cast_ref_mixed"))
}
default fn cast_ref_opt_str<'b>(&'b self) -> &'b [Option<&'a str>] {
panic!("{}", self.type_error("cast_ref_opt_str"))
}
default fn cast_ref_opt_f64(&self) -> &[Option<OrderedFloat<f64>>] {
panic!("{}", self.type_error("cast_ref_opt_f64"))
}
}

impl<'a> Data<'a> for &'a [&'a str] {
Expand All @@ -558,6 +564,12 @@ impl<'a> Data<'a> for &'a [&'a str] {
}
}

impl<'a> Data<'a> for &'a [Option<&'a str>] {
fn cast_ref_opt_str(&self) -> &[Option<&'a str>] {
self
}
}

impl<'a> Data<'a> for &'a [Val<'a>] {
fn cast_ref_mixed(&self) -> &[Val<'a>] {
self
Expand Down Expand Up @@ -606,6 +618,12 @@ impl<'a> Data<'a> for &'a [OrderedFloat<f64>] {
}
}

impl<'a> Data<'a> for &'a [Option<OrderedFloat<f64>>] {
fn cast_ref_opt_f64(&self) -> &[Option<OrderedFloat<f64>>] {
self
}
}

impl<'a> Data<'a> for &'a [MergeOp] {
fn cast_ref_merge_op(&self) -> &[MergeOp] {
self
Expand Down
55 changes: 50 additions & 5 deletions src/engine/data_types/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ use crate::mem_store::*;
// WARNING: Changing this enum will break backwards compatibility with existing data
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug, Serialize, Deserialize)]
pub enum EncodingType {
// Straightforward vector or slice of basic types
Str,
I64,
U8,
U16,
U32,
U64,
F64,
Val,
USize,
Bitvec, // this has the same representation as U8, but will have 1/8th the length

OptStr,
OptF64,

// Nullable versions of basic types which include both a vector/slize and a Vec<u8>/&[u8] bitvec mask
NullableStr,
NullableI64,
NullableU8,
Expand All @@ -24,17 +26,24 @@ pub enum EncodingType {
NullableU64,
NullableF64,

USize,
Val,
// Vector of optional basic types. Used for grouping or sorting
OptStr,
OptF64,

// Represents null column as single `usize` value that is the length of the column
Null,

// Single scalar value
ScalarI64,
ScalarStr,
ScalarString,
ConstVal,

ByteSlices(usize),

// Used as grouping key during aggregation/sorting operation when we cannot bit or byte pack the columns that make up the grouping key
ValRows,

Premerge,
MergeOp,
}
Expand Down Expand Up @@ -103,6 +112,7 @@ impl EncodingType {
| EncodingType::U64
| EncodingType::F64
| EncodingType::USize
| EncodingType::Bitvec
| EncodingType::Val
| EncodingType::Null
| EncodingType::ScalarI64
Expand Down Expand Up @@ -133,6 +143,7 @@ impl EncodingType {
| EncodingType::U64
| EncodingType::F64
| EncodingType::USize
| EncodingType::Bitvec
| EncodingType::Val
| EncodingType::Null
| EncodingType::ScalarI64
Expand Down Expand Up @@ -166,6 +177,7 @@ impl EncodingType {
| EncodingType::F64
| EncodingType::USize
| EncodingType::Val
| EncodingType::Bitvec
| EncodingType::ByteSlices(_)
| EncodingType::ValRows
| EncodingType::Premerge
Expand All @@ -178,6 +190,39 @@ impl EncodingType {
}
}

pub fn is_scalar(&self) -> bool {
match *self {
EncodingType::NullableStr
| EncodingType::NullableI64
| EncodingType::NullableU8
| EncodingType::NullableU16
| EncodingType::NullableU32
| EncodingType::NullableU64
| EncodingType::NullableF64
| EncodingType::OptStr
| EncodingType::OptF64
| EncodingType::Str
| EncodingType::I64
| EncodingType::U8
| EncodingType::U16
| EncodingType::U32
| EncodingType::U64
| EncodingType::F64
| EncodingType::USize
| EncodingType::Val
| EncodingType::Bitvec
| EncodingType::ByteSlices(_)
| EncodingType::ValRows
| EncodingType::Premerge
| EncodingType::Null
| EncodingType::MergeOp => false,
| EncodingType::ScalarI64
| EncodingType::ScalarStr
| EncodingType::ScalarString
| EncodingType::ConstVal => true,
}
}

pub fn least_upper_bound(&self, other: EncodingType) -> EncodingType {
if *self == other {
*self
Expand Down
18 changes: 9 additions & 9 deletions src/engine/execution/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ pub struct Scalar<T> { t: PhantomData<T> }
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub struct Nullable<T> { t: PhantomData<T> }

pub fn error_buffer_ref(name: &'static str) -> BufferRef<Any> {
BufferRef {
i: 0xdead_beef,
name,
t: PhantomData,
}
}

impl BufferRef<Any> {
pub fn merge_op(self) -> BufferRef<MergeOp> { self.transmute() }
pub fn premerge(self) -> BufferRef<Premerge> { self.transmute() }
Expand Down Expand Up @@ -267,7 +259,7 @@ impl TypedBufferRef {
}

pub fn u8(&self) -> Result<BufferRef<u8>, QueryError> {
ensure!(self.tag == EncodingType::U8, "{:?} != U8", self.tag);
ensure!(self.tag == EncodingType::U8 || self.tag == EncodingType::Bitvec, "{:?} != U8", self.tag);
Ok(self.buffer.u8())
}

Expand Down Expand Up @@ -356,3 +348,11 @@ impl TypedBufferRef {
Ok(self.buffer.scalar_string())
}
}

pub fn error_buffer_ref(name: &'static str) -> BufferRef<Any> {
BufferRef {
i: 0xdead_beef,
name,
t: PhantomData,
}
}
Loading

0 comments on commit 7f72802

Please sign in to comment.