diff --git a/cmd/otelbench/record.go b/cmd/otelbench/record.go index 2cac670c..82c1b7a1 100644 --- a/cmd/otelbench/record.go +++ b/cmd/otelbench/record.go @@ -12,6 +12,7 @@ import ( "github.com/dustin/go-humanize" "github.com/go-faster/errors" + "github.com/klauspost/compress/snappy" "github.com/klauspost/compress/zstd" "github.com/prometheus/prometheus/prompb" "github.com/spf13/cobra" @@ -25,6 +26,7 @@ type Record struct { Duration time.Duration Output string Validate bool + Encoding string points atomic.Uint64 requests atomic.Uint64 @@ -35,8 +37,8 @@ func (r *Record) read(req *http.Request) ([]byte, error) { if t := req.Header.Get("Content-Type"); t != "application/x-protobuf" { return nil, errors.Errorf("unsupported content type %q", t) } - if e := req.Header.Get("Content-Encoding"); e != "zstd" { - return nil, errors.Errorf("unsupported encoding %q", e) + if e := req.Header.Get("Content-Encoding"); e != r.Encoding { + return nil, errors.Errorf("unexpected encoding %q", e) } compressedData, err := io.ReadAll(req.Body) @@ -44,16 +46,24 @@ func (r *Record) read(req *http.Request) ([]byte, error) { return nil, errors.Wrap(err, "read compressed data") } - d, err := zstd.NewReader(bytes.NewReader(compressedData)) - if err != nil { - return nil, errors.Wrap(err, "create decoder") - } - defer d.Close() - if r.Validate { - data, err := io.ReadAll(d) - if err != nil { - return nil, errors.Wrap(err, "read data") + var data []byte + switch r.Encoding { + case "zstd": + reader, err := zstd.NewReader(bytes.NewReader(compressedData)) + if err != nil { + return nil, errors.Wrap(err, "create decoder") + } + defer reader.Close() + if data, err = io.ReadAll(reader); err != nil { + return nil, errors.Wrap(err, "read data") + } + case "snappy": + if data, err = snappy.Decode(data, compressedData); err != nil { + return nil, errors.Wrap(err, "decode data") + } + default: + return nil, errors.Errorf("unsupported encoding %q", r.Encoding) } var writeRequest prompb.WriteRequest if err := writeRequest.Unmarshal(data); err != nil { @@ -100,6 +110,7 @@ func (r *Record) Run(ctx context.Context) (rerr error) { } compressedData, err := r.read(req) if err != nil { + fmt.Println("> bad request:", err) w.WriteHeader(http.StatusBadRequest) return } @@ -181,5 +192,6 @@ func newRecordCommand() *cobra.Command { cmd.Flags().DurationVarP(&recorder.Duration, "duration", "d", time.Minute*5, "Duration to record") cmd.Flags().StringVarP(&recorder.Output, "output", "o", "requests.rwq", "Output file") cmd.Flags().BoolVar(&recorder.Validate, "validate", true, "Validate requests") + cmd.Flags().StringVar(&recorder.Encoding, "encoding", "zstd", "Encoding to use") return cmd } diff --git a/cmd/otelbench/replay.go b/cmd/otelbench/replay.go index cfd65b2c..456ef832 100644 --- a/cmd/otelbench/replay.go +++ b/cmd/otelbench/replay.go @@ -18,9 +18,10 @@ import ( ) type Replay struct { - Target string - Source string - Workers int + Target string + Source string + Encoding string + Workers int } func (r *Replay) Run(ctx context.Context) error { @@ -55,7 +56,7 @@ func (r *Replay) Run(ctx context.Context) error { return errors.Wrap(err, "create request") } req.Header.Set("Content-Type", "application/x-protobuf") - req.Header.Set("Content-Encoding", "zstd") + req.Header.Set("Content-Encoding", r.Encoding) res, err := client.Do(req) if err != nil { return errors.Wrap(err, "do request") @@ -67,7 +68,7 @@ func (r *Replay) Run(ctx context.Context) error { if len(out) == 0 { out = []byte("empty") } - if res.StatusCode != http.StatusAccepted { + if !(res.StatusCode == http.StatusAccepted || res.StatusCode == http.StatusNoContent || res.StatusCode == http.StatusOK) { return errors.Errorf("%s: %s", res.Status, out) } return nil @@ -135,5 +136,6 @@ func newReplayCommand() *cobra.Command { cmd.Flags().StringVar(&replay.Target, "target", "http://127.0.0.1:19291", "Target server") cmd.Flags().StringVarP(&replay.Source, "input", "i", "requests.rwq", "Source file") cmd.Flags().IntVarP(&replay.Workers, "workers", "j", 8, "Number of workers") + cmd.Flags().StringVar(&replay.Encoding, "encoding", "zstd", "Encoding") return cmd }