Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.quix): Add plugin #16144

Merged
merged 23 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/outputs/all/quix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || outputs || outputs.quix

package all

import _ "github.com/influxdata/telegraf/plugins/outputs/quix" // register plugin
58 changes: 58 additions & 0 deletions plugins/outputs/quix/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Quix Output Plugin

This plugin writes metrics to a [Quix][quix] endpoint.

Please consult Quix's [official documentation][docs] for more details on the
Quix platform architecture and concepts.

⭐ Telegraf v1.33.0
🏷️ cloud, messaging
💻 all

[quix]: https://quix.io
[docs]: https://quix.io/docs/

## Global configuration options <!-- @/docs/includes/plugin_config.md -->

In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Secret-store support

This plugin supports secrets from secret-stores for the `token` option.
See the [secret-store documentation][SECRETSTORE] for more details on how
to use them.

[SECRETSTORE]: ../../../docs/CONFIGURATION.md#secret-store-secrets

## Configuration

```toml @sample.conf
# Send metrics to a Quix data processing pipeline
[[outputs.quix]]
## Endpoint for providing the configuration
# url = "https://portal-api.platform.quix.io"

## Workspace and topics to send the metrics to
workspace = "your_workspace"
topic = "your_topic"

## Authentication token created in Quix
token = "your_auth_token"

## Amount of time allowed to complete the HTTP request for fetching the config
# timeout = "5s"
```

The plugin requires a [SDK token][token] for authentication with Quix. You can
generate the `token` in settings under the `API and tokens` section.

Furthermore, the `workspace` parameter must be set to the `Workspace ID` or the
`Environment ID` of your Quix project. Those values can be found in settings
under the `General settings` section.

[token]: https://quix.io/docs/develop/authentication/personal-access-token.html
81 changes: 81 additions & 0 deletions plugins/outputs/quix/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package quix

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
)

type brokerConfig struct {
BootstrapServers string `json:"bootstrap.servers"`
SaslMechanism string `json:"sasl.mechanism"`
SaslUsername string `json:"sasl.username"`
SaslPassword string `json:"sasl.password"`
SecurityProtocol string `json:"security.protocol"`
SSLCertBase64 string `json:"ssl.ca.cert"`

cert []byte
}

func (q *Quix) fetchBrokerConfig() (*brokerConfig, error) {
// Create request
endpoint := fmt.Sprintf("%s/workspaces/%s/broker/librdkafka", q.APIURL, q.Workspace)
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return nil, fmt.Errorf("creating request failed: %w", err)
}

// Setup authentication
token, err := q.Token.Get()
if err != nil {
return nil, fmt.Errorf("getting token failed: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token.String())
req.Header.Set("Accept", "application/json")
token.Destroy()

// Query the broker configuration from the Quix API
client, err := q.HTTPClientConfig.CreateClient(context.Background(), q.Log)
if err != nil {
return nil, fmt.Errorf("creating client failed: %w", err)
}
defer client.CloseIdleConnections()

resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("executing request failed: %w", err)
}
defer resp.Body.Close()

// Read the body as we need it both in case of an error as well as for
// decoding the config in case of success
body, err := io.ReadAll(resp.Body)
if err != nil {
q.Log.Errorf("Reading message body failed: %v", err)
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected response %q (%d): %s",
http.StatusText(resp.StatusCode),
resp.StatusCode,
string(body),
)
}

// Decode the broker and the returned certificate
var cfg brokerConfig
if err := json.Unmarshal(body, &cfg); err != nil {
return nil, fmt.Errorf("decoding body failed: %w", err)
}

cert, err := base64.StdEncoding.DecodeString(cfg.SSLCertBase64)
if err != nil {
return nil, fmt.Errorf("decoding certificate failed: %w", err)
}
cfg.cert = cert

return &cfg, nil
}
169 changes: 169 additions & 0 deletions plugins/outputs/quix/quix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
//go:generate ../../../tools/readme_config_includer/generator
package quix

