Skip to content

Commit

Permalink
test(prombench): report points per sec from clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Dec 16, 2023
1 parent a187fee commit 2bf6ba3
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 3 deletions.
68 changes: 65 additions & 3 deletions cmd/prombench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"bytes"
"context"
"crypto/sha256"
"errors"
"flag"
"fmt"
"io"
Expand All @@ -16,12 +15,16 @@ import (
"os"
"os/exec"
"path/filepath"
"slices"
"strings"
"sync/atomic"
"syscall"
"time"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/proto"
"github.com/cenkalti/backoff/v4"
"github.com/go-faster/errors"
"github.com/go-faster/sdk/app"
"github.com/go-faster/sdk/zctx"
"go.uber.org/zap"
Expand All @@ -33,6 +36,7 @@ type App struct {
node atomic.Pointer[[]byte]
cfg atomic.Pointer[config]
nodeExporterAddr string
clickhouseAddr string
agentAddr string
targetsCount int
scrapeInterval time.Duration
Expand All @@ -41,6 +45,7 @@ type App struct {
useVictoria bool
targets []string
metricsInfo atomic.Pointer[metricsInfo]
pointsPerSecond atomic.Uint64
}

type metricsInfo struct {
Expand Down Expand Up @@ -150,6 +155,53 @@ func (a *App) HandleNodeExporter(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write(*v)
}

func (a *App) fetchClickhouseStats(ctx context.Context) error {
client, err := ch.Dial(ctx, ch.Options{
Address: a.clickhouseAddr,
})
if err != nil {
return errors.Wrap(err, "dial")
}
defer func() {
_ = client.Close()
}()
var (
seconds proto.ColDateTime
points proto.ColUInt64
)
if err := client.Do(ctx, ch.Query{
Body: `SELECT toDateTime(toStartOfSecond(timestamp)) as ts, COUNT() as total
FROM metrics_points
WHERE timestamp < (now() - toIntervalSecond(3))
GROUP BY ts
ORDER BY ts DESC
LIMIT 15`,
Result: proto.Results{
{Name: "ts", Data: &seconds},
{Name: "total", Data: &points},
},
}); err != nil {
return errors.Wrap(err, "query")
}
a.pointsPerSecond.Store(slices.Max(points))
return nil
}

func (a *App) RunClickhouseReporter(ctx context.Context) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := a.fetchClickhouseStats(ctx); err != nil {
zctx.From(ctx).Error("cannot fetch clickhouse tats", zap.Error(err))
}
}
}
}

func (a *App) RunReporter(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 2)
defer ticker.Stop()
Expand All @@ -163,12 +215,16 @@ func (a *App) RunReporter(ctx context.Context) error {
zctx.From(ctx).Info("no metrics info")
continue
}
zctx.From(ctx).Info("Reporting",
fields := []zap.Field{
zap.String("hash", info.Hash),
zap.Int("scraped.total", info.Count),
zap.Int("scraped.size", info.Size),
zap.Int("metrics.total", info.Count*a.targetsCount),
)
}
if a.clickhouseAddr != "" {
fields = append(fields, zap.Uint64("points_per_sec", a.pointsPerSecond.Load()))
}
zctx.From(ctx).Info("Reporting", fields...)
}
}
}
Expand Down Expand Up @@ -328,6 +384,11 @@ func (a *App) run(ctx context.Context, _ *zap.Logger, _ *app.Metrics) error {
g.Go(func() error {
return a.ProgressConfig(ctx)
})
if a.clickhouseAddr != "" {
g.Go(func() error {
return a.RunClickhouseReporter(ctx)
})
}
g.Go(func() error {
return a.RunReporter(ctx)
})
Expand Down Expand Up @@ -381,6 +442,7 @@ func main() {
flag.DurationVar(&a.scrapeInterval, "scrapeInterval", time.Second*5, "The scrape_interval to set at the scrape config returned from -httpListenAddr")
flag.DurationVar(&a.scrapeConfigUpdateInterval, "scrapeConfigUpdateInterval", time.Minute*10, "The -scrapeConfigUpdatePercent scrape targets are updated in the scrape config returned from -httpListenAddr every -scrapeConfigUpdateInterval")
flag.Float64Var(&a.scrapeConfigUpdatePercent, "scrapeConfigUpdatePercent", 1, "The -scrapeConfigUpdatePercent scrape targets are updated in the scrape config returned from -httpListenAddr ever -scrapeConfigUpdateInterval")
flag.StringVar(&a.clickhouseAddr, "clickhouseAddr", "", "clickhouse tcp protocol addr to get actual stats from")
flag.BoolVar(&a.useVictoria, "useVictoria", true, "use vmagent instead of prometheus")
flag.Parse()
a.parseTargets()
Expand Down
18 changes: 18 additions & 0 deletions dev/local/ch-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,22 @@ WHERE timestamp < (now() - toIntervalSecond(5))
GROUP BY ts
ORDER BY ts DESC
LIMIT 15;
```

Or see attacker logs, points_per_sec attribute:

```console
docker compose logs attacker --no-log-prefix | grep Reporting | tail -n5 | jq -c
```
```json
{"level":"info","ts":1702747705.4254293,"caller":"prombench/main.go:227","msg":"Reporting","hash":"9659488dfc5b1296","scraped.total":1437,"scraped.size":102828,"metrics.total":143700,"points_per_sec":144300}
```

```console
$ docker compose logs attacker --no-log-prefix | grep Reporting | tail -n5 | jq -r .points_per_sec
144400
144400
144400
144353
144353
```
1 change: 1 addition & 0 deletions dev/local/ch-bench/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ services:
- --targetsCount=100
- --scrapeConfigUpdateInterval=5s
- --scrapeConfigUpdatePercent=3
- --clickhouseAddr=clickhouse:9000
- http://oteldb:19291
depends_on:
- clickhouse
Expand Down

0 comments on commit 2bf6ba3

Please sign in to comment.