Skip to content

Commit

Permalink
feat(otelbench.promrw): add compression protocol flag
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Jan 10, 2024
1 parent 9ab7574 commit d4042cf
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 16 deletions.
34 changes: 23 additions & 11 deletions cmd/otelbench/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/dustin/go-humanize"
"github.com/go-faster/errors"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/prometheus/prompb"
"github.com/spf13/cobra"
Expand All @@ -25,6 +26,7 @@ type Record struct {
Duration time.Duration
Output string
Validate bool
Encoding string

points atomic.Uint64
requests atomic.Uint64
Expand All @@ -35,25 +37,33 @@ func (r *Record) read(req *http.Request) ([]byte, error) {
if t := req.Header.Get("Content-Type"); t != "application/x-protobuf" {
return nil, errors.Errorf("unsupported content type %q", t)
}
if e := req.Header.Get("Content-Encoding"); e != "zstd" {
return nil, errors.Errorf("unsupported encoding %q", e)
if e := req.Header.Get("Content-Encoding"); e != r.Encoding {
return nil, errors.Errorf("unexpected encoding %q", e)
}

compressedData, err := io.ReadAll(req.Body)
if err != nil {
return nil, errors.Wrap(err, "read compressed data")
}

d, err := zstd.NewReader(bytes.NewReader(compressedData))
if err != nil {
return nil, errors.Wrap(err, "create decoder")
}
defer d.Close()

if r.Validate {
data, err := io.ReadAll(d)
if err != nil {
return nil, errors.Wrap(err, "read data")
var data []byte
switch r.Encoding {
case "zstd":
reader, err := zstd.NewReader(bytes.NewReader(compressedData))
if err != nil {
return nil, errors.Wrap(err, "create decoder")
}
defer reader.Close()
if data, err = io.ReadAll(reader); err != nil {
return nil, errors.Wrap(err, "read data")
}
case "snappy":
if data, err = snappy.Decode(data, compressedData); err != nil {
return nil, errors.Wrap(err, "decode data")
}
default:
return nil, errors.Errorf("unsupported encoding %q", r.Encoding)
}
var writeRequest prompb.WriteRequest
if err := writeRequest.Unmarshal(data); err != nil {
Expand Down Expand Up @@ -100,6 +110,7 @@ func (r *Record) Run(ctx context.Context) (rerr error) {
}
compressedData, err := r.read(req)
if err != nil {
fmt.Println("> bad request:", err)
w.WriteHeader(http.StatusBadRequest)
return
}
Expand Down Expand Up @@ -181,5 +192,6 @@ func newRecordCommand() *cobra.Command {
cmd.Flags().DurationVarP(&recorder.Duration, "duration", "d", time.Minute*5, "Duration to record")
cmd.Flags().StringVarP(&recorder.Output, "output", "o", "requests.rwq", "Output file")
cmd.Flags().BoolVar(&recorder.Validate, "validate", true, "Validate requests")
cmd.Flags().StringVar(&recorder.Encoding, "encoding", "zstd", "Encoding to use")
return cmd
}
12 changes: 7 additions & 5 deletions cmd/otelbench/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
)

type Replay struct {
Target string
Source string
Workers int
Target string
Source string
Encoding string
Workers int
}

func (r *Replay) Run(ctx context.Context) error {
Expand Down Expand Up @@ -55,7 +56,7 @@ func (r *Replay) Run(ctx context.Context) error {
return errors.Wrap(err, "create request")
}
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", "zstd")
req.Header.Set("Content-Encoding", r.Encoding)
res, err := client.Do(req)
if err != nil {
return errors.Wrap(err, "do request")
Expand All @@ -67,7 +68,7 @@ func (r *Replay) Run(ctx context.Context) error {
if len(out) == 0 {
out = []byte("empty")
}
if res.StatusCode != http.StatusAccepted {
if !(res.StatusCode == http.StatusAccepted || res.StatusCode == http.StatusNoContent || res.StatusCode == http.StatusOK) {
return errors.Errorf("%s: %s", res.Status, out)
}
return nil
Expand Down Expand Up @@ -135,5 +136,6 @@ func newReplayCommand() *cobra.Command {
cmd.Flags().StringVar(&replay.Target, "target", "http://127.0.0.1:19291", "Target server")
cmd.Flags().StringVarP(&replay.Source, "input", "i", "requests.rwq", "Source file")
cmd.Flags().IntVarP(&replay.Workers, "workers", "j", 8, "Number of workers")
cmd.Flags().StringVar(&replay.Encoding, "encoding", "zstd", "Encoding")
return cmd
}

0 comments on commit d4042cf

Please sign in to comment.