Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Prometheus remote write to k6 repository #4282

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ require (
github.com/fatih/color v1.18.0
github.com/go-sourcemap/sourcemap v2.1.4+incompatible
github.com/golang/protobuf v1.5.4
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/grafana/sobek v0.0.0-20241024150027-d91f02b05e9b
github.com/grafana/xk6-dashboard v0.7.5
github.com/grafana/xk6-output-opentelemetry v0.3.0
github.com/grafana/xk6-output-prometheus-remote v0.5.1
Expand All @@ -30,6 +33,8 @@ require (
github.com/mstoykov/envconfig v1.5.0
github.com/mstoykov/k6-taskqueue-lib v0.1.3
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/client_model v0.4.0
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e
github.com/sirupsen/logrus v1.9.3
github.com/spf13/afero v1.1.2
Expand All @@ -47,6 +52,7 @@ require (
golang.org/x/crypto v0.32.0
golang.org/x/crypto/x509roots/fallback v0.0.0-20250116161740-71d3a4cfdb03
golang.org/x/net v0.34.0
golang.org/x/sync v0.10.0
golang.org/x/term v0.28.0
golang.org/x/time v0.9.0
google.golang.org/grpc v1.69.4
Expand All @@ -57,7 +63,7 @@ require (

require (
buf.build/gen/go/gogo/protobuf/protocolbuffers/go v1.31.0-20210810001428-4df00b267f94.1 // indirect
buf.build/gen/go/prometheus/prometheus/protocolbuffers/go v1.31.0-20230627135113-9a12bc2590d2.1 // indirect
buf.build/gen/go/prometheus/prometheus/protocolbuffers/go v1.31.0-20230627135113-9a12bc2590d2.1
github.com/andybalholm/cascadia v1.3.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bufbuild/protocompile v0.14.1 // indirect
Expand All @@ -71,17 +77,13 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/pprof v0.0.0-20230728192033-2ba5b33183c6 // indirect
github.com/google/uuid v1.6.0
github.com/grafana/sobek v0.0.0-20241024150027-d91f02b05e9b
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/nxadm/tail v1.4.11 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/r3labs/sse/v2 v2.10.0 // indirect
Expand All @@ -93,7 +95,6 @@ require (
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
golang.org/x/sync v0.10.0
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"go.k6.io/k6/internal/output/cloud"
"go.k6.io/k6/internal/output/influxdb"
"go.k6.io/k6/internal/output/json"
"go.k6.io/k6/internal/output/prometheusrw/remotewrite"
"go.k6.io/k6/lib"
"go.k6.io/k6/output"
"go.k6.io/k6/output/csv"

"github.com/grafana/xk6-dashboard/dashboard"
"github.com/grafana/xk6-output-opentelemetry/pkg/opentelemetry"
"github.com/grafana/xk6-output-prometheus-remote/pkg/remotewrite"
)

// builtinOutput marks the available builtin outputs.
Expand Down
142 changes: 142 additions & 0 deletions internal/output/prometheusrw/remote/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Package remote implements the Prometheus remote write protocol.
package remote

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"math"
"net/http"
"net/url"
"time"

"github.com/grafana/xk6-output-prometheus-remote/pkg/sigv4"

prompb "buf.build/gen/go/prometheus/prometheus/protocolbuffers/go"
"github.com/klauspost/compress/snappy"
"google.golang.org/protobuf/proto"
)

// HTTPConfig holds the config for the HTTP client.
type HTTPConfig struct {
Timeout time.Duration
TLSConfig *tls.Config
BasicAuth *BasicAuth
SigV4 *sigv4.Config
Headers http.Header
}

// BasicAuth holds the config for basic authentication.
type BasicAuth struct {
Username, Password string
}

// WriteClient is a client implementation of the Prometheus remote write protocol.
// It follows the specs defined by the official design document:
// https://docs.google.com/document/d/1LPhVRSFkGNSuU1fBd81ulhsCPR4hkSZyyBj1SZ8fWOM
type WriteClient struct {
hc *http.Client
url *url.URL
cfg *HTTPConfig
}

// NewWriteClient creates a new WriteClient.
func NewWriteClient(endpoint string, cfg *HTTPConfig) (*WriteClient, error) {
if cfg == nil {
cfg = &HTTPConfig{}
}
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
wc := &WriteClient{
hc: &http.Client{
Timeout: cfg.Timeout,
},
url: u,
cfg: cfg,
}
if cfg.TLSConfig != nil {
wc.hc.Transport = &http.Transport{
TLSClientConfig: cfg.TLSConfig,
}
}
if cfg.SigV4 != nil {
tripper, err := sigv4.NewRoundTripper(cfg.SigV4, wc.hc.Transport)
if err != nil {
return nil, err
}
wc.hc.Transport = tripper
}
return wc, nil
}

// Store sends a batch of samples to the HTTP endpoint,
// the request is the proto marshaled and encoded.
func (c *WriteClient) Store(ctx context.Context, series []*prompb.TimeSeries) error {
b, err := newWriteRequestBody(series)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(
ctx, http.MethodPost, c.url.String(), bytes.NewReader(b))
if err != nil {
return fmt.Errorf("create new HTTP request failed: %w", err)
}
if c.cfg.BasicAuth != nil {
req.SetBasicAuth(c.cfg.BasicAuth.Username, c.cfg.BasicAuth.Password)
}

if len(c.cfg.Headers) > 0 {
req.Header = c.cfg.Headers.Clone()
}

req.Header.Set("User-Agent", "k6-prometheus-rw-output")

// They are mostly defined by the specs
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

resp, err := c.hc.Do(req)
if err != nil {
return fmt.Errorf("HTTP POST request failed: %w", err)
}
defer func() {
err = resp.Body.Close()
if err != nil {
panic(err)
}
}()

_, err = io.Copy(io.Discard, resp.Body)
if err != nil {
return err
}

return validateResponseStatus(resp.StatusCode)
}

func newWriteRequestBody(series []*prompb.TimeSeries) ([]byte, error) {
b, err := proto.Marshal(&prompb.WriteRequest{
Timeseries: series,
})
if err != nil {
return nil, fmt.Errorf("encoding series as protobuf write request failed: %w", err)
}
if snappy.MaxEncodedLen(len(b)) < 0 {
return nil, fmt.Errorf("the protobuf message is too large to be handled by Snappy encoder; "+
"size: %d, limit: %d", len(b), math.MaxUint32)
}
return snappy.Encode(nil, b), nil
}

func validateResponseStatus(code int) error {
if code >= http.StatusOK && code < 300 {
return nil
}

return fmt.Errorf("got status code: %d instead expected a 2xx successful status code", code)
}
Loading
Loading