import (
"crypto/tls"
"crypto/x509"
_ "embed"
"errors"
"fmt"
"strings"
"time"

"github.com/IBM/sarama"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
common_http "github.com/influxdata/telegraf/plugins/common/http"
common_kafka "github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/json"
)

srebhan marked this conversation as resolved.
Show resolved Hide resolved
//go:embed sample.conf
var sampleConfig string

type Quix struct {
APIURL string `toml:"url"`
Workspace string `toml:"workspace"`
Topic string `toml:"topic"`
Token config.Secret `toml:"token"`
Log telegraf.Logger `toml:"-"`
common_http.HTTPClientConfig

producer sarama.SyncProducer
serializer serializers.Serializer
kakfaTopic string
}

func (*Quix) SampleConfig() string {
return sampleConfig
}

func (q *Quix) Init() error {
// Set defaults
if q.APIURL == "" {
q.APIURL = "https://portal-api.platform.quix.io"
}
q.APIURL = strings.TrimSuffix(q.APIURL, "/")

srebhan marked this conversation as resolved.
Show resolved Hide resolved
// Check input parameters
if q.Topic == "" {
return errors.New("option 'topic' must be set")
}
if q.Workspace == "" {
return errors.New("option 'workspace' must be set")
}
if q.Token.Empty() {
return errors.New("option 'token' must be set")
}
q.kakfaTopic = q.Workspace + "-" + q.Topic

// Create a JSON serializer for the output
q.serializer = &json.Serializer{
TimestampUnits: config.Duration(time.Nanosecond), // Hardcoded nanoseconds precision
}

return nil
}

func (q *Quix) Connect() error {
// Fetch the Kafka broker configuration from the Quix HTTP endpoint
quixConfig, err := q.fetchBrokerConfig()
if err != nil {
return fmt.Errorf("fetching broker config failed: %w", err)
}
brokers := strings.Split(quixConfig.BootstrapServers, ",")
if len(brokers) == 0 {
return errors.New("no brokers received")
}

srebhan marked this conversation as resolved.
Show resolved Hide resolved
// Setup the Kakfa producer config
cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true

switch quixConfig.SecurityProtocol {
case "SASL_SSL":
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = quixConfig.SaslUsername
cfg.Net.SASL.Password = quixConfig.SaslPassword
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA256}
}

switch quixConfig.SaslMechanism {
case "SCRAM-SHA-512":
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA512}
}
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
case "SCRAM-SHA-256":
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA256}
}
case "PLAIN":
cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext
default:
return fmt.Errorf("unsupported SASL mechanism: %s", quixConfig.SaslMechanism)
}

// Certificate
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(quixConfig.cert) {
return errors.New("appending CA cert to pool failed")
}
cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = &tls.Config{RootCAs: certPool}
case "PLAINTEXT":
// No additional configuration required for plaintext communication
default:
return fmt.Errorf("unsupported security protocol: %s", quixConfig.SecurityProtocol)
}

// Setup the Kakfa producer itself
producer, err := sarama.NewSyncProducer(brokers, cfg)
if err != nil {
return fmt.Errorf("creating producer failed: %w", err)
}
q.producer = producer

return nil
}

func (q *Quix) Write(metrics []telegraf.Metric) error {
for _, m := range metrics {
serialized, err := q.serializer.Serialize(m)
if err != nil {
q.Log.Errorf("Error serializing metric: %v", err)
continue
}

msg := &sarama.ProducerMessage{
Topic: q.kakfaTopic,
Value: sarama.ByteEncoder(serialized),
Timestamp: m.Time(),
Key: sarama.StringEncoder("telegraf"),
}

if _, _, err = q.producer.SendMessage(msg); err != nil {
q.Log.Errorf("Error sending message to Kafka: %v", err)
continue
}
}

return nil
}

func (q *Quix) Close() error {
if q.producer != nil {
return q.producer.Close()
}
return nil
}

func init() {
outputs.Add("quix", func() telegraf.Output { return &Quix{} })
}
Loading
Loading