Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use dictionary type to store column #993

Merged
merged 19 commits into from
Jul 5, 2023
Merged
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ bytes = "1.1.0"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.5"
ceresdbproto = "1.0"
chrono = "0.4"
clap = "3.0"
clru = "0.6.1"
Expand Down
20 changes: 19 additions & 1 deletion analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,25 @@ impl HybridRecordDecoder {
.iter()
.map(|f| {
if let DataType::List(nested_field) = f.data_type() {
Arc::new(Field::new(f.name(), nested_field.data_type().clone(), true))
match f.data_type() {
DataType::Dictionary(_, _) => {
assert!(f.dict_id().is_some(), "Dictionary must have dict_id");
assert!(
f.dict_is_ordered().is_some(),
"Dictionary must have dict_is_ordered"
);
let dict_id = f.dict_id().unwrap();
let dict_is_ordered = f.dict_is_ordered().unwrap();
Arc::new(Field::new_dict(
f.name(),
nested_field.data_type().clone(),
true,
dict_id,
dict_is_ordered,
))
}
_ => Arc::new(Field::new(f.name(), nested_field.data_type().clone(), true)),
}
} else {
f.clone()
}
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/sst/parquet/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub fn build_hybrid_arrow_schema(schema: &Schema) -> ArrowSchemaRef {
field.data_type().clone(),
true,
)));
// TODO(tanruixiang): is there need to use new_dict?
Arc::new(Field::new(field.name(), field_type, true))
} else {
field.clone()
Expand Down Expand Up @@ -418,6 +419,7 @@ impl ListArrayBuilder {
let array_len = self.multi_row_arrays.len();
let mut offsets = MutableBuffer::new(array_len * std::mem::size_of::<i32>());
let child_data = self.build_child_data(&mut offsets)?;
// TODO(tanruixiang): is there need to use new_dict?
let field = Arc::new(Field::new(
LIST_ITEM_NAME,
self.datum_kind.to_arrow_data_type(),
Expand Down
94 changes: 80 additions & 14 deletions analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ mod tests {
use common_types::{
bytes::Bytes,
projected_schema::ProjectedSchema,
tests::{build_row, build_schema},
tests::{build_row, build_row_for_dictionary, build_schema, build_schema_with_dictionary},
time::{TimeRange, Timestamp},
};
use common_util::{
Expand Down Expand Up @@ -365,9 +365,10 @@ mod tests {
init_log_for_test();

let runtime = Arc::new(runtime::Builder::default().build().unwrap());
parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3, 3]);
parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 3]);
parquet_write_and_then_read_back(runtime, 5, vec![5, 5, 5]);
parquet_write_and_then_read_back(runtime.clone(), 2, vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2]);
parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3, 3, 3, 2]);
parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 4, 4]);
parquet_write_and_then_read_back(runtime, 5, vec![5, 5, 5, 5]);
}

fn parquet_write_and_then_read_back(
Expand All @@ -390,8 +391,8 @@ mod tests {
let store_picker: ObjectStorePickerRef = Arc::new(store);
let sst_file_path = Path::from("data.par");

let schema = build_schema();
let projected_schema = ProjectedSchema::no_projection(schema.clone());
let schema = build_schema_with_dictionary();
let reader_projected_schema = ProjectedSchema::no_projection(schema.clone());
let sst_meta = MetaData {
min_key: Bytes::from_static(b"100"),
max_key: Bytes::from_static(b"200"),
Expand All @@ -410,9 +411,37 @@ mod tests {
// reach here when counter is 9 7 5 3 1
let ts = 100 + counter;
let rows = vec![
build_row(b"a", ts, 10.0, "v4", 1000, 1_000_000),
build_row(b"b", ts, 10.0, "v4", 1000, 1_000_000),
build_row(b"c", ts, 10.0, "v4", 1000, 1_000_000),
build_row_for_dictionary(
b"a",
ts,
10.0,
"v4",
1000,
1_000_000,
Some("tagv1"),
"tagv2",
),
build_row_for_dictionary(
b"b",
ts,
10.0,
"v4",
1000,
1_000_000,
Some("tagv2"),
"tagv4",
),
build_row_for_dictionary(b"c", ts, 10.0, "v4", 1000, 1_000_000, None, "tagv2"),
build_row_for_dictionary(
b"d",
ts,
10.0,
"v4",
1000,
1_000_000,
Some("tagv3"),
"tagv2",
),
];
let batch = build_record_batch_with_key(schema.clone(), rows);
Poll::Ready(Some(Ok(batch)))
Expand All @@ -432,15 +461,15 @@ mod tests {
.await
.unwrap();

assert_eq!(15, sst_info.row_num);
assert_eq!(20, sst_info.row_num);

let scan_options = ScanOptions::default();
// read sst back to test
let sst_read_options = SstReadOptions {
reverse: false,
frequency: ReadFrequency::Frequent,
num_rows_per_row_group: 5,
projected_schema,
projected_schema: reader_projected_schema,
predicate: Arc::new(Predicate::empty()),
meta_cache: None,
scan_options,
Expand Down Expand Up @@ -483,9 +512,46 @@ mod tests {
let mut stream = reader.read().await.unwrap();
let mut expect_rows = vec![];
for counter in &[4, 3, 2, 1, 0] {
expect_rows.push(build_row(b"a", 100 + counter, 10.0, "v4", 1000, 1_000_000));
expect_rows.push(build_row(b"b", 100 + counter, 10.0, "v4", 1000, 1_000_000));
expect_rows.push(build_row(b"c", 100 + counter, 10.0, "v4", 1000, 1_000_000));
expect_rows.push(build_row_for_dictionary(
b"a",
100 + counter,
10.0,
"v4",
1000,
1_000_000,
Some("tagv1"),
"tagv2",
));
expect_rows.push(build_row_for_dictionary(
b"b",
100 + counter,
10.0,
"v4",
1000,
1_000_000,
Some("tagv2"),
"tagv4",
));
expect_rows.push(build_row_for_dictionary(
b"c",
100 + counter,
10.0,
"v4",
1000,
1_000_000,
None,
"tagv2",
));
expect_rows.push(build_row_for_dictionary(
b"d",
100 + counter,
10.0,
"v4",
1000,
1_000_000,
Some("tagv3"),
"tagv2",
));
}
check_stream(&mut stream, expect_rows).await;
});
Expand Down
Loading