From e1fc6f902831d8e1466e222874d22d1a65882f33 Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 23 Mar 2023 17:17:58 +0800 Subject: [PATCH] draft. --- server/src/handlers/error.rs | 3 + server/src/handlers/influxdb.rs | 334 +++++++++++++++++++++++++++++++- sql/src/influxql/planner.rs | 2 +- 3 files changed, 331 insertions(+), 8 deletions(-) diff --git a/server/src/handlers/error.rs b/server/src/handlers/error.rs index 16c2355a53..f644f9c7dd 100644 --- a/server/src/handlers/error.rs +++ b/server/src/handlers/error.rs @@ -75,6 +75,9 @@ pub enum Error { #[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))] InfluxDbHandler { msg: String, source: GenericError }, + + #[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + InfluxDbHandlerInternal { msg: String, backtrace: Backtrace }, } define_result!(Error); diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs index cb10fd7b4c..e998bfe421 100644 --- a/server/src/handlers/influxdb.rs +++ b/server/src/handlers/influxdb.rs @@ -10,21 +10,35 @@ use bytes::Bytes; use ceresdbproto::storage::{ value, Field, FieldGroup, Tag, Value, WriteSeriesEntry, WriteTableRequest, }; -use common_types::{request_id::RequestId, time::Timestamp}; +use common_types::{ + column_schema::{self, ColumnSchema}, + datum::{Datum, DatumKind}, + record_batch::RecordBatch, + request_id::RequestId, + schema::RecordSchema, + time::Timestamp, +}; use common_util::error::BoxError; use handlers::{ error::{InfluxDbHandler, Result}, query::QueryRequest, }; use influxdb_line_protocol::FieldValue; +use interpreters::interpreter::Output; use log::debug; use query_engine::executor::Executor as QueryExecutor; -use snafu::ResultExt; +use serde::{ + ser::{SerializeMap, SerializeSeq}, + Serialize, +}; +use snafu::{ensure, OptionExt, ResultExt}; +use sql::influxql::planner::CERESDB_MEASUREMENT_COLUMN_NAME; use warp::{reject, reply, Rejection, Reply}; use crate::{ context::RequestContext, handlers, + handlers::error::InfluxDbHandlerInternal, instance::InstanceRef, proxy::grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext}, schema_config_provider::SchemaConfigProviderRef, @@ -71,6 +85,289 @@ impl From for WriteRequest { pub type WriteResponse = (); +/// One result in group(defined by group by clause) +/// +/// Influxdb names the result set series, so name each result in the set +/// `OneSeries` here. Its format is like: +/// { +// "results": [{ +/// "statement_id": 0, +/// "series": [{ +/// "name": "home", +/// "tags": { +/// "room": "Living Room" +/// }, +/// "columns": ["time", "co", "hum", "temp"], +/// "values": [["2022-01-01T08:00:00Z", 0, 35.9, 21.1], ... ] +/// }, ... ] +/// }, ... ] +/// } +#[derive(Debug, Serialize)] +pub struct InfluxqlResponse { + results: Vec, +} + +#[derive(Debug, Serialize)] +pub struct InfluxqlResult { + statement_id: u32, + series: Vec, +} + +#[derive(Debug)] +pub struct OneSeries { + name: String, + tags: Option, + columns: Vec, + values: Vec>, +} + +impl Serialize for OneSeries { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let mut one_series = serializer.serialize_map(Some(4))?; + one_series.serialize_entry("name", &self.name)?; + if let Some(tags) = &self.tags { + one_series.serialize_entry("tags", &tags)?; + } + one_series.serialize_entry("columns", &self.columns)?; + one_series.serialize_entry("values", &self.values)?; + + one_series.end() + } +} + +#[derive(Debug)] +struct Tags(Vec<(String, String)>); + +impl Serialize for Tags { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let mut tags = serializer.serialize_map(Some(self.0.len()))?; + + for (tagk, tagv) in &self.0 { + tags.serialize_entry(tagk, tagv)?; + } + + tags.end() + } +} + +/// Influxql response builder +// #[derive(Serialize)] +// #[serde(rename_all = "snake_case")] +#[derive(Default)] +pub struct InfluxqlResultBuilder { + statement_id: u32, + + /// Schema of influxql query result + /// + /// Its format is like: measurement | tag_1..tag_n(defined by group by + /// clause) | time | value_column_1..value_column_n + column_schemas: Vec, + + /// Tags part in schema + group_by_tag_col_idxs: Vec, + + /// Value columns part in schema(include `time`) + value_col_idxs: Vec, + + /// Mapping series key(measurement + tag values) to column data + group_key_to_idx: HashMap, + + /// Column datas + value_groups: Vec>>, +} + +impl InfluxqlResultBuilder { + fn new(record_schema: &RecordSchema, statement_id: u32) -> Result { + let column_schemas = record_schema.columns().to_owned(); + + // Find the tags part and columns part from schema. + let mut group_by_col_idxs = Vec::new(); + let mut value_col_idxs = Vec::new(); + + let mut col_iter = column_schemas.iter().enumerate(); + // The first column may be measurement column in normal. + ensure!(col_iter.next().unwrap().1.name == CERESDB_MEASUREMENT_COLUMN_NAME, InfluxDbHandlerInternal { + msg: format!("invalid schema whose first column is not measurement column, schema:{column_schemas:?}"), + }); + + // The group by tags will be placed after measurement and before time column. + let mut searching_group_by_tags = true; + while let Some((idx, col)) = col_iter.next() { + if col.data_type.is_timestamp() { + searching_group_by_tags = false; + } + + if searching_group_by_tags { + group_by_col_idxs.push(idx); + } else { + value_col_idxs.push(idx); + } + } + + Ok(Self { + statement_id, + column_schemas, + group_by_tag_col_idxs: group_by_col_idxs, + value_col_idxs, + group_key_to_idx: HashMap::new(), + value_groups: Vec::new(), + }) + } + + fn add_record_batch(mut self, record_batch: RecordBatch) -> Result { + // Check schema's compatibility. + ensure!( + record_batch.schema().columns() == &self.column_schemas, + InfluxDbHandlerInternal { + msg: format!( + "conflict schema, origin:{:?}, new:{:?}", + self.column_schemas, + record_batch.schema().columns() + ), + } + ); + + let row_num = record_batch.num_rows(); + for row_idx in 0..row_num { + // Get measurement + group by tags. + let group_key = self.extract_group_key(&record_batch, row_idx)?; + let value_group = self.extract_value_group(&record_batch, row_idx)?; + + let value_groups = if let Some(idx) = self.group_key_to_idx.get(&group_key) { + self.value_groups.get_mut(*idx).unwrap() + } else { + self.value_groups.push(Vec::new()); + self.group_key_to_idx + .insert(group_key, self.value_groups.len() - 1); + self.value_groups.last_mut().unwrap() + }; + + value_groups.push(value_group); + } + + Ok(self) + } + + fn build(self) -> InfluxqlResult { + let ordered_group_keys = { + let mut ordered_pairs = self + .group_key_to_idx + .clone() + .into_iter() + .collect::>(); + ordered_pairs.sort_by(|a, b| a.1.cmp(&b.1)); + ordered_pairs + .into_iter() + .map(|(key, _)| key) + .collect::>() + }; + + let series = ordered_group_keys + .into_iter() + .zip(self.value_groups.into_iter()) + .map(|(group_key, value_group)| { + let name = group_key.measurement; + let tags = group_key + .group_by_tag_values + .into_iter() + .enumerate() + .map(|(tagk_idx, tagv)| { + let tagk_col_idx = self.group_by_tag_col_idxs[tagk_idx]; + let tagk = self.column_schemas[tagk_col_idx].name.clone(); + + (tagk, tagv) + }) + .collect::>(); + + let columns = self + .value_col_idxs + .iter() + .map(|idx| self.column_schemas[*idx].name.clone()) + .collect::>(); + + OneSeries { + name, + tags: Some(Tags(tags)), + columns, + values: value_group, + } + }) + .collect(); + + InfluxqlResult { + series, + statement_id: self.statement_id, + } + } + + fn extract_group_key(&self, record_batch: &RecordBatch, row_idx: usize) -> Result { + let mut group_by_tag_values = Vec::with_capacity(self.group_by_tag_col_idxs.len()); + let measurement = { + let measurement = record_batch.column(0).datum(row_idx); + if let Datum::String(m) = measurement { + m.to_string() + } else { + return InfluxDbHandlerInternal { msg: "" }.fail(); + } + }; + + for col_idx in &self.group_by_tag_col_idxs { + let tag = { + let tag_datum = record_batch.column(*col_idx).datum(row_idx); + match tag_datum { + Datum::Null => "".to_string(), + Datum::String(tag) => tag.to_string(), + _ => return InfluxDbHandlerInternal { msg: "" }.fail(), + } + }; + group_by_tag_values.push(tag); + } + + Ok(GroupKey { + measurement, + group_by_tag_values, + }) + } + + fn extract_value_group( + &self, + record_batch: &RecordBatch, + row_idx: usize, + ) -> Result> { + let mut value_group = Vec::with_capacity(self.value_col_idxs.len()); + for col_idx in &self.value_col_idxs { + let value = record_batch.column(*col_idx).datum(row_idx); + + value_group.push(value); + } + + Ok(value_group) + } +} + +#[derive(Hash, PartialEq, Eq, Clone)] +struct GroupKey { + measurement: String, + group_by_tag_values: Vec, +} + +#[derive(Hash, PartialEq, Eq)] +struct TagKv { + key: String, + value: String, +} + +struct Columns { + names: Vec, + data: Vec, +} + impl InfluxDb { pub fn new(instance: InstanceRef, schema_config_provider: SchemaConfigProviderRef) -> Self { Self { @@ -229,6 +526,10 @@ fn convert_influx_value(field_value: FieldValue) -> Value { Value { value: Some(v) } } +// fn convert_query_result(output: Output) -> { + +// } + // TODO: Request and response type don't match influxdb's API now. pub async fn query( ctx: RequestContext, @@ -255,16 +556,18 @@ pub async fn write( #[cfg(test)] mod tests { + use common_types::tests::build_schema; + use super::*; #[test] fn test_convert_influxdb_write_req() { let lines = r#" -demo,tag1=t1,tag2=t2 field1=90,field2=100 1678675992000 -demo,tag1=t1,tag2=t2 field1=91,field2=101 1678675993000 -demo,tag1=t11,tag2=t22 field1=900,field2=1000 1678675992000 -demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000 -"# + demo,tag1=t1,tag2=t2 field1=90,field2=100 1678675992000 + demo,tag1=t1,tag2=t2 field1=91,field2=101 1678675993000 + demo,tag1=t11,tag2=t22 field1=900,field2=1000 1678675992000 + demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000 + "# .to_string(); let req = WriteRequest { lines, @@ -366,4 +669,21 @@ demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000 } ); } + + #[test] + fn test_print() { + + + let one_series = OneSeries { + name: "test".to_string(), + tags: None, + columns: vec!["column1".to_string(), "column2".to_string()], + values: vec![ + vec![Datum::Int32(1), Datum::Int32(2)], + vec![Datum::Int32(2), Datum::Int32(2)], + ], + }; + + println!("{}", serde_json::to_string(&one_series).unwrap()); + } } diff --git a/sql/src/influxql/planner.rs b/sql/src/influxql/planner.rs index f4dcbc73ce..7ce67347dd 100644 --- a/sql/src/influxql/planner.rs +++ b/sql/src/influxql/planner.rs @@ -19,7 +19,7 @@ use crate::{ provider::{ContextProviderAdapter, MetaProvider}, }; -const CERESDB_MEASUREMENT_COLUMN_NAME: &str = "ceresdb::measurement"; +pub const CERESDB_MEASUREMENT_COLUMN_NAME: &str = "ceresdb::measurement"; pub(crate) struct Planner<'a, P: MetaProvider> { context_provider: ContextProviderAdapter<'a, P>,