Skip to content

Commit

Permalink
draft.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Mar 23, 2023
1 parent 99f2cf4 commit e1fc6f9
Show file tree
Hide file tree
Showing 3 changed files with 331 additions and 8 deletions.
3 changes: 3 additions & 0 deletions server/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub enum Error {

#[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))]
InfluxDbHandler { msg: String, source: GenericError },

#[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
InfluxDbHandlerInternal { msg: String, backtrace: Backtrace },
}

define_result!(Error);
Expand Down
334 changes: 327 additions & 7 deletions server/src/handlers/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,35 @@ use bytes::Bytes;
use ceresdbproto::storage::{
value, Field, FieldGroup, Tag, Value, WriteSeriesEntry, WriteTableRequest,
};
use common_types::{request_id::RequestId, time::Timestamp};
use common_types::{
column_schema::{self, ColumnSchema},
datum::{Datum, DatumKind},
record_batch::RecordBatch,
request_id::RequestId,
schema::RecordSchema,
time::Timestamp,
};
use common_util::error::BoxError;
use handlers::{
error::{InfluxDbHandler, Result},
query::QueryRequest,
};
use influxdb_line_protocol::FieldValue;
use interpreters::interpreter::Output;
use log::debug;
use query_engine::executor::Executor as QueryExecutor;
use snafu::ResultExt;
use serde::{
ser::{SerializeMap, SerializeSeq},
Serialize,
};
use snafu::{ensure, OptionExt, ResultExt};
use sql::influxql::planner::CERESDB_MEASUREMENT_COLUMN_NAME;
use warp::{reject, reply, Rejection, Reply};

use crate::{
context::RequestContext,
handlers,
handlers::error::InfluxDbHandlerInternal,
instance::InstanceRef,
proxy::grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext},
schema_config_provider::SchemaConfigProviderRef,
Expand Down Expand Up @@ -71,6 +85,289 @@ impl From<Bytes> for WriteRequest {

pub type WriteResponse = ();

/// One result in group(defined by group by clause)
///
/// Influxdb names the result set series, so name each result in the set
/// `OneSeries` here. Its format is like:
/// {
// "results": [{
/// "statement_id": 0,
/// "series": [{
/// "name": "home",
/// "tags": {
/// "room": "Living Room"
/// },
/// "columns": ["time", "co", "hum", "temp"],
/// "values": [["2022-01-01T08:00:00Z", 0, 35.9, 21.1], ... ]
/// }, ... ]
/// }, ... ]
/// }
#[derive(Debug, Serialize)]
pub struct InfluxqlResponse {
results: Vec<InfluxqlResult>,
}

#[derive(Debug, Serialize)]
pub struct InfluxqlResult {
statement_id: u32,
series: Vec<OneSeries>,
}

#[derive(Debug)]
pub struct OneSeries {
name: String,
tags: Option<Tags>,
columns: Vec<String>,
values: Vec<Vec<Datum>>,
}

impl Serialize for OneSeries {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut one_series = serializer.serialize_map(Some(4))?;
one_series.serialize_entry("name", &self.name)?;
if let Some(tags) = &self.tags {
one_series.serialize_entry("tags", &tags)?;
}
one_series.serialize_entry("columns", &self.columns)?;
one_series.serialize_entry("values", &self.values)?;

one_series.end()
}
}

#[derive(Debug)]
struct Tags(Vec<(String, String)>);

impl Serialize for Tags {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut tags = serializer.serialize_map(Some(self.0.len()))?;

for (tagk, tagv) in &self.0 {
tags.serialize_entry(tagk, tagv)?;
}

tags.end()
}
}

/// Influxql response builder
// #[derive(Serialize)]
// #[serde(rename_all = "snake_case")]
#[derive(Default)]
pub struct InfluxqlResultBuilder {
statement_id: u32,

/// Schema of influxql query result
///
/// Its format is like: measurement | tag_1..tag_n(defined by group by
/// clause) | time | value_column_1..value_column_n
column_schemas: Vec<ColumnSchema>,

/// Tags part in schema
group_by_tag_col_idxs: Vec<usize>,

/// Value columns part in schema(include `time`)
value_col_idxs: Vec<usize>,

/// Mapping series key(measurement + tag values) to column data
group_key_to_idx: HashMap<GroupKey, usize>,

/// Column datas
value_groups: Vec<Vec<Vec<Datum>>>,
}

