Skip to content

Commit

Permalink
feat: support opentsdb put api (apache#1037)
Browse files Browse the repository at this point in the history
## Rationale
Part of apache#904 

## Detailed Changes


## Test Plan
1. write some data points 
```
curl --location --request POST 'http://127.0.0.1:5440/opentsdb/api/put' --data-ascii '
[
{
    "metric": "sys.cpu.nice",
    "timestamp": 1687935743000,
    "value": 18,
    "tags": {
       "host": "web01",
       "dc": "lga"
    }
},
{
    "metric": "sys.cpu.nice",
    "timestamp": 1687935743000,
    "value": 18,
    "tags": {
       "host": "web01"
    }
}
]
'
```

2. select 
```
curl --location --request POST 'http://127.0.0.1:5440/sql' --data-ascii '
SELECT * from "sys.cpu.nice"
'
```

the response: 
```
{
  "rows": [
    {
      "tsid": 1890867319031064034,
      "timestamp": 1687935743000,
      "dc": null,
      "host": "web01",
      "value": 18.0
    },
    {
      "tsid": 7054964577922029584,
      "timestamp": 1687935743000,
      "dc": "lga",
      "host": "web01",
      "value": 18.0
    }
  ]
}
```
  • Loading branch information
zouxiang1993 authored and dust1 committed Aug 9, 2023
1 parent 9628772 commit 510ea32
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions common_util/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
};

use chrono::{DateTime, Utc};
use common_types::time::Timestamp;

pub trait DurationExt {
/// Convert into u64.
Expand Down Expand Up @@ -66,6 +67,17 @@ pub fn format_as_ymdhms(unix_timestamp: i64) -> String {
dt.format("%Y-%m-%d %H:%M:%S").to_string()
}

pub fn try_to_millis(ts: i64) -> Option<Timestamp> {
// https://help.aliyun.com/document_detail/60683.html
if (4294968..=4294967295).contains(&ts) {
return Some(Timestamp::new(ts * 1000));
}
if (4294967296..=9999999999999).contains(&ts) {
return Some(Timestamp::new(ts));
}
None
}

#[cfg(test)]
mod tests {
use std::thread;
Expand Down
1 change: 1 addition & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ futures = { workspace = true }
http = "0.2"
influxdb-line-protocol = "1.0"
interpreters = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
logger = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod http;
pub mod influxdb;
pub mod instance;
pub mod limiter;
pub mod opentsdb;
mod read;
pub mod schema_config_provider;
mod util;
Expand Down
50 changes: 50 additions & 0 deletions proxy/src/opentsdb/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! This module implements [put][1] for OpenTSDB
//! [1]: http://opentsdb.net/docs/build/html/api_http/put.html
use ceresdbproto::storage::{
RequestContext as GrpcRequestContext, WriteRequest as GrpcWriteRequest,
};
use log::debug;
use query_engine::executor::Executor as QueryExecutor;

use crate::{
context::RequestContext,
error::Result,
opentsdb::types::{convert_put_request, PutRequest, PutResponse},
Context, Proxy,
};

pub mod types;

impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub async fn handle_opentsdb_put(
&self,
ctx: RequestContext,
req: PutRequest,
) -> Result<PutResponse> {
let table_request = GrpcWriteRequest {
context: Some(GrpcRequestContext {
database: ctx.schema.clone(),
}),
table_requests: convert_put_request(req)?,
};
let proxy_context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.write_runtime.clone(),
enable_partition_table_access: false,
forwarded_from: None,
};
let result = self
.handle_write_internal(proxy_context, table_request)
.await?;

debug!(
"OpenTSDB write finished, catalog:{}, schema:{}, result:{result:?}",
ctx.catalog, ctx.schema
);

Ok(())
}
}
199 changes: 199 additions & 0 deletions proxy/src/opentsdb/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{
collections::{HashMap, HashSet},
fmt::Debug,
};

use bytes::Bytes;
use ceresdbproto::storage::{
value, Field, FieldGroup, Tag, Value as ProtoValue, WriteSeriesEntry, WriteTableRequest,
};
use common_util::{error::BoxError, time::try_to_millis};
use http::StatusCode;
use serde::Deserialize;
use serde_json::from_slice;
use snafu::{OptionExt, ResultExt};

use crate::error::{ErrNoCause, ErrWithCause, Result};

const OPENTSDB_DEFAULT_FIELD: &str = "value";

#[derive(Debug)]
pub struct PutRequest {
pub points: Bytes,

pub summary: Option<String>,
pub details: Option<String>,
pub sync: Option<String>,
pub sync_timeout: i32,
}

impl PutRequest {
pub fn new(points: Bytes, params: PutParams) -> Self {
PutRequest {
points,
summary: params.summary,
details: params.details,
sync: params.sync,
sync_timeout: params.sync_timeout,
}
}
}

pub type PutResponse = ();

/// Query string parameters for put api
///
/// It's derived from query string parameters of put described in
/// doc of OpenTSDB 2.4:
/// http://opentsdb.net/docs/build/html/api_http/put.html#requests
///
/// NOTE:
/// - all the params is unimplemented.
#[derive(Debug, Default, Deserialize)]
#[serde(default)]
pub struct PutParams {
pub summary: Option<String>,
pub details: Option<String>,
pub sync: Option<String>,
pub sync_timeout: i32,
}

