Skip to content

Commit

Permalink
feat: add influxdb write (#723)
Browse files Browse the repository at this point in the history
* refactor: add influxdb module

* add convert write request

* impl write handler

* merge field group for same series

* add convert unittest

* fix CR

* fix naming
  • Loading branch information
jiacai2050 authored Mar 13, 2023
1 parent aee2e70 commit 8837aa6
Show file tree
Hide file tree
Showing 7 changed files with 461 additions and 78 deletions.
80 changes: 46 additions & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 2 additions & 9 deletions integration_tests/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use ceresdb_client::{
RpcContext,
};
use reqwest::ClientBuilder;
use serde::Serialize;
use sql::{
ast::{Statement, TableName},
parser::Parser,
Expand Down Expand Up @@ -46,11 +45,6 @@ struct HttpClient {
endpoint: String,
}

#[derive(Clone, Serialize)]
struct InfluxQLRequest {
query: String,
}

impl HttpClient {
fn new(endpoint: String) -> Self {
let client = ClientBuilder::new()
Expand Down Expand Up @@ -178,12 +172,11 @@ impl CeresDB {
}

async fn execute_influxql(query: String, http_client: HttpClient) -> Box<dyn Display> {
let url = format!("http://{}/influxql", http_client.endpoint);
let query_request = InfluxQLRequest { query };
let url = format!("http://{}/influxdb/v1/query", http_client.endpoint);
let resp = http_client
.client
.post(url)
.json(&query_request)
.body(query)
.send()
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ datafusion-expr = { workspace = true }
df_operator = { workspace = true }
futures = { workspace = true }
http = "0.2"
influxdb_line_protocol = { git = "https://github.com/jiacai2050/influxdb_line_protocol" }
interpreters = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions server/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

//! Error of handlers
use common_util::error::GenericError;
use snafu::{Backtrace, Snafu};
use warp::reject::Reject;

use crate::limiter;
// TODO(yingwen): Avoid printing huge sql string
Expand Down Expand Up @@ -70,6 +72,11 @@ pub enum Error {
source: tokio::time::error::Elapsed,
backtrace: Backtrace,
},

#[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))]
InfluxDbHandler { msg: String, source: GenericError },
}

define_result!(Error);

impl Reject for Error {}
Loading

0 comments on commit 8837aa6

Please sign in to comment.