Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new sink): Adding greptimedb metrics sink #17198

Merged
merged 37 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e82ed07
feat: add greptimedb sink (wip)
sunng87 Feb 27, 2023
5e951fb
feat: configure greptimedb as git dependency
sunng87 Mar 7, 2023
4efa4c0
feat: transforming vector metric to greptimedb insert request
sunng87 Mar 15, 2023
52094b8
feat: address issues in latest greptimedb client
sunng87 Mar 21, 2023
a080f30
feat: add grpc auth support
sunng87 Mar 21, 2023
9e8d354
refactor: default value for catalog and schema, rename column fn
sunng87 Mar 21, 2023
7c13cea
refactor: small tweaks
sunng87 Mar 22, 2023
6f7add6
test: add integration test wip
sunng87 Mar 24, 2023
c51c2ef
feat: update greptimedb grpc client, dbname and healthcheck for grpc
sunng87 Mar 31, 2023
e766b62
feat: change default timestmap column name to ts
sunng87 Apr 6, 2023
2c5cafc
test: add integration tests for greptimedb
sunng87 Apr 11, 2023
64704f7
test: correct column count in test cases
sunng87 May 4, 2023
945cf90
test: resolve lint and test errors
sunng87 May 4, 2023
9c37da8
chore: switch to compact greptimedb rust client
sunng87 May 5, 2023
a888bf9
docs: add website docs
sunng87 May 22, 2023
2bfbce3
chore: update 3rd-party license file
sunng87 May 22, 2023
b4774ff
fix: add greptimedb-integration-tests feature
sunng87 May 24, 2023
31107a4
chore: remove network section from integration tests
sunng87 May 25, 2023
2dc19f5
fix: resolve new lint warnings
sunng87 Jun 2, 2023
352bd13
test: fix compose file by adding command
sunng87 Jun 7, 2023
b320d61
feat: use latest client protocol
sunng87 Jun 6, 2023
26a7a61
refactor: use stream sink apis
sunng87 Jun 11, 2023
828bebc
fix: integration tests
sunng87 Jun 12, 2023
8fb9943
feat: add retry logic back
sunng87 Jun 12, 2023
542a820
fix: change default column name to val instead of value
sunng87 Jun 15, 2023
2d39446
refactor: minor tunes
sunng87 Jun 26, 2023
156a35e
docs: update cue files
sunng87 Jun 26, 2023
3cdeb51
feat: add some tlsconfig support for custom ca and client cert
sunng87 Jul 1, 2023
930f97d
docs: add greptime.md for website
sunng87 Jul 1, 2023
1e6b6df
Merge remote-tracking branch 'upstream/master' into feature/greptimedb
sunng87 Jul 1, 2023
d590c7b
refactor: adopt latest master changes
sunng87 Jul 1, 2023
7cde107
fix: avoid panic for tlsconfig error
sunng87 Jul 3, 2023
74ba665
Merge branch 'master' into feature/greptimedb
sunng87 Jul 4, 2023
c16cc6d
Merge branch 'master' into feature/greptimedb
sunng87 Jul 6, 2023
2968854
Apply suggestions from code review
sunng87 Jul 8, 2023
289f4a8
Update src/sinks/greptimedb/service.rs
sunng87 Jul 8, 2023
e247b68
refactor: address review issues
sunng87 Jul 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ hex = { version = "0.4.3", default-features = false, optional = true }
sha2 = { version = "0.10.7", default-features = false, optional = true }

# GreptimeDB
greptimedb-client = { git = "https://github.com/GreptimeTeam/greptimedb-client-rust.git", rev = "0841134b004cfcf228af038b2b338dccaf743644", optional = true }
greptimedb-client = { git = "https://github.com/GreptimeTeam/greptimedb-client-rust.git", rev = "bc32362adf0df17a41a95bae4221d6d8f1775656", optional = true }

# VRL Lang
vrl = { package = "vrl", version = "0.4.0", features = ["cli", "test"] }
Expand Down
10 changes: 6 additions & 4 deletions src/sinks/greptimedb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ use tower::ServiceBuilder;
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;
use vector_core::config::{AcknowledgementsConfig, Input};
use vector_core::tls::TlsConfig;

use crate::config::{SinkConfig, SinkContext};
use crate::sinks::util::{BatchConfig, SinkBatchSettings};
use crate::sinks::{Healthcheck, VectorSink};
use crate::sinks::prelude::*;

use self::service::GreptimeDBRetryLogic;

Expand Down Expand Up @@ -111,6 +110,9 @@ pub struct GreptimeDBConfig {
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub acknowledgements: AcknowledgementsConfig,

#[configurable(derived)]
pub tls: Option<TlsConfig>,
}

