diff --git a/Cargo.lock b/Cargo.lock index c38942ba0e..e1430f278e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1084,9 +1084,9 @@ dependencies = [ [[package]] name = "ceresdbproto" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbfdcd9746d2b027e2880ef80bb6c5735ea45ad590f21b2cd2168eb11ba66f7a" +checksum = "81229e82e9afa8318e7f765cc01cd15f7380786699f4c7beceec7540e0488d7e" dependencies = [ "prost", "protoc-bin-vendored", diff --git a/Cargo.toml b/Cargo.toml index e634ff10f1..d94dd002f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ bytes = "1.1.0" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = "1.0.5" +ceresdbproto = "1.0" chrono = "0.4" clap = "3.0" clru = "0.6.1" diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index d65f75dcd1..47d3b8c2dd 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -526,7 +526,25 @@ impl HybridRecordDecoder { .iter() .map(|f| { if let DataType::List(nested_field) = f.data_type() { - Arc::new(Field::new(f.name(), nested_field.data_type().clone(), true)) + match f.data_type() { + DataType::Dictionary(_, _) => { + assert!(f.dict_id().is_some(), "Dictionary must have dict_id"); + assert!( + f.dict_is_ordered().is_some(), + "Dictionary must have dict_is_ordered" + ); + let dict_id = f.dict_id().unwrap(); + let dict_is_ordered = f.dict_is_ordered().unwrap(); + Arc::new(Field::new_dict( + f.name(), + nested_field.data_type().clone(), + true, + dict_id, + dict_is_ordered, + )) + } + _ => Arc::new(Field::new(f.name(), nested_field.data_type().clone(), true)), + } } else { f.clone() } diff --git a/analytic_engine/src/sst/parquet/hybrid.rs b/analytic_engine/src/sst/parquet/hybrid.rs index df0b3808af..3d155bc01b 100644 --- a/analytic_engine/src/sst/parquet/hybrid.rs +++ b/analytic_engine/src/sst/parquet/hybrid.rs @@ -127,6 +127,7 @@ pub fn build_hybrid_arrow_schema(schema: &Schema) -> ArrowSchemaRef { field.data_type().clone(), true, ))); + // TODO(tanruixiang): is there need to use new_dict? Arc::new(Field::new(field.name(), field_type, true)) } else { field.clone() @@ -418,6 +419,7 @@ impl ListArrayBuilder { let array_len = self.multi_row_arrays.len(); let mut offsets = MutableBuffer::new(array_len * std::mem::size_of::()); let child_data = self.build_child_data(&mut offsets)?; + // TODO(tanruixiang): is there need to use new_dict? let field = Arc::new(Field::new( LIST_ITEM_NAME, self.datum_kind.to_arrow_data_type(), diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index 8bba1b41a2..0e1b5f3658 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -333,7 +333,7 @@ mod tests { use common_types::{ bytes::Bytes, projected_schema::ProjectedSchema, - tests::{build_row, build_schema}, + tests::{build_row, build_row_for_dictionary, build_schema, build_schema_with_dictionary}, time::{TimeRange, Timestamp}, }; use common_util::{ @@ -365,9 +365,10 @@ mod tests { init_log_for_test(); let runtime = Arc::new(runtime::Builder::default().build().unwrap()); - parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3, 3]); - parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 3]); - parquet_write_and_then_read_back(runtime, 5, vec![5, 5, 5]); + parquet_write_and_then_read_back(runtime.clone(), 2, vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2]); + parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3, 3, 3, 2]); + parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 4, 4]); + parquet_write_and_then_read_back(runtime, 5, vec![5, 5, 5, 5]); } fn parquet_write_and_then_read_back( @@ -390,8 +391,8 @@ mod tests { let store_picker: ObjectStorePickerRef = Arc::new(store); let sst_file_path = Path::from("data.par"); - let schema = build_schema(); - let projected_schema = ProjectedSchema::no_projection(schema.clone()); + let schema = build_schema_with_dictionary(); + let reader_projected_schema = ProjectedSchema::no_projection(schema.clone()); let sst_meta = MetaData { min_key: Bytes::from_static(b"100"), max_key: Bytes::from_static(b"200"), @@ -410,9 +411,37 @@ mod tests { // reach here when counter is 9 7 5 3 1 let ts = 100 + counter; let rows = vec![ - build_row(b"a", ts, 10.0, "v4", 1000, 1_000_000), - build_row(b"b", ts, 10.0, "v4", 1000, 1_000_000), - build_row(b"c", ts, 10.0, "v4", 1000, 1_000_000), + build_row_for_dictionary( + b"a", + ts, + 10.0, + "v4", + 1000, + 1_000_000, + Some("tagv1"), + "tagv2", + ), + build_row_for_dictionary( + b"b", + ts, + 10.0, + "v4", + 1000, + 1_000_000, + Some("tagv2"), + "tagv4", + ), + build_row_for_dictionary(b"c", ts, 10.0, "v4", 1000, 1_000_000, None, "tagv2"), + build_row_for_dictionary( + b"d", + ts, + 10.0, + "v4", + 1000, + 1_000_000, + Some("tagv3"), + "tagv2", + ), ]; let batch = build_record_batch_with_key(schema.clone(), rows); Poll::Ready(Some(Ok(batch))) @@ -432,7 +461,7 @@ mod tests { .await .unwrap(); - assert_eq!(15, sst_info.row_num); + assert_eq!(20, sst_info.row_num); let scan_options = ScanOptions::default(); // read sst back to test @@ -440,7 +469,7 @@ mod tests { reverse: false, frequency: ReadFrequency::Frequent, num_rows_per_row_group: 5, - projected_schema, + projected_schema: reader_projected_schema, predicate: Arc::new(Predicate::empty()), meta_cache: None, scan_options, @@ -483,9 +512,46 @@ mod tests { let mut stream = reader.read().await.unwrap(); let mut expect_rows = vec![]; for counter in &[4, 3, 2, 1, 0] { - expect_rows.push(build_row(b"a", 100 + counter, 10.0, "v4", 1000, 1_000_000)); - expect_rows.push(build_row(b"b", 100 + counter, 10.0, "v4", 1000, 1_000_000)); - expect_rows.push(build_row(b"c", 100 + counter, 10.0, "v4", 1000, 1_000_000)); + expect_rows.push(build_row_for_dictionary( + b"a", + 100 + counter, + 10.0, + "v4", + 1000, + 1_000_000, + Some("tagv1"), + "tagv2", + )); + expect_rows.push(build_row_for_dictionary( + b"b", + 100 + counter, + 10.0, + "v4", + 1000, + 1_000_000, + Some("tagv2"), + "tagv4", + )); + expect_rows.push(build_row_for_dictionary( + b"c", + 100 + counter, + 10.0, + "v4", + 1000, + 1_000_000, + None, + "tagv2", + )); + expect_rows.push(build_row_for_dictionary( + b"d", + 100 + counter, + 10.0, + "v4", + 1000, + 1_000_000, + Some("tagv3"), + "tagv2", + )); } check_stream(&mut stream, expect_rows).await; }); diff --git a/common_types/src/column.rs b/common_types/src/column.rs index 4c09a84644..d1cf85fefc 100644 --- a/common_types/src/column.rs +++ b/common_types/src/column.rs @@ -5,16 +5,17 @@ use std::sync::Arc; use arrow::{ array::{ - Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, - Date32Array as DateArray, Date32Builder as DateBuilder, Float32Array as FloatArray, - Float32Builder as FloatBuilder, Float64Array as DoubleArray, + Array, ArrayAccessor, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, + BooleanBuilder, Date32Array as DateArray, Date32Builder as DateBuilder, DictionaryArray, + Float32Array as FloatArray, Float32Builder as FloatBuilder, Float64Array as DoubleArray, Float64Builder as DoubleBuilder, Int16Array, Int16Builder, Int32Array, Int32Builder, Int64Array, Int64Builder, Int8Array, Int8Builder, NullArray, StringArray, StringBuilder, - Time64NanosecondArray as TimeArray, Time64NanosecondBuilder as TimeBuilder, - TimestampMillisecondArray, TimestampMillisecondBuilder, UInt16Array, UInt16Builder, - UInt32Array, UInt32Builder, UInt64Array, UInt64Builder, UInt8Array, UInt8Builder, + StringDictionaryBuilder, Time64NanosecondArray as TimeArray, + Time64NanosecondBuilder as TimeBuilder, TimestampMillisecondArray, + TimestampMillisecondBuilder, UInt16Array, UInt16Builder, UInt32Array, UInt32Builder, + UInt64Array, UInt64Builder, UInt8Array, UInt8Builder, }, - datatypes::{DataType, TimeUnit}, + datatypes::{DataType, Int32Type, TimeUnit}, error::ArrowError, }; use datafusion::physical_plan::{ @@ -142,6 +143,11 @@ pub struct VarbinaryColumn(BinaryArray); #[derive(Debug)] pub struct StringColumn(StringArray); +/// dictionary encode type is difference from other types, need implement +/// without macro +#[derive(Debug)] +pub struct StringDictionaryColumn(DictionaryArray); + #[derive(Debug)] pub struct DateColumn(DateArray); @@ -287,6 +293,55 @@ impl_column!( ); impl_column!(StringColumn, get_string_datum, get_string_datum_view); +impl StringDictionaryColumn { + /// Get datum by index + pub fn datum_opt(&self, index: usize) -> Option { + if index >= self.0.len() { + return None; + } + Some(self.datum(index)) + } + + pub fn datum_view_opt(&self, index: usize) -> Option { + if index >= self.0.len() { + return None; + } + Some(self.datum_view(index)) + } + + pub fn datum_view(&self, index: usize) -> DatumView { + if self.0.is_null(index) { + return DatumView::Null; + } + // TODO(tanruixiang): Is this the efficient way? + DatumView::String(self.0.downcast_dict::().unwrap().value(index)) + } + + pub fn datum(&self, index: usize) -> Datum { + if self.0.is_null(index) { + return Datum::Null; + } + // TODO(tanruixiang): Is this the efficient way? + Datum::String( + self.0 + .downcast_dict::() + .unwrap() + .value(index) + .into(), + ) + } + + #[inline] + pub fn num_rows(&self) -> usize { + self.0.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.num_rows() == 0 + } +} + macro_rules! impl_dedup { ($Column: ident) => { impl $Column { @@ -321,6 +376,22 @@ impl_dedup!(TimestampColumn); impl_dedup!(VarbinaryColumn); impl_dedup!(StringColumn); +impl StringDictionaryColumn { + pub fn dedup(&self, selected: &mut [bool]) { + if self.0.is_empty() { + return; + } + selected[0] = true; + for (i, v) in selected.iter_mut().enumerate().take(self.0.len()).skip(1) { + let current = self.0.key(i); + let prev = self.0.key(i - 1); + if current != prev { + *v = true; + } + } + } +} + macro_rules! impl_new_null { ($Column: ident, $Builder: ident) => { impl $Column { @@ -389,6 +460,34 @@ impl_from_array_and_slice!(TimestampColumn, TimestampMillisecondArray); impl_from_array_and_slice!(VarbinaryColumn, BinaryArray); impl_from_array_and_slice!(StringColumn, StringArray); +impl From> for StringDictionaryColumn { + fn from(array: DictionaryArray) -> Self { + Self(array) + } +} + +impl From<&DictionaryArray> for StringDictionaryColumn { + fn from(array_ref: &DictionaryArray) -> Self { + let array_data = array_ref.into_data(); + let array = DictionaryArray::::from(array_data); + Self(array) + } +} + +impl StringDictionaryColumn { + fn to_arrow_array(&self) -> DictionaryArray { + let array_data = self.0.clone().into_data(); + DictionaryArray::::from(array_data) + } + + fn slice(&self, offset: usize, length: usize) -> Self { + let array_slice = self.0.slice(offset, length); + let array_data = array_slice.into_data(); + let array = DictionaryArray::::from(array_data); + Self(array) + } +} + macro_rules! impl_iter { ($Column: ident, $Value: ident) => { impl $Column { @@ -438,6 +537,19 @@ impl StringColumn { } } +impl StringDictionaryColumn { + /// Create a column that all values are null. + fn new_null(num_rows: usize) -> Self { + let mut builder = StringDictionaryBuilder::::new(); + for _ in 0..num_rows { + builder.append_null(); + } + let array = builder.finish(); + + Self(array) + } +} + macro_rules! impl_numeric_column { ($(($Kind: ident, $type: ty)), *) => { $( @@ -543,18 +655,21 @@ macro_rules! impl_column_block { impl ColumnBlock { pub fn datum_kind(&self) -> DatumKind { match self { + ColumnBlock::StringDictionary(_) => DatumKind::String, $(ColumnBlock::$Kind(_) => DatumKind::$Kind,)* } } pub fn datum_opt(&self, index: usize) -> Option { match self { + ColumnBlock::StringDictionary(col) => col.datum_opt(index), $(ColumnBlock::$Kind(col) => col.datum_opt(index),)* } } pub fn datum_view_opt(&self, index: usize) -> Option { match self { + ColumnBlock::StringDictionary(col) => col.datum_view_opt(index), $(ColumnBlock::$Kind(col) => col.datum_view_opt(index),)* } } @@ -562,6 +677,7 @@ macro_rules! impl_column_block { /// Panic if index is out fo bound. pub fn datum_view(&self, index: usize) -> DatumView { match self { + ColumnBlock::StringDictionary(col) => col.datum_view(index), $(ColumnBlock::$Kind(col) => col.datum_view(index),)* } } @@ -569,18 +685,21 @@ macro_rules! impl_column_block { /// Panic if index is out fo bound. pub fn datum(&self, index: usize) -> Datum { match self { + ColumnBlock::StringDictionary(col) => col.datum(index), $(ColumnBlock::$Kind(col) => col.datum(index),)* } } pub fn num_rows(&self) -> usize { match self { + ColumnBlock::StringDictionary(col) => col.num_rows(), $(ColumnBlock::$Kind(col) => col.num_rows(),)* } } pub fn to_arrow_array_ref(&self) -> ArrayRef { match self { + ColumnBlock::StringDictionary(col) => Arc::new(col.to_arrow_array()), $(ColumnBlock::$Kind(col) => Arc::new(col.to_arrow_array()),)* } } @@ -590,6 +709,7 @@ macro_rules! impl_column_block { /// The first datum is not marked to true. pub fn dedup(&self, selected: &mut [bool]) { match self { + ColumnBlock::StringDictionary(col) => col.dedup(selected), $(ColumnBlock::$Kind(col) => col.dedup(selected),)* } } @@ -600,6 +720,7 @@ macro_rules! impl_column_block { #[must_use] pub fn slice(&self, offset: usize, length: usize) -> Self { match self { + ColumnBlock::StringDictionary(col) => ColumnBlock::StringDictionary(col.slice(offset, length)), $(ColumnBlock::$Kind(col) => ColumnBlock::$Kind(col.slice(offset, length)),)* } } @@ -612,6 +733,12 @@ macro_rules! impl_column_block { } } })* + + impl From for ColumnBlock { + fn from(column: StringDictionaryColumn) -> Self { + Self::StringDictionary(column) + } + } }; } @@ -628,6 +755,8 @@ macro_rules! define_column_block { #[derive(Debug)] pub enum ColumnBlock { Null(NullColumn), + StringDictionary(StringDictionaryColumn), + String(StringColumn), $( $Kind([<$Kind Column>]), )* @@ -635,8 +764,23 @@ macro_rules! define_column_block { impl ColumnBlock { pub fn try_from_arrow_array_ref(datum_kind: &DatumKind, array: &ArrayRef) -> Result { + let is_dictionary : bool = if let DataType::Dictionary(..) = array.data_type() { + true + } else { + false + }; let column = match datum_kind { DatumKind::Null => ColumnBlock::Null(NullColumn::new_null(array.len())), + DatumKind::String => { + if is_dictionary { + let cast_column = cast_array(datum_kind, array)?; + ColumnBlock::StringDictionary(StringDictionaryColumn::from(cast_column)) + + } else { + let cast_column = cast_array(datum_kind, array)?; + ColumnBlock::String(StringColumn::from(cast_column)) + } + }, $( DatumKind::$Kind => { let mills_array; @@ -657,9 +801,16 @@ macro_rules! define_column_block { Ok(column) } - pub fn new_null_with_type(kind: &DatumKind, rows: usize) -> Result { + pub fn new_null_with_type(kind: &DatumKind, rows: usize, is_dictionary: bool) -> Result { let block = match kind { DatumKind::Null => ColumnBlock::Null(NullColumn::new_null(rows)), + DatumKind::String => { + if is_dictionary { + ColumnBlock::StringDictionary(StringDictionaryColumn::new_null(rows)) + }else { + ColumnBlock::String(StringColumn::new_null(rows)) + } + }, $( DatumKind::$Kind => ColumnBlock::$Kind([<$Kind Column>]::new_null(rows)), )* @@ -674,8 +825,8 @@ macro_rules! define_column_block { // Define column blocks, Null is defined explicitly in macro. define_column_block!( - Timestamp, Double, Float, Varbinary, String, UInt64, UInt32, UInt16, UInt8, Int64, Int32, - Int16, Int8, Boolean, Date, Time + Timestamp, Double, Float, Varbinary, UInt64, UInt32, UInt16, UInt8, Int64, Int32, Int16, Int8, + Boolean, Date, Time ); impl ColumnBlock { @@ -796,7 +947,6 @@ macro_rules! append_block { macro_rules! define_column_block_builder { ($(($Kind: ident, $Builder: ident)), *) => { paste! { - #[derive(Debug)] pub enum ColumnBlockBuilder { Null { rows: usize }, Timestamp(TimestampMillisecondBuilder), @@ -804,6 +954,7 @@ macro_rules! define_column_block_builder { String(StringBuilder), Date(DateBuilder), Time(TimeBuilder), + Dictionary(StringDictionaryBuilder::), $( $Kind($Builder), )* @@ -811,13 +962,19 @@ macro_rules! define_column_block_builder { impl ColumnBlockBuilder { /// Create by data type with initial capacity - pub fn with_capacity(data_type: &DatumKind, item_capacity: usize) -> Self { + pub fn with_capacity(data_type: &DatumKind, item_capacity: usize, is_dictionary : bool) -> Self { match data_type { DatumKind::Null => Self::Null { rows: 0 }, DatumKind::Timestamp => Self::Timestamp(TimestampMillisecondBuilder::with_capacity(item_capacity)), // The data_capacity is set as 1024, because the item is variable-size type. DatumKind::Varbinary => Self::Varbinary(BinaryBuilder::with_capacity(item_capacity, 1024)), - DatumKind::String => Self::String(StringBuilder::with_capacity(item_capacity, 1024)), + DatumKind::String =>{ + if is_dictionary { + Self::Dictionary(StringDictionaryBuilder::::new()) + }else { + Self::String(StringBuilder::with_capacity(item_capacity, 1024)) + } + } DatumKind::Date => Self::Date(DateBuilder::with_capacity(item_capacity)), DatumKind::Time => Self::Time(TimeBuilder::with_capacity(item_capacity)), $( @@ -847,6 +1004,17 @@ macro_rules! define_column_block_builder { Self::String(builder) => append_datum!(String, builder, Datum, datum), Self::Date(builder) => append_datum!(Date, builder, Datum, datum), Self::Time(builder) => append_datum!(Time, builder, Datum, datum), + Self::Dictionary(builder) => { + match datum { + Datum::Null => Ok(builder.append_null()), + Datum::String(v) => Ok(builder.append_value(v)), + _ => ConflictType { + expect: DatumKind::String, + given: datum.kind(), + } + .fail() + } + }, $( Self::$Kind(builder) => append_datum!($Kind, builder, Datum, datum), )* @@ -874,6 +1042,17 @@ macro_rules! define_column_block_builder { Self::String(builder) => append_datum!(String, builder, DatumView, datum), Self::Date(builder) => append_datum!(Date, builder, DatumView, datum), Self::Time(builder) => append_datum!(Time, builder, DatumView, datum), + Self::Dictionary(builder) => { + match datum { + DatumView::Null => Ok(builder.append_null()), + DatumView::String(v) => Ok(builder.append_value(v)), + _ => ConflictType { + expect: DatumKind::String, + given: datum.kind(), + } + .fail() + } + }, $( Self::$Kind(builder) => append_datum!($Kind, builder, DatumView, datum), )* @@ -898,6 +1077,34 @@ macro_rules! define_column_block_builder { Self::String(builder) => append_block!(String, builder, ColumnBlock, block, start, len), Self::Date(builder) => append_block!(Date, builder, ColumnBlock, block, start, len), Self::Time(builder) => append_block!(Time, builder, ColumnBlock, block, start, len), + Self::Dictionary(builder) => { + match block { + ColumnBlock::Null(v) => { + let end = std::cmp::min(start + len, v.num_rows()); + for _ in start..end { + builder.append_null(); + } + Ok(()) + } + ColumnBlock::StringDictionary(v) => { + let end = std::cmp::min(start + len, v.num_rows()); + for i in start..end { + if v.0.is_null(i) { + builder.append_null(); + } else { + let value = v.datum(i); + builder.append_value(value.as_str().unwrap()); + } + } + Ok(()) + } + _ => ConflictType { + expect: DatumKind::String, + given: block.datum_kind(), + } + .fail(), + } + }, $( Self::$Kind(builder) => append_block!($Kind, builder, ColumnBlock, block, start, len), )* @@ -912,6 +1119,7 @@ macro_rules! define_column_block_builder { Self::String(builder) => builder.len(), Self::Date(builder) => builder.len(), Self::Time(builder) => builder.len(), + Self::Dictionary(builder) => builder.len(), $( Self::$Kind(builder) => builder.len(), )* @@ -931,6 +1139,9 @@ macro_rules! define_column_block_builder { Self::String(builder) => StringColumn::from(builder.finish()).into(), Self::Date(builder) => DateColumn::from(builder.finish()).into(), Self::Time(builder) => TimeColumn::from(builder.finish()).into(), + Self::Dictionary(builder) => { + StringDictionaryColumn::from(builder.finish()).into() + }, $( Self::$Kind(builder) => [<$Kind Column>]::from(builder.finish()).into(), )* @@ -959,8 +1170,8 @@ define_column_block_builder!( impl ColumnBlockBuilder { /// Create by data type - pub fn new(data_type: &DatumKind) -> Self { - Self::with_capacity(data_type, 0) + pub fn new(data_type: &DatumKind, is_dictionry: bool) -> Self { + Self::with_capacity(data_type, 0, is_dictionry) } pub fn is_empty(&self) -> bool { @@ -976,7 +1187,9 @@ impl ColumnBlockBuilder { #[cfg(test)] mod tests { use super::*; - use crate::tests::{build_rows, build_schema}; + use crate::tests::{ + build_row_for_dictionary, build_rows, build_schema, build_schema_with_dictionary, + }; #[test] fn test_column_block_builder() { @@ -984,7 +1197,7 @@ mod tests { let rows = build_rows(); // DatumKind::Varbinary let column = schema.column(0); - let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2); + let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2, false); // append builder.append(rows[0][0].clone()).unwrap(); @@ -998,7 +1211,7 @@ mod tests { let column_block = builder.build(); assert_eq!(column_block.num_rows(), 2); - let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2); + let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2, false); // append_block_range builder.append_block_range(&column_block, 0, 1).unwrap(); @@ -1015,4 +1228,109 @@ mod tests { Datum::Varbinary(Bytes::copy_from_slice(b"binary key1")) ); } + + #[test] + fn test_column_block_string_dictionary_builder() { + let schema = build_schema_with_dictionary(); + let rows = vec![ + build_row_for_dictionary( + b"a", + 1, + 10.0, + "v4", + 1000, + 1_000_000, + Some("tag1_1"), + "tag2_1", + ), + build_row_for_dictionary( + b"b", + 2, + 10.0, + "v4", + 1000, + 1_000_000, + Some("tag1_2"), + "tag2_2", + ), + build_row_for_dictionary( + b"c", + 3, + 10.0, + "v4", + 1000, + 1_000_000, + Some("tag1_3"), + "tag2_3", + ), + build_row_for_dictionary( + b"d", + 4, + 10.0, + "v4", + 1000, + 1_000_000, + Some("tag1_1"), + "tag2_4", + ), + build_row_for_dictionary( + b"e", + 5, + 10.0, + "v4", + 1000, + 1_000_000, + Some("tag1_3"), + "tag2_4", + ), + build_row_for_dictionary(b"f", 6, 10.0, "v4", 1000, 1_000_000, None, "tag2_4"), + ]; + // DatumKind::String , is_dictionary = true + let column = schema.column(6); + let mut builder = + ColumnBlockBuilder::with_capacity(&column.data_type, 0, column.is_dictionary); + // append + (0..rows.len()).for_each(|i| builder.append(rows[i][6].clone()).unwrap()); + + let ret = builder.append(rows[0][0].clone()); + assert!(ret.is_err()); + + // append_view + builder.append_view(rows[5][6].as_view()).unwrap(); + let ret = builder.append_view(rows[1][0].as_view()); + + assert!(ret.is_err()); + + let column_block = builder.build(); + assert_eq!(column_block.num_rows(), 7); + let mut builder = + ColumnBlockBuilder::with_capacity(&column.data_type, 2, column.is_dictionary); + + // append_block_range + (0..rows.len()).for_each(|i| builder.append_block_range(&column_block, i, 1).unwrap()); + + let column_block = builder.build(); + assert_eq!(column_block.num_rows(), 6); + assert_eq!( + column_block.datum(0), + Datum::String(StringBytes::from("tag1_1")) + ); + assert_eq!( + column_block.datum(1), + Datum::String(StringBytes::from("tag1_2")) + ); + assert_eq!( + column_block.datum(2), + Datum::String(StringBytes::from("tag1_3")) + ); + assert_eq!( + column_block.datum(3), + Datum::String(StringBytes::from("tag1_1")) + ); + assert_eq!( + column_block.datum(4), + Datum::String(StringBytes::from("tag1_3")) + ); + assert_eq!(column_block.datum(5), Datum::Null); + } } diff --git a/common_types/src/column_schema.rs b/common_types/src/column_schema.rs index 2f1a48cbbd..0678336f01 100644 --- a/common_types/src/column_schema.rs +++ b/common_types/src/column_schema.rs @@ -29,6 +29,12 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display("Invalid dictionary type:{}.\nBacktrace:\n{}", data_type, backtrace))] + InvalidDictionaryType { + data_type: DataType, + backtrace: Backtrace, + }, + #[snafu(display( "Arrow field meta data is missing, field name:{}.\nBacktrace:\n{}", field_name, @@ -119,6 +125,7 @@ pub enum ReadOp { struct ArrowFieldMeta { id: u32, is_tag: bool, + is_dictionary: bool, comment: String, } @@ -126,6 +133,7 @@ struct ArrowFieldMeta { pub enum ArrowFieldMetaKey { Id, IsTag, + IsDictionary, Comment, } @@ -134,6 +142,7 @@ impl ArrowFieldMetaKey { match self { ArrowFieldMetaKey::Id => "field::id", ArrowFieldMetaKey::IsTag => "field::is_tag", + ArrowFieldMetaKey::IsDictionary => "field::is_dictionary", ArrowFieldMetaKey::Comment => "field::comment", } } @@ -159,6 +168,8 @@ pub struct ColumnSchema { /// Is tag, tag is just a hint for a column, there is no restriction that a /// tag column must be a part of primary key pub is_tag: bool, + // Whether to use dictionary types for encoding column + pub is_dictionary: bool, /// Comment of the column pub comment: String, /// Column name in response @@ -191,6 +202,11 @@ impl ColumnSchema { } } + /// Check whether a type is valid dictionary type. + pub fn is_valid_dictionary_type(typ: DatumKind) -> bool { + matches!(typ, DatumKind::String) + } + /// Convert `self` to [`arrow::datatypes::Field`] pub fn to_arrow_field(&self) -> Field { From::from(self) @@ -273,6 +289,7 @@ impl TryFrom for ColumnSchema { data_type: DatumKind::from(data_type), is_nullable: column_schema.is_nullable, is_tag: column_schema.is_tag, + is_dictionary: column_schema.is_dictionary, comment: column_schema.comment, escaped_name, default_value, @@ -287,6 +304,7 @@ impl TryFrom<&Arc> for ColumnSchema { let ArrowFieldMeta { id, is_tag, + is_dictionary, comment, } = decode_arrow_field_meta_data(field.metadata())?; Ok(Self { @@ -299,6 +317,7 @@ impl TryFrom<&Arc> for ColumnSchema { )?, is_nullable: field.is_nullable(), is_tag, + is_dictionary, comment, escaped_name: field.name().escape_debug().to_string(), default_value: None, @@ -309,11 +328,24 @@ impl TryFrom<&Arc> for ColumnSchema { impl From<&ColumnSchema> for Field { fn from(col_schema: &ColumnSchema) -> Self { let metadata = encode_arrow_field_meta_data(col_schema); - let mut field = Field::new( - &col_schema.name, - col_schema.data_type.into(), - col_schema.is_nullable, - ); + // If the column sholud use dictionary, create correspond dictionary type. + let mut field = if col_schema.is_dictionary { + Field::new_dict( + &col_schema.name, + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + col_schema.is_nullable, + col_schema.id.into(), + // TODO(tanruixiang): how to use dict_is_ordered + false, + ) + } else { + Field::new( + &col_schema.name, + col_schema.data_type.into(), + col_schema.is_nullable, + ) + }; + field.set_metadata(metadata); field @@ -343,6 +375,7 @@ fn decode_arrow_field_meta_data(meta: &HashMap) -> Result HashMap, } @@ -385,6 +423,7 @@ impl Builder { data_type, is_nullable: false, is_tag: false, + is_dictionary: false, comment: String::new(), default_value: None, } @@ -407,6 +446,12 @@ impl Builder { self } + /// Set this column is dictionary, default is false (not a dictionary). + pub fn is_dictionary(mut self, is_dictionary: bool) -> Self { + self.is_dictionary = is_dictionary; + self + } + pub fn comment(mut self, comment: String) -> Self { self.comment = comment; self @@ -427,6 +472,15 @@ impl Builder { ); } + if self.is_dictionary { + ensure!( + ColumnSchema::is_valid_dictionary_type(self.data_type), + InvalidDictionaryType { + data_type: self.data_type + } + ); + } + Ok(()) } @@ -439,6 +493,7 @@ impl Builder { data_type: self.data_type, is_nullable: self.is_nullable, is_tag: self.is_tag, + is_dictionary: self.is_dictionary, comment: self.comment, escaped_name, default_value: self.default_value, @@ -460,6 +515,7 @@ impl From for schema_pb::ColumnSchema { is_nullable: src.is_nullable, id: src.id, is_tag: src.is_tag, + is_dictionary: src.is_dictionary, comment: src.comment, default_value, } @@ -475,10 +531,11 @@ mod tests { /// Create a column schema for test, each field is filled with non-default /// value fn new_test_column_schema() -> ColumnSchema { - Builder::new("test_column_schema".to_string(), DatumKind::Boolean) + Builder::new("test_column_schema".to_string(), DatumKind::String) .id(18) .is_nullable(true) .is_tag(true) + .is_dictionary(true) .comment("Comment of this column".to_string()) .default_value(Some(Expr::Value(Value::Boolean(true)))) .build() @@ -491,9 +548,10 @@ mod tests { let rhs = ColumnSchema { id: 18, name: "test_column_schema".to_string(), - data_type: DatumKind::Boolean, + data_type: DatumKind::String, is_nullable: true, is_tag: true, + is_dictionary: true, comment: "Comment of this column".to_string(), escaped_name: "test_column_schema".escape_debug().to_string(), default_value: Some(Expr::Value(Value::Boolean(true))), @@ -508,6 +566,8 @@ mod tests { let pb_schema = schema_pb::ColumnSchema::from(column_schema.clone()); // Check pb specific fields assert!(pb_schema.is_tag); + assert!(pb_schema.is_dictionary); + assert!(pb_schema.is_nullable); let schema_from_pb = ColumnSchema::try_from(pb_schema).unwrap(); assert_eq!(&schema_from_pb, &column_schema); @@ -524,4 +584,16 @@ mod tests { ); } } + + #[test] + fn test_valid_dictionary_type() { + let valid_dictionary_types = vec![DatumKind::String]; + + for v in &DatumKind::VALUES { + assert_eq!( + ColumnSchema::is_valid_dictionary_type(*v), + valid_dictionary_types.contains(v) + ); + } + } } diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index 12e1c6e371..d0618a1880 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -171,6 +171,11 @@ impl DatumKind { ) } + /// Can column of this datum kind used as dictionary encode column + pub fn is_dictionary_kind(&self) -> bool { + matches!(self, DatumKind::String) + } + pub fn unsign_kind(&self) -> Option { match self { Self::Int64 | Self::UInt64 => Some(Self::UInt64), @@ -1138,6 +1143,7 @@ pub mod arrow_convert { DataType::Boolean => Some(Self::Boolean), DataType::Date32 => Some(Self::Date), DataType::Time64(TimeUnit::Nanosecond) => Some(Self::Time), + DataType::Dictionary(_, _) => Some(Self::String), DataType::Float16 | DataType::LargeUtf8 | DataType::LargeBinary @@ -1153,7 +1159,6 @@ pub mod arrow_convert { | DataType::Date64 | DataType::Interval(_) | DataType::Duration(_) - | DataType::Dictionary(_, _) | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) | DataType::RunEndEncoded(_, _) @@ -1235,6 +1240,7 @@ pub mod arrow_convert { } ScalarValue::Date32(v) => v.map(Datum::Date), ScalarValue::Time64Nanosecond(v) => v.map(Datum::Time), + ScalarValue::Dictionary(_, literal) => Datum::from_scalar_value(literal), ScalarValue::List(_, _) | ScalarValue::Date64(_) | ScalarValue::Time32Second(_) @@ -1248,8 +1254,7 @@ pub mod arrow_convert { | ScalarValue::Struct(_, _) | ScalarValue::Decimal128(_, _, _) | ScalarValue::Null - | ScalarValue::IntervalMonthDayNano(_) - | ScalarValue::Dictionary(_, _) => None, + | ScalarValue::IntervalMonthDayNano(_) => None, } } } @@ -1281,6 +1286,7 @@ pub mod arrow_convert { ScalarValue::TimestampMillisecond(v, _) => { v.map(|v| DatumView::Timestamp(Timestamp::new(v))) } + ScalarValue::Dictionary(_, literal) => DatumView::from_scalar_value(literal), ScalarValue::List(_, _) | ScalarValue::Date64(_) | ScalarValue::Time32Second(_) @@ -1294,8 +1300,7 @@ pub mod arrow_convert { | ScalarValue::Struct(_, _) | ScalarValue::Decimal128(_, _, _) | ScalarValue::Null - | ScalarValue::IntervalMonthDayNano(_) - | ScalarValue::Dictionary(_, _) => None, + | ScalarValue::IntervalMonthDayNano(_) => None, } } } diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs index a7a73c9381..fbfacd902b 100644 --- a/common_types/src/record_batch.rs +++ b/common_types/src/record_batch.rs @@ -318,7 +318,23 @@ fn cast_arrow_record_batch(source: ArrowRecordBatch) -> Result DataType::Timestamp(TimeUnit::Millisecond, None), field.is_nullable(), ), - _ => Field::new(field.name(), field.data_type().clone(), field.is_nullable()), + _ => { + let (dict_id, dict_is_ordered) = { + match field.data_type() { + DataType::Dictionary(_, _) => { + (field.dict_id().unwrap(), field.dict_is_ordered().unwrap()) + } + _ => (0, false), + } + }; + Field::new_dict( + field.name(), + field.data_type().clone(), + field.is_nullable(), + dict_id, + dict_is_ordered, + ) + } }; f.set_metadata(field.metadata().clone()); f @@ -477,7 +493,13 @@ impl RecordBatchWithKeyBuilder { let builders = schema_with_key .columns() .iter() - .map(|column_schema| ColumnBlockBuilder::with_capacity(&column_schema.data_type, 0)) + .map(|column_schema| { + ColumnBlockBuilder::with_capacity( + &column_schema.data_type, + 0, + column_schema.is_dictionary, + ) + }) .collect(); Self { schema_with_key, @@ -490,7 +512,11 @@ impl RecordBatchWithKeyBuilder { .columns() .iter() .map(|column_schema| { - ColumnBlockBuilder::with_capacity(&column_schema.data_type, capacity) + ColumnBlockBuilder::with_capacity( + &column_schema.data_type, + capacity, + column_schema.is_dictionary, + ) }) .collect(); Self { @@ -660,9 +686,12 @@ impl ArrowRecordBatchProjector { } None => { // Need to push row with specific type. - let null_block = - ColumnBlock::new_null_with_type(&column_schema.data_type, num_rows) - .context(CreateColumnBlock)?; + let null_block = ColumnBlock::new_null_with_type( + &column_schema.data_type, + num_rows, + column_schema.is_dictionary, + ) + .context(CreateColumnBlock)?; column_blocks.push(null_block); } } diff --git a/common_types/src/tests.rs b/common_types/src/tests.rs index 0703d39d1e..9cfb86a0fb 100644 --- a/common_types/src/tests.rs +++ b/common_types/src/tests.rs @@ -129,7 +129,8 @@ fn default_value_schema_builder() -> schema::Builder { } /// Build a schema for testing: -/// (key1(varbinary), key2(timestamp), field1(double), field2(string)) +/// (key1(varbinary), key2(timestamp), field1(double), field2(string), +/// field3(date), field4(time)) pub fn build_schema() -> Schema { base_schema_builder().build().unwrap() } @@ -145,6 +146,32 @@ pub fn build_default_value_schema() -> Schema { default_value_schema_builder().build().unwrap() } +/// Build a schema for testing: +/// (key1(varbinary), key2(timestamp), field1(double), field2(string), +/// field3(date), field4(time)) tag1(string dictionary), tag2(string dictionary) +pub fn build_schema_with_dictionary() -> Schema { + let builder = base_schema_builder() + .add_normal_column( + column_schema::Builder::new("tag1".to_string(), DatumKind::String) + .is_tag(true) + .is_dictionary(true) + .is_nullable(true) + .build() + .unwrap(), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("tag2".to_string(), DatumKind::String) + .is_tag(true) + .is_dictionary(true) + .build() + .unwrap(), + ) + .unwrap(); + + builder.build().unwrap() +} + /// Build a schema for testing: /// (tsid(uint64), key2(timestamp), tag1(string), tag2(string), value(int8), /// field2(float)) @@ -193,6 +220,31 @@ pub fn build_schema_for_cpu() -> Schema { builder.build().unwrap() } +#[allow(clippy::too_many_arguments)] +pub fn build_row_for_dictionary( + key1: &[u8], + key2: i64, + field1: f64, + field2: &str, + field3: i32, + field4: i64, + tag1: Option<&str>, + tag2: &str, +) -> Row { + let datums = vec![ + Datum::Varbinary(Bytes::copy_from_slice(key1)), + Datum::Timestamp(Timestamp::new(key2)), + Datum::Double(field1), + Datum::String(StringBytes::from(field2)), + Datum::Date(field3), + Datum::Time(field4), + tag1.map(|v| Datum::String(StringBytes::from(v))) + .unwrap_or(Datum::Null), + Datum::String(StringBytes::from(tag2)), + ]; + + Row::from_datums(datums) +} pub fn build_projected_schema() -> ProjectedSchema { let schema = build_schema(); assert!(schema.num_columns() > 1); diff --git a/df_operator/src/udfs/time_bucket.rs b/df_operator/src/udfs/time_bucket.rs index 1ea693d954..29d2932aff 100644 --- a/df_operator/src/udfs/time_bucket.rs +++ b/df_operator/src/udfs/time_bucket.rs @@ -141,8 +141,9 @@ impl<'a> TimeBucket<'a> { } fn call(&self) -> Result { + // TODO(tanruixiang) : mising is_dictionary params let mut out_column_builder = - ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, self.column.num_rows()); + ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, self.column.num_rows(), false); for ts_opt in self.column.iter() { match ts_opt { Some(ts) => { diff --git a/integration_tests/cases/env/local/ddl/create_tables.result b/integration_tests/cases/env/local/ddl/create_tables.result index ede67e9c1a..87779ac52b 100644 --- a/integration_tests/cases/env/local/ddl/create_tables.result +++ b/integration_tests/cases/env/local/ddl/create_tables.result @@ -48,7 +48,7 @@ affected_rows: 0 CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:Some(\"failed to create table on shard, request:CreateTableRequest { catalog_name: \\\"ceresdb\\\", schema_name: \\\"public\\\", table_name: \\\"05_create_tables_t\\\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \\\"tsid\\\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \\\"\\\", escaped_name: \\\"tsid\\\", default_value: None }, ColumnSchema { id: 2, name: \\\"t\\\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \\\"\\\", escaped_name: \\\"t\\\", default_value: None }, ColumnSchema { id: 3, name: \\\"c1\\\", data_type: Int32, is_nullable: true, is_tag: false, comment: \\\"\\\", escaped_name: \\\"c1\\\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \\\"Analytic\\\", options: {}, state: Stable, shard_id: 0, partition_info: None }\"), err:Failed to create table, table already exists, table:05_create_tables_t." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:Some(\"failed to create table on shard, request:CreateTableRequest { catalog_name: \\\"ceresdb\\\", schema_name: \\\"public\\\", table_name: \\\"05_create_tables_t\\\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \\\"tsid\\\", data_type: UInt64, is_nullable: false, is_tag: false, is_dictionary: false, comment: \\\"\\\", escaped_name: \\\"tsid\\\", default_value: None }, ColumnSchema { id: 2, name: \\\"t\\\", data_type: Timestamp, is_nullable: false, is_tag: false, is_dictionary: false, comment: \\\"\\\", escaped_name: \\\"t\\\", default_value: None }, ColumnSchema { id: 3, name: \\\"c1\\\", data_type: Int32, is_nullable: true, is_tag: false, is_dictionary: false, comment: \\\"\\\", escaped_name: \\\"c1\\\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \\\"Analytic\\\", options: {}, state: Stable, shard_id: 0, partition_info: None }\"), err:Failed to create table, table already exists, table:05_create_tables_t." }) create table `05_create_tables_t2`(a int, b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic with (enable_ttl='false'); @@ -67,11 +67,11 @@ Int32(4), create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:Some(\"failed to create table on shard, request:CreateTableRequest { catalog_name: \\\"ceresdb\\\", schema_name: \\\"public\\\", table_name: \\\"05_create_tables_t2\\\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \\\"tsid\\\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \\\"\\\", escaped_name: \\\"tsid\\\", default_value: None }, ColumnSchema { id: 2, name: \\\"t\\\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \\\"\\\", escaped_name: \\\"t\\\", default_value: None }, ColumnSchema { id: 3, name: \\\"a\\\", data_type: Int32, is_nullable: true, is_tag: false, comment: \\\"\\\", escaped_name: \\\"a\\\", default_value: None }, ColumnSchema { id: 4, name: \\\"b\\\", data_type: Int32, is_nullable: true, is_tag: false, comment: \\\"\\\", escaped_name: \\\"b\\\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \\\"Analytic\\\", options: {}, state: Stable, shard_id: 0, partition_info: None }\"), err:Failed to create table, table already exists, table:05_create_tables_t2." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:Some(\"failed to create table on shard, request:CreateTableRequest { catalog_name: \\\"ceresdb\\\", schema_name: \\\"public\\\", table_name: \\\"05_create_tables_t2\\\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \\\"tsid\\\", data_type: UInt64, is_nullable: false, is_tag: false, is_dictionary: false, comment: \\\"\\\", escaped_name: \\\"tsid\\\", default_value: None }, ColumnSchema { id: 2, name: \\\"t\\\", data_type: Timestamp, is_nullable: false, is_tag: false, is_dictionary: false, comment: \\\"\\\", escaped_name: \\\"t\\\", default_value: None }, ColumnSchema { id: 3, name: \\\"a\\\", data_type: Int32, is_nullable: true, is_tag: false, is_dictionary: false, comment: \\\"\\\", escaped_name: \\\"a\\\", default_value: None }, ColumnSchema { id: 4, name: \\\"b\\\", data_type: Int32, is_nullable: true, is_tag: false, is_dictionary: false, comment: \\\"\\\", escaped_name: \\\"b\\\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \\\"Analytic\\\", options: {}, state: Stable, shard_id: 0, partition_info: None }\"), err:Failed to create table, table already exists, table:05_create_tables_t2." }) create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:Some(\"failed to create table on shard, request:CreateTableRequest { catalog_name: \\\"ceresdb\\\", schema_name: \\\"public\\\", table_name: \\\"05_create_tables_t2\\\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \\\"tsid\\\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \\\"\\\", escaped_name: \\\"tsid\\\", default_value: None }, ColumnSchema { id: 2, name: \\\"t\\\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \\\"\\\", escaped_name: \\\"t\\\", default_value: None }, ColumnSchema { id: 3, name: \\\"a\\\", data_type: Int32, is_nullable: true, is_tag: false, comment: \\\"\\\", escaped_name: \\\"a\\\", default_value: None }, ColumnSchema { id: 4, name: \\\"b\\\", data_type: Int32, is_nullable: true, is_tag: false, comment: \\\"\\\", escaped_name: \\\"b\\\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \\\"Analytic\\\", options: {}, state: Stable, shard_id: 0, partition_info: None }\"), err:Failed to create table, table already exists, table:05_create_tables_t2." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:Some(\"failed to create table on shard, request:CreateTableRequest { catalog_name: \\\"ceresdb\\\", schema_name: \\\"public\\\", table_name: \\\"05_create_tables_t2\\\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \\\"tsid\\\", data_type: UInt64, is_nullable: false, is_tag: false, is_dictionary: false, comment: \\\"\\\", escaped_name: \\\"tsid\\\", default_value: None }, ColumnSchema { id: 2, name: \\\"t\\\", data_type: Timestamp, is_nullable: false, is_tag: false, is_dictionary: false, comment: \\\"\\\", escaped_name: \\\"t\\\", default_value: None }, ColumnSchema { id: 3, name: \\\"a\\\", data_type: Int32, is_nullable: true, is_tag: false, is_dictionary: false, comment: \\\"\\\", escaped_name: \\\"a\\\", default_value: None }, ColumnSchema { id: 4, name: \\\"b\\\", data_type: Int32, is_nullable: true, is_tag: false, is_dictionary: false, comment: \\\"\\\", escaped_name: \\\"b\\\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \\\"Analytic\\\", options: {}, state: Stable, shard_id: 0, partition_info: None }\"), err:Failed to create table, table already exists, table:05_create_tables_t2." }) create table `05_create_tables_t3`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; diff --git a/interpreters/src/insert.rs b/interpreters/src/insert.rs index 95bc47a0cb..782b07b6f3 100644 --- a/interpreters/src/insert.rs +++ b/interpreters/src/insert.rs @@ -341,7 +341,11 @@ fn get_or_extract_column_from_row_groups( .unwrap_or_else(|| { let data_type = row_groups.schema().column(column_idx).data_type; let iter = row_groups.iter_column(column_idx); - let mut builder = ColumnBlockBuilder::with_capacity(&data_type, iter.size_hint().0); + let mut builder = ColumnBlockBuilder::with_capacity( + &data_type, + iter.size_hint().0, + row_groups.schema().column(column_idx).is_dictionary, + ); for datum in iter { builder.append(datum.clone()).context(BuildColumnBlock)?; diff --git a/proxy/src/grpc/prom_query.rs b/proxy/src/grpc/prom_query.rs index 4ef9ebeecf..8368921fc0 100644 --- a/proxy/src/grpc/prom_query.rs +++ b/proxy/src/grpc/prom_query.rs @@ -373,54 +373,77 @@ mod tests { .unwrap(), ) .unwrap() + .add_normal_column( + column_schema::Builder::new("tag_dictionary".to_string(), DatumKind::String) + .is_tag(true) + .is_dictionary(true) + .is_nullable(true) + .build() + .unwrap(), + ) + .unwrap() .build() .unwrap() } fn build_column_block() -> Vec { - let build_row = |ts: i64, tsid: u64, field1: f64, field2: &str| -> Row { + let build_row = |ts: i64, tsid: u64, field1: f64, field2: &str, dic: Option<&str>| -> Row { let datums = vec![ Datum::Timestamp(Timestamp::new(ts)), Datum::UInt64(tsid), Datum::Double(field1), Datum::String(StringBytes::from(field2)), + dic.map(|v| Datum::String(StringBytes::from(v))) + .unwrap_or(Datum::Null), ]; Row::from_datums(datums) }; let rows = vec![ - build_row(1000001, 1, 10.0, "v5"), - build_row(1000002, 1, 11.0, "v5"), - build_row(1000000, 2, 10.0, "v4"), - build_row(1000000, 3, 10.0, "v3"), + build_row(1000001, 1, 10.0, "v5", Some("d1")), + build_row(1000002, 1, 11.0, "v5", None), + build_row(1000000, 2, 10.0, "v4", Some("d2")), + build_row(1000000, 3, 10.0, "v3", None), ]; - let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, 2); + let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, 2, false); for row in &rows { builder.append(row[0].clone()).unwrap(); } let timestamp_block = builder.build(); - let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::UInt64, 2); + let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::UInt64, 2, false); for row in &rows { builder.append(row[1].clone()).unwrap(); } let tsid_block = builder.build(); - let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::Double, 2); + let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::Double, 2, false); for row in &rows { builder.append(row[2].clone()).unwrap(); } let field_block = builder.build(); - let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 2); + let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 2, false); for row in &rows { builder.append(row[3].clone()).unwrap(); } let tag_block = builder.build(); - vec![timestamp_block, tsid_block, field_block, tag_block] + let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 2, true); + for row in &rows { + builder.append(row[4].clone()).unwrap(); + } + let dictionary_block = builder.build(); + + vec![ + timestamp_block, + tsid_block, + field_block, + tag_block, + dictionary_block, + ] } fn make_sample(timestamp: i64, value: f64) -> Sample { @@ -440,7 +463,7 @@ mod tests { let column_name = ColumnNames { timestamp: "timestamp".to_string(), - tag_keys: vec!["tag1".to_string()], + tag_keys: vec!["tag1".to_string(), "tag_dictionary".to_string()], field: "field1".to_string(), }; let converter = RecordConverter::try_new(&column_name, &schema.to_record_schema()).unwrap(); @@ -461,11 +484,17 @@ mod tests { ); assert_eq!( tsid_to_tags.get(&1).unwrap().clone(), - make_tags(vec![("tag1".to_string(), "v5".to_string())]) + make_tags(vec![ + ("tag1".to_string(), "v5".to_string()), + ("tag_dictionary".to_string(), "d1".to_string()) + ]) ); assert_eq!( tsid_to_tags.get(&2).unwrap().clone(), - make_tags(vec![("tag1".to_string(), "v4".to_string())]) + make_tags(vec![ + ("tag1".to_string(), "v4".to_string()), + ("tag_dictionary".to_string(), "d2".to_string()) + ]) ); assert_eq!( tsid_to_tags.get(&3).unwrap().clone(), diff --git a/proxy/src/influxdb/types.rs b/proxy/src/influxdb/types.rs index 58cba675ab..d090842b07 100644 --- a/proxy/src/influxdb/types.rs +++ b/proxy/src/influxdb/types.rs @@ -811,11 +811,12 @@ mod tests { } fn build_test_column_blocks() -> Vec { - let mut measurement_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); - let mut tag_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); - let mut time_builder = ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, 3); - let mut field_builder1 = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); - let mut field_builder2 = ColumnBlockBuilder::with_capacity(&DatumKind::UInt64, 3); + let mut measurement_builder = + ColumnBlockBuilder::with_capacity(&DatumKind::String, 3, false); + let mut tag_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3, false); + let mut time_builder = ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, 3, false); + let mut field_builder1 = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3, false); + let mut field_builder2 = ColumnBlockBuilder::with_capacity(&DatumKind::UInt64, 3, false); // Data in measurement1 let measurement1 = Datum::String(StringBytes::copy_from_str("m1")); diff --git a/proxy/src/write.rs b/proxy/src/write.rs index 44b2ab4491..fc2f88d4d9 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -684,7 +684,6 @@ fn find_new_columns( ); let tag_name = &tag_names[name_index]; - build_column(&mut columns, schema, tag_name, &tag.value, true)?; } diff --git a/query_frontend/src/parser.rs b/query_frontend/src/parser.rs index 883e8e22ab..77f4c35056 100644 --- a/query_frontend/src/parser.rs +++ b/query_frontend/src/parser.rs @@ -37,6 +37,7 @@ macro_rules! parser_err { const TS_KEY: &str = "__ts_key"; const TAG: &str = "TAG"; +const DICTIONARY: &str = "DICTIONARY"; const UNSIGN: &str = "UNSIGN"; const MODIFY: &str = "MODIFY"; const SETTING: &str = "SETTING"; @@ -62,6 +63,7 @@ macro_rules! is_custom_column { } is_custom_column!(TAG); +is_custom_column!(DICTIONARY); is_custom_column!(UNSIGN); /// Get the comment from the [`ColumnOption`] if it is a comment option. @@ -326,6 +328,22 @@ impl<'a> Parser<'a> { // WITH ... let options = self.parser.parse_options(Keyword::WITH)?; + // Only String Column Can Be Dictionary Encoded + for c in &columns { + let mut is_dictionary = false; + for op in &c.options { + if is_dictionary_column(&op.option) { + is_dictionary = true; + } + } + if c.data_type != DataType::String && is_dictionary { + return parser_err!(format!( + "Only string column can be dictionary encoded: {:?}", + c.to_string() + )); + } + } + Ok(Statement::Create(Box::new(CreateTable { if_not_exists, table_name, @@ -513,6 +531,10 @@ impl<'a> Parser<'a> { Ok(Some(ColumnOption::DialectSpecific(vec![ Token::make_keyword(TAG), ]))) + } else if self.consume_token(DICTIONARY) { + Ok(Some(ColumnOption::DialectSpecific(vec![ + Token::make_keyword(DICTIONARY), + ]))) } else if self.consume_token(UNSIGN) { // Support unsign for ceresdb Ok(Some(ColumnOption::DialectSpecific(vec![ @@ -973,6 +995,52 @@ mod tests { } } + #[test] + fn test_dictionary_column() { + let sql = "CREATE TABLE IF NOT EXISTS t(c1 string tag dictionary, c2 float dictionary, c3 bigint unsign)"; + assert!(Parser::parse_sql(sql).is_err()); + let sql = "CREATE TABLE IF NOT EXISTS t(c1 string tag dictionary, c2 string dictionary, c3 bigint unsign)"; + let statements = Parser::parse_sql(sql).unwrap(); + assert_eq!(statements.len(), 1); + match &statements[0] { + Statement::Create(v) => { + let columns = &v.columns; + assert_eq!(3, columns.len()); + for c in columns { + if c.name.value == "c1" { + assert_eq!(2, c.options.len()); + let opt = &c.options[0]; + assert!(is_tag_column(&opt.option)); + let opt = &c.options[1]; + assert!(is_dictionary_column(&opt.option)); + } else if c.name.value == "c2" { + assert_eq!(1, c.options.len()); + let opt = &c.options[0]; + assert!(is_dictionary_column(&opt.option)); + } else if c.name.value == "c3" { + assert_eq!(1, c.options.len()); + let opt = &c.options[0]; + assert!(is_unsign_column(&opt.option)); + } else { + panic!("failed"); + } + } + } + _ => panic!("failed"), + } + } + + #[test] + fn test_dictionary_use_unstring_column() { + let sql = + "CREATE TABLE IF NOT EXISTS t(c1 string tag, c2 float dictionary, c3 bigint unsign)"; + assert!(Parser::parse_sql(sql).is_err()); + let sql = "CREATE TABLE IF NOT EXISTS t(c1 string tag dictionary, c2 float dictionary, c3 bigint unsign)"; + assert!(Parser::parse_sql(sql).is_err()); + let sql = "CREATE TABLE IF NOT EXISTS t(c1 string tag, c2 float dictionary, c3 bigint unsign dictionary)"; + assert!(Parser::parse_sql(sql).is_err()); + } + #[test] fn test_comment_column() { let sql = "CREATE TABLE IF NOT EXISTS t(c1 string, c2 float, c3 bigint comment 'id')"; diff --git a/query_frontend/src/planner.rs b/query_frontend/src/planner.rs index 694bc0ee0e..de8327da70 100644 --- a/query_frontend/src/planner.rs +++ b/query_frontend/src/planner.rs @@ -431,6 +431,7 @@ pub fn build_schema_from_write_table_request( if let Some(column_schema) = name_column_map.get(tag_name) { ensure_data_type_compatible(table, tag_name, true, data_type, column_schema)?; } + let column_schema = build_column_schema(tag_name, data_type, true)?; name_column_map.insert(tag_name, column_schema); } @@ -465,7 +466,6 @@ pub fn build_schema_from_write_table_request( column_schema, )?; } - let column_schema = build_column_schema(field_name, data_type, false)?; name_column_map.insert(field_name, column_schema); } @@ -1234,6 +1234,7 @@ fn parse_column(col: &ColumnDef) -> Result { // Process column options let mut is_nullable = true; // A column is nullable by default. let mut is_tag = false; + let mut is_dictionary = false; let mut is_unsign = false; let mut comment = String::new(); let mut default_value = None; @@ -1242,6 +1243,8 @@ fn parse_column(col: &ColumnDef) -> Result { is_nullable = false; } else if parser::is_tag_column(&option_def.option) { is_tag = true; + } else if parser::is_dictionary_column(&option_def.option) { + is_dictionary = true; } else if parser::is_unsign_column(&option_def.option) { is_unsign = true; } else if let Some(default_value_expr) = parser::get_default_value(&option_def.option) { @@ -1260,6 +1263,7 @@ fn parse_column(col: &ColumnDef) -> Result { let builder = column_schema::Builder::new(col.name.value.clone(), data_type) .is_nullable(is_nullable) .is_tag(is_tag) + .is_dictionary(is_dictionary) .comment(comment) .default_value(default_value); @@ -1441,6 +1445,7 @@ mod tests { data_type: String, is_nullable: false, is_tag: true, + is_dictionary: false, comment: "", escaped_name: "c1", default_value: None, @@ -1451,6 +1456,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "ts", default_value: None, @@ -1461,6 +1467,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "c3", default_value: None, @@ -1471,6 +1478,7 @@ mod tests { data_type: UInt32, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "c4", default_value: Some( @@ -1488,6 +1496,7 @@ mod tests { data_type: UInt32, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "c5", default_value: Some( @@ -1514,6 +1523,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "c6", default_value: Some( @@ -1612,6 +1622,7 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -1622,6 +1633,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -1632,6 +1644,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -1642,6 +1655,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -1652,6 +1666,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -1662,6 +1677,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, @@ -1687,6 +1703,7 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -1697,6 +1714,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -1707,6 +1725,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -1717,6 +1736,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -1727,6 +1747,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -1737,6 +1758,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, @@ -1851,6 +1873,105 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "key1", + default_value: None, + }, + ColumnSchema { + id: 2, + name: "key2", + data_type: Timestamp, + is_nullable: false, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "key2", + default_value: None, + }, + ColumnSchema { + id: 3, + name: "field1", + data_type: Double, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field1", + default_value: None, + }, + ColumnSchema { + id: 4, + name: "field2", + data_type: String, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field2", + default_value: None, + }, + ColumnSchema { + id: 5, + name: "field3", + data_type: Date, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field3", + default_value: None, + }, + ColumnSchema { + id: 6, + name: "field4", + data_type: Time, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field4", + default_value: None, + }, + ], + }, + version: 1, + primary_key_indexes: [ + 0, + 1, + ], + }, + }, + }, +)"#, + ) + .unwrap(); + } + + #[test] + fn test_alter_column_with_dictionary_encode() { + let sql = "ALTER TABLE test_table ADD column dic string dictionary;"; + quick_test( + sql, + r#"AlterTable( + AlterTablePlan { + table: MemoryTable { + name: "test_table", + id: TableId( + 100, + ), + schema: Schema { + timestamp_index: 1, + tsid_index: None, + column_schemas: ColumnSchemas { + columns: [ + ColumnSchema { + id: 1, + name: "key1", + data_type: Varbinary, + is_nullable: false, + is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -1861,6 +1982,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -1871,6 +1993,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -1881,6 +2004,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -1891,6 +2015,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -1901,6 +2026,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, @@ -1914,6 +2040,21 @@ mod tests { ], }, }, + operations: AddColumn( + [ + ColumnSchema { + id: 0, + name: "dic", + data_type: String, + is_nullable: true, + is_tag: false, + is_dictionary: true, + comment: "", + escaped_name: "dic", + default_value: None, + }, + ], + ), }, )"#, ) @@ -1946,6 +2087,7 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -1956,6 +2098,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -1966,6 +2109,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -1976,6 +2120,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -1986,6 +2131,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -1996,6 +2142,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, @@ -2017,6 +2164,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "add_col", default_value: None, @@ -2055,6 +2203,7 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -2065,6 +2214,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -2075,6 +2225,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -2085,6 +2236,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -2095,6 +2247,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -2105,6 +2258,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, @@ -2156,6 +2310,7 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -2166,6 +2321,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -2176,6 +2332,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -2186,6 +2343,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -2196,6 +2354,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -2206,6 +2365,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, diff --git a/server/src/mysql/writer.rs b/server/src/mysql/writer.rs index e2af7880e7..af01fa09ec 100644 --- a/server/src/mysql/writer.rs +++ b/server/src/mysql/writer.rs @@ -150,6 +150,7 @@ mod tests { name: "id".to_string(), data_type: DatumKind::Int32, is_nullable: false, + is_dictionary: false, is_tag: false, comment: "".to_string(), escaped_name: "id".to_string(), @@ -163,6 +164,7 @@ mod tests { name: "name".to_string(), data_type: DatumKind::String, is_nullable: true, + is_dictionary: false, is_tag: true, comment: "".to_string(), escaped_name: "name".to_string(), @@ -177,6 +179,7 @@ mod tests { data_type: DatumKind::Timestamp, is_nullable: true, is_tag: true, + is_dictionary: false, comment: "".to_string(), escaped_name: "birthday".to_string(), default_value: None, @@ -190,6 +193,7 @@ mod tests { data_type: DatumKind::Boolean, is_nullable: true, is_tag: true, + is_dictionary: false, comment: "".to_string(), escaped_name: "is_show".to_string(), default_value: None, @@ -203,6 +207,7 @@ mod tests { data_type: DatumKind::Double, is_nullable: true, is_tag: true, + is_dictionary: false, comment: "".to_string(), escaped_name: "money".to_string(), default_value: None, diff --git a/table_engine/src/memory.rs b/table_engine/src/memory.rs index 4e79a291fb..80d092609c 100644 --- a/table_engine/src/memory.rs +++ b/table_engine/src/memory.rs @@ -229,7 +229,7 @@ fn row_group_to_record_batch( ), })?; let cols = rows.iter_column(col_index); - let column_block = build_column_block(&column.data_type, cols)?; + let column_block = build_column_block(&column.data_type, cols, column.is_dictionary)?; column_blocks.push(column_block); } @@ -243,8 +243,10 @@ fn row_group_to_record_batch( fn build_column_block<'a, I: Iterator>( data_type: &DatumKind, iter: I, + is_dictionary: bool, ) -> stream::Result { - let mut builder = ColumnBlockBuilder::with_capacity(data_type, iter.size_hint().0); + let mut builder = + ColumnBlockBuilder::with_capacity(data_type, iter.size_hint().0, is_dictionary); for datum in iter { builder .append(datum.clone())