diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index d9b13fcf742c0..21e2e10c80ec7 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -295,6 +295,9 @@ gpg gql grafana graphiql +greptime +greptimecloud +greptimedb gvisor gws hadoop diff --git a/.github/workflows/changes.yml b/.github/workflows/changes.yml index db6c4225073e0..653d3395bdfda 100644 --- a/.github/workflows/changes.yml +++ b/.github/workflows/changes.yml @@ -70,6 +70,8 @@ on: value: ${{ jobs.int_tests.outputs.fluent }} gcp: value: ${{ jobs.int_tests.outputs.gcp }} + greptimedb: + value: ${{ jobs.int_tests.outputs.greptimedb }} humio: value: ${{ jobs.int_tests.outputs.humio }} http-client: @@ -194,6 +196,7 @@ jobs: eventstoredb: ${{ steps.filter.outputs.eventstoredb }} fluent: ${{ steps.filter.outputs.fluent }} gcp: ${{ steps.filter.outputs.gcp }} + greptimedb: ${{ steps.filter.outputs.greptimedb }} humio: ${{ steps.filter.outputs.humio }} http-client: ${{ steps.filter.outputs.http-client }} influxdb: ${{ steps.filter.outputs.influxdb }} diff --git a/.github/workflows/integration-comment.yml b/.github/workflows/integration-comment.yml index 8089f69bccc86..4393d9c117a13 100644 --- a/.github/workflows/integration-comment.yml +++ b/.github/workflows/integration-comment.yml @@ -221,6 +221,14 @@ jobs: max_attempts: 3 command: bash scripts/ci-integration-test.sh gcp + - name: greptimedb + if: ${{ contains(github.event.comment.body, '/ci-run-integration-greptimedb') || contains(github.event.comment.body, '/ci-run-all') }} + uses: nick-fields/retry@v2 + with: + timeout_minutes: 30 + max_attempts: 3 + command: bash scripts/ci-integration-test.sh greptimedb + - name: humio if: ${{ contains(github.event.comment.body, '/ci-run-integration-humio') || contains(github.event.comment.body, '/ci-run-all') }} uses: nick-fields/retry@v2 diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 9ffb06d8c45d4..dddec5af7e673 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -67,6 +67,7 @@ jobs: || needs.changes.outputs.eventstoredb == 'true' || needs.changes.outputs.fluent == 'true' || needs.changes.outputs.gcp == 'true' + || needs.changes.outputs.greptimedb == 'true' || needs.changes.outputs.humio == 'true' || needs.changes.outputs.http-client == 'true' || needs.changes.outputs.influxdb == 'true' @@ -233,6 +234,14 @@ jobs: max_attempts: 3 command: bash scripts/ci-integration-test.sh gcp + - if: ${{ github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.greptimedb == 'true' }} + name: greptimedb + uses: nick-fields/retry@v2 + with: + timeout_minutes: 30 + max_attempts: 3 + command: bash scripts/ci-integration-test.sh greptimedb + - if: ${{ github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.humio == 'true' }} name: humio uses: nick-fields/retry@v2 diff --git a/Cargo.lock b/Cargo.lock index b4643639d00ee..04bf16ff6d16b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3557,6 +3557,39 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "greptime-proto" +version = "0.1.0" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?tag=0.2.1#4398d20c56d5f7939cc2960789cb1fa7dd18e6fe" +dependencies = [ + "prost", + "serde", + "serde_json", + "tonic", + "tonic-build", +] + +[[package]] +name = "greptimedb-client" +version = "0.1.0" +source = "git+https://github.com/GreptimeTeam/greptimedb-client-rust.git?rev=bc32362adf0df17a41a95bae4221d6d8f1775656#bc32362adf0df17a41a95bae4221d6d8f1775656" +dependencies = [ + "dashmap", + "enum_dispatch", + "futures 0.3.28", + "futures-util", + "greptime-proto", + "parking_lot", + "prost", + "rand 0.8.5", + "snafu", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower", +] + [[package]] name = "grok" version = "2.0.0" @@ -9259,6 +9292,7 @@ dependencies = [ "glob", "goauth", "governor", + "greptimedb-client", "grok", "h2", "hash_hasher", diff --git a/Cargo.toml b/Cargo.toml index dd4227faab120..d3fe89000945c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -233,6 +233,9 @@ tui = { version = "0.19.0", optional = true, default-features = false, features hex = { version = "0.4.3", default-features = false, optional = true } sha2 = { version = "0.10.7", default-features = false, optional = true } +# GreptimeDB +greptimedb-client = { git = "https://github.com/GreptimeTeam/greptimedb-client-rust.git", rev = "bc32362adf0df17a41a95bae4221d6d8f1775656", optional = true } + # External libs arc-swap = { version = "1.6", default-features = false, optional = true } async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true } @@ -646,6 +649,7 @@ sinks-metrics = [ "sinks-blackhole", "sinks-console", "sinks-datadog_metrics", + "sinks-greptimedb", "sinks-humio", "sinks-influxdb", "sinks-kafka", @@ -679,6 +683,7 @@ sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serd sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"] sinks-file = ["dep:async-compression"] sinks-gcp = ["dep:base64", "gcp"] +sinks-greptimedb = ["dep:greptimedb-client"] sinks-honeycomb = [] sinks-http = [] sinks-humio = ["sinks-splunk_hec", "transforms-metric_to_log"] @@ -739,6 +744,7 @@ all-integration-tests = [ "gcp-cloud-storage-integration-tests", "gcp-integration-tests", "gcp-pubsub-integration-tests", + "greptimedb-integration-tests", "http-client-integration-tests", "humio-integration-tests", "influxdb-integration-tests", @@ -800,6 +806,7 @@ fluent-integration-tests = ["docker", "sources-fluent"] gcp-cloud-storage-integration-tests = ["sinks-gcp"] gcp-integration-tests = ["sinks-gcp"] gcp-pubsub-integration-tests = ["sinks-gcp", "sources-gcp_pubsub"] +greptimedb-integration-tests = ["sinks-greptimedb"] humio-integration-tests = ["sinks-humio"] http-client-integration-tests = ["sources-http_client"] influxdb-integration-tests = ["sinks-influxdb"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index fd759a75ab82a..c6fec46f59732 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -225,6 +225,8 @@ graphql-parser,https://github.com/graphql-rust/graphql-parser,MIT OR Apache-2.0, graphql_client,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé graphql_client_codegen,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé graphql_query_derive,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé +greptime-proto,https://github.com/GreptimeTeam/greptime-proto,Apache-2.0,The greptime-proto Authors +greptimedb-client,https://github.com/GreptimeTeam/greptimedb-client-rust,Apache-2.0,The greptimedb-client Authors grok,https://github.com/daschl/grok,Apache-2.0,Michael Nitschinger h2,https://github.com/hyperium/h2,MIT,"Carl Lerche , Sean McArthur " hash_hasher,https://github.com/Fraser999/Hash-Hasher,Apache-2.0 OR MIT,Fraser Hutchison diff --git a/Makefile b/Makefile index d988725b612f8..c1c2970767305 100644 --- a/Makefile +++ b/Makefile @@ -332,7 +332,7 @@ test-behavior: test-behavior-transforms test-behavior-formats test-behavior-conf test-integration: ## Runs all integration tests test-integration: test-integration-amqp test-integration-appsignal test-integration-aws test-integration-axiom test-integration-azure test-integration-chronicle test-integration-clickhouse test-integration: test-integration-databend test-integration-docker-logs test-integration-elasticsearch -test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-humio test-integration-http-client test-integration-influxdb +test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-greptimedb test-integration-humio test-integration-http-client test-integration-influxdb test-integration: test-integration-kafka test-integration-logstash test-integration-loki test-integration-mongodb test-integration-nats test-integration: test-integration-nginx test-integration-opentelemetry test-integration-postgres test-integration-prometheus test-integration-pulsar test-integration: test-integration-redis test-integration-splunk test-integration-dnstap test-integration-datadog-agent test-integration-datadog-logs diff --git a/scripts/integration/greptimedb/compose.yaml b/scripts/integration/greptimedb/compose.yaml new file mode 100644 index 0000000000000..7cd131240b49d --- /dev/null +++ b/scripts/integration/greptimedb/compose.yaml @@ -0,0 +1,8 @@ +version: '3' + +services: + greptimedb: + image: docker.io/greptime/greptimedb:${CONFIG_VERSION} + command: "standalone start --http-addr=0.0.0.0:4000 --rpc-addr=0.0.0.0:4001" + healthcheck: + test: "curl -f localhost:4000/health || exit 1" diff --git a/scripts/integration/greptimedb/test.yaml b/scripts/integration/greptimedb/test.yaml new file mode 100644 index 0000000000000..b428b37a03ed2 --- /dev/null +++ b/scripts/integration/greptimedb/test.yaml @@ -0,0 +1,12 @@ +features: +- greptimedb-integration-tests + +test_filter: '::greptimedb::' + +runner: + env: + GREPTIMEDB_ENDPOINT: greptimedb:4001 + GREPTIMEDB_HTTP: http://greptimedb:4000 + +matrix: + version: ['latest'] diff --git a/src/sinks/greptimedb/batch.rs b/src/sinks/greptimedb/batch.rs new file mode 100644 index 0000000000000..af4ef01096dfb --- /dev/null +++ b/src/sinks/greptimedb/batch.rs @@ -0,0 +1,48 @@ +use vector_core::{ + event::{Metric, MetricValue}, + stream::batcher::limiter::ItemBatchSize, +}; + +use super::request_builder::{ + DISTRIBUTION_QUANTILES, DISTRIBUTION_STAT_FIELD_COUNT, SUMMARY_STAT_FIELD_COUNT, +}; + +const F64_BYTE_SIZE: usize = 8; +const I64_BYTE_SIZE: usize = 8; + +#[derive(Default)] +pub(super) struct GreptimeDBBatchSizer; + +impl GreptimeDBBatchSizer { + pub(super) fn estimated_size_of(&self, item: &Metric) -> usize { + // Metric name. + item.series().name().name().len() + // Metric namespace, with an additional 1 to account for the namespace separator. + + item.series().name().namespace().map(|s| s.len() + 1).unwrap_or(0) + // Metric tags, with an additional 1 per tag to account for the tag key/value separator. + + item.series().tags().map(|t| { + t.iter_all().map(|(k, v)| { + k.len() + 1 + v.map(|v| v.len()).unwrap_or(0) + }) + .sum() + }) + .unwrap_or(0) + // timestamp + + I64_BYTE_SIZE + + + // value size + match item.value() { + MetricValue::Counter { .. } | MetricValue::Gauge { .. } | MetricValue::Set { ..} => F64_BYTE_SIZE, + MetricValue::Distribution { .. } => F64_BYTE_SIZE * (DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT), + MetricValue::AggregatedHistogram { buckets, .. } => F64_BYTE_SIZE * (buckets.len() + SUMMARY_STAT_FIELD_COUNT), + MetricValue::AggregatedSummary { quantiles, .. } => F64_BYTE_SIZE * (quantiles.len() + SUMMARY_STAT_FIELD_COUNT), + MetricValue::Sketch { .. } => F64_BYTE_SIZE * (DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT), + } + } +} + +impl ItemBatchSize for GreptimeDBBatchSizer { + fn size(&self, item: &Metric) -> usize { + self.estimated_size_of(item) + } +} diff --git a/src/sinks/greptimedb/integration_tests.rs b/src/sinks/greptimedb/integration_tests.rs new file mode 100644 index 0000000000000..598c49e1b1e23 --- /dev/null +++ b/src/sinks/greptimedb/integration_tests.rs @@ -0,0 +1,88 @@ +use chrono::{DateTime, Duration, Utc}; +use futures::stream; +use vector_core::event::{Event, Metric, MetricKind, MetricValue}; +use vector_core::metric_tags; + +use crate::sinks::util::test::load_sink; +use crate::{ + config::{SinkConfig, SinkContext}, + test_util::{ + components::{run_and_assert_sink_compliance, SINK_TAGS}, + trace_init, + }, +}; + +use super::GreptimeDBConfig; + +#[tokio::test] +async fn test_greptimedb_sink() { + trace_init(); + let cfg = format!( + r#"endpoint= "{}" +"#, + std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned()) + ); + + let (config, _) = load_sink::(&cfg).unwrap(); + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + + let query_client = query_client(); + + // Drop the table and data inside + let _ = query_client + .get(&format!( + "{}/v1/sql", + std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned()) + )) + .query(&[("sql", "DROP TABLE ns_my_counter")]) + .send() + .await + .unwrap(); + + let base_time = Utc::now(); + let events: Vec<_> = (0..10).map(|idx| create_event(idx, base_time)).collect(); + run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await; + + let query_response = query_client + .get(&format!( + "{}/v1/sql", + std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned()) + )) + .query(&[("sql", "SELECT region, val FROM ns_my_counter")]) + .send() + .await + .unwrap() + .text() + .await + .expect("Fetch json from greptimedb failed"); + let result: serde_json::Value = + serde_json::from_str(&query_response).expect("Invalid json returned from greptimedb query"); + assert_eq!( + result + .pointer("/output/0/records/rows") + .and_then(|v| v.as_array()) + .expect("Error getting greptimedb response array") + .len(), + 10 + ) +} + +fn query_client() -> reqwest::Client { + reqwest::Client::builder().build().unwrap() +} + +fn create_event(i: i32, base_time: DateTime) -> Event { + Event::Metric( + Metric::new( + "my_counter".to_owned(), + MetricKind::Incremental, + MetricValue::Counter { value: i as f64 }, + ) + .with_namespace(Some("ns")) + .with_tags(Some(metric_tags!( + "region" => "us-west-1", + "production" => "true", + ))) + .with_timestamp(Some(base_time + Duration::seconds(i as i64))), + ) +} diff --git a/src/sinks/greptimedb/mod.rs b/src/sinks/greptimedb/mod.rs new file mode 100644 index 0000000000000..af0588e34b12b --- /dev/null +++ b/src/sinks/greptimedb/mod.rs @@ -0,0 +1,175 @@ +//! `GreptimeDB` sink for vector. +//! +//! This sink writes Vector's metric data into +//! [GreptimeDB](https://github.com/greptimeteam/greptimedb), a cloud-native +//! time-series database. It uses GreptimeDB's [gRPC +//! API](https://docs.greptime.com/user-guide/write-data/grpc) and GreptimeDB's +//! [rust client](https://github.com/GreptimeTeam/greptimedb-client-rust). +//! +//! This sink transforms metrics into GreptimeDB table using following rules: +//! +//! - Table name: `{namespace}_{metric_name}`. If the metric doesn't have a +//! namespace, we will use metric_name for table name. +//! - Timestamp: timestamp is stored as a column called `ts`. +//! - Tags: metric tags are stored as string columns with its name as column +//! name +//! - Counter and Gauge: the value of counter and gauge are stored in a column +//! called `val` +//! - Set: the number of set items is stored in a column called `val`. +//! - Distribution, Histogram and Summary, Sketch: Statistical attributes like +//! `sum`, `count`, "max", "min", quantiles and buckets are stored as columns. +//! +use greptimedb_client::Client; +use snafu::Snafu; +use vector_common::sensitive_string::SensitiveString; + +use crate::sinks::prelude::*; + +use self::service::GreptimeDBRetryLogic; + +mod batch; +#[cfg(all(test, feature = "greptimedb-integration-tests"))] +mod integration_tests; +mod request_builder; +mod service; +mod sink; + +#[derive(Clone, Copy, Debug, Default)] +pub struct GreptimeDBDefaultBatchSettings; + +impl SinkBatchSettings for GreptimeDBDefaultBatchSettings { + const MAX_EVENTS: Option = Some(20); + const MAX_BYTES: Option = None; + const TIMEOUT_SECS: f64 = 1.0; +} + +fn default_dbname() -> String { + greptimedb_client::DEFAULT_SCHEMA_NAME.to_string() +} + +/// Configuration items for GreptimeDB +#[configurable_component(sink("greptimedb", "Ingest metrics data into GreptimeDB."))] +#[derive(Clone, Debug, Derivative)] +#[derivative(Default)] +#[serde(deny_unknown_fields)] +pub struct GreptimeDBConfig { + /// The GreptimeDB [database][database] name to connect. + /// + /// Default to `public`, the default database of GreptimeDB. + /// + /// Database can be created via `create database` statement on + /// GreptimeDB. If you are using GreptimeCloud, use `dbname` from the + /// connection information of your instance. + /// + /// [database]: https://docs.greptime.com/user-guide/concepts/key-concepts#database + #[configurable(metadata(docs::examples = "public"))] + #[derivative(Default(value = "default_dbname()"))] + #[serde(default = "default_dbname")] + pub dbname: String, + /// The host and port of GreptimeDB gRPC service. + /// + /// This sink uses GreptimeDB's gRPC interface for data ingestion. By + /// default, GreptimeDB listens to port 4001 for gRPC protocol. + /// + /// The address _must_ include a port. + #[configurable(metadata(docs::examples = "example.com:4001"))] + #[configurable(metadata( + docs::examples = "1nge17d2r3ns.ap-southeast-1.aws.greptime.cloud:4001" + ))] + pub endpoint: String, + /// The username for your GreptimeDB instance. + /// + /// This is required if your instance has authentication enabled. + #[configurable(metadata(docs::examples = "username"))] + #[serde(default)] + pub username: Option, + /// The password for your GreptimeDB instance. + /// + /// This is required if your instance has authentication enabled. + #[configurable(metadata(docs::examples = "password"))] + #[serde(default)] + pub password: Option, + + #[configurable(derived)] + #[serde(default)] + pub request: TowerRequestConfig, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + pub tls: Option, +} + +impl_generate_config_from_default!(GreptimeDBConfig); + +#[typetag::serde(name = "greptimedb")] +#[async_trait::async_trait] +impl SinkConfig for GreptimeDBConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); + let service = ServiceBuilder::new() + .settings(request_settings, GreptimeDBRetryLogic) + .service(service::GreptimeDBService::try_new(self)?); + let sink = sink::GreptimeDBSink { + service, + batch_settings: self.batch.into_batcher_settings()?, + }; + + let healthcheck = healthcheck(self)?; + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::metric() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +fn healthcheck(config: &GreptimeDBConfig) -> crate::Result { + let client = Client::with_urls(vec![&config.endpoint]); + + Ok(async move { client.health_check().await.map_err(|error| error.into()) }.boxed()) +} + +#[derive(Debug, Snafu)] +pub enum GreptimeDBConfigError { + #[snafu(display("greptimedb TLS Config Error: missing key"))] + TlsMissingKey, + #[snafu(display("greptimedb TLS Config Error: missing cert"))] + TlsMissingCert, +} + +#[cfg(test)] +mod tests { + use indoc::indoc; + + use super::*; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } + + #[test] + fn test_config_with_username() { + let config = indoc! {r#" + endpoint = "foo-bar.ap-southeast-1.aws.greptime.cloud:4001" + dbname = "foo-bar" + "#}; + + toml::from_str::(config).unwrap(); + } +} diff --git a/src/sinks/greptimedb/request_builder.rs b/src/sinks/greptimedb/request_builder.rs new file mode 100644 index 0000000000000..06c92059c6f4e --- /dev/null +++ b/src/sinks/greptimedb/request_builder.rs @@ -0,0 +1,363 @@ +use chrono::Utc; +use greptimedb_client::api::v1::column::*; +use greptimedb_client::api::v1::*; +use vector_core::event::metric::{Bucket, MetricSketch, Quantile, Sample}; +use vector_core::event::{Metric, MetricValue}; +use vector_core::metrics::AgentDDSketch; + +use crate::sinks::util::statistic::DistributionStatistic; + +pub(super) const DISTRIBUTION_QUANTILES: [f64; 5] = [0.5, 0.75, 0.90, 0.95, 0.99]; +pub(super) const DISTRIBUTION_STAT_FIELD_COUNT: usize = 5; +pub(super) const SUMMARY_STAT_FIELD_COUNT: usize = 2; + +fn f64_field(name: &str, value: f64) -> Column { + Column { + column_name: name.to_owned(), + values: Some(column::Values { + f64_values: vec![value], + ..Default::default() + }), + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float64 as i32, + ..Default::default() + } +} + +fn ts_column(name: &str, value: i64) -> Column { + Column { + column_name: name.to_owned(), + values: Some(column::Values { + ts_millisecond_values: vec![value], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + } +} + +fn tag_column(name: &str, value: &str) -> Column { + Column { + column_name: name.to_owned(), + values: Some(column::Values { + string_values: vec![value.to_owned()], + ..Default::default() + }), + semantic_type: SemanticType::Tag as i32, + datatype: ColumnDataType::String as i32, + ..Default::default() + } +} + +pub(super) fn metric_to_insert_request(metric: Metric) -> InsertRequest { + let ns = metric.namespace(); + let metric_name = metric.name(); + let table_name = if let Some(ns) = ns { + format!("{ns}_{metric_name}") + } else { + metric_name.to_owned() + }; + + let mut columns = Vec::new(); + // timestamp + let timestamp = metric + .timestamp() + .map(|t| t.timestamp_millis()) + .unwrap_or_else(|| Utc::now().timestamp_millis()); + columns.push(ts_column("ts", timestamp)); + + // tags + if let Some(tags) = metric.tags() { + for (key, value) in tags.iter_single() { + columns.push(tag_column(key, value)); + } + } + + // fields + match metric.value() { + MetricValue::Counter { value } => columns.push(f64_field("val", *value)), + MetricValue::Gauge { value } => columns.push(f64_field("val", *value)), + MetricValue::Set { values } => columns.push(f64_field("val", values.len() as f64)), + MetricValue::Distribution { samples, .. } => { + encode_distribution(samples, &mut columns); + } + + MetricValue::AggregatedHistogram { + buckets, + count, + sum, + } => { + encode_histogram(buckets.as_ref(), &mut columns); + columns.push(f64_field("count", *count as f64)); + columns.push(f64_field("sum", *sum)); + } + MetricValue::AggregatedSummary { + quantiles, + count, + sum, + } => { + encode_quantiles(quantiles.as_ref(), &mut columns); + columns.push(f64_field("count", *count as f64)); + columns.push(f64_field("sum", *sum)); + } + MetricValue::Sketch { sketch } => { + let MetricSketch::AgentDDSketch(sketch) = sketch; + encode_sketch(sketch, &mut columns); + } + } + + InsertRequest { + table_name, + columns, + row_count: 1, + ..Default::default() + } +} + +fn encode_distribution(samples: &[Sample], columns: &mut Vec) { + if let Some(stats) = DistributionStatistic::from_samples(samples, &DISTRIBUTION_QUANTILES) { + columns.push(f64_field("min", stats.min)); + columns.push(f64_field("max", stats.max)); + columns.push(f64_field("avg", stats.avg)); + columns.push(f64_field("sum", stats.sum)); + columns.push(f64_field("count", stats.count as f64)); + + for (quantile, value) in stats.quantiles { + columns.push(f64_field(&format!("p{:02}", quantile * 100f64), value)); + } + } +} + +fn encode_histogram(buckets: &[Bucket], columns: &mut Vec) { + for bucket in buckets { + let column_name = format!("b{}", bucket.upper_limit); + columns.push(f64_field(&column_name, bucket.count as f64)); + } +} + +fn encode_quantiles(quantiles: &[Quantile], columns: &mut Vec) { + for quantile in quantiles { + let column_name = format!("p{:02}", quantile.quantile * 100f64); + columns.push(f64_field(&column_name, quantile.value)); + } +} + +fn encode_sketch(sketch: &AgentDDSketch, columns: &mut Vec) { + columns.push(f64_field("count", sketch.count() as f64)); + if let Some(min) = sketch.min() { + columns.push(f64_field("min", min)); + } + + if let Some(max) = sketch.max() { + columns.push(f64_field("max", max)); + } + + if let Some(sum) = sketch.sum() { + columns.push(f64_field("sum", sum)); + } + + if let Some(avg) = sketch.avg() { + columns.push(f64_field("avg", avg)); + } + + for q in DISTRIBUTION_QUANTILES { + if let Some(quantile) = sketch.quantile(q) { + let column_name = format!("p{:02}", q * 100f64); + columns.push(f64_field(&column_name, quantile)); + } + } +} + +#[cfg(test)] +mod tests { + + use similar_asserts::assert_eq; + + use super::*; + use crate::event::metric::{MetricKind, StatisticKind}; + + fn get_column(columns: &[Column], name: &str) -> f64 { + let col = columns.iter().find(|c| c.column_name == name).unwrap(); + *(col.values.as_ref().unwrap().f64_values.first().unwrap()) + } + + #[test] + fn test_metric_data_to_insert_request() { + let metric = Metric::new( + "load1", + MetricKind::Absolute, + MetricValue::Gauge { value: 1.1 }, + ) + .with_namespace(Some("ns")) + .with_tags(Some([("host".to_owned(), "my_host".to_owned())].into())) + .with_timestamp(Some(Utc::now())); + + let insert = metric_to_insert_request(metric); + + assert_eq!(insert.table_name, "ns_load1"); + assert_eq!(insert.row_count, 1); + assert_eq!(insert.columns.len(), 3); + + let column_names = insert + .columns + .iter() + .map(|c| c.column_name.as_ref()) + .collect::>(); + assert!(column_names.contains(&"ts")); + assert!(column_names.contains(&"host")); + assert!(column_names.contains(&"val")); + + assert_eq!(get_column(&insert.columns, "val"), 1.1); + + let metric2 = Metric::new( + "load1", + MetricKind::Absolute, + MetricValue::Gauge { value: 1.1 }, + ); + let insert2 = metric_to_insert_request(metric2); + assert_eq!(insert2.table_name, "load1"); + } + + #[test] + fn test_counter() { + let metric = Metric::new( + "cpu_seconds_total", + MetricKind::Incremental, + MetricValue::Counter { value: 1.1 }, + ); + let insert = metric_to_insert_request(metric); + assert_eq!(insert.columns.len(), 2); + + assert_eq!(get_column(&insert.columns, "val"), 1.1); + } + + #[test] + fn test_set() { + let metric = Metric::new( + "cpu_seconds_total", + MetricKind::Absolute, + MetricValue::Set { + values: ["foo".to_owned(), "bar".to_owned()].into_iter().collect(), + }, + ); + let insert = metric_to_insert_request(metric); + assert_eq!(insert.columns.len(), 2); + + assert_eq!(get_column(&insert.columns, "val"), 2.0); + } + + #[test] + fn test_distribution() { + let metric = Metric::new( + "cpu_seconds_total", + MetricKind::Incremental, + MetricValue::Distribution { + samples: vector_core::samples![1.0 => 2, 2.0 => 4, 3.0 => 2], + statistic: StatisticKind::Histogram, + }, + ); + let insert = metric_to_insert_request(metric); + assert_eq!( + insert.columns.len(), + 1 + DISTRIBUTION_STAT_FIELD_COUNT + DISTRIBUTION_QUANTILES.len() + ); + + assert_eq!(get_column(&insert.columns, "max"), 3.0); + assert_eq!(get_column(&insert.columns, "min"), 1.0); + assert_eq!(get_column(&insert.columns, "avg"), 2.0); + assert_eq!(get_column(&insert.columns, "sum"), 16.0); + assert_eq!(get_column(&insert.columns, "count"), 8.0); + assert_eq!(get_column(&insert.columns, "p50"), 2.0); + assert_eq!(get_column(&insert.columns, "p75"), 2.0); + assert_eq!(get_column(&insert.columns, "p90"), 3.0); + assert_eq!(get_column(&insert.columns, "p95"), 3.0); + assert_eq!(get_column(&insert.columns, "p99"), 3.0); + } + + #[test] + fn test_histogram() { + let buckets = vector_core::buckets![1.0 => 1, 2.0 => 2, 3.0 => 1]; + let buckets_len = buckets.len(); + let metric = Metric::new( + "cpu_seconds_total", + MetricKind::Incremental, + MetricValue::AggregatedHistogram { + buckets, + count: 4, + sum: 8.0, + }, + ); + let insert = metric_to_insert_request(metric); + assert_eq!( + insert.columns.len(), + 1 + SUMMARY_STAT_FIELD_COUNT + buckets_len + ); + + assert_eq!(get_column(&insert.columns, "b1"), 1.0); + assert_eq!(get_column(&insert.columns, "b2"), 2.0); + assert_eq!(get_column(&insert.columns, "b3"), 1.0); + assert_eq!(get_column(&insert.columns, "count"), 4.0); + assert_eq!(get_column(&insert.columns, "sum"), 8.0); + } + + #[test] + fn test_summary() { + let quantiles = vector_core::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0]; + let quantiles_len = quantiles.len(); + let metric = Metric::new( + "cpu_seconds_total", + MetricKind::Incremental, + MetricValue::AggregatedSummary { + quantiles, + count: 6, + sum: 12.0, + }, + ); + + let insert = metric_to_insert_request(metric); + assert_eq!( + insert.columns.len(), + 1 + SUMMARY_STAT_FIELD_COUNT + quantiles_len + ); + + assert_eq!(get_column(&insert.columns, "p01"), 1.5); + assert_eq!(get_column(&insert.columns, "p50"), 2.0); + assert_eq!(get_column(&insert.columns, "p99"), 3.0); + assert_eq!(get_column(&insert.columns, "count"), 6.0); + assert_eq!(get_column(&insert.columns, "sum"), 12.0); + } + + #[test] + fn test_sketch() { + let mut sketch = AgentDDSketch::with_agent_defaults(); + let samples = 10; + for i in 0..samples { + sketch.insert(i as f64); + } + + let metric = Metric::new( + "cpu_seconds_total", + MetricKind::Incremental, + MetricValue::Sketch { + sketch: MetricSketch::AgentDDSketch(sketch), + }, + ); + + let insert = metric_to_insert_request(metric); + assert_eq!( + insert.columns.len(), + 1 + DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT + ); + + assert!(get_column(&insert.columns, "p50") <= 4.0); + assert!(get_column(&insert.columns, "p95") > 8.0); + assert!(get_column(&insert.columns, "p95") <= 9.0); + assert!(get_column(&insert.columns, "p99") > 8.0); + assert!(get_column(&insert.columns, "p99") <= 9.0); + assert_eq!(get_column(&insert.columns, "count"), samples as f64); + assert_eq!(get_column(&insert.columns, "sum"), 45.0); + assert_eq!(get_column(&insert.columns, "max"), 9.0); + assert_eq!(get_column(&insert.columns, "min"), 0.0); + } +} diff --git a/src/sinks/greptimedb/service.rs b/src/sinks/greptimedb/service.rs new file mode 100644 index 0000000000000..2aab611af669e --- /dev/null +++ b/src/sinks/greptimedb/service.rs @@ -0,0 +1,190 @@ +use std::num::NonZeroUsize; +use std::sync::Arc; +use std::task::Poll; + +use greptimedb_client::api::v1::auth_header::AuthScheme; +use greptimedb_client::api::v1::*; +use greptimedb_client::channel_manager::*; +use greptimedb_client::{Client, Database, Error as GreptimeError}; +use vector_core::event::Metric; + +use crate::sinks::prelude::*; + +use super::batch::GreptimeDBBatchSizer; +use super::request_builder::metric_to_insert_request; +use super::{GreptimeDBConfig, GreptimeDBConfigError}; + +#[derive(Clone, Default)] +pub(super) struct GreptimeDBRetryLogic; + +impl RetryLogic for GreptimeDBRetryLogic { + type Response = GreptimeDBBatchOutput; + type Error = GreptimeError; + + fn is_retriable_error(&self, error: &Self::Error) -> bool { + error.is_retriable() + } +} + +#[derive(Clone)] +pub(super) struct GreptimeDBRequest { + items: Vec, + finalizers: EventFinalizers, + metadata: RequestMetadata, +} + +impl GreptimeDBRequest { + pub(super) fn from_metrics(metrics: Vec) -> Self { + let mut items = Vec::with_capacity(metrics.len()); + let mut finalizers = EventFinalizers::default(); + let mut request_metadata_builder = RequestMetadataBuilder::default(); + + let sizer = GreptimeDBBatchSizer::default(); + let mut estimated_request_size = 0; + for mut metric in metrics.into_iter() { + finalizers.merge(metric.take_finalizers()); + estimated_request_size += sizer.estimated_size_of(&metric); + + request_metadata_builder.track_event(metric.clone()); + + items.push(metric_to_insert_request(metric)); + } + + let request_size = + NonZeroUsize::new(estimated_request_size).expect("request should never be zero length"); + + GreptimeDBRequest { + items, + finalizers, + metadata: request_metadata_builder.with_request_size(request_size), + } + } +} + +impl Finalizable for GreptimeDBRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.finalizers) + } +} + +impl MetaDescriptive for GreptimeDBRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +#[derive(Debug)] +pub struct GreptimeDBBatchOutput { + pub item_count: u32, + pub metadata: RequestMetadata, +} + +impl DriverResponse for GreptimeDBBatchOutput { + fn event_status(&self) -> EventStatus { + EventStatus::Delivered + } + + fn events_sent(&self) -> &GroupedCountByteSize { + self.metadata.events_estimated_json_encoded_byte_size() + } + + fn bytes_sent(&self) -> Option { + Some(self.metadata.request_wire_size()) + } +} + +#[derive(Debug, Clone)] +pub struct GreptimeDBService { + /// the client that connects to greptimedb + client: Arc, +} + +impl GreptimeDBService { + pub fn try_new(config: &GreptimeDBConfig) -> crate::Result { + let grpc_client = if let Some(tls_config) = &config.tls { + let channel_config = ChannelConfig { + client_tls: Self::try_from_tls_config(tls_config)?, + ..Default::default() + }; + Client::with_manager_and_urls( + ChannelManager::with_tls_config(channel_config).map_err(Box::new)?, + vec![&config.endpoint], + ) + } else { + Client::with_urls(vec![&config.endpoint]) + }; + + let mut client = Database::new_with_dbname(&config.dbname, grpc_client); + + if let (Some(username), Some(password)) = (&config.username, &config.password) { + client.set_auth(AuthScheme::Basic(Basic { + username: username.to_owned(), + password: password.clone().into(), + })) + }; + + Ok(GreptimeDBService { + client: Arc::new(client), + }) + } + + fn try_from_tls_config(tls_config: &TlsConfig) -> crate::Result> { + if let Some(ca_path) = tls_config.ca_file.as_ref() { + let cert_path = tls_config + .crt_file + .as_ref() + .ok_or(GreptimeDBConfigError::TlsMissingCert)?; + let key_path = tls_config + .key_file + .as_ref() + .ok_or(GreptimeDBConfigError::TlsMissingKey)?; + + if tls_config.key_pass.is_some() + || tls_config.alpn_protocols.is_some() + || tls_config.verify_certificate.is_some() + || tls_config.verify_hostname.is_some() + { + warn!( + message = "TlsConfig: key_pass, alpn_protocols, verify_certificate and verify_hostname are not supported by greptimedb client at the moment." + ); + } + + Ok(Some(ClientTlsOption { + server_ca_cert_path: ca_path.clone(), + client_key_path: key_path.clone(), + client_cert_path: cert_path.clone(), + })) + } else { + Ok(None) + } + } +} + +impl Service for GreptimeDBService { + type Response = GreptimeDBBatchOutput; + type Error = GreptimeError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll> { + Poll::Ready(Ok(())) + } + + // Convert vector metrics into GreptimeDB format and send them in batch + fn call(&mut self, req: GreptimeDBRequest) -> Self::Future { + let client = Arc::clone(&self.client); + + Box::pin(async move { + let metadata = req.metadata; + let result = client.insert(req.items).await?; + + Ok(GreptimeDBBatchOutput { + item_count: result, + metadata, + }) + }) + } +} diff --git a/src/sinks/greptimedb/sink.rs b/src/sinks/greptimedb/sink.rs new file mode 100644 index 0000000000000..2b28e50755248 --- /dev/null +++ b/src/sinks/greptimedb/sink.rs @@ -0,0 +1,55 @@ +use async_trait::async_trait; + +use futures::StreamExt; +use futures_util::stream::BoxStream; +use vector_core::event::{Metric, MetricValue}; + +use crate::sinks::prelude::*; +use crate::sinks::util::buffer::metrics::MetricNormalize; +use crate::sinks::util::buffer::metrics::MetricSet; + +use super::batch::GreptimeDBBatchSizer; +use super::service::{GreptimeDBRequest, GreptimeDBRetryLogic, GreptimeDBService}; + +#[derive(Clone, Debug, Default)] +pub struct GreptimeDBMetricNormalize; + +impl MetricNormalize for GreptimeDBMetricNormalize { + fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option { + match (metric.kind(), &metric.value()) { + (_, MetricValue::Counter { .. }) => state.make_absolute(metric), + (_, MetricValue::Gauge { .. }) => state.make_absolute(metric), + // All others are left as-is + _ => Some(metric), + } + } +} + +pub struct GreptimeDBSink { + pub(super) service: Svc, + pub(super) batch_settings: BatcherSettings, +} + +impl GreptimeDBSink { + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .map(|event| event.into_metric()) + .normalized_with_default::() + .batched( + self.batch_settings + .into_item_size_config(GreptimeDBBatchSizer::default()), + ) + .map(GreptimeDBRequest::from_metrics) + .into_driver(self.service) + .protocol("grpc") + .run() + .await + } +} + +#[async_trait] +impl StreamSink for GreptimeDBSink { + async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 194f1539e2011..960e60a3fdd9c 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -53,6 +53,8 @@ pub mod file; pub mod gcp; #[cfg(any(feature = "sinks-gcp"))] pub mod gcs_common; +#[cfg(feature = "sinks-greptimedb")] +pub mod greptimedb; #[cfg(feature = "sinks-honeycomb")] pub mod honeycomb; #[cfg(feature = "sinks-http")] diff --git a/website/content/en/docs/reference/configuration/sinks/greptimedb.md b/website/content/en/docs/reference/configuration/sinks/greptimedb.md new file mode 100644 index 0000000000000..57aebf227d2f4 --- /dev/null +++ b/website/content/en/docs/reference/configuration/sinks/greptimedb.md @@ -0,0 +1,14 @@ +--- +title: GreptimeDB +description: Writes metric data to [GreptimeDB](https://github.com/greptimeteam/greptimedb) +kind: sink +layout: component +tags: ["greptimedb", "component", "sink", "storage", "time-series", "metrics"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... +*/}} diff --git a/website/cue/reference/components/sinks/base/greptimedb.cue b/website/cue/reference/components/sinks/base/greptimedb.cue new file mode 100644 index 0000000000000..6f25208e64bad --- /dev/null +++ b/website/cue/reference/components/sinks/base/greptimedb.cue @@ -0,0 +1,340 @@ +package metadata + +base: components: sinks: greptimedb: configuration: { + acknowledgements: { + description: """ + Controls how acknowledgements are handled for this sink. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [e2e_acks]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: """ + Whether or not end-to-end acknowledgements are enabled. + + When enabled for a sink, any source connected to that sink, where the source supports + end-to-end acknowledgements as well, waits for events to be acknowledged by the sink + before acknowledging them at the source. + + Enabling or disabling acknowledgements at the sink level takes precedence over any global + [`acknowledgements`][global_acks] configuration. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + """ + required: false + type: bool: {} + } + } + batch: { + description: "Event batching behavior." + required: false + type: object: options: { + max_bytes: { + description: """ + The maximum size of a batch that is processed by a sink. + + This is based on the uncompressed size of the batched events, before they are + serialized/compressed. + """ + required: false + type: uint: unit: "bytes" + } + max_events: { + description: "The maximum size of a batch before it is flushed." + required: false + type: uint: { + default: 20 + unit: "events" + } + } + timeout_secs: { + description: "The maximum age of a batch before it is flushed." + required: false + type: float: { + default: 1.0 + unit: "seconds" + } + } + } + } + dbname: { + description: """ + The GreptimeDB [database][database] name to connect. + + Default to `public`, the default database of GreptimeDB. + + Database can be created via `create database` statement on + GreptimeDB. If you are using GreptimeCloud, use `dbname` from the + connection information of your instance. + + [database]: https://docs.greptime.com/user-guide/concepts/key-concepts#database + """ + required: false + type: string: { + default: "public" + examples: [ + "public", + ] + } + } + endpoint: { + description: """ + The host and port of GreptimeDB gRPC service. + + This sink uses GreptimeDB's gRPC interface for data ingestion. By + default, GreptimeDB listens to port 4001 for gRPC protocol. + + The address _must_ include a port. + """ + required: true + type: string: examples: ["example.com:4001", "1nge17d2r3ns.ap-southeast-1.aws.greptime.cloud:4001"] + } + password: { + description: """ + The password for your GreptimeDB instance. + + This is required if your instance has authentication enabled. + """ + required: false + type: string: examples: ["password"] + } + request: { + description: """ + Middleware settings for outbound requests. + + Various settings can be configured, such as concurrency and rate limits, timeouts, etc. + """ + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + Note that the new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`. + + When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable + those values are. We use that deviation when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: "Configuration for outbound request concurrency." + required: false + type: { + string: { + default: "none" + enum: { + adaptive: """ + Concurrency will be managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/about/under-the-hood/networking/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: """ + The maximum number of retries to make for failed requests. + + The default, for all intents and purposes, represents an infinite number of retries. + """ + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the fibonacci sequence is used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 3600 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } + tls: { + description: "TLS configuration." + required: false + type: object: options: { + alpn_protocols: { + description: """ + Sets the list of supported ALPN protocols. + + Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order + that they are defined. + """ + required: false + type: array: items: type: string: examples: ["h2"] + } + ca_file: { + description: """ + Absolute path to an additional CA certificate file. + + The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/certificate_authority.crt"] + } + crt_file: { + description: """ + Absolute path to a certificate file used to identify this server. + + The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as + an inline string in PEM format. + + If this is set, and is not a PKCS#12 archive, `key_file` must also be set. + """ + required: false + type: string: examples: ["/path/to/host_certificate.crt"] + } + key_file: { + description: """ + Absolute path to a private key file used to identify this server. + + The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/host_certificate.key"] + } + key_pass: { + description: """ + Passphrase used to unlock the encrypted key file. + + This has no effect unless `key_file` is set. + """ + required: false + type: string: examples: ["${KEY_PASS_ENV_VAR}", "PassWord1"] + } + verify_certificate: { + description: """ + Enables certificate verification. + + If enabled, certificates must not be expired and must be issued by a trusted + issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the + certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and + so on until the verification process reaches a root certificate. + + Relevant for both incoming and outgoing connections. + + Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. + """ + required: false + type: bool: {} + } + verify_hostname: { + description: """ + Enables hostname verification. + + If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by + the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. + + Only relevant for outgoing connections. + + Do NOT set this to `false` unless you understand the risks of not verifying the remote hostname. + """ + required: false + type: bool: {} + } + } + } + username: { + description: """ + The username for your GreptimeDB instance. + + This is required if your instance has authentication enabled. + """ + required: false + type: string: examples: ["username"] + } +} diff --git a/website/cue/reference/components/sinks/greptimedb.cue b/website/cue/reference/components/sinks/greptimedb.cue new file mode 100644 index 0000000000000..d538538bb432e --- /dev/null +++ b/website/cue/reference/components/sinks/greptimedb.cue @@ -0,0 +1,90 @@ +package metadata + +components: sinks: greptimedb: { + title: "GreptimeDB" + + classes: { + commonly_used: true + delivery: "at_least_once" + development: "beta" + egress_method: "batch" + service_providers: ["GreptimeDB"] + stateful: false + } + + features: { + auto_generated: true + acknowledgements: true + healthcheck: enabled: true + send: { + batch: { + enabled: true + common: false + max_bytes: 10_000_000 + timeout_secs: 1.0 + } + compression: enabled: false + encoding: { + enabled: true + codec: enabled: false + } + request: { + enabled: true + headers: false + } + tls: { + enabled: true + can_verify_certificate: false + can_verify_hostname: true + enabled_default: false + enabled_by_scheme: false + } + to: { + service: services.greptimedb + + interface: { + socket: { + api: { + title: "GreptimeDB gRPC API" + url: urls.greptimedb_grpc + } + direction: "outgoing" + protocols: ["http"] + ssl: "optional" + } + } + } + } + } + + support: { + requirements: [] + warnings: [] + notices: [] + } + + configuration: base.components.sinks.greptimedb.configuration + + input: { + logs: false + metrics: { + counter: true + distribution: true + gauge: true + histogram: true + set: true + summary: true + } + traces: false + } + + how_it_works: { + setup: { + title: "Setup" + body: """ + 1. Start your own [GreptimeDB](\(urls.greptimedb)) or create an instance on [GreptimeCloud](\(urls.greptimecloud)). + 2. Configure gRPC endpoint(host:port) and optional dbname and authentication information. + """ + } + } +} diff --git a/website/cue/reference/services/greptimedb.cue b/website/cue/reference/services/greptimedb.cue new file mode 100644 index 0000000000000..e152397da7307 --- /dev/null +++ b/website/cue/reference/services/greptimedb.cue @@ -0,0 +1,10 @@ +package metadata + +services: greptimedb: { + name: "GreptimeDB" + thing: "a \(name) database" + url: urls.greptimedb + versions: null + + description: "[GreptimeDB](\(urls.greptimedb)) is an open-source cloud-native time-series database. It combines time-series and analytic workload into one database, and allows query via both SQL and PromQL. GreptimeDB works seamlessly with modern infrastructure like Kubernetes and object storage. It's also available on [Cloud](\(urls.greptimecloud))." +} diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index 2f6921ba39eea..73ff61e5b3ec8 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -235,6 +235,9 @@ urls: { graphql: "https://graphql.org" graphql_playground: "\(github)/graphql/graphql-playground" graphviz: "https://graphviz.org/" + greptimecloud: "https://greptime.cloud" + greptimedb: "https://github.com/greptimeteam/greptimedb" + greptimedb_grpc: "https://docs.greptime.com/" grok: "https://grokdebug.herokuapp.com/" grok_debugger: "https://grokdebug.herokuapp.com/" grok_patterns: "\(github)/daschl/grok/tree/master/patterns"