From cdfbacd26183f9076b2630f39fb186613a04f65d Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 10 Mar 2023 16:46:24 +0800 Subject: [PATCH 1/7] refactor: add influxdb module --- integration_tests/src/database.rs | 2 +- server/src/handlers/error.rs | 3 ++ server/src/handlers/influxdb.rs | 84 +++++++++++++++++++++++++++++ server/src/handlers/mod.rs | 1 + server/src/http.rs | 88 ++++++++++++++++--------------- 5 files changed, 134 insertions(+), 44 deletions(-) create mode 100644 server/src/handlers/influxdb.rs diff --git a/integration_tests/src/database.rs b/integration_tests/src/database.rs index 06aa2f1f17..40f8eed14b 100644 --- a/integration_tests/src/database.rs +++ b/integration_tests/src/database.rs @@ -178,7 +178,7 @@ impl CeresDB { } async fn execute_influxql(query: String, http_client: HttpClient) -> Box { - let url = format!("http://{}/influxql", http_client.endpoint); + let url = format!("http://{}/influxdb/v1/query", http_client.endpoint); let query_request = InfluxQLRequest { query }; let resp = http_client .client diff --git a/server/src/handlers/error.rs b/server/src/handlers/error.rs index 1b0362623e..42d6ad1c1c 100644 --- a/server/src/handlers/error.rs +++ b/server/src/handlers/error.rs @@ -3,6 +3,7 @@ //! Error of handlers use snafu::{Backtrace, Snafu}; +use warp::reject::Reject; use crate::limiter; // TODO(yingwen): Avoid printing huge sql string @@ -73,3 +74,5 @@ pub enum Error { } define_result!(Error); + +impl Reject for Error {} diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs new file mode 100644 index 0000000000..0d028a9f82 --- /dev/null +++ b/server/src/handlers/influxdb.rs @@ -0,0 +1,84 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! This module implements [write][1] and [query][2] for InfluxDB. +//! [1]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint +//! [2]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint + +use std::sync::Arc; + +use bytes::Bytes; +use handlers::{error::Result, query::QueryRequest}; +use query_engine::executor::Executor as QueryExecutor; +use warp::{reject, reply, Rejection, Reply}; + +use crate::{ + context::RequestContext, handlers, instance::InstanceRef, + schema_config_provider::SchemaConfigProviderRef, +}; + +pub struct Influxdb { + instance: InstanceRef, + #[allow(dead_code)] + schema_config_provider: SchemaConfigProviderRef, +} + +/// Line protocol +pub struct WriteRequest { + pub payload: String, +} + +impl From for WriteRequest { + fn from(bytes: Bytes) -> Self { + WriteRequest { + payload: String::from_utf8_lossy(&bytes).to_string(), + } + } +} + +#[allow(dead_code)] +type WriteResponse = String; + +impl Influxdb { + pub fn new(instance: InstanceRef, schema_config_provider: SchemaConfigProviderRef) -> Self { + Self { + instance, + schema_config_provider, + } + } + + async fn query( + &self, + ctx: RequestContext, + req: QueryRequest, + ) -> Result { + handlers::query::handle_query(&ctx, self.instance.clone(), req) + .await + .map(handlers::query::convert_output) + } + + async fn write(&self, _ctx: RequestContext, _req: WriteRequest) -> Result { + todo!() + } +} + +// TODO: Request and response type don't match influxdb's API now. +pub async fn query( + ctx: RequestContext, + db: Arc>, + req: QueryRequest, +) -> std::result::Result { + db.query(ctx, req) + .await + .map_err(reject::custom) + .map(|v| reply::json(&v)) +} + +// TODO: Request and response type don't match influxdb's API now. +#[allow(dead_code)] +pub async fn write( + ctx: RequestContext, + db: Arc>, + req: WriteRequest, +) -> std::result::Result { + db.write(ctx, req).await.map_err(reject::custom) +} diff --git a/server/src/handlers/mod.rs b/server/src/handlers/mod.rs index 229f00c2c0..cf0f264c67 100644 --- a/server/src/handlers/mod.rs +++ b/server/src/handlers/mod.rs @@ -4,6 +4,7 @@ pub mod admin; pub mod error; +pub mod influxdb; pub mod prom; pub mod query; diff --git a/server/src/http.rs b/server/src/http.rs index c5201fd25b..951ac115b0 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -31,7 +31,12 @@ use crate::{ consts, context::RequestContext, error_util, - handlers::{self, prom::CeresDBStorage, query::Request}, + handlers::{ + self, + influxdb::{self, Influxdb}, + prom::CeresDBStorage, + query::Request, + }, instance::InstanceRef, metrics, schema_config_provider::SchemaConfigProviderRef, @@ -109,6 +114,7 @@ pub struct Service { instance: InstanceRef, profiler: Arc, prom_remote_storage: RemoteStorageRef, + influxdb: Arc>, tx: Sender<()>, config: HttpConfig, } @@ -124,15 +130,20 @@ impl Service { fn routes( &self, ) -> impl Filter + Clone { - self.home() - .or(self.metrics()) - .or(self.sql()) - .or(self.influxql()) - .or(self.heap_profile()) - .or(self.admin_block()) - .or(self.flush_memtable()) - .or(self.update_log_level()) - .or(self.prom_api()) + warp::body::content_length_limit(self.config.max_body_size).and( + self.home() + // public APIs + .or(self.metrics()) + .or(self.sql()) + .or(self.influxdb_api()) + .or(self.prom_api()) + // admin APIs + .or(self.admin_block()) + // debug APIs + .or(self.flush_memtable()) + .or(self.update_log_level()) + .or(self.heap_profile()), + ) } /// Expose `/prom/v1/read` and `/prom/v1/write` to serve Prometheus remote @@ -178,7 +189,6 @@ impl Service { warp::path!("sql") .and(warp::post()) - .and(warp::body::content_length_limit(self.config.max_body_size)) .and(extract_request) .and(self.with_context()) .and(self.with_instance()) @@ -200,41 +210,24 @@ impl Service { }) } - // POST /influxql - // this request type is not what influxdb API expected, the one in influxdb: - // https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint - fn influxql( + /// POST `/influxdb/v1/query` and `/influxdb/v1/write` + fn influxdb_api( &self, ) -> impl Filter + Clone { - // accept json or plain text - let extract_request = warp::body::json() - .or(warp::body::bytes().map(Request::from)) - .unify(); + let write_api = warp::path!("write") + .and(self.with_context()) + .and(self.with_influxdb()) + .and(warp::body::bytes().map(influxdb::WriteRequest::from)) + .and_then(influxdb::write); + let query_api = warp::path!("query") + .and(self.with_context()) + .and(self.with_influxdb()) + .and(warp::body::bytes().map(|bytes| QueryRequest::Influxql(Request::from(bytes)))) + .and_then(influxdb::query); - warp::path!("influxql") + warp::path!("influxdb" / "v1" / ..) .and(warp::post()) - .and(warp::body::content_length_limit(self.config.max_body_size)) - .and(extract_request) - .and(self.with_context()) - .and(self.with_instance()) - .and_then(|req, ctx, instance| async move { - let req = QueryRequest::Influxql(req); - let result = handlers::query::handle_query(&ctx, instance, req) - .await - // TODO: the sql's `convert_output` function may be not suitable to influxql. - // We should implement influxql's related function in later. - .map(handlers::query::convert_output) - .map_err(|e| { - // TODO(yingwen): Maybe truncate and print the sql - error!("Http service Failed to handle sql, err:{}", e); - Box::new(e) - }) - .context(HandleRequest); - match result { - Ok(res) => Ok(reply::json(&res)), - Err(e) => Err(reject::custom(e)), - } - }) + .and(write_api.or(query_api)) } // POST /debug/flush_memtable @@ -407,6 +400,13 @@ impl Service { warp::any().map(move || profiler.clone()) } + fn with_influxdb( + &self, + ) -> impl Filter>,), Error = Infallible> + Clone { + let influxdb = self.influxdb.clone(); + warp::any().map(move || influxdb.clone()) + } + fn with_instance( &self, ) -> impl Filter,), Error = Infallible> + Clone { @@ -474,8 +474,9 @@ impl Builder { .context(MissingSchemaConfigProvider)?; let prom_remote_storage = Arc::new(CeresDBStorage::new( instance.clone(), - schema_config_provider, + schema_config_provider.clone(), )); + let influxdb = Arc::new(Influxdb::new(instance.clone(), schema_config_provider)); let (tx, rx) = oneshot::channel(); let service = Service { @@ -483,6 +484,7 @@ impl Builder { log_runtime, instance, prom_remote_storage, + influxdb, profiler: Arc::new(Profiler::default()), tx, config: self.config.clone(), From cfa31c034ea2be491110478c12bf06fc3ee57ee3 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 10 Mar 2023 17:54:26 +0800 Subject: [PATCH 2/7] add convert write request --- Cargo.lock | 80 +++++++++++++++------------ server/Cargo.toml | 1 + server/src/handlers/error.rs | 4 ++ server/src/handlers/influxdb.rs | 98 ++++++++++++++++++++++++++++++--- 4 files changed, 140 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index add93025d3..be6df5f0e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,7 +83,7 @@ dependencies = [ "arrow 32.0.0", "async-trait", "base64 0.13.0", - "bytes 1.2.1", + "bytes 1.4.0", "ceresdbproto", "common_types", "common_util", @@ -558,7 +558,7 @@ dependencies = [ "async-trait", "axum-core", "bitflags", - "bytes 1.2.1", + "bytes 1.4.0", "futures-util", "http", "http-body", @@ -585,7 +585,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9f0c0a60006f2a293d82d571f635042a72edf927539b7685bd62d361963839b" dependencies = [ "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "futures-util", "http", "http-body", @@ -921,15 +921,15 @@ dependencies = [ [[package]] name = "bytes" -version = "1.2.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" [[package]] name = "bytes_ext" version = "1.0.0" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "snafu 0.6.10", ] @@ -1738,7 +1738,7 @@ dependencies = [ "arrow 32.0.0", "async-compression", "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "bzip2", "chrono", "dashmap 5.4.0", @@ -2481,7 +2481,7 @@ version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "fnv", "futures-core", "futures-sink", @@ -2542,7 +2542,7 @@ checksum = "4cff78e5788be1e0ab65b04d306b2ed5092c815ec97ec70f4ebd5aee158aa55d" dependencies = [ "base64 0.13.0", "bitflags", - "bytes 1.2.1", + "bytes 1.4.0", "headers-core", "http", "httpdate", @@ -2607,7 +2607,7 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "fnv", "itoa 1.0.3", ] @@ -2618,7 +2618,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "http", "pin-project-lite", ] @@ -2656,7 +2656,7 @@ version = "0.14.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "futures-channel", "futures-core", "futures-util", @@ -2791,6 +2791,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "influxdb_line_protocol" +version = "0.1.0" +source = "git+https://github.com/jiacai2050/influxdb_line_protocol#14e00a3dbc99a5edff226b92e3496314b086acf4" +dependencies = [ + "bytes 1.4.0", + "nom 7.1.1", + "smallvec", + "snafu 0.7.1", +] + [[package]] name = "instant" version = "0.1.12" @@ -3499,7 +3510,7 @@ dependencies = [ "bitflags", "bitvec", "byteorder", - "bytes 1.2.1", + "bytes 1.4.0", "cc", "chrono", "cmake", @@ -3743,7 +3754,7 @@ checksum = "e1ea8f683b4f89a64181393742c041520a1a87e9775e6b4c0dd5a3281af05fc6" dependencies = [ "async-trait", "base64 0.21.0", - "bytes 1.2.1", + "bytes 1.4.0", "chrono", "futures 0.3.25", "itertools", @@ -3767,7 +3778,7 @@ name = "object_store" version = "1.0.0" dependencies = [ "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "ceresdbproto", "chrono", "clru", @@ -3886,7 +3897,7 @@ checksum = "a0d9aab6ebed77bd0998c728fbef20d6afc63db38c8fe85e0923b624c1b6bfab" dependencies = [ "async-trait", "base64 0.13.0", - "bytes 1.2.1", + "bytes 1.4.0", "chrono", "derive_more", "hmac", @@ -3961,7 +3972,7 @@ dependencies = [ "arrow-select", "base64 0.21.0", "brotli", - "bytes 1.2.1", + "bytes 1.4.0", "chrono", "flate2", "futures 0.3.25", @@ -4018,7 +4029,7 @@ dependencies = [ "arrow 32.0.0", "arrow_ext", "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "common_util", "datafusion", "datafusion-expr", @@ -4424,7 +4435,7 @@ version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21dc42e00223fc37204bd4aa177e69420c604ca4a183209a8f9de30c6d934698" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "prost-derive", ] @@ -4434,7 +4445,7 @@ version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "heck 0.4.0", "itertools", "lazy_static", @@ -4469,7 +4480,7 @@ version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5e0526209433e96d83d750dd81a99118edbc55739e7e61a46764fd2ad537788" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "prost", ] @@ -4916,7 +4927,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c" dependencies = [ "base64 0.13.0", - "bytes 1.2.1", + "bytes 1.4.0", "encoding_rs", "futures-core", "futures-util", @@ -5004,7 +5015,7 @@ version = "0.3.0" source = "git+https://github.com/influxdata/rskafka.git?rev=00988a564b1db0249d858065fc110476c075efad#00988a564b1db0249d858065fc110476c075efad" dependencies = [ "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "chrono", "crc32c", "flate2", @@ -5310,7 +5321,7 @@ dependencies = [ "arrow 32.0.0", "arrow_ext", "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "catalog", "ceresdbproto", "cluster", @@ -5321,6 +5332,7 @@ dependencies = [ "df_operator", "futures 0.3.25", "http", + "influxdb_line_protocol", "interpreters", "lazy_static", "log", @@ -5469,7 +5481,7 @@ name = "skiplist" version = "1.0.0" dependencies = [ "arena", - "bytes 1.2.1", + "bytes 1.4.0", "criterion", "rand 0.7.3", "yatp", @@ -5539,9 +5551,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" [[package]] name = "snafu" @@ -6101,7 +6113,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" dependencies = [ "autocfg 1.1.0", - "bytes 1.2.1", + "bytes 1.4.0", "libc", "memchr", "mio", @@ -6186,7 +6198,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" dependencies = [ "async-stream", - "bytes 1.2.1", + "bytes 1.4.0", "futures-core", "tokio", "tokio-stream", @@ -6210,7 +6222,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "futures-core", "futures-sink", "pin-project-lite", @@ -6237,7 +6249,7 @@ dependencies = [ "async-trait", "axum", "base64 0.13.0", - "bytes 1.2.1", + "bytes 1.4.0", "futures-core", "futures-util", "h2", @@ -6317,7 +6329,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" dependencies = [ "bitflags", - "bytes 1.2.1", + "bytes 1.4.0", "futures-core", "futures-util", "http", @@ -6464,7 +6476,7 @@ checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" dependencies = [ "base64 0.13.0", "byteorder", - "bytes 1.2.1", + "bytes 1.4.0", "http", "httparse", "log", @@ -6695,7 +6707,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed7b8be92646fc3d18b06147664ebc5f48d222686cb11a8755e561a735aacc6d" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "futures-channel", "futures-util", "headers", diff --git a/server/Cargo.toml b/server/Cargo.toml index 8aa7275a35..97e2c82baa 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -26,6 +26,7 @@ datafusion-expr = { workspace = true } df_operator = { workspace = true } futures = { workspace = true } http = "0.2" +influxdb_line_protocol = { git = "https://github.com/jiacai2050/influxdb_line_protocol" } interpreters = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } diff --git a/server/src/handlers/error.rs b/server/src/handlers/error.rs index 42d6ad1c1c..1e6a6ad24d 100644 --- a/server/src/handlers/error.rs +++ b/server/src/handlers/error.rs @@ -2,6 +2,7 @@ //! Error of handlers +use common_util::error::GenericError; use snafu::{Backtrace, Snafu}; use warp::reject::Reject; @@ -71,6 +72,9 @@ pub enum Error { source: tokio::time::error::Elapsed, backtrace: Backtrace, }, + + #[snafu(display("Influxdb handler failed, msg:{}, source:{}", msg, source))] + InfluxdbHandler { msg: String, source: GenericError }, } define_result!(Error); diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs index 0d028a9f82..3c71935f25 100644 --- a/server/src/handlers/influxdb.rs +++ b/server/src/handlers/influxdb.rs @@ -4,11 +4,18 @@ //! [1]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint //! [2]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use bytes::Bytes; -use handlers::{error::Result, query::QueryRequest}; +use ceresdbproto::storage::{value, FieldGroup, Tag, Value, WriteSeriesEntry, WriteTableRequest}; +use common_types::time::Timestamp; +use common_util::error::BoxError; +use handlers::{ + error::{InfluxdbHandler, Result}, + query::QueryRequest, +}; use query_engine::executor::Executor as QueryExecutor; +use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use warp::{reject, reply, Rejection, Reply}; use crate::{ @@ -22,21 +29,38 @@ pub struct Influxdb { schema_config_provider: SchemaConfigProviderRef, } +#[derive(Debug, Default)] +pub enum Precision { + #[default] + Millisecond, + Second, +} + +impl Precision { + fn normalize(&self, ts: i64) -> i64 { + match self { + Self::Millisecond => ts, + Self::Second => ts * 1000, + } + } +} + /// Line protocol pub struct WriteRequest { - pub payload: String, + pub lines: String, + pub precision: Precision, } impl From for WriteRequest { fn from(bytes: Bytes) -> Self { WriteRequest { - payload: String::from_utf8_lossy(&bytes).to_string(), + lines: String::from_utf8_lossy(&bytes).to_string(), + precision: Default::default(), } } } -#[allow(dead_code)] -type WriteResponse = String; +pub type WriteResponse = (); impl Influxdb { pub fn new(instance: InstanceRef, schema_config_provider: SchemaConfigProviderRef) -> Self { @@ -56,11 +80,65 @@ impl Influxdb { .map(handlers::query::convert_output) } - async fn write(&self, _ctx: RequestContext, _req: WriteRequest) -> Result { + async fn write(&self, ctx: RequestContext, req: WriteRequest) -> Result { todo!() } } +fn convert_write_req(req: WriteRequest) -> Result> { + let mut req_by_measurement = HashMap::new(); + let default_ts = Timestamp::now().as_i64(); + for line in influxdb_line_protocol::parse_lines(&req.lines) { + let mut line = line + .box_err() + .with_context(|| InfluxdbHandler { msg: "valid line" })?; + + let timestamp = line + .timestamp + .map_or_else(|| default_ts, |ts| req.precision.normalize(ts)); + let mut tag_set = line.series.tag_set.unwrap_or_default(); + // sort by tag key + tag_set.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + // sort by field key + line.field_set.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + + req_by_measurement + .entry(line.series.measurement.to_string()) + .or_insert_with(|| WriteTableRequest { + table: line.series.measurement.to_string(), + tag_names: tag_set.iter().map(|(tagk, _)| tagk.to_string()).collect(), + field_names: line + .field_set + .iter() + .map(|(tagk, _)| tagk.to_string()) + .collect(), + entries: Vec::new(), + }) + .entries + .push(WriteSeriesEntry { + tags: tag_set + .iter() + .enumerate() + .map(|(idx, (_, tagv))| Tag { + name_index: idx as u32, + value: Some(Value { + value: Some(value::Value::StringValue(tagv.to_string())), + }), + }) + .collect(), + field_groups: line + .field_set + .iter() + .map(|(_, fieldv)| FieldGroup { + timestamp, + fields: vec![], + }) + .collect(), + }); + } + todo!() +} + // TODO: Request and response type don't match influxdb's API now. pub async fn query( ctx: RequestContext, @@ -74,11 +152,13 @@ pub async fn query( } // TODO: Request and response type don't match influxdb's API now. -#[allow(dead_code)] pub async fn write( ctx: RequestContext, db: Arc>, req: WriteRequest, ) -> std::result::Result { - db.write(ctx, req).await.map_err(reject::custom) + db.write(ctx, req) + .await + .map_err(reject::custom) + .map(|_| reply::reply()) } From 8e5592b94a1024bec698eefdc3d761c9b23daab3 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 13 Mar 2023 11:37:35 +0800 Subject: [PATCH 3/7] impl write handler --- server/src/handlers/influxdb.rs | 116 ++++++++++++++++++++++++++------ server/src/http.rs | 29 ++++---- 2 files changed, 110 insertions(+), 35 deletions(-) diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs index 3c71935f25..124889bb2b 100644 --- a/server/src/handlers/influxdb.rs +++ b/server/src/handlers/influxdb.rs @@ -4,28 +4,31 @@ //! [1]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint //! [2]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Instant}; use bytes::Bytes; -use ceresdbproto::storage::{value, FieldGroup, Tag, Value, WriteSeriesEntry, WriteTableRequest}; -use common_types::time::Timestamp; +use ceresdbproto::storage::{ + value, Field, FieldGroup, Tag, Value, WriteSeriesEntry, WriteTableRequest, +}; +use common_types::{request_id::RequestId, time::Timestamp}; use common_util::error::BoxError; use handlers::{ error::{InfluxdbHandler, Result}, query::QueryRequest, }; +use influxdb_line_protocol::FieldValue; +use log::debug; use query_engine::executor::Executor as QueryExecutor; -use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; +use snafu::ResultExt; use warp::{reject, reply, Rejection, Reply}; use crate::{ - context::RequestContext, handlers, instance::InstanceRef, - schema_config_provider::SchemaConfigProviderRef, + context::RequestContext, grpc::storage_service::write::WriteContext, handlers, + instance::InstanceRef, schema_config_provider::SchemaConfigProviderRef, }; pub struct Influxdb { instance: InstanceRef, - #[allow(dead_code)] schema_config_provider: SchemaConfigProviderRef, } @@ -33,6 +36,8 @@ pub struct Influxdb { pub enum Precision { #[default] Millisecond, + // TODO: parse precision `second` from HTTP API + #[allow(dead_code)] Second, } @@ -46,6 +51,7 @@ impl Precision { } /// Line protocol +#[derive(Debug)] pub struct WriteRequest { pub lines: String, pub precision: Precision, @@ -81,17 +87,66 @@ impl Influxdb { } async fn write(&self, ctx: RequestContext, req: WriteRequest) -> Result { - todo!() + let request_id = RequestId::next_id(); + let deadline = ctx.timeout.map(|t| Instant::now() + t); + let catalog = &ctx.catalog; + self.instance.catalog_manager.default_catalog_name(); + let schema = &ctx.schema; + let schema_config = self + .schema_config_provider + .schema_config(schema) + .box_err() + .with_context(|| InfluxdbHandler { + msg: format!("get schema config failed, schema:{schema}"), + })?; + + let write_context = + WriteContext::new(request_id, deadline, catalog.clone(), schema.clone()); + + let plans = crate::grpc::storage_service::write::write_request_to_insert_plan( + self.instance.clone(), + convert_write_request(req)?, + schema_config, + write_context, + ) + .await + .box_err() + .with_context(|| InfluxdbHandler { + msg: "write request to insert plan", + })?; + + let mut success = 0; + for insert_plan in plans { + success += crate::grpc::storage_service::write::execute_plan( + request_id, + catalog, + schema, + self.instance.clone(), + insert_plan, + deadline, + ) + .await + .box_err() + .with_context(|| InfluxdbHandler { + msg: "execute plan", + })?; + } + debug!( + "Remote write finished, catalog:{}, schema:{}, success:{}", + catalog, schema, success + ); + + Ok(()) } } -fn convert_write_req(req: WriteRequest) -> Result> { +fn convert_write_request(req: WriteRequest) -> Result> { let mut req_by_measurement = HashMap::new(); let default_ts = Timestamp::now().as_i64(); for line in influxdb_line_protocol::parse_lines(&req.lines) { - let mut line = line - .box_err() - .with_context(|| InfluxdbHandler { msg: "valid line" })?; + let mut line = line.box_err().with_context(|| InfluxdbHandler { + msg: "invalid line", + })?; let timestamp = line .timestamp @@ -126,17 +181,36 @@ fn convert_write_req(req: WriteRequest) -> Result> { }), }) .collect(), - field_groups: line - .field_set - .iter() - .map(|(_, fieldv)| FieldGroup { - timestamp, - fields: vec![], - }) - .collect(), + // TODO: merge field group for same series + field_groups: vec![FieldGroup { + timestamp, + fields: line + .field_set + .into_iter() + .enumerate() + .map(|(idx, (_, fieldv))| Field { + name_index: idx as u32, + value: Some(convert_influx_value(fieldv)), + }) + .collect(), + }], }); } - todo!() + + Ok(req_by_measurement.into_values().collect()) +} + +/// Convert influxdb's FieldValue to ceresdbproto's Value +fn convert_influx_value(field_value: FieldValue) -> Value { + let v = match field_value { + FieldValue::I64(v) => value::Value::Int64Value(v), + FieldValue::U64(v) => value::Value::Uint64Value(v), + FieldValue::F64(v) => value::Value::Float64Value(v), + FieldValue::String(v) => value::Value::StringValue(v.to_string()), + FieldValue::Boolean(v) => value::Value::BoolValue(v), + }; + + Value { value: Some(v) } } // TODO: Request and response type don't match influxdb's API now. diff --git a/server/src/http.rs b/server/src/http.rs index 951ac115b0..7cd2b1b51a 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -130,20 +130,18 @@ impl Service { fn routes( &self, ) -> impl Filter + Clone { - warp::body::content_length_limit(self.config.max_body_size).and( - self.home() - // public APIs - .or(self.metrics()) - .or(self.sql()) - .or(self.influxdb_api()) - .or(self.prom_api()) - // admin APIs - .or(self.admin_block()) - // debug APIs - .or(self.flush_memtable()) - .or(self.update_log_level()) - .or(self.heap_profile()), - ) + self.home() + // public APIs + .or(self.metrics()) + .or(self.sql()) + .or(self.influxdb_api()) + .or(self.prom_api()) + // admin APIs + .or(self.admin_block()) + // debug APIs + .or(self.flush_memtable()) + .or(self.update_log_level()) + .or(self.heap_profile()) } /// Expose `/prom/v1/read` and `/prom/v1/write` to serve Prometheus remote @@ -168,6 +166,7 @@ impl Service { warp::path!("prom" / "v1" / ..) .and(warp::post()) + .and(warp::body::content_length_limit(self.config.max_body_size)) .and(write_api.or(query_api)) } @@ -189,6 +188,7 @@ impl Service { warp::path!("sql") .and(warp::post()) + .and(warp::body::content_length_limit(self.config.max_body_size)) .and(extract_request) .and(self.with_context()) .and(self.with_instance()) @@ -227,6 +227,7 @@ impl Service { warp::path!("influxdb" / "v1" / ..) .and(warp::post()) + .and(warp::body::content_length_limit(self.config.max_body_size)) .and(write_api.or(query_api)) } From d70bc238cc5ad92aa283047651c7ae6b74d5ec86 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 13 Mar 2023 12:09:04 +0800 Subject: [PATCH 4/7] merge field group for same series --- server/src/handlers/influxdb.rs | 67 ++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs index 124889bb2b..78b37f7bf3 100644 --- a/server/src/handlers/influxdb.rs +++ b/server/src/handlers/influxdb.rs @@ -157,7 +157,7 @@ fn convert_write_request(req: WriteRequest) -> Result> { // sort by field key line.field_set.sort_unstable_by(|a, b| a.0.cmp(&b.0)); - req_by_measurement + let req_for_one_measurement = req_by_measurement .entry(line.series.measurement.to_string()) .or_insert_with(|| WriteTableRequest { table: line.series.measurement.to_string(), @@ -168,33 +168,46 @@ fn convert_write_request(req: WriteRequest) -> Result> { .map(|(tagk, _)| tagk.to_string()) .collect(), entries: Vec::new(), - }) - .entries - .push(WriteSeriesEntry { - tags: tag_set - .iter() - .enumerate() - .map(|(idx, (_, tagv))| Tag { - name_index: idx as u32, - value: Some(Value { - value: Some(value::Value::StringValue(tagv.to_string())), - }), - }) - .collect(), - // TODO: merge field group for same series - field_groups: vec![FieldGroup { - timestamp, - fields: line - .field_set - .into_iter() - .enumerate() - .map(|(idx, (_, fieldv))| Field { - name_index: idx as u32, - value: Some(convert_influx_value(fieldv)), - }) - .collect(), - }], }); + + let tags: Vec<_> = tag_set + .iter() + .enumerate() + .map(|(idx, (_, tagv))| Tag { + name_index: idx as u32, + value: Some(Value { + value: Some(value::Value::StringValue(tagv.to_string())), + }), + }) + .collect(); + let field_group = FieldGroup { + timestamp, + fields: line + .field_set + .iter() + .cloned() + .enumerate() + .map(|(idx, (_, fieldv))| Field { + name_index: idx as u32, + value: Some(convert_influx_value(fieldv)), + }) + .collect(), + }; + let mut found = false; + for entry in &mut req_for_one_measurement.entries { + if entry.tags == tags { + // TODO: remove clone? + entry.field_groups.push(field_group.clone()); + found = true; + break; + } + } + if !found { + req_for_one_measurement.entries.push(WriteSeriesEntry { + tags, + field_groups: vec![field_group], + }) + } } Ok(req_by_measurement.into_values().collect()) From 753bec8bd455a97be18210868a7c2a84b2f3c845 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 13 Mar 2023 14:44:01 +0800 Subject: [PATCH 5/7] add convert unittest --- integration_tests/src/database.rs | 9 +-- server/src/handlers/influxdb.rs | 115 ++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 8 deletions(-) diff --git a/integration_tests/src/database.rs b/integration_tests/src/database.rs index 40f8eed14b..4addf9c817 100644 --- a/integration_tests/src/database.rs +++ b/integration_tests/src/database.rs @@ -18,7 +18,6 @@ use ceresdb_client::{ RpcContext, }; use reqwest::ClientBuilder; -use serde::Serialize; use sql::{ ast::{Statement, TableName}, parser::Parser, @@ -46,11 +45,6 @@ struct HttpClient { endpoint: String, } -#[derive(Clone, Serialize)] -struct InfluxQLRequest { - query: String, -} - impl HttpClient { fn new(endpoint: String) -> Self { let client = ClientBuilder::new() @@ -179,11 +173,10 @@ impl CeresDB { async fn execute_influxql(query: String, http_client: HttpClient) -> Box { let url = format!("http://{}/influxdb/v1/query", http_client.endpoint); - let query_request = InfluxQLRequest { query }; let resp = http_client .client .post(url) - .json(&query_request) + .body(query) .send() .await .unwrap(); diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs index 78b37f7bf3..435304fb0a 100644 --- a/server/src/handlers/influxdb.rs +++ b/server/src/handlers/influxdb.rs @@ -249,3 +249,118 @@ pub async fn write( .map_err(reject::custom) .map(|_| reply::reply()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_convert_influxdb_write_req() { + let lines = r#" +demo,tag1=t1,tag2=t2 field1=90,field2=100 1678675992000 +demo,tag1=t1,tag2=t2 field1=91,field2=101 1678675993000 +demo,tag1=t11,tag2=t22 field1=900,field2=1000 1678675992000 +demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000 +"# + .to_string(); + let req = WriteRequest { + lines, + precision: Precision::Millisecond, + }; + + let pb_req = convert_write_request(req).unwrap(); + assert_eq!(1, pb_req.len()); + assert_eq!( + pb_req[0], + WriteTableRequest { + table: "demo".to_string(), + tag_names: vec!["tag1".to_string(), "tag2".to_string()], + field_names: vec!["field1".to_string(), "field2".to_string()], + entries: vec![ + // First series + WriteSeriesEntry { + tags: vec![ + Tag { + name_index: 0, + value: Some(convert_influx_value(FieldValue::String("t1".into()))), + }, + Tag { + name_index: 1, + value: Some(convert_influx_value(FieldValue::String("t2".into()))), + }, + ], + field_groups: vec![ + FieldGroup { + timestamp: 1678675992000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(90.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(100.0))), + } + ] + }, + FieldGroup { + timestamp: 1678675993000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(91.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(101.0))), + } + ] + }, + ] + }, + // Second series + WriteSeriesEntry { + tags: vec![ + Tag { + name_index: 0, + value: Some(convert_influx_value(FieldValue::String("t11".into()))), + }, + Tag { + name_index: 1, + value: Some(convert_influx_value(FieldValue::String("t22".into()))), + }, + ], + field_groups: vec![ + FieldGroup { + timestamp: 1678675992000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(900.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(1000.0))), + } + ] + }, + FieldGroup { + timestamp: 1678675993000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(901.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(1001.0))), + } + ] + }, + ] + } + ] + } + ); + } +} From b2f7dd980ccd5fd87f31b7b3fd03e78293cc1962 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 13 Mar 2023 15:45:24 +0800 Subject: [PATCH 6/7] fix CR --- server/src/handlers/error.rs | 4 ++-- server/src/handlers/influxdb.rs | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/handlers/error.rs b/server/src/handlers/error.rs index 1e6a6ad24d..16c2355a53 100644 --- a/server/src/handlers/error.rs +++ b/server/src/handlers/error.rs @@ -73,8 +73,8 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Influxdb handler failed, msg:{}, source:{}", msg, source))] - InfluxdbHandler { msg: String, source: GenericError }, + #[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))] + InfluxDbHandler { msg: String, source: GenericError }, } define_result!(Error); diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs index 435304fb0a..31dca245bc 100644 --- a/server/src/handlers/influxdb.rs +++ b/server/src/handlers/influxdb.rs @@ -13,7 +13,7 @@ use ceresdbproto::storage::{ use common_types::{request_id::RequestId, time::Timestamp}; use common_util::error::BoxError; use handlers::{ - error::{InfluxdbHandler, Result}, + error::{InfluxDbHandler, Result}, query::QueryRequest, }; use influxdb_line_protocol::FieldValue; @@ -96,7 +96,7 @@ impl Influxdb { .schema_config_provider .schema_config(schema) .box_err() - .with_context(|| InfluxdbHandler { + .with_context(|| InfluxDbHandler { msg: format!("get schema config failed, schema:{schema}"), })?; @@ -111,7 +111,7 @@ impl Influxdb { ) .await .box_err() - .with_context(|| InfluxdbHandler { + .with_context(|| InfluxDbHandler { msg: "write request to insert plan", })?; @@ -127,12 +127,12 @@ impl Influxdb { ) .await .box_err() - .with_context(|| InfluxdbHandler { + .with_context(|| InfluxDbHandler { msg: "execute plan", })?; } debug!( - "Remote write finished, catalog:{}, schema:{}, success:{}", + "Influxdb write finished, catalog:{}, schema:{}, success:{}", catalog, schema, success ); @@ -144,7 +144,7 @@ fn convert_write_request(req: WriteRequest) -> Result> { let mut req_by_measurement = HashMap::new(); let default_ts = Timestamp::now().as_i64(); for line in influxdb_line_protocol::parse_lines(&req.lines) { - let mut line = line.box_err().with_context(|| InfluxdbHandler { + let mut line = line.box_err().with_context(|| InfluxDbHandler { msg: "invalid line", })?; @@ -247,7 +247,7 @@ pub async fn write( db.write(ctx, req) .await .map_err(reject::custom) - .map(|_| reply::reply()) + .map(|_| warp::http::StatusCode::NO_CONTENT) } #[cfg(test)] From e88784fd299b00bd0b176900edb8ca7b7f26ff9c Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 13 Mar 2023 17:15:29 +0800 Subject: [PATCH 7/7] fix naming --- server/src/handlers/influxdb.rs | 8 ++++---- server/src/http.rs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs index 31dca245bc..8a057a2e5c 100644 --- a/server/src/handlers/influxdb.rs +++ b/server/src/handlers/influxdb.rs @@ -27,7 +27,7 @@ use crate::{ instance::InstanceRef, schema_config_provider::SchemaConfigProviderRef, }; -pub struct Influxdb { +pub struct InfluxDb { instance: InstanceRef, schema_config_provider: SchemaConfigProviderRef, } @@ -68,7 +68,7 @@ impl From for WriteRequest { pub type WriteResponse = (); -impl Influxdb { +impl InfluxDb { pub fn new(instance: InstanceRef, schema_config_provider: SchemaConfigProviderRef) -> Self { Self { instance, @@ -229,7 +229,7 @@ fn convert_influx_value(field_value: FieldValue) -> Value { // TODO: Request and response type don't match influxdb's API now. pub async fn query( ctx: RequestContext, - db: Arc>, + db: Arc>, req: QueryRequest, ) -> std::result::Result { db.query(ctx, req) @@ -241,7 +241,7 @@ pub async fn query( // TODO: Request and response type don't match influxdb's API now. pub async fn write( ctx: RequestContext, - db: Arc>, + db: Arc>, req: WriteRequest, ) -> std::result::Result { db.write(ctx, req) diff --git a/server/src/http.rs b/server/src/http.rs index 7cd2b1b51a..f54db35dc6 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -33,7 +33,7 @@ use crate::{ error_util, handlers::{ self, - influxdb::{self, Influxdb}, + influxdb::{self, InfluxDb}, prom::CeresDBStorage, query::Request, }, @@ -114,7 +114,7 @@ pub struct Service { instance: InstanceRef, profiler: Arc, prom_remote_storage: RemoteStorageRef, - influxdb: Arc>, + influxdb: Arc>, tx: Sender<()>, config: HttpConfig, } @@ -403,7 +403,7 @@ impl Service { fn with_influxdb( &self, - ) -> impl Filter>,), Error = Infallible> + Clone { + ) -> impl Filter>,), Error = Infallible> + Clone { let influxdb = self.influxdb.clone(); warp::any().map(move || influxdb.clone()) } @@ -477,7 +477,7 @@ impl Builder { instance.clone(), schema_config_provider.clone(), )); - let influxdb = Arc::new(Influxdb::new(instance.clone(), schema_config_provider)); + let influxdb = Arc::new(InfluxDb::new(instance.clone(), schema_config_provider)); let (tx, rx) = oneshot::channel(); let service = Service {