Skip to content

Commit

Permalink
Fix query engine bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Apr 13, 2024
1 parent b8e5e45 commit 195600f
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 87 deletions.
5 changes: 4 additions & 1 deletion locustdb-derive/src/ast_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub fn ast_builder(input: TokenStream) -> TokenStream {
"Can't provide internal type ({}).",
field_ident
);
let hash_type = parse_quote!(hasher.update(&[#t.to_u8()]););
hashes.push(hash_type);
parse_quote!(let #field_ident = self.buffer_provider.named_buffer(#ident_str, #t);)
} else {
create_buffer(&field_ident, &field_type)
Expand All @@ -68,6 +70,7 @@ pub fn ast_builder(input: TokenStream) -> TokenStream {
if let Some(fn_input) = fn_input {
fn_inputs.push(fn_input);
}
hashes.push(parse_quote!(hasher.update(&[#t.to_u8()]);));
output = parse_quote!(Some(#field_ident.buffer.i));
parse_quote!(let #field_ident = self.buffer_provider.named_buffer(#ident_str, #t);)
} else {
Expand Down Expand Up @@ -304,7 +307,7 @@ fn parse_type(field_ident: &Ident, type_def: String) -> Option<(Expr, Option<FnA
.map(|ident| Ident::new(ident, Span::call_site()))
.collect::<Vec<_>>();
parse_quote! {
if #(#parents.is_nullable())||* { #base_type.nullable() } else { #base_type }
if #(#parents.is_nullable())||* && !#base_type.is_naturally_nullable() { #base_type.nullable() } else { #base_type }
}
}
}
Expand Down
1 change: 1 addition & 0 deletions locustdb-derive/src/reify_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ fn types(t: &Ident) -> Option<Vec<Type>> {
"Number" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64]),
"Float" => Some(vec![Type::F64]),
"NullableInteger" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64]),
"NullableNumber" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64, Type::NullableF64]),
"NullableFloat" => Some(vec![Type::NullableF64]),
"Primitive" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64]),
"PrimitiveOrVal" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64, Type::Val]),
Expand Down
3 changes: 3 additions & 0 deletions src/engine/data_types/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ impl<'a> Data<'a> for Vec<Val<'a>> {
}
None
}
fn to_mixed(&self) -> Vec<Val<'a>> {
self.clone()
}
}

impl<'a> Data<'a> for Vec<usize> {
Expand Down
78 changes: 76 additions & 2 deletions src/engine/data_types/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,25 @@ impl EncodingType {
}
}

