diff --git a/cmd/prombench/LICENSE b/cmd/prombench/LICENSE new file mode 100644 index 00000000..6b553f66 --- /dev/null +++ b/cmd/prombench/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 VictoriaMetrics + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/cmd/prombench/cfg.go b/cmd/prombench/cfg.go new file mode 100644 index 00000000..0fde591f --- /dev/null +++ b/cmd/prombench/cfg.go @@ -0,0 +1,86 @@ +package main + +import ( + "fmt" + "time" + + "gopkg.in/yaml.v2" +) + +func (c *config) marshalYAML() []byte { + data, err := yaml.Marshal(c) + if err != nil { + panic(err) + } + return data +} + +func newConfig(targetsCount int, scrapeInterval time.Duration, targetAddr string) *config { + scs := make([]*staticConfig, 0, targetsCount) + for i := 0; i < targetsCount; i++ { + scs = append(scs, &staticConfig{ + Targets: []string{targetAddr}, + Labels: map[string]string{ + "instance": fmt.Sprintf("host-%d", i), + "revision": "r0", + }, + }) + } + return &config{ + Global: globalConfig{ + ScrapeInterval: scrapeInterval, + }, + ScrapeConfigs: []*scrapeConfig{ + { + JobName: "node_exporter", + StaticConfigs: scs, + Path: "/node", + }, + }, + } +} + +// config represents essential parts from Prometheus config defined at https://prometheus.io/docs/prometheus/latest/configuration/configuration/ +type config struct { + Global globalConfig `yaml:"global"` + ScrapeConfigs []*scrapeConfig `yaml:"scrape_configs,omitempty"` + RemoteWrites []*remoteWriteConfig `yaml:"remote_write,omitempty"` +} + +// globalConfig represents essential parts for `global` section of Prometheus config. +// +// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/ +type globalConfig struct { + ScrapeInterval time.Duration `yaml:"scrape_interval"` +} + +// rapeConfig represents essential parts for `scrape_config` section of Prometheus config. +// +// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config +type scrapeConfig struct { + JobName string `yaml:"job_name"` + Path string `yaml:"metrics_path,omitempty"` + StaticConfigs []*staticConfig `yaml:"static_configs"` +} + +// staticConfig represents essential parts for `static_config` section of Prometheus config. +// +// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#static_config +type staticConfig struct { + Targets []string `yaml:"targets"` + Labels map[string]string `yaml:"labels"` +} + +// remoteWriteConfig represents essential parts for `remote_write` section of Prometheus config. +// +// https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write +type remoteWriteConfig struct { + URL string `yaml:"url"` + Name string `yaml:"name,omitempty"` + Metadata *remoteWriteMetadataConfig `yaml:"metadata_config,omitempty"` +} + +type remoteWriteMetadataConfig struct { + Send bool `yaml:"send"` + SendInterval time.Duration `yaml:"send_interval,omitempty"` +} diff --git a/cmd/prombench/main.go b/cmd/prombench/main.go new file mode 100644 index 00000000..f317850a --- /dev/null +++ b/cmd/prombench/main.go @@ -0,0 +1,350 @@ +package main + +import ( + "bufio" + "bytes" + "context" + "crypto/sha256" + "errors" + "flag" + "fmt" + "io" + "math/rand" + "net/http" + "net/url" + "os" + "os/exec" + "path/filepath" + "strings" + "sync/atomic" + "syscall" + "time" + + "github.com/go-faster/sdk/app" + "github.com/go-faster/sdk/zctx" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +type App struct { + addr string + node atomic.Pointer[[]byte] + cfg atomic.Pointer[config] + nodeExporterAddr string + agentAddr string + targetsCount int + scrapeInterval time.Duration + scrapeConfigUpdateInterval time.Duration + scrapeConfigUpdatePercent float64 + useVictoria bool + targets []string + metricsInfo atomic.Pointer[metricsInfo] +} + +type metricsInfo struct { + Count int + Size int + Hash string +} + +func (a *App) PollNodeExporter(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := a.fetchNodeExporter(ctx); err != nil { + zctx.From(ctx).Error("cannot fetch node exporter", zap.Error(err)) + } + } + } +} + +func (a *App) fetchNodeExporter(ctx context.Context) error { + u := &url.URL{ + Scheme: "http", + Path: "/metrics", + Host: a.nodeExporterAddr, + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), http.NoBody) + if err != nil { + return err + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer func() { + _ = res.Body.Close() + }() + data, err := io.ReadAll(res.Body) + if err != nil { + return err + } + a.node.Store(&data) + + // Count metrics. + var metricsCount int + s := bufio.NewScanner(bytes.NewReader(data)) + for s.Scan() { + text := strings.TrimSpace(s.Text()) + if text == "" || strings.HasPrefix(text, "#") { + continue + } + metricsCount++ + } + d := sha256.Sum256(data) + h := fmt.Sprintf("%x", d[:8]) + a.metricsInfo.Store(&metricsInfo{ + Count: metricsCount, + Size: len(data), + Hash: h, + }) + return nil +} + +func (a *App) HandleConfig(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + v := a.cfg.Load() + _, _ = w.Write(v.marshalYAML()) +} + +func (a *App) ProgressConfig(ctx context.Context) error { + rev := 0 + r := rand.New(rand.NewSource(1)) // #nosec G404 + p := a.scrapeConfigUpdatePercent / 100 + ticker := time.NewTicker(a.scrapeConfigUpdateInterval) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + rev++ + revStr := fmt.Sprintf("r%d", rev) + cfg := a.cfg.Load() + for _, sc := range cfg.ScrapeConfigs { + for _, stc := range sc.StaticConfigs { + if r.Float64() >= p { + continue + } + stc.Labels["revision"] = revStr + } + } + a.cfg.Store(cfg) + } + } +} + +func (a *App) HandleNodeExporter(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + v := a.node.Load() + if v == nil { + _, _ = w.Write([]byte("# no data")) + return + } + _, _ = w.Write(*v) +} + +func (a *App) RunReporter(ctx context.Context) error { + ticker := time.NewTicker(time.Second * 2) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + info := a.metricsInfo.Load() + if info == nil { + zctx.From(ctx).Info("no metrics info") + continue + } + zctx.From(ctx).Info("Reporting", + zap.String("hash", info.Hash), + zap.Int("scraped.total", info.Count), + zap.Int("scraped.size", info.Size), + zap.Int("metrics.total", info.Count*a.targetsCount), + ) + } + } +} + +func (a *App) RunNodeExporter(ctx context.Context) error { + args := []string{ + "--no-collector.wifi", + "--no-collector.hwmon", + "--no-collector.time", + "--no-collector.timex", + "--no-collector.arp", + "--no-collector.netdev", + "--no-collector.netstat", + "--collector.processes", + "--web.max-requests=40", + "--web.listen-address=" + a.nodeExporterAddr, + "--log.format=json", + } + // #nosec G204 + cmd := exec.CommandContext(ctx, "node_exporter", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd.Run() +} + +func (a *App) RunAgent(ctx context.Context) error { + if len(a.targets) != 1 { + return errors.New("expected one target") + } + arg := []string{ + "--httpListenAddr=" + a.agentAddr, + "--loggerFormat=json", + "--remoteWrite.showURL", + "--promscrape.config=http://" + a.addr + "/config", + "--remoteWrite.url=" + a.targets[0], + } + // #nosec G204 + cmd := exec.CommandContext(ctx, "vmagent", arg...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd.Run() +} + +func (a *App) RunPrometheus(ctx context.Context, dir string) error { + defer func() { + _ = os.RemoveAll(dir) + }() + prometheusConfigFile := filepath.Join(dir, "prometheus.yml") + if err := os.WriteFile(prometheusConfigFile, a.cfg.Load().marshalYAML(), 0o600); err != nil { + return err + } + // #nosec G204 + cmd := exec.CommandContext(ctx, "prometheus", + "--config.file="+filepath.Join(dir, "prometheus.yml"), + "--web.listen-address="+a.agentAddr, + "--enable-feature=agent", + "--enable-feature=new-service-discovery-manager", + "--log.format=json", + ) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Dir = dir + go func() { + // Periodically update the config. + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := os.WriteFile(prometheusConfigFile, a.cfg.Load().marshalYAML(), 0o600); err != nil { + zctx.From(ctx).Error("cannot update prometheus config", zap.Error(err)) + } + if err := cmd.Process.Signal(syscall.SIGHUP); err != nil { + zctx.From(ctx).Error("cannot send SIGHUP to prometheus", zap.Error(err)) + } + } + } + }() + return cmd.Run() +} + +func (a *App) prometheusConfig() *config { + cfg := newConfig(a.targetsCount, a.scrapeInterval, a.addr) + if !a.useVictoria { + var remotes []*remoteWriteConfig + for i, target := range a.targets { + remotes = append(remotes, &remoteWriteConfig{ + URL: target, + Name: fmt.Sprintf("target-%d", i), + Metadata: &remoteWriteMetadataConfig{ + Send: true, + SendInterval: time.Second, + }, + }) + } + cfg.RemoteWrites = remotes + } + return cfg +} + +func (a *App) parseTargets() { + for _, arg := range flag.Args() { + u, err := url.Parse(arg) + if err != nil { + fmt.Fprintln(os.Stderr, "invalid target:", err) + os.Exit(1) + } + a.targets = append(a.targets, u.String()) + } + if len(a.targets) == 0 { + fmt.Fprintln(os.Stderr, "no targets specified") + os.Exit(1) + } +} + +func (a *App) run(ctx context.Context, _ *zap.Logger, _ *app.Metrics) error { + a.cfg.Store(a.prometheusConfig()) + + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + return a.ProgressConfig(ctx) + }) + g.Go(func() error { + return a.RunReporter(ctx) + }) + if a.useVictoria { + g.Go(func() error { + return a.RunAgent(ctx) + }) + } else { + prometheusDir, err := os.MkdirTemp("", "prometheus") + if err != nil { + return err + } + g.Go(func() error { + return a.RunPrometheus(ctx, prometheusDir) + }) + } + g.Go(func() error { + return a.RunNodeExporter(ctx) + }) + g.Go(func() error { + a.PollNodeExporter(ctx) + return nil + }) + g.Go(func() error { + mux := http.NewServeMux() + mux.HandleFunc("/node", a.HandleNodeExporter) + mux.HandleFunc("/config", a.HandleConfig) + srv := &http.Server{ + Addr: a.addr, + Handler: mux, + ReadHeaderTimeout: time.Second * 5, + } + go func() { + <-ctx.Done() + _ = srv.Close() + }() + if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil + }) + return g.Wait() +} + +func main() { + var a App + flag.StringVar(&a.nodeExporterAddr, "nodeExporterAddr", "127.0.0.1:9301", "address for node exporter to listen") + flag.StringVar(&a.addr, "addr", "127.0.0.1:8428", "address to listen") + flag.StringVar(&a.agentAddr, "agentAddr", "127.0.0.1:8429", "address for vmagent to listen") + flag.IntVar(&a.targetsCount, "targetsCount", 100, "The number of scrape targets to return from -httpListenAddr. Each target has the same address defined by -targetAddr") + flag.DurationVar(&a.scrapeInterval, "scrapeInterval", time.Second*5, "The scrape_interval to set at the scrape config returned from -httpListenAddr") + flag.DurationVar(&a.scrapeConfigUpdateInterval, "scrapeConfigUpdateInterval", time.Minute*10, "The -scrapeConfigUpdatePercent scrape targets are updated in the scrape config returned from -httpListenAddr every -scrapeConfigUpdateInterval") + flag.Float64Var(&a.scrapeConfigUpdatePercent, "scrapeConfigUpdatePercent", 1, "The -scrapeConfigUpdatePercent scrape targets are updated in the scrape config returned from -httpListenAddr ever -scrapeConfigUpdateInterval") + flag.BoolVar(&a.useVictoria, "useVictoria", true, "use vmagent instead of prometheus") + flag.Parse() + a.parseTargets() + app.Run(a.run) +} diff --git a/dev/local/ch-bench/README.md b/dev/local/ch-bench/README.md new file mode 100644 index 00000000..036645b1 --- /dev/null +++ b/dev/local/ch-bench/README.md @@ -0,0 +1,38 @@ +# ch-bench + +This docker compose project will run a [prometheus-benchmark](https://github.com/VictoriaMetrics/prometheus-benchmark) +composed as a single binary (see ./cmd/prombench) against single instance of oteldb. + +The `probmench` runs `node_exporter` and `vmagent`, acting as a dynamic configuration source for `vmagent` scrape targets +and caching proxy before `node_exporter`, also logging some metrics. +This generates load as `vmagent` scrapes generated config and sends to configured remote write address per `scrapeInterval`. + +Total generated load are controlled by following arguments: +- `-targetsCount`, number of targets to generate (i.e. "virtual" node exporter instances) +- `-scrapeInterval`, interval between scrapes of each target + +So total metrics per second would be `(targetsCount * scrapedMetrics) / scrapeInterval`, where +`scrapedMetrics` is number of metrics scraped from node exporter. + +The `scapedMetrics` depends on machine where node exporter runs, but it averages around 1400-1500. + +The `scrapeConfigUpdateInterval` and `scrapeConfigUpdatePercent` controls new "tags" generation. +This does not impact metrics per second, but will generate more unique metrics. + +To start: + +``` +docker compose up -d +``` + +To collect profiles: +``` +./prof.sh +``` + +This will emit profiles as `cpu.out` and `mem.out`: + +``` +go tool pprof -alloc_space mem.out +go tool pprof cpu.out +``` \ No newline at end of file diff --git a/dev/local/ch-bench/clickhouse.xml b/dev/local/ch-bench/clickhouse.xml new file mode 100644 index 00000000..e4f4b22e --- /dev/null +++ b/dev/local/ch-bench/clickhouse.xml @@ -0,0 +1,3 @@ +<clickhouse> + <listen_host>0.0.0.0</listen_host> +</clickhouse> \ No newline at end of file diff --git a/dev/local/ch-bench/docker-compose.yml b/dev/local/ch-bench/docker-compose.yml new file mode 100644 index 00000000..f5e027ec --- /dev/null +++ b/dev/local/ch-bench/docker-compose.yml @@ -0,0 +1,48 @@ +version: "3" + +services: + clickhouse: + image: clickhouse/clickhouse-server:23.10 + ports: + - "9000:9000" + - "8123:8123" + volumes: + - ./clickhouse.xml:/etc/clickhouse-server/config.d/monitoring.xml + healthcheck: + test: ['CMD', 'wget', '--spider', '-q', '127.0.0.1:8123/ping'] + interval: 1s + timeout: 1s + retries: 30 + + attacker: + build: + context: ../../../ + dockerfile: prombench.Dockerfile + command: + - --scrapeInterval=1s + - --targetsCount=100 + - --scrapeConfigUpdateInterval=5s + - --scrapeConfigUpdatePercent=3 + - http://oteldb:19291 + depends_on: + - clickhouse + - oteldb + + oteldb: + build: + context: ../../../ + dockerfile: Dockerfile + environment: + - OTELDB_STORAGE=ch + - CH_DSN=clickhouse://clickhouse:9000 + - METRICS_ADDR=:3201 + - OTEL_LOG_LEVEL=warn + - OTEL_METRICS_EXPORTER=none + - OTEL_LOGS_EXPORTER=none + - OTEL_TRACES_EXPORTER=none + - OTEL_RESOURCE_ATTRIBUTES=service.name=oteldb + - PPROF_ADDR=:9010 + ports: + - "9010:9010" # pprof http://localhost:9010/debug/pprof/ + depends_on: + - clickhouse diff --git a/dev/local/ch-bench/prof.sh b/dev/local/ch-bench/prof.sh new file mode 100755 index 00000000..72801d8d --- /dev/null +++ b/dev/local/ch-bench/prof.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +set -ex + +# CPU +curl -o cpu.out http://localhost:9010/debug/pprof/profile?seconds=10 + +# Memory +curl -o mem.out http://localhost:9010/debug/pprof/heap?seconds=10 diff --git a/prombench.Dockerfile b/prombench.Dockerfile new file mode 100644 index 00000000..d44b8bb0 --- /dev/null +++ b/prombench.Dockerfile @@ -0,0 +1,21 @@ +FROM golang:latest as builder + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . ./ +RUN CGO_ENABLED=0 GOOS=linux go build -o /prombench ./cmd/prombench + +FROM alpine:latest +RUN apk --no-cache add ca-certificates + +# Some dependencies that prombench launches with os.Exec: +COPY --from=victoriametrics/vmagent /vmagent-prod /bin/vmagent +COPY --from=prom/prometheus /bin/prometheus /bin/promtool +COPY --from=quay.io/prometheus/node-exporter /bin/node_exporter /bin/node_exporter + +COPY --from=builder /prombench /bin/prombench + +ENTRYPOINT ["/bin/prombench"] \ No newline at end of file