impl InfluxqlResultBuilder {
fn new(record_schema: &RecordSchema, statement_id: u32) -> Result<Self> {
let column_schemas = record_schema.columns().to_owned();

// Find the tags part and columns part from schema.
let mut group_by_col_idxs = Vec::new();
let mut value_col_idxs = Vec::new();

let mut col_iter = column_schemas.iter().enumerate();
// The first column may be measurement column in normal.
ensure!(col_iter.next().unwrap().1.name == CERESDB_MEASUREMENT_COLUMN_NAME, InfluxDbHandlerInternal {
msg: format!("invalid schema whose first column is not measurement column, schema:{column_schemas:?}"),
});

// The group by tags will be placed after measurement and before time column.
let mut searching_group_by_tags = true;
while let Some((idx, col)) = col_iter.next() {
if col.data_type.is_timestamp() {
searching_group_by_tags = false;
}

if searching_group_by_tags {
group_by_col_idxs.push(idx);
} else {
value_col_idxs.push(idx);
}
}

Ok(Self {
statement_id,
column_schemas,
group_by_tag_col_idxs: group_by_col_idxs,
value_col_idxs,
group_key_to_idx: HashMap::new(),
value_groups: Vec::new(),
})
}

fn add_record_batch(mut self, record_batch: RecordBatch) -> Result<Self> {
// Check schema's compatibility.
ensure!(
record_batch.schema().columns() == &self.column_schemas,
InfluxDbHandlerInternal {
msg: format!(
"conflict schema, origin:{:?}, new:{:?}",
self.column_schemas,
record_batch.schema().columns()
),
}
);

let row_num = record_batch.num_rows();
for row_idx in 0..row_num {
// Get measurement + group by tags.
let group_key = self.extract_group_key(&record_batch, row_idx)?;
let value_group = self.extract_value_group(&record_batch, row_idx)?;

let value_groups = if let Some(idx) = self.group_key_to_idx.get(&group_key) {
self.value_groups.get_mut(*idx).unwrap()
} else {
self.value_groups.push(Vec::new());
self.group_key_to_idx
.insert(group_key, self.value_groups.len() - 1);
self.value_groups.last_mut().unwrap()
};

value_groups.push(value_group);
}

Ok(self)
}

fn build(self) -> InfluxqlResult {
let ordered_group_keys = {
let mut ordered_pairs = self
.group_key_to_idx
.clone()
.into_iter()
.collect::<Vec<_>>();
ordered_pairs.sort_by(|a, b| a.1.cmp(&b.1));
ordered_pairs
.into_iter()
.map(|(key, _)| key)
.collect::<Vec<_>>()
};

let series = ordered_group_keys
.into_iter()
.zip(self.value_groups.into_iter())
.map(|(group_key, value_group)| {
let name = group_key.measurement;
let tags = group_key
.group_by_tag_values
.into_iter()
.enumerate()
.map(|(tagk_idx, tagv)| {
let tagk_col_idx = self.group_by_tag_col_idxs[tagk_idx];
let tagk = self.column_schemas[tagk_col_idx].name.clone();

(tagk, tagv)
})
.collect::<Vec<_>>();

let columns = self
.value_col_idxs
.iter()
.map(|idx| self.column_schemas[*idx].name.clone())
.collect::<Vec<_>>();

OneSeries {
name,
tags: Some(Tags(tags)),
columns,
values: value_group,
}
})
.collect();

InfluxqlResult {
series,
statement_id: self.statement_id,
}
}

fn extract_group_key(&self, record_batch: &RecordBatch, row_idx: usize) -> Result<GroupKey> {
let mut group_by_tag_values = Vec::with_capacity(self.group_by_tag_col_idxs.len());
let measurement = {
let measurement = record_batch.column(0).datum(row_idx);
if let Datum::String(m) = measurement {
m.to_string()
} else {
return InfluxDbHandlerInternal { msg: "" }.fail();
}
};

for col_idx in &self.group_by_tag_col_idxs {
let tag = {
let tag_datum = record_batch.column(*col_idx).datum(row_idx);
match tag_datum {
Datum::Null => "".to_string(),
Datum::String(tag) => tag.to_string(),
_ => return InfluxDbHandlerInternal { msg: "" }.fail(),
}
};
group_by_tag_values.push(tag);
}

Ok(GroupKey {
measurement,
group_by_tag_values,
})
}

fn extract_value_group(
&self,
record_batch: &RecordBatch,
row_idx: usize,
) -> Result<Vec<Datum>> {
let mut value_group = Vec::with_capacity(self.value_col_idxs.len());
for col_idx in &self.value_col_idxs {
let value = record_batch.column(*col_idx).datum(row_idx);

value_group.push(value);
}

Ok(value_group)
}
}

#[derive(Hash, PartialEq, Eq, Clone)]
struct GroupKey {
measurement: String,
group_by_tag_values: Vec<String>,
}

#[derive(Hash, PartialEq, Eq)]
struct TagKv {
key: String,
value: String,
}

struct Columns {
names: Vec<String>,
data: Vec<Datum>,
}

impl<Q: QueryExecutor + 'static> InfluxDb<Q> {
pub fn new(instance: InstanceRef<Q>, schema_config_provider: SchemaConfigProviderRef) -> Self {
Self {
Expand Down Expand Up @@ -229,6 +526,10 @@ fn convert_influx_value(field_value: FieldValue) -> Value {
Value { value: Some(v) }
}

// fn convert_query_result(output: Output) -> {

// }

// TODO: Request and response type don't match influxdb's API now.
pub async fn query<Q: QueryExecutor + 'static>(
ctx: RequestContext,
Expand All @@ -255,16 +556,18 @@ pub async fn write<Q: QueryExecutor + 'static>(

#[cfg(test)]
mod tests {
use common_types::tests::build_schema;

use super::*;

#[test]
fn test_convert_influxdb_write_req() {
let lines = r#"
demo,tag1=t1,tag2=t2 field1=90,field2=100 1678675992000
demo,tag1=t1,tag2=t2 field1=91,field2=101 1678675993000
demo,tag1=t11,tag2=t22 field1=900,field2=1000 1678675992000
demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000
"#
demo,tag1=t1,tag2=t2 field1=90,field2=100 1678675992000
demo,tag1=t1,tag2=t2 field1=91,field2=101 1678675993000
demo,tag1=t11,tag2=t22 field1=900,field2=1000 1678675992000
demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000
"#
.to_string();
let req = WriteRequest {
lines,
Expand Down Expand Up @@ -366,4 +669,21 @@ demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000
}
);
}

#[test]
fn test_print() {


let one_series = OneSeries {
name: "test".to_string(),
tags: None,
columns: vec!["column1".to_string(), "column2".to_string()],
values: vec![
vec![Datum::Int32(1), Datum::Int32(2)],
vec![Datum::Int32(2), Datum::Int32(2)],
],
};

println!("{}", serde_json::to_string(&one_series).unwrap());
}
}
Loading

0 comments on commit e1fc6f9

Please sign in to comment.