Skip to content

Commit

Permalink
perf(promrw): implement data load
Browse files Browse the repository at this point in the history
Ref: #283
  • Loading branch information
ernado committed Jan 5, 2024
1 parent ea237dc commit 8984a81
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ secret.yaml
/otelfaker
/chotel

vmagent-remotewrite-data

# goreleaser
dist
coverage.txt
Expand Down
23 changes: 23 additions & 0 deletions cmd/promrw/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# promrw

## Recording

Start listener:
```bash
go run ./cmd/promrw -f /tmp/remotewrite.gob.zstd --listen -d 10m --addr="http://127.0.0.1:8080"
```

Start load generator:
```bash
go run ./cmd/prombench --targetsCount=100 --scrapeInterval=1s http://127.0.0.1:8080
```

Prometheus remote write requests will be recorded to `/tmp/remotewrite.gob.zstd` file.

## Replay

Use `ch-bench-read` docker compose and run:

```bash
go run ./cmd/promrw -f /tmp/remotewrite.gob.zstd --addr="http://127.0.0.1:19291"
```
142 changes: 138 additions & 4 deletions cmd/promrw/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"bytes"
"context"
"encoding/gob"
"flag"
Expand All @@ -13,9 +14,13 @@ import (
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/cheggaaa/pb/v3"
"github.com/go-faster/errors"
"github.com/go-faster/sdk/app"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/prometheus/prompb"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -31,21 +36,45 @@ func disableTelemetry() {
}
}

func decode(r *http.Request) ([]byte, error) {
if t := r.Header.Get("Content-Type"); t != "application/x-protobuf" {
return nil, errors.Errorf("unsupported content type %q", t)
}
switch e := r.Header.Get("Content-Encoding"); e {
case "zstd":
dec, err := zstd.NewReader(r.Body)
if err != nil {
return nil, errors.Wrap(err, "create decoder")
}
defer dec.Close()
return io.ReadAll(dec)
case "snappy":
rd := snappy.NewReader(r.Body)
return io.ReadAll(rd)
default:
return nil, errors.Errorf("unsupported encoding %q", e)
}
}

func main() {
disableTelemetry()
var arg struct {
Listen bool
Addr string
Data string
Duration time.Duration
Workers int
}
flag.BoolVar(&arg.Listen, "listen", false, "Listen mode")
flag.StringVar(&arg.Addr, "addr", ":8080", "Address")
flag.StringVar(&arg.Data, "f", "rw.gob.zstd", "Data file")
flag.IntVar(&arg.Workers, "j", 8, "Workers")
flag.DurationVar(&arg.Duration, "d", time.Minute, "Duration in seconds of recorded data")
flag.Parse()

if arg.Listen {
// Write gob-encoded series of byte slices to zstd-compressed file.
// Byte slice is protobuf-encoded prompb.WriteRequest.
app.Run(func(ctx context.Context, lg *zap.Logger, m *app.Metrics) (rerr error) {
f, err := os.Create(arg.Data)
if err != nil {
Expand Down Expand Up @@ -74,23 +103,34 @@ func main() {
Addr: arg.Addr,
ReadHeaderTimeout: time.Second,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
if r.Method == http.MethodGet {
w.WriteHeader(http.StatusOK)
return
}
data, err := decode(r)
if err != nil {
lg.Error("read", zap.Error(err))
lg.Error("Read", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
now := time.Now()
start.CompareAndSwap(nil, &now)
duration := now.Sub(*start.Load())
if duration > arg.Duration {
cancel()
w.WriteHeader(http.StatusAccepted)
cancel()
return
}
// Check if we have a valid request.
var writeRequest prompb.WriteRequest
if err := writeRequest.Unmarshal(data); err != nil {
lg.Error("Unmarshal", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
if err := e.Encode(data); err != nil {
lg.Error("Write", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
cancel()
return
}
w.WriteHeader(http.StatusAccepted)
Expand Down Expand Up @@ -128,4 +168,98 @@ func main() {
return g.Wait()
})
}
app.Run(func(ctx context.Context, lg *zap.Logger, m *app.Metrics) error {
f, err := os.Open(arg.Data)
if err != nil {
return errors.Wrap(err, "open file")
}
defer func() {
if err := f.Close(); err != nil {
lg.Error("close file", zap.Error(err))
}
}()
stat, err := f.Stat()
if err != nil {
return errors.Wrap(err, "stat file")
}

b := pb.New64(stat.Size())
b.Start()
defer b.Finish()

r, err := zstd.NewReader(b.NewProxyReader(f))
if err != nil {
return errors.Wrap(err, "create decoder")
}
defer r.Close()
d := gob.NewDecoder(r)
client := &http.Client{}

fn := func(data []byte) error {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, arg.Addr, bytes.NewReader(data))
if err != nil {
return errors.Wrap(err, "create request")
}
res, err := client.Do(req)
if err != nil {
return errors.Wrap(err, "do request")
}
defer func() {
_ = res.Body.Close()
}()
out, _ := io.ReadAll(io.LimitReader(res.Body, 512))
if len(out) == 0 {
out = []byte("empty")
}
if res.StatusCode != http.StatusAccepted {
return errors.Errorf("%s: %s", res.Status, out)
}
return nil
}

g, ctx := errgroup.WithContext(ctx)
inputs := make(chan []byte, arg.Workers)
for i := 0; i < arg.Workers; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case data := <-inputs:
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = time.Second * 5
notify := func(err error, duration time.Duration) {
lg.Error("Retry", zap.Duration("duration", duration), zap.Error(err))
}
if err := backoff.RetryNotify(func() error {
return fn(data)
}, backoff.WithContext(bo, ctx), notify); err != nil {
return errors.Wrap(err, "retry")
}
}
}
})
}
g.Go(func() error {
defer close(inputs)
for {
var data []byte
if err := d.Decode(&data); err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return errors.Wrap(err, "decode")
}
select {
case <-ctx.Done():
return ctx.Err()
case inputs <- data:
}
}
})
return g.Wait()
})
}
36 changes: 36 additions & 0 deletions dev/local/ch-bench-read/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version: "3"

volumes:
prom:

services:
clickhouse:
image: clickhouse/clickhouse-server:23.10
ports:
- "127.0.0.1:9000:9000"
- "127.0.0.1:8123:8123"
volumes:
- ../clickhouse.xml:/etc/clickhouse-server/config.d/monitoring.xml
healthcheck:
test: ['CMD', 'wget', '--spider', '-q', '127.0.0.1:8123/ping']
interval: 1s
timeout: 1s
retries: 30

oteldb:
build:
context: ../../../
dockerfile: Dockerfile
environment:
- OTELDB_STORAGE=ch
- CH_DSN=clickhouse://clickhouse:9000
- OTEL_LOG_LEVEL=debug
- OTEL_METRICS_EXPORTER=none
- OTEL_LOGS_EXPORTER=none
- OTEL_TRACES_EXPORTER=none
- OTEL_RESOURCE_ATTRIBUTES=service.name=go-faster.oteldb
ports:
- "127.0.0.1:9090:9090"
- "127.0.0.1:19291:19291"
depends_on:
- clickhouse

0 comments on commit 8984a81

Please sign in to comment.