#[derive(Debug, Deserialize)]
pub struct Point {
pub metric: String,
pub timestamp: i64,
pub value: Value,
pub tags: HashMap<String, String>,
}

#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum Value {
IntegerValue(i64),
F64Value(f64),
}

pub(crate) fn convert_put_request(req: PutRequest) -> Result<Vec<WriteTableRequest>> {
let points = {
// multi points represent as json array
let parse_array = from_slice::<Vec<Point>>(&req.points);
match parse_array {
Ok(points) => Ok(points),
Err(_e) => {
// single point represent as json object
let parse_object = from_slice::<Point>(&req.points);
match parse_object {
Ok(point) => Ok(vec![point]),
Err(e) => Err(e),
}
}
}
};
let points = points.box_err().with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: "Json parse error".to_string(),
})?;
validate(&points)?;

let mut points_per_metric = HashMap::with_capacity(100);
for point in points {
points_per_metric
.entry(point.metric.clone())
.or_insert(Vec::new())
.push(point);
}

let mut requests = Vec::with_capacity(points_per_metric.len());
for (metric, points) in points_per_metric {
let mut tag_names_set = HashSet::with_capacity(points[0].tags.len() * 2);
for point in &points {
for tag_name in point.tags.keys() {
tag_names_set.insert(tag_name.clone());
}
}

let mut tag_name_to_tag_index: HashMap<String, u32> =
HashMap::with_capacity(tag_names_set.len());
let mut tag_names = Vec::with_capacity(tag_names_set.len());
for (idx, tag_name) in tag_names_set.into_iter().enumerate() {
tag_name_to_tag_index.insert(tag_name.clone(), idx as u32);
tag_names.push(tag_name);
}

let mut req = WriteTableRequest {
table: metric,
tag_names,
field_names: vec![String::from(OPENTSDB_DEFAULT_FIELD)],
entries: Vec::with_capacity(points.len()),
};

for point in points {
let timestamp = point.timestamp;
let timestamp = try_to_millis(timestamp)
.with_context(|| ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Invalid timestamp: {}", point.timestamp),
})?
.as_i64();

let mut tags = Vec::with_capacity(point.tags.len());
for (tag_name, tag_value) in point.tags {
let &tag_index = tag_name_to_tag_index.get(&tag_name).unwrap();
tags.push(Tag {
name_index: tag_index,
value: Some(ProtoValue {
value: Some(value::Value::StringValue(tag_value)),
}),
});
}

let value = match point.value {
Value::IntegerValue(v) => value::Value::Int64Value(v),
Value::F64Value(v) => value::Value::Float64Value(v),
};
let fields = vec![Field {
name_index: 0,
value: Some(ProtoValue { value: Some(value) }),
}];

let field_groups = vec![FieldGroup { timestamp, fields }];

req.entries.push(WriteSeriesEntry { tags, field_groups });
}
requests.push(req);
}

Ok(requests)
}

pub(crate) fn validate(points: &[Point]) -> Result<()> {
for point in points {
if point.metric.is_empty() {
return ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: "Metric must not be empty",
}
.fail();
}
if point.tags.is_empty() {
return ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: "At least one tag must be supplied",
}
.fail();
}
for tag_name in point.tags.keys() {
if tag_name.is_empty() {
return ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: "Tag name must not be empty",
}
.fail();
}
}
}

Ok(())
}
26 changes: 26 additions & 0 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use proxy::{
http::sql::{convert_output, Request},
influxdb::types::{InfluxqlParams, InfluxqlRequest, WriteParams, WriteRequest},
instance::InstanceRef,
opentsdb::types::{PutParams, PutRequest},
Proxy,
};
use query_engine::executor::Executor as QueryExecutor;
Expand Down Expand Up @@ -183,6 +184,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.or(self.metrics())
.or(self.sql())
.or(self.influxdb_api())
.or(self.opentsdb_api())
.or(self.prom_api())
.or(self.route())
// admin APIs
Expand Down Expand Up @@ -344,6 +346,30 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
warp::path!("influxdb" / "v1" / ..).and(write_api.or(query_api))
}

fn opentsdb_api(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
let body_limit = warp::body::content_length_limit(self.config.max_body_size);

let put_api = warp::path!("put")
.and(warp::post())
.and(body_limit)
.and(self.with_context())
.and(warp::query::<PutParams>())
.and(warp::body::bytes())
.and(self.with_proxy())
.and_then(|ctx, params, points, proxy: Arc<Proxy<Q>>| async move {
let request = PutRequest::new(points, params);
let result = proxy.handle_opentsdb_put(ctx, request).await;
match result {
Ok(_res) => Ok(reply::with_status(warp::reply(), StatusCode::NO_CONTENT)),
Err(e) => Err(reject::custom(e)),
}
});

warp::path!("opentsdb" / "api" / ..).and(put_api)
}

// POST /debug/flush_memtable
fn flush_memtable(
&self,
Expand Down

0 comments on commit 510ea32

Please sign in to comment.