impl_generate_config_from_default!(GreptimeDBConfig);
Expand All @@ -122,7 +124,7 @@ impl SinkConfig for GreptimeDBConfig {
let request_settings = self.request.unwrap_with(&TowerRequestConfig::default());
let service = ServiceBuilder::new()
.settings(request_settings, GreptimeDBRetryLogic)
.service(service::GreptimeDBService::new(self));
.service(service::GreptimeDBService::try_new(self)?);
let sink = sink::GreptimeDBSink {
service,
batch_settings: self.batch.into_batcher_settings()?,
Expand Down
54 changes: 42 additions & 12 deletions src/sinks/greptimedb/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@ use std::sync::Arc;
use std::task::Poll;

use futures_util::future::BoxFuture;

use greptimedb_client::api::v1::auth_header::AuthScheme;
use greptimedb_client::api::v1::*;
use greptimedb_client::channel_manager::*;
use greptimedb_client::{Client, Database, Error as GreptimeError};
use tower::Service;
use vector_common::finalization::{EventFinalizers, EventStatus, Finalizable};
use vector_common::internal_event::CountByteSize;
use vector_common::request_metadata::{MetaDescriptive, RequestMetadata};
use vector_core::event::Metric;
use vector_core::stream::DriverResponse;

use crate::sinks::prelude::RetryLogic;
use crate::sinks::util::metadata::RequestMetadataBuilder;
use crate::sinks::prelude::*;

use super::batch::GreptimeDBBatchSizer;
use super::request_builder::metric_to_insert_request;
Expand Down Expand Up @@ -108,8 +103,20 @@ pub struct GreptimeDBService {
}

impl GreptimeDBService {
pub fn new(config: &GreptimeDBConfig) -> Self {
let grpc_client = Client::with_urls(vec![&config.endpoint]);
pub fn try_new(config: &GreptimeDBConfig) -> crate::Result<Self> {
let grpc_client = if let Some(tls_config) = &config.tls {
let channel_config = ChannelConfig {
client_tls: Self::try_from_tls_config(tls_config),
..Default::default()
};
Client::with_manager_and_urls(
ChannelManager::with_tls_config(channel_config).map_err(Box::new)?,
vec![&config.endpoint],
)
} else {
Client::with_urls(vec![&config.endpoint])
};

let mut client = Database::new_with_dbname(&config.dbname, grpc_client);

if let (Some(username), Some(password)) = (&config.username, &config.password) {
Expand All @@ -119,10 +126,33 @@ impl GreptimeDBService {
}))
};

// TODO: tls configuration

GreptimeDBService {
Ok(GreptimeDBService {
client: Arc::new(client),
})
}

fn try_from_tls_config(tls_config: &TlsConfig) -> Option<ClientTlsOption> {
if let Some(ca_path) = tls_config.ca_file.as_ref() {
let cert_path = tls_config.crt_file.as_ref().expect(
sunng87 marked this conversation as resolved.
Show resolved Hide resolved
"Client cert file is required for greptimedb sink when custom CA specified",
);
let key_path = tls_config
.key_file
.as_ref()
.expect("Client key file is required for greptimedb sink when custom CA specified");
if tls_config.key_pass.is_some() {
warn!(
message = "TLS key file with password is not supported by greptimedb client at the moment."
);
}
sunng87 marked this conversation as resolved.
Show resolved Hide resolved

Some(ClientTlsOption {
server_ca_cert_path: ca_path.clone(),
client_key_path: key_path.clone(),
client_cert_path: cert_path.clone(),
})
} else {
None
}
}
}
Expand Down
7 changes: 2 additions & 5 deletions src/sinks/greptimedb/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ use async_trait::async_trait;

use futures::StreamExt;
use futures_util::stream::BoxStream;
use vector_core::event::{Event, Metric, MetricValue};
use vector_core::sink::StreamSink;
use vector_core::stream::BatcherSettings;
use vector_core::event::{Metric, MetricValue};

use crate::sinks::prelude::*;
use crate::sinks::util::buffer::metrics::MetricNormalize;
use crate::sinks::util::buffer::metrics::MetricSet;
use crate::sinks::util::service::Svc;
use crate::sinks::util::SinkBuilderExt;

use super::batch::GreptimeDBBatchSizer;
use super::service::{GreptimeDBRequest, GreptimeDBRetryLogic, GreptimeDBService};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
title: GreptimeDB
description: Writes metric data to [GreptimeDB](https://github.com/greptimeteam/greptimedb)
kind: sink
layout: component
tags: ["greptimedb", "component", "sink", "storage", "time-series", "metrics"]
---

{{/*
This doc is generated using:

1. The template in layouts/docs/component.html
2. The relevant CUE data in cue/reference/components/...
*/}}
85 changes: 85 additions & 0 deletions website/cue/reference/components/sinks/base/greptimedb.cue
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,91 @@ base: components: sinks: greptimedb: configuration: {
}
}
}
tls: {
description: "TLS configuration."
required: false
type: object: options: {
alpn_protocols: {
description: """
Sets the list of supported ALPN protocols.

Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order
that they are defined.
"""
required: false
type: array: items: type: string: examples: ["h2"]
}
ca_file: {
description: """
Absolute path to an additional CA certificate file.

The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
"""
required: false
type: string: examples: ["/path/to/certificate_authority.crt"]
}
crt_file: {
description: """
Absolute path to a certificate file used to identify this server.

The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as
an inline string in PEM format.

If this is set, and is not a PKCS#12 archive, `key_file` must also be set.
"""
required: false
type: string: examples: ["/path/to/host_certificate.crt"]
}
key_file: {
description: """
Absolute path to a private key file used to identify this server.

The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
"""
required: false
type: string: examples: ["/path/to/host_certificate.key"]
}
key_pass: {
description: """
Passphrase used to unlock the encrypted key file.

This has no effect unless `key_file` is set.
"""
required: false
type: string: examples: ["${KEY_PASS_ENV_VAR}", "PassWord1"]
}
verify_certificate: {
description: """
Enables certificate verification.

If enabled, certificates must not be expired and must be issued by a trusted
issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the
certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and
so on until the verification process reaches a root certificate.

Relevant for both incoming and outgoing connections.

Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
"""
required: false
type: bool: {}
}
verify_hostname: {
description: """
Enables hostname verification.

If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by
the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.

Only relevant for outgoing connections.

Do NOT set this to `false` unless you understand the risks of not verifying the remote hostname.
"""
required: false
type: bool: {}
}
}
}
username: {
description: """
The username for your GreptimeDB instance.
Expand Down