Skip to content

Commit

Permalink
feat(prombench): print detailed stats
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Dec 16, 2023
1 parent bc46b2a commit 9e8dfea
Showing 1 changed file with 167 additions and 26 deletions.
193 changes: 167 additions & 26 deletions cmd/prombench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/proto"
"github.com/cenkalti/backoff/v4"
"github.com/dustin/go-humanize"
"github.com/go-faster/errors"
"github.com/go-faster/sdk/app"
"github.com/go-faster/sdk/zctx"
Expand All @@ -45,7 +46,7 @@ type App struct {
useVictoria bool
targets []string
metricsInfo atomic.Pointer[metricsInfo]
pointsPerSecond atomic.Uint64
storageInfo atomic.Pointer[storageInfo]
}

type metricsInfo struct {
Expand Down Expand Up @@ -155,6 +156,18 @@ func (a *App) HandleNodeExporter(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write(*v)
}

type storageInfo struct {
Start time.Time
End time.Time
Rows int
DiskSizeBytes int
PrimaryKeySize int
CompressedSize int
UncompressedSize int
CompressRatio float64
PointsPerSecond int
}

func (a *App) fetchClickhouseStats(ctx context.Context) error {
client, err := ch.Dial(ctx, ch.Options{
Address: a.clickhouseAddr,
Expand All @@ -165,29 +178,108 @@ func (a *App) fetchClickhouseStats(ctx context.Context) error {
defer func() {
_ = client.Close()
}()
var (
seconds proto.ColDateTime
points proto.ColUInt64
)
if err := client.Do(ctx, ch.Query{
Body: `SELECT toDateTime(toStartOfSecond(timestamp)) as ts, COUNT() as total
var info storageInfo
{
var start, end proto.ColDateTime64
if err := client.Do(ctx, ch.Query{
Body: `SELECT min(timestamp) as start, max(timestamp) as end FROM metrics_points`,
Result: proto.Results{
{Name: "start", Data: &start},
{Name: "end", Data: &end},
},
}); err != nil {
return errors.Wrap(err, "query")
}
info.Start = start.Row(0)
info.End = end.Row(0)
}
{
var (
seconds proto.ColDateTime
points proto.ColUInt64
)
if err := client.Do(ctx, ch.Query{
Body: `SELECT toDateTime(toStartOfSecond(timestamp)) as ts, COUNT() as total
FROM metrics_points
WHERE timestamp < (now() - toIntervalSecond(3))
GROUP BY ts
ORDER BY ts DESC
LIMIT 15`,
Result: proto.Results{
{Name: "ts", Data: &seconds},
{Name: "total", Data: &points},
},
}); err != nil {
return errors.Wrap(err, "query")
Result: proto.Results{
{Name: "ts", Data: &seconds},
{Name: "total", Data: &points},
},
}); err != nil {
return errors.Wrap(err, "query")
}
if len(points) > 0 {
info.PointsPerSecond = int(slices.Max(points))
}
}
if len(points) == 0 {
a.pointsPerSecond.Store(0)
} else {
a.pointsPerSecond.Store(slices.Max(points))
{
var (
table proto.ColStr
rows proto.ColUInt64
diskSize proto.ColUInt64
primaryKeysSize proto.ColUInt64
compressedSize proto.ColUInt64
uncompressedSize proto.ColUInt64
compressRatio proto.ColFloat64
)
const query = `select parts.*,
columns.compressed_size,
columns.uncompressed_size,
columns.ratio
from (
select table,
sum(data_uncompressed_bytes) AS uncompressed_size,
sum(data_compressed_bytes) AS compressed_size,
round(sum(data_uncompressed_bytes) / sum(data_compressed_bytes), 2) AS ratio
from system.columns
where database = 'default'
group by table
) columns
right join (
select table,
sum(rows) as rows,
sum(bytes) as disk_size,
sum(primary_key_bytes_in_memory) as primary_keys_size
from system.parts
where active and database = 'default'
group by database, table
) parts on columns.table = parts.table
order by parts.disk_size desc`
if err := client.Do(ctx, ch.Query{
Body: query,
Result: proto.Results{
{Name: "parts.table", Data: &table},
{Name: "rows", Data: &rows},
{Name: "disk_size", Data: &diskSize},
{Name: "primary_keys_size", Data: &primaryKeysSize},
{Name: "compressed_size", Data: &compressedSize},
{Name: "uncompressed_size", Data: &uncompressedSize},
{Name: "ratio", Data: &compressRatio},
},
}); err != nil {
return errors.Wrap(err, "query")
}
for i := 0; i < len(rows); i++ {
switch table.Row(i) {
case "metrics_points":
info.Rows = int(rows.Row(i))
info.DiskSizeBytes = int(diskSize.Row(i))
info.PrimaryKeySize = int(primaryKeysSize.Row(i))
info.CompressedSize = int(compressedSize.Row(i))
info.UncompressedSize = int(uncompressedSize.Row(i))
info.CompressRatio = compressRatio.Row(i)
default:
continue
}
}
}

a.storageInfo.Store(&info)

return nil
}

Expand All @@ -200,15 +292,28 @@ func (a *App) RunClickhouseReporter(ctx context.Context) error {
return ctx.Err()
case <-ticker.C:
if err := a.fetchClickhouseStats(ctx); err != nil {
zctx.From(ctx).Error("cannot fetch clickhouse tats", zap.Error(err))
zctx.From(ctx).Error("cannot fetch clickhouse stats", zap.Error(err))
}
}
}
}

func fmtInt(v int) string {
s := humanize.SIWithDigits(float64(v), 0, "")
s = strings.ReplaceAll(s, " ", "")
return s
}

func compactBytes(v int) string {
s := humanize.Bytes(uint64(v))
s = strings.ReplaceAll(s, " ", "")
return s
}

func (a *App) RunReporter(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 2)
defer ticker.Stop()
var lastHash string
for {
select {
case <-ctx.Done():
Expand All @@ -219,16 +324,52 @@ func (a *App) RunReporter(ctx context.Context) error {
zctx.From(ctx).Info("no metrics info")
continue
}
fields := []zap.Field{
zap.String("hash", info.Hash),
zap.Int("scraped.total", info.Count),
zap.Int("scraped.size", info.Size),
zap.Int("metrics.total", info.Count*a.targetsCount),
{
if lastHash == info.Hash {
zctx.From(ctx).Warn("Last hash is the same, node exporter stalled or not working?", zap.String("hash", info.Hash))
}
lastHash = info.Hash
}
if a.clickhouseAddr != "" {
fields = append(fields, zap.Uint64("points_per_sec", a.pointsPerSecond.Load()))
var s strings.Builder
s.WriteString(fmt.Sprintf("m=%s", fmtInt(info.Count*a.targetsCount)))
if v := a.storageInfo.Load(); v != nil && a.clickhouseAddr != "" {
now := time.Now()
s.WriteString(" ")
s.WriteString(fmt.Sprintf("uptime=%s", now.Sub(v.Start).Round(time.Second)))
s.WriteString(" ")
s.WriteString(fmt.Sprintf("lag=%s", now.Sub(v.End).Round(time.Millisecond)))
s.WriteString(" ")
s.WriteString(fmt.Sprintf("pps=%s", fmtInt(v.PointsPerSecond)))
s.WriteString(" ")
s.WriteString(fmt.Sprintf("rows=%s", fmtInt(v.Rows)))
s.WriteString(" ")
s.WriteString(
fmt.Sprintf("%s -> %s (%.0fx)",
compactBytes(v.CompressedSize),
compactBytes(v.UncompressedSize),
v.CompressRatio,
),
)
bytesPerPoint := float64(v.CompressedSize) / float64(v.Rows)
s.WriteString(" ")
s.WriteString(fmt.Sprintf("%.1f b/point", bytesPerPoint))

type metric struct {
Name string
Seconds int
}
for _, m := range []metric{
{Name: "d", Seconds: 60 * 60 * 24},
{Name: "w", Seconds: 60 * 60 * 24 * 7},
{Name: "m", Seconds: 60 * 60 * 24 * 30},
} {
rowsPerDay := v.PointsPerSecond * m.Seconds
dataPerDay := float64(rowsPerDay) / float64(v.Rows) * float64(v.CompressedSize)
s.WriteString(" ")
s.WriteString(fmt.Sprintf("%s/%s", compactBytes(int(dataPerDay)), m.Name))
}
}
zctx.From(ctx).Info("Reporting", fields...)
fmt.Println(s.String())
}
}
}
Expand Down

0 comments on commit 9e8dfea

Please sign in to comment.