/// Returns the nullable version of the encoding type.
/// This is used when propagating nullability of type casts.
/// Types that already represent null values will return themselves.
pub fn nullable(&self) -> EncodingType {
match self {
EncodingType::Str | EncodingType::NullableStr | EncodingType::OptStr => {
EncodingType::Str | EncodingType::NullableStr => {
EncodingType::NullableStr
}
EncodingType::I64 | EncodingType::NullableI64 => EncodingType::NullableI64,
EncodingType::U8 | EncodingType::NullableU8 => EncodingType::NullableU8,
EncodingType::U16 | EncodingType::NullableU16 => EncodingType::NullableU16,
EncodingType::U32 | EncodingType::NullableU32 => EncodingType::NullableU32,
EncodingType::U64 | EncodingType::NullableU64 => EncodingType::NullableU64,
EncodingType::F64 | EncodingType::NullableF64 | EncodingType::OptF64 => {
EncodingType::F64 | EncodingType::NullableF64=> {
EncodingType::NullableF64
}
EncodingType::Val => EncodingType::Val,
EncodingType::OptStr => EncodingType::OptStr,
EncodingType::OptF64 => EncodingType::OptF64,
_ => panic!("{:?} does not have a corresponding nullable type", &self),
}
}
Expand All @@ -93,6 +98,8 @@ impl EncodingType {
}
}

/// Returns whether the encoding type is nullable, i.e., is a basic type with an associated null map.
/// This does not apply to types like OptStr which can naturally represent null values.
pub fn is_nullable(&self) -> bool {
match self {
EncodingType::NullableStr
Expand Down Expand Up @@ -126,6 +133,40 @@ impl EncodingType {
}
}

/// Returns whether the encoding type can represent null values without an associated null map.
pub fn is_naturally_nullable(&self) -> bool {
match self {
EncodingType::Val
| EncodingType::OptStr
| EncodingType::OptF64 => true,
EncodingType::NullableStr
| EncodingType::NullableI64
| EncodingType::NullableU8
| EncodingType::NullableU16
| EncodingType::NullableU32
| EncodingType::NullableU64
| EncodingType::NullableF64
| EncodingType::Str
| EncodingType::I64
| EncodingType::U8
| EncodingType::U16
| EncodingType::U32
| EncodingType::U64
| EncodingType::F64
| EncodingType::USize
| EncodingType::Bitvec
| EncodingType::Null
| EncodingType::ScalarI64
| EncodingType::ScalarStr
| EncodingType::ScalarString
| EncodingType::ConstVal
| EncodingType::ByteSlices(_)
| EncodingType::ValRows
| EncodingType::Premerge
| EncodingType::MergeOp => false,
}
}

pub fn non_nullable(&self) -> EncodingType {
match self {
EncodingType::NullableStr | EncodingType::OptStr => EncodingType::Str,
Expand Down Expand Up @@ -238,6 +279,39 @@ impl EncodingType {
}
}
}

pub fn to_u8(self) -> u8 {
match self {
EncodingType::Str => 0u8,
EncodingType::I64 => 1,
EncodingType::U8 => 2,
EncodingType::U16 => 3,
EncodingType::U32 => 4,
EncodingType::U64 => 5,
EncodingType::F64 => 6,
EncodingType::Val => 7,
EncodingType::USize => 8,
EncodingType::Bitvec => 9,
EncodingType::NullableStr => 10,
EncodingType::NullableI64 => 11,
EncodingType::NullableU8 => 12,
EncodingType::NullableU16 => 13,
EncodingType::NullableU32 => 14,
EncodingType::NullableU64 => 15,
EncodingType::NullableF64 => 16,
EncodingType::OptStr => 17,
EncodingType::OptF64 => 18,
EncodingType::Null => 19,
EncodingType::ScalarI64 => 20,
EncodingType::ScalarStr => 21,
EncodingType::ScalarString => 22,
EncodingType::ConstVal => 23,
EncodingType::ByteSlices(x) => 32 + u8::try_from(x).unwrap(),
EncodingType::ValRows => 25,
EncodingType::Premerge => 26,
EncodingType::MergeOp => 27,
}
}
}

#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down
2 changes: 2 additions & 0 deletions src/engine/data_types/vec_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ pub trait VecData<T>: PartialEq + Ord + Copy + Debug + Sync + Send {
impl VecData<u8> for u8 {
fn unwrap<'a, 'b>(vec: &'b dyn Data<'a>) -> &'b [u8] where u8: 'a { vec.cast_ref_u8() }
fn unwrap_mut<'a, 'b>(vec: &'b mut dyn Data<'a>) -> &'b mut Vec<u8> where u8: 'a { vec.cast_ref_mut_u8() }
fn wrap_one(value: u8) -> RawVal { RawVal::Int(i64::from(value)) }
fn t() -> EncodingType { EncodingType::U8 }
}

impl VecData<u16> for u16 {
fn unwrap<'a, 'b>(vec: &'b dyn Data<'a>) -> &'b [u16] where u16: 'a { vec.cast_ref_u16() }
fn unwrap_mut<'a, 'b>(vec: &'b mut dyn Data<'a>) -> &'b mut Vec<u16> where u16: 'a { vec.cast_ref_mut_u16() }
fn wrap_one(value: u16) -> RawVal { RawVal::Int(i64::from(value)) }
fn t() -> EncodingType { EncodingType::U16 }
}

Expand Down
10 changes: 6 additions & 4 deletions src/engine/execution/batch_merging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub fn combine<'a>(
aggregates.push((aggregated.any(), aggregator));
}

let mut executor = qp.prepare(data, batch_size)?;
let mut executor = qp.prepare(data, batch_size, false)?;
let mut results = executor.prepare_no_columns();
executor.run(1, &mut results, batch1.show || batch2.show)?;

Expand Down Expand Up @@ -227,9 +227,9 @@ pub fn combine<'a>(
let (l, r) = unify_types(&mut qp, l, r);
let mut partitioning = qp.partition(l, r, limit, desc);

for i in 1..(left.len() - 1) {
for i in 1..(batch1.order_by.len() - 1){
let (index1, desc) = batch1.order_by[i];
let (index2, _) = batch1.order_by[i];
let (index2, _) = batch2.order_by[i];
let (l, r) = unify_types(&mut qp, left[index1], right[index2]);
partitioning = qp.subpartition(partitioning, l, r, desc);
}
Expand All @@ -247,6 +247,7 @@ pub fn combine<'a>(
let l = null_to_val(&mut qp, left[ileft]);
let r = null_to_val(&mut qp, right[iright]);
let (l, r) = unify_types(&mut qp, l, r);
assert!(l.tag == r.tag, "Types do not match: {:?} {:?}", l.tag, r.tag);
let merged = qp.merge_keep(merge_ops, l, r);
projection.push(merged.any());
}
Expand All @@ -259,12 +260,13 @@ pub fn combine<'a>(
let l = null_to_val(&mut qp, left[ileft]);
let r = null_to_val(&mut qp, right[iright]);
let (l, r) = unify_types(&mut qp, l, r);
assert!(l.tag == r.tag, "Types do not match: {:?} {:?}", l.tag, r.tag);
let merged = qp.merge_keep(merge_ops, l, r);
order_by.push((merged.any(), desc));
}
order_by.push((merged_final_sort_col.any(), final_desc));

let mut executor = qp.prepare(data, batch_size)?;
let mut executor = qp.prepare(data, batch_size, false)?;
let mut results = executor.prepare_no_columns();
executor.run(1, &mut results, batch1.show || batch2.show)?;
let (columns, projection, _, order_by) =
Expand Down
41 changes: 22 additions & 19 deletions src/engine/execution/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ impl<'a> QueryExecutor<'a> {
// or downstream of this stage.
for input in self.ops[consumer].inputs() {
for &p in &producers[input.i] {
if transitive_output[p] && stage[p] != current_stage
{
if transitive_output[p] && stage[p] != current_stage {
consumers_to_revisit.push(consumer);
continue 'l2;
}
Expand Down Expand Up @@ -605,26 +604,25 @@ impl<'a> QueryExecutor<'a> {
for (i, input_i) in substitutions {
let stage = stage_for_op[i];
let input = self.buffer_provider.all_buffers[input_i];
let buffer = self.buffer_provider.named_buffer("buffer_stream", input.tag);
let buffer = self
.buffer_provider
.named_buffer("buffer_stream", input.tag);
if input.tag.is_scalar() {
continue
continue;
}
let op = if input.tag == EncodingType::Null {
vector_operator::operator::stream_null_vec(
input.any(),
buffer.any(),
)
vector_operator::operator::stream_null_vec(input.any(), buffer.any())
} else if input.tag.is_nullable() {
vector_operator::operator::stream_nullable(
input,
self.buffer_provider.named_buffer("buffer_stream_data", input.tag),
self.buffer_provider
.named_buffer("buffer_stream_data", input.tag),
self.buffer_provider.buffer_u8("buffer_stream_present"),
buffer,
)
.unwrap()
} else {
vector_operator::operator::stream(input, buffer)
.unwrap()
vector_operator::operator::stream(input, buffer).unwrap()
};
self.ops[i].update_input(input_i, buffer.buffer.i);
total_order[stage].ops.insert(0, (self.ops.len(), true));
Expand Down Expand Up @@ -682,27 +680,32 @@ impl<'a> QueryExecutor<'a> {
let (max_length, batch_size) = self.init_stage(column_length, stage, scratchpad);
let stream = self.stages[stage].stream;
if show {
println!("\n-- Stage {} --", stage);
println!(
"batch_size: {}, max_length: {}, column_length: {}, stream: {}",
batch_size, max_length, column_length, stream
);
println!("\n-- Stage {stage} -- batch_size: {batch_size}, max_length: {max_length}, column_length: {column_length}, stream: {stream}");
}
let mut has_more = true;
let mut iters = 0;
while has_more {
has_more = false;
for &(op, streamable) in &self.stages[stage].ops {
if show && iters < 2 {
println!("{} streamable={streamable}", self.ops[op].display(true));
let types = self.ops[op]
.outputs()
.iter()
.map(|b| format!("{:?}", self.buffer_provider.all_buffers[b.i].tag))
.collect::<Vec<_>>()
.join(",");
println!(
"{} streamable={streamable} types={types}",
self.ops[op].display(true)
);
}
self.ops[op].execute(stream && streamable, scratchpad)?;
if show && iters < 2 {
for output in self.ops[op].outputs() {
let data = scratchpad.get_any(output);
println!("{}", data.display());
println!(" {}", data.display());
if let Some(present) = scratchpad.try_get_null_map(output) {
print!("null map: ");
print!(" null map: ");
for i in 0..cmp::min(present.len() * 8, 100) {
if (&*present).is_set(i) {
print!("1")
Expand Down
1 change: 1 addition & 0 deletions src/engine/operators/propagate_nullability.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::engine::*;

/// Applies the null map of a nullable buffer to another (non-nullable) buffer.
#[derive(Debug)]
pub struct PropagateNullability<T> {
pub from: BufferRef<Nullable<Any>>,
Expand Down
43 changes: 4 additions & 39 deletions src/engine/operators/to_val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,7 @@ use crate::bitvec::*;
use crate::engine::*;
use crate::mem_store::Val;

pub struct NullableStrToVal<'a> {
pub input: BufferRef<Nullable<&'a str>>,
pub vals: BufferRef<Val<'a>>,
}

impl<'a> VecOperator<'a> for NullableStrToVal<'a> {
fn execute(&mut self, stream: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> {
let (input, present) = scratchpad.get_nullable(self.input);
let mut vals = scratchpad.get_mut(self.vals);
if stream { vals.clear(); }
for i in 0..input.len() {
if (&*present).is_set(i) {
vals.push(Val::Str(input[i]));
} else {
vals.push(Val::Null);
}
}
Ok(())
}

fn init(&mut self, _: usize, batch_size: usize, scratchpad: &mut Scratchpad<'a>) {
scratchpad.set(self.vals, Vec::with_capacity(batch_size));
}

fn inputs(&self) -> Vec<BufferRef<Any>> { vec![self.input.any()] }
fn inputs_mut(&mut self) -> Vec<&mut usize> { vec![&mut self.input.i] }
fn outputs(&self) -> Vec<BufferRef<Any>> { vec![self.vals.any()] }
fn can_stream_input(&self, _: usize) -> bool { true }
fn can_stream_output(&self, _: usize) -> bool { true }
fn can_block_output(&self) -> bool { true }
fn allocates(&self) -> bool { true }

fn display_op(&self, _: bool) -> String {
format!("NullableStrToVal({})", self.vals)
}
}
use super::type_conversion::Cast;

pub struct ValToNullableStr<'a> {
pub vals: BufferRef<Val<'a>>,
Expand Down Expand Up @@ -84,19 +49,19 @@ impl<'a> VecOperator<'a> for ValToNullableStr<'a> {
}
}

pub struct NullableIntToVal<'a, T> {
pub struct NullableToVal<'a, T> {
pub input: BufferRef<Nullable<T>>,
pub vals: BufferRef<Val<'a>>,
}

impl<'a, T: GenericIntVec<T>> VecOperator<'a> for NullableIntToVal<'a, T> {
impl<'a, T: VecData<T> + Cast<Val<'a>> + 'a> VecOperator<'a> for NullableToVal<'a, T> {
fn execute(&mut self, stream: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> {
let (input, present) = scratchpad.get_nullable(self.input);
let mut vals = scratchpad.get_mut(self.vals);
if stream { vals.clear(); }
for i in 0..input.len() {
if (&*present).is_set(i) {
vals.push(Val::Integer(num::cast(input[i]).unwrap()));
vals.push(input[i].cast());
} else {
vals.push(Val::Null);
}
Expand Down
Loading

0 comments on commit 195600f

Please sign in to comment.