diff --git a/Dockerfile b/Dockerfile index 7f10616..be615ad 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,21 @@ +FROM golang:1.20-alpine as build + +ADD . /src/ + +RUN cd /src && \ + go mod download && \ + go build -o /ripley . && \ + go build -o /dummyweb etc/dummyweb.go && \ + go build -o /linegen etc/linegen.go + +################################## # Start fresh from a smaller image -FROM alpine:3.15.0 +################################## +FROM alpine:latest RUN apk add ca-certificates -COPY ripley /usr/bin/ripley -ENTRYPOINT ["/usr/bin/ripley"] \ No newline at end of file +COPY --from=build /ripley /usr/bin/ripley +COPY --from=build /linegen /usr/bin/linegen +COPY --from=build /dummyweb /usr/bin/dummyweb + +ENTRYPOINT ["/usr/bin/ripley"] diff --git a/README.md b/README.md index db15c8e..db4b659 100644 --- a/README.md +++ b/README.md @@ -129,3 +129,11 @@ cat etc/requests.jsonl | ./ripley -pace "30s@1" -dry-run ```bash go test pkg/*.go ``` + +## Memory profiling + +``` +./ripley -metricsServerEnable -printStat -memprofile dist/mem.pprof -pace 1m@10 -workers 100 < etc/requests.jsonl + +go tool pprof --alloc_objects ripley dist/mem.pprof +```bash diff --git a/etc/dummyweb.go b/etc/dummyweb.go index b05898f..04a8e99 100644 --- a/etc/dummyweb.go +++ b/etc/dummyweb.go @@ -1,23 +1,100 @@ package main import ( + "encoding/json" + "flag" + "fmt" "log" - "net/http" - "net/http/httputil" + "os" + "os/signal" + "syscall" "time" + + "github.com/valyala/fasthttp" + "github.com/valyala/fasthttp/reuseport" ) -func handler(w http.ResponseWriter, r *http.Request) { - dump, err := httputil.DumpRequest(r, true) +var silent bool + +func main() { + flag.BoolVar(&silent, "silent", false, "whether to silence output") + addr := flag.String("addr", "localhost:8080", "server listen address. Default: localhost:8080") + flag.Parse() + + // Create a new listener on the given address using port reuse + ln, err := reuseport.Listen("tcp4", *addr) if err != nil { - panic(err) + log.Fatalf("error creating listener: %v", err) } + defer ln.Close() - log.Printf("%v: %s\n", time.Now().Format(time.UnixDate), string(dump)) - w.Write([]byte("hi\n")) + // Create a new fasthttp server + server := &fasthttp.Server{ + TCPKeepalive: true, + LogAllErrors: true, + ReadBufferSize: 1024 * 1024, + WriteBufferSize: 1024 * 1024, + ReadTimeout: 90 * time.Second, + WriteTimeout: 5 * time.Second, + Handler: requestHandler, + } + + // Start the server in a goroutine + go func() { + if err := server.Serve(ln); err != nil { + log.Fatalf("error starting server: %v", err) + } + }() + + // Wait for a signal to stop the server + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGTERM) + <-sig + + // Stop the server + server.Shutdown() } -func main() { - http.HandleFunc("/", handler) - log.Fatal(http.ListenAndServe(":8080", nil)) +func requestToJSON(req *fasthttp.Request) ([]byte, error) { + type requestJSON struct { + URI string `json:"uri"` + Method string `json:"method"` + Headers map[string]string `json:"headers"` + ContentType string `json:"content_type"` + Body string `json:"body"` + } + + // Get the request URI, method, headers, content type, and body + uri := string(req.URI().FullURI()) + method := string(req.Header.Method()) + headers := make(map[string]string) + req.Header.VisitAll(func(k, v []byte) { + headers[string(k)] = string(v) + }) + contentType := string(req.Header.ContentType()) + body := string(req.Body()) + + // Create a requestJSON struct and marshal it to JSON + reqJSON := &requestJSON{ + URI: uri, + Method: method, + Headers: headers, + ContentType: contentType, + Body: body, + } + return json.Marshal(reqJSON) +} + +func requestHandler(ctx *fasthttp.RequestCtx) { + jsonData, _ := requestToJSON(&ctx.Request) + + if !silent { + fmt.Println(string(jsonData)) + } + + ctx.SetContentType("application/json") + ctx.Response.Header.SetContentLength(len(jsonData)) + // ctx.Response.Header.Set("Connection", "keep-alive") + ctx.SetStatusCode(fasthttp.StatusOK) + ctx.Write(jsonData) } diff --git a/etc/linegen.go b/etc/linegen.go new file mode 100644 index 0000000..bf399bd --- /dev/null +++ b/etc/linegen.go @@ -0,0 +1,29 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "os" + "time" +) + +func main() { + rate := flag.Int("rate", 10, "lines per second") + flag.Parse() + + tickDuration := time.Duration((1000000 / (*rate))) * time.Microsecond + + ticker := time.NewTicker(tickDuration) + defer ticker.Stop() + + scanner := bufio.NewScanner(os.Stdin) + if !scanner.Scan() { + return + } + line := scanner.Text() + + for range ticker.C { + fmt.Println(line) + } +} diff --git a/etc/requests.jsonl b/etc/requests.jsonl index 165da08..1fd1d8f 100644 --- a/etc/requests.jsonl +++ b/etc/requests.jsonl @@ -1,15 +1,12 @@ -{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:50.9Z"} -{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:51.9Z"} -{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:52.9Z"} -{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:53.9Z"} -{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:54.9Z"} -{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:55.9Z"} -{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:56.9Z"} -{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:57.9Z"} -{"url": "http://localhost:8080/", "method": "POST", "body": "{\"foo\": \"bar\"}", "headers": {"Accept": "text/plain"}, "timestamp": "2021-11-08T18:59:58.9Z"} +{"url": "http://localhost:8080/localhost:8080", "method": "GET", "timestamp": "2021-11-08T18:59:50.9Z", "headers": {"Content-Type": "application/json", "Host": "example.net", "Cookies":"cookie=1234567890", "User-Agent":"Mozilla/5.0", "x-custom-header":"x-custom-header-value"}} +{"url": "http://localhost:8081/localhost:8081", "method": "GET", "timestamp": "2021-11-08T18:59:50.9Z", "headers": {"Content-Type": "application/json", "Host": "example.net", "Cookies":"cookie=1234567890", "User-Agent":"Mozilla/5.0", "x-custom-header":"x-custom-header-value"}} +{"url": "http://localhost:8080/localhost:8080", "method": "POST", "body": "{\"foo\": \"bar\"}", "headers": {"Accept": "text/plain"}, "timestamp": "2021-11-08T18:59:58.9Z"} +{"url": "http://localhost:8081/localhost:8081", "method": "POST", "body": "{\"foo\": \"bar\"}", "headers": {"Accept": "text/plain"}, "timestamp": "2021-11-08T18:59:58.9Z"} +{"url": "http://localhost:8081/", "method": "GET", "headers": {"Accept": "text/plain"}, "timestamp": "2021-11-08T18:59:59.9Z"} {"url": "http://localhost:8080/", "method": "GET", "headers": {"Accept": "text/plain"}, "timestamp": "2021-11-08T18:59:59.9Z"} -{"url": "http://localhost:8080/", "method": "HEAD", "timestamp": "2021-11-08T19:00:00.00Z"} +{"url": "http://localhost:8080/", "method": "HEAD", "timestamp": "2021-11-08T19:00:00.00Z", "headers": {"Connection": "close" }} {"url": "http://localhost:8080/", "method": "OPTIONS", "timestamp": "2021-11-08T19:00:00.01Z"} {"url": "http://localhost:8080/", "method": "TRACE", "timestamp": "2021-11-08T19:00:00.02Z"} +{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T19:00:00.04Z"} +{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T19:00:00.04Z"}}} {"url": "http://localhost:8080/", "method": "PROPFIND", "timestamp": "2021-11-08T19:00:00.03Z"} -{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T19:00:00.04Z"} \ No newline at end of file diff --git a/go.mod b/go.mod index ed2334f..d6ac7f6 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,20 @@ module github.com/loveholidays/ripley -go 1.17 +go 1.20 + +require ( + github.com/VictoriaMetrics/metrics v1.23.1 + github.com/montanaflynn/stats v0.7.0 + github.com/valyala/fasthttp v1.45.0 + github.com/valyala/fastjson v1.6.4 +) + +require ( + github.com/andybalholm/brotli v1.0.5 // indirect + github.com/klauspost/compress v1.16.3 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fastrand v1.1.0 // indirect + github.com/valyala/histogram v1.2.0 // indirect + github.com/valyala/tcplisten v1.0.0 // indirect + golang.org/x/sys v0.6.0 // indirect +) diff --git a/go.sum b/go.sum index e69de29..805eb81 100644 --- a/go.sum +++ b/go.sum @@ -0,0 +1,22 @@ +github.com/VictoriaMetrics/metrics v1.23.1 h1:/j8DzeJBxSpL2qSIdqnRFLvQQhbJyJbbEi22yMm7oL0= +github.com/VictoriaMetrics/metrics v1.23.1/go.mod h1:rAr/llLpEnAdTehiNlUxKgnjcOuROSzpw0GvjpEbvFc= +github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= +github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY= +github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/montanaflynn/stats v0.7.0 h1:r3y12KyNxj/Sb/iOE46ws+3mS1+MZca1wlHQFPsY/JU= +github.com/montanaflynn/stats v0.7.0/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.45.0 h1:zPkkzpIn8tdHZUrVa6PzYd0i5verqiPSkgTd3bSUcpA= +github.com/valyala/fasthttp v1.45.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= +github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ= +github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= +github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= +github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= +github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= +github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/main.go b/main.go index 1753070..c36a6b0 100644 --- a/main.go +++ b/main.go @@ -20,29 +20,49 @@ package main import ( "flag" + "fmt" + _ "net/http/pprof" "os" + "os/signal" "runtime" "runtime/pprof" + "syscall" ripley "github.com/loveholidays/ripley/pkg" ) func main() { - paceStr := flag.String("pace", "10s@1", `[duration]@[rate], e.g. "1m@1 30s@1.5 1h@2"`) - silent := flag.Bool("silent", false, "Suppress output") - dryRun := flag.Bool("dry-run", false, "Consume input but do not send HTTP requests to targets") - timeout := flag.Int("timeout", 10, "HTTP client timeout in seconds") - strict := flag.Bool("strict", false, "Panic on bad input") - memprofile := flag.String("memprofile", "", "Write memory profile to `file` before exit") - numWorkers := flag.Int("workers", 1000, "Number of client workers to use") + var opts ripley.Options + + flag.StringVar(&opts.Pace, "pace", "10s@1", `[duration]@[rate], e.g. "1m@1 30s@1.5 1h@2"`) + flag.BoolVar(&opts.Silent, "silent", false, "Suppress output") + flag.BoolVar(&opts.SilentHttpError, "silentHttpError", false, "Suppress HTTP errors (http codes 5xx) output") + + flag.BoolVar(&opts.DryRun, "dry-run", false, "Consume input but do not send HTTP requests to targets") + flag.IntVar(&opts.Timeout, "timeout", 10, "HTTP client request timeout in seconds") + flag.IntVar(&opts.TimeoutConnection, "timeoutConnection", 3, "HTTP client connetion timeout in seconds") + flag.BoolVar(&opts.Strict, "strict", false, "Panic on bad input") + flag.StringVar(&opts.Memprofile, "memprofile", "", "Write memory profile to `file` before exit") + flag.IntVar(&opts.NumWorkers, "workers", 10, "Number of client workers to use") + + flag.BoolVar(&opts.PrintStat, "printStat", false, "Print statistics to stdout at the end") + flag.BoolVar(&opts.MetricsServerEnable, "metricsServerEnable", false, "Enable metrics server. Server prometheus statistics on /metrics endpoint") + flag.StringVar(&opts.MetricsServerAddr, "metricsServerAddr", "0.0.0.0:8081", "Metrics server listen address") + + flag.IntVar(&opts.PrintNSlowest, "printNSlowest", 0, "Print N slowest Requests") + + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "Usage: %s -target string\n", os.Args[0]) + flag.PrintDefaults() + } flag.Parse() - exitCode := ripley.Replay(*paceStr, *silent, *dryRun, *timeout, *strict, *numWorkers) + exitCode := ripley.Replay(&opts) defer os.Exit(exitCode) - if *memprofile != "" { - f, err := os.Create(*memprofile) + if opts.Memprofile != "" { + f, err := os.Create(opts.Memprofile) if err != nil { panic(err) @@ -54,5 +74,10 @@ func main() { if err := pprof.WriteHeapProfile(f); err != nil { panic(err) } + + // Wait for a signal to stop the server + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGTERM) + <-sig } } diff --git a/pkg/client.go b/pkg/client.go index b2cee5c..743707a 100644 --- a/pkg/client.go +++ b/pkg/client.go @@ -19,68 +19,55 @@ along with this program. If not, see . package ripley import ( - "io" - "net/http" + "sync" "time" + + "github.com/valyala/fasthttp" ) -type Result struct { - StatusCode int `json:"statusCode"` - Latency time.Duration `json:"latency"` - Request *request `json:"request"` - ErrorMsg string `json:"error"` +type HttpClientsPool struct { + pool sync.Map } -func startClientWorkers(numWorkers int, requests <-chan *request, results chan<- *Result, dryRun bool, timeout int) { - client := &http.Client{ - Timeout: time.Duration(timeout) * time.Second, - CheckRedirect: func(req *http.Request, via []*http.Request) error { - return http.ErrUseLastResponse - }, - } +var httpClientsPool HttpClientsPool + +func startClientWorkers(opts *Options, requests chan *Request, results chan *Result, slowestResults *SlowestResults, metricsRequestReceived chan<- bool) { + go metricMeasureChannelCapacityAndLengh(requests, results) + go handleResult(opts, results, slowestResults) + go metricsServer(opts, metricsRequestReceived) - for i := 0; i <= numWorkers; i++ { - go doHttpRequest(client, requests, results, dryRun) + for i := 0; i < opts.NumWorkers; i++ { + go doHttpRequest(opts, requests, results) } } -func doHttpRequest(client *http.Client, requests <-chan *request, results chan<- *Result, dryRun bool) { - for req := range requests { - latencyStart := time.Now() - - if dryRun { - sendResult(req, &http.Response{}, latencyStart, "", results) - } else { - go func() { - httpReq, err := req.httpRequest() - - if err != nil { - sendResult(req, &http.Response{}, latencyStart, err.Error(), results) - return - } - - resp, err := client.Do(httpReq) - - if err != nil { - sendResult(req, &http.Response{}, latencyStart, err.Error(), results) - return - } - - _, err = io.ReadAll(resp.Body) - defer resp.Body.Close() - - if err != nil { - sendResult(req, &http.Response{}, latencyStart, err.Error(), results) - return - } - - sendResult(req, resp, latencyStart, "", results) - }() - } +func getOrCreateHttpClient(opts *Options, req *Request) (*fasthttp.HostClient, error) { + if val, ok := httpClientsPool.pool.Load(req.Address); ok { + return val.(*fasthttp.HostClient), nil } + + // If another goroutine has stored a value before us, + // use the stored value instead of the one we just created + val, _ := httpClientsPool.pool.LoadOrStore(req.Address, httpClientsPool.createHttpClient(opts, req)) + return val.(*fasthttp.HostClient), nil } -func sendResult(req *request, resp *http.Response, latencyStart time.Time, err string, results chan<- *Result) { - latency := time.Now().Sub(latencyStart) - results <- &Result{StatusCode: resp.StatusCode, Latency: latency, Request: req, ErrorMsg: err} +func (h *HttpClientsPool) createHttpClient(opts *Options, req *Request) interface{} { + return &fasthttp.HostClient{ + Addr: req.Address, + Name: "ripley", + MaxConns: opts.NumWorkers, + ReadBufferSize: 512 * 1024, + WriteBufferSize: 128 * 1024, + ConnPoolStrategy: fasthttp.LIFO, + IsTLS: req.IsTLS, + MaxConnWaitTimeout: time.Duration(opts.Timeout) * time.Second, + MaxConnDuration: time.Duration(opts.Timeout) * time.Second, + MaxIdleConnDuration: time.Duration(opts.Timeout) * time.Second, + ReadTimeout: time.Duration(opts.Timeout) * time.Second, + WriteTimeout: time.Duration(opts.Timeout) * time.Second, + Dial: CountingDialer(opts), + DisablePathNormalizing: true, + DisableHeaderNamesNormalizing: true, + } } diff --git a/pkg/dialer.go b/pkg/dialer.go new file mode 100644 index 0000000..0667b14 --- /dev/null +++ b/pkg/dialer.go @@ -0,0 +1,70 @@ +package ripley + +import ( + "net" + "time" + + "github.com/VictoriaMetrics/metrics" + "github.com/valyala/fasthttp" +) + +type CountingConnection struct { + net.Conn + failedConnections *metrics.Counter + openConnections *metrics.Counter + closedConnections *metrics.Counter + writeBytes *metrics.Counter + readBytes *metrics.Counter +} + +func CountingDialer(opts *Options) fasthttp.DialFunc { + return func(addr string) (net.Conn, error) { + failedConnections := getOrCreateFailedConnectionsCounter(addr) + openConnections := getOrCreateOpenConnectionsCounter(addr) + closedConnections := getOrCreateClosedConnectionsCounter(addr) + writeBytes := getOrCreateWriteBytesCounter(addr) + readBytes := getOrCreateReadBytesCounter(addr) + + tcpDialer := &fasthttp.TCPDialer{Concurrency: opts.NumWorkers, DNSCacheDuration: 24 * time.Hour} + conn, err := tcpDialer.DialTimeout(addr, time.Duration(opts.TimeoutConnection)*time.Second) + if err != nil { + failedConnections.Inc() + return nil, err + } + + openConnections.Inc() + return &CountingConnection{ + conn, + failedConnections, + openConnections, + closedConnections, + writeBytes, + readBytes, + }, nil + } +} + +func (c *CountingConnection) Close() error { + err := c.Conn.Close() + c.closedConnections.Inc() + return err +} + +func (c *CountingConnection) Read(b []byte) (n int, err error) { + n, err = c.Conn.Read(b) + if err == nil { + c.readBytes.Add(n) + } + + return +} + +func (c *CountingConnection) Write(b []byte) (n int, err error) { + n, err = c.Conn.Write(b) + + if err == nil { + c.writeBytes.Add(n) + } + + return +} diff --git a/pkg/metrics.go b/pkg/metrics.go new file mode 100644 index 0000000..6fd0f22 --- /dev/null +++ b/pkg/metrics.go @@ -0,0 +1,105 @@ +package ripley + +import ( + "fmt" + "net/http" + "time" + + _ "net/http/pprof" + + "github.com/VictoriaMetrics/metrics" +) + +const metricsDefaultSummaryWindow = 5 * time.Minute + +var metricsDefaultSummaryQuantiles = []float64{0.5, 0.9, 0.95, 0.99, 1} + +func metricsServer(opts *Options, metricsRequestReceived chan<- bool) { + if !opts.MetricsServerEnable { + return + } + + // Expose the registered metrics at `/metrics` path. + http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) { + metrics.WritePrometheus(w, true) + select { + case metricsRequestReceived <- true: + default: + } + }) + + if err := http.ListenAndServe(opts.MetricsServerAddr, nil); err != nil { + panic(err) + } +} + +func getOrCreatePacerPhaseTimeCounter(phase string) *metrics.Counter { + return metrics.GetOrCreateCounter(fmt.Sprintf(`pacer_phases{phase="%s"}`, phase)) +} + +func getOrCreateChannelLengthCounter(name string) *metrics.Counter { + return metrics.GetOrCreateCounter(fmt.Sprintf(`channel_length{channel="%s"}`, name)) +} + +func getOrCreateChannelCapacityCounter(name string) *metrics.Counter { + return metrics.GetOrCreateCounter(fmt.Sprintf(`channel_capacity{channel="%s"}`, name)) +} + +func getOrCreateRequestDurationSummary(addr string) *metrics.Summary { + return metrics.GetOrCreateSummaryExt(fmt.Sprintf(`requests_duration_seconds{addr="%s"}`, addr), metricsDefaultSummaryWindow, metricsDefaultSummaryQuantiles) +} + +func getOrCreateResponseCodeCounter(code int, addr string) *metrics.Counter { + return metrics.GetOrCreateCounter(fmt.Sprintf(`response_code{status="%d", addr="%s"}`, code, addr)) +} + +func getOrCreateFailedConnectionsCounter(addr string) *metrics.Counter { + return metrics.GetOrCreateCounter(fmt.Sprintf(`connections_failed{addr="%s"}`, addr)) +} + +func getOrCreateOpenConnectionsCounter(addr string) *metrics.Counter { + return metrics.GetOrCreateCounter(fmt.Sprintf(`connections_opened{addr="%s"}`, addr)) +} + +func getOrCreateClosedConnectionsCounter(addr string) *metrics.Counter { + return metrics.GetOrCreateCounter(fmt.Sprintf(`connections_closed{addr="%s"}`, addr)) +} + +func getOrCreateWriteBytesCounter(addr string) *metrics.Counter { + return metrics.GetOrCreateCounter(fmt.Sprintf(`connections_write_bytes{addr="%s"}`, addr)) +} + +func getOrCreateReadBytesCounter(addr string) *metrics.Counter { + return metrics.GetOrCreateCounter(fmt.Sprintf(`connections_read_bytes{addr="%s"}`, addr)) +} + +func updatePacerMetrics(p *phase) { + metrics_pacer_phases := getOrCreatePacerPhaseTimeCounter(fmt.Sprintf("%s@%.3f", p.duration, p.rate)) + metrics_pacer_phases.Set(uint64(time.Now().Unix())) +} + +func metricHandleResult(result *Result) { + requests_duration_seconds := getOrCreateRequestDurationSummary(result.Request.Address) + requests_duration_seconds.Update(result.Latency.Seconds()) + + response_code := getOrCreateResponseCodeCounter(result.StatusCode, result.Request.Address) + response_code.Inc() +} + +func metricMeasureChannelCapacityAndLengh(requests chan *Request, results chan *Result) { + ticker := time.Tick(time.Second) + + requests_channel_length := getOrCreateChannelLengthCounter("requests") + requests_channel_capacity := getOrCreateChannelCapacityCounter("requests") + + results_channel_length := getOrCreateChannelLengthCounter("results") + results_channel_capacity := getOrCreateChannelCapacityCounter("results") + + for range ticker { + requests_channel_length.Set(uint64(len(requests))) + requests_channel_capacity.Set(uint64(cap(requests))) + + results_channel_length.Set(uint64(len(results))) + results_channel_capacity.Set(uint64(cap(results))) + } +} diff --git a/pkg/pace.go b/pkg/pace.go index 15c2aaf..21ecf3f 100644 --- a/pkg/pace.go +++ b/pkg/pace.go @@ -21,6 +21,7 @@ package ripley import ( "strconv" "strings" + "sync" "time" ) @@ -28,6 +29,7 @@ type pacer struct { phases []*phase lastRequestTime time.Time done bool + mutex sync.RWMutex } type phase struct { @@ -35,7 +37,7 @@ type phase struct { rate float64 } -func newPacer(phasesStr string) (*pacer, error) { +func NewPacer(phasesStr string) (*pacer, error) { phases, err := parsePhases(phasesStr) if err != nil { @@ -47,22 +49,37 @@ func newPacer(phasesStr string) (*pacer, error) { func (p *pacer) start() { // Run a timer for the first phase's duration + updatePacerMetrics(p.phases[0]) + time.AfterFunc(p.phases[0].duration, p.onPhaseElapsed) } func (p *pacer) onPhaseElapsed() { // Pop phase + p.mutex.Lock() + defer p.mutex.Unlock() + p.phases = p.phases[1:] if len(p.phases) == 0 { p.done = true + updatePacerMetrics(&phase{duration: 0, rate: 0}) } else { + updatePacerMetrics(p.phases[0]) + // Create a timer with next phase time.AfterFunc(p.phases[0].duration, p.onPhaseElapsed) } } func (p *pacer) waitDuration(t time.Time) time.Duration { + p.mutex.RLock() + defer p.mutex.RUnlock() + // Need to check as time.AfterFunc updates phases lengh + if len(p.phases) == 0 { + return 0 + } + // If there are no more phases left, continue with the last phase's rate if p.lastRequestTime.IsZero() { p.lastRequestTime = t diff --git a/pkg/pace_test.go b/pkg/pace_test.go index 980bcef..9e0f119 100644 --- a/pkg/pace_test.go +++ b/pkg/pace_test.go @@ -74,7 +74,7 @@ func TestParseManyPhases(t *testing.T) { } func TestWaitDuration(t *testing.T) { - pacer, err := newPacer("30s@1") + pacer, err := NewPacer("30s@1") if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -97,7 +97,7 @@ func TestWaitDuration(t *testing.T) { } func TestWaitDuration5X(t *testing.T) { - pacer, err := newPacer("30s@10") + pacer, err := NewPacer("30s@10") if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -120,7 +120,7 @@ func TestWaitDuration5X(t *testing.T) { } func TestPacerDoneOnLastPhaseElapsed(t *testing.T) { - pacer, err := newPacer("30s@10") + pacer, err := NewPacer("30s@10") if err != nil { t.Fatalf("Unexpected error: %v", err) diff --git a/pkg/replay.go b/pkg/replay.go index e6cb08d..85f1140 100644 --- a/pkg/replay.go +++ b/pkg/replay.go @@ -20,97 +20,133 @@ package ripley import ( "bufio" - "encoding/json" "fmt" "os" + "os/signal" "sync" + "syscall" "time" + "unsafe" + + "github.com/VictoriaMetrics/metrics" ) -func Replay(phasesStr string, silent, dryRun bool, timeout int, strict bool, numWorkers int) int { +type Options struct { + Pace string + Silent bool + SilentHttpError bool + DryRun bool + Timeout int + TimeoutConnection int + Strict bool + Memprofile string + NumWorkers int + PrintStat bool + MetricsServerEnable bool + MetricsServerAddr string + PrintNSlowest int +} + +// Ensures we have handled all HTTP request results before exiting +var waitGroupResults sync.WaitGroup + +func Replay(opts *Options) int { // Default exit code var exitCode int = 0 - // Ensures we have handled all HTTP request results before exiting - var waitGroup sync.WaitGroup + var slowestResults = NewSlowestResults(opts) + var metricsRequestReceived = make(chan bool) + + // Print metrics and slowest results at the end + defer printStats(slowestResults, opts) // Send requests for the HTTP client workers to pick up on this channel - requests := make(chan *request) + var requests = make(chan *Request, opts.NumWorkers) defer close(requests) // HTTP client workers will send their results on this channel - results := make(chan *Result) + var results = make(chan *Result, opts.NumWorkers) defer close(results) // The pacer controls the rate of replay - pacer, err := newPacer(phasesStr) - + pacer, err := NewPacer(opts.Pace) if err != nil { panic(err) } // Read request JSONL input from STDIN - scanner := bufio.NewScanner(os.Stdin) + reader := bufio.NewReaderSize(os.Stdin, 1024*1024) + scanner := bufio.NewScanner(reader) // Start HTTP client goroutine pool - startClientWorkers(numWorkers, requests, results, dryRun, timeout) + startClientWorkers(opts, requests, results, slowestResults, metricsRequestReceived) pacer.start() + // Channel to handle interrupt signal + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, syscall.SIGINT, syscall.SIGTERM) + + // Print metrics and slowest results by signal + go func() { + <-interruptChan + printStats(slowestResults, opts) + os.Exit(exitCode) + }() + for scanner.Scan() { - req, err := unmarshalRequest(scanner.Bytes()) + if pacer.done { + break + } + waitGroupResults.Add(1) + + b := scanner.Bytes() + req, err := unmarshalRequest(&b) if err != nil { exitCode = 126 - result, _ := json.Marshal(Result{ - StatusCode: 0, + res := &Result{ + StatusCode: -2, Latency: 0, Request: req, ErrorMsg: fmt.Sprintf("%v", err), - }) - fmt.Println(string(result)) + } - if strict { + if opts.Strict { panic(err) } + + results <- res continue } - if pacer.done { - break - } + duration := pacer.waitDuration(req.Timestamp) + time.Sleep(duration) - // The pacer decides how long to wait between requests - waitDuration := pacer.waitDuration(req.Timestamp) - time.Sleep(waitDuration) requests <- req - waitGroup.Add(1) - - // Goroutine to handle the HTTP client result - go func() { - defer waitGroup.Done() + } - result := <-results + if err := scanner.Err(); err != nil { + panic(err) + } - // If there's a panic elsewhere, this channel can return nil - if result == nil { - return - } + waitGroupResults.Wait() - jsonResult, err := json.Marshal(result) + // Wait until the latest Prometheus metrics are received by exporter + select { + case <-metricsRequestReceived: + case <-time.After(time.Duration(2) * time.Second): + } - if err != nil { - panic(err) - } + return exitCode +} - if !silent { - fmt.Println(string(jsonResult)) - } - }() +func printStats(slowestResults *SlowestResults, opts *Options) { + if opts.PrintStat { + metrics.WritePrometheus(os.Stdout, true) } - - if scanner.Err() != nil { - panic(scanner.Err()) + for _, slowestResult := range slowestResults.results { + fmt.Println(slowestResult.toJson()) } +} - waitGroup.Wait() - - return exitCode +func b2s(b []byte) string { + return unsafe.String(unsafe.SliceData(b), len(b)) } diff --git a/pkg/request.go b/pkg/request.go index d798307..9ecceae 100644 --- a/pkg/request.go +++ b/pkg/request.go @@ -19,63 +19,97 @@ along with this program. If not, see . package ripley import ( - "bytes" - "encoding/json" "fmt" - "net/http" + "net/url" "time" + + "github.com/valyala/fasthttp" + "github.com/valyala/fastjson" ) var ( validMethods = [9]string{"GET", "HEAD", "POST", "PUT", "DELETE", "CONNECT", "OPTIONS", "TRACE", "PATCH"} ) -type request struct { - Method string `json:"method"` - Url string `json:"url"` - Body string `json:"body"` - Timestamp time.Time `json:"timestamp"` - Headers map[string]string `json:"headers"` +type Request struct { + Address string `json:"Address"` + IsTLS bool `json:"IsTLS"` + Method string `json:"Method"` + Url string `json:"Url"` + Body string `json:"Body"` + Timestamp time.Time `json:"Timestamp"` + Headers map[string]string `json:"Headers"` } -func (r *request) httpRequest() (*http.Request, error) { - req, err := http.NewRequest(r.Method, r.Url, bytes.NewReader([]byte(r.Body))) +func (r *Request) fasthttpRequest() *fasthttp.Request { + req := fasthttp.AcquireRequest() - if err != nil { - return nil, err - } + req.Header.SetMethod(r.Method) + req.SetRequestURI(r.Url) + req.SetBody([]byte(r.Body)) for k, v := range r.Headers { - req.Header.Add(k, v) + req.Header.Set(k, v) } - if host := req.Header.Get("Host"); host != "" { - req.Host = host + if host := req.Header.Peek("Host"); len(host) > 0 { + req.SetHost(b2s(host)) } - return req, nil + return req } -func unmarshalRequest(jsonRequest []byte) (*request, error) { - req := &request{} - err := json.Unmarshal(jsonRequest, &req) +func unmarshalRequest(jsonRequest *[]byte) (*Request, error) { + var p fastjson.Parser + v, err := p.ParseBytes(*jsonRequest) + if err != nil { + return &Request{}, err + } + + req := &Request{ + Method: b2s(v.GetStringBytes("method")), + Url: b2s(v.GetStringBytes("url")), + Body: b2s(v.GetStringBytes("body")), + Headers: make(map[string]string), + } + + if req.Url == "" { + return req, fmt.Errorf("missing required key: url") + } + // Parse URL + up, err := url.Parse(req.Url) if err != nil { return req, err } + req.Address = up.Host + if up.Port() == "" { + req.Address += ":80" + } + req.IsTLS = up.Scheme == "https" // Validate if !validMethod(req.Method) { return req, fmt.Errorf("Invalid method: %s", req.Method) } - if req.Url == "" { - return req, fmt.Errorf("Missing required key: url") + // Parse headers + headers := v.GetObject("headers") + headers.Visit(func(k []byte, v *fastjson.Value) { + req.Headers[b2s(k)] = b2s(v.GetStringBytes()) + }) + + timestampVal := v.GetStringBytes("timestamp") + if timestampVal == nil { + return req, fmt.Errorf("missing required key: timestamp") + } - if req.Timestamp.IsZero() { - return req, fmt.Errorf("Missing required key: timestamp") + timestamp, err := time.Parse(time.RFC3339Nano, b2s(timestampVal)) + if err != nil { + return req, fmt.Errorf("invalid timestamp: %v", timestamp) } + req.Timestamp = timestamp return req, nil } @@ -88,3 +122,27 @@ func validMethod(requestMethod string) bool { } return false } + +func doHttpRequest(opts *Options, requests <-chan *Request, results chan<- *Result) { + for req := range requests { + latencyStart := time.Now() + if opts.DryRun { + sendToResult(opts, req, &fasthttp.Response{}, latencyStart, nil, results) + } else { + httpReq := req.fasthttpRequest() + httpResp := fasthttp.AcquireResponse() + + client, err := getOrCreateHttpClient(opts, req) + if err != nil { + sendToResult(opts, req, httpResp, latencyStart, err, results) + return + } + + err = client.DoTimeout(httpReq, httpResp, time.Duration(opts.Timeout)*time.Second) + sendToResult(opts, req, httpResp, latencyStart, err, results) + + fasthttp.ReleaseRequest(httpReq) + fasthttp.ReleaseResponse(httpResp) + } + } +} diff --git a/pkg/request_test.go b/pkg/request_test.go index 44f26c2..b44e48e 100644 --- a/pkg/request_test.go +++ b/pkg/request_test.go @@ -23,18 +23,27 @@ import ( "time" ) +func TestParseInvalidJson(t *testing.T) { + jsonRequest := []byte(`{}}`) + _, err := unmarshalRequest(&jsonRequest) + + if err.Error() != `unexpected tail: "}"` { + t.Errorf(`err.Error() = %v; want "unexpected tail: "}""`, err.Error()) + } +} + func TestUnrmarshalInvalidMethod(t *testing.T) { - jsonRequest := `{"method": "WHAT"}` - _, err := unmarshalRequest([]byte(jsonRequest)) + jsonRequest := []byte(`{"method": "WHAT", "url": "http://example.com"}`) + _, err := unmarshalRequest(&jsonRequest) - if err.Error() != "Invalid method: WHAT" { - t.Errorf(`err.Error() = %v; want "Invalid method: WHAT"`, err.Error()) + if err.Error() != "invalid method: WHAT" { + t.Errorf(`err.Error() = %v; want "invalid method: WHAT"`, err.Error()) } } func TestUnrmarshalValid(t *testing.T) { - jsonRequest := `{"method": "GET", "url": "http://example.com", "timestamp": "2021-11-08T18:59:59.9Z"}` - req, err := unmarshalRequest([]byte(jsonRequest)) + jsonRequest := []byte(`{"method": "GET", "url": "http://example.com", "timestamp": "2021-11-08T18:59:59.9Z"}`) + req, err := unmarshalRequest(&jsonRequest) if err != nil { t.Errorf("err = %v; want nil", err) @@ -58,3 +67,54 @@ func TestUnrmarshalValid(t *testing.T) { t.Errorf("req.Timestamp = %v; want %v", req.Timestamp, expectedTime) } } + +func TestFasthttpRequest(t *testing.T) { + jsonRequest := []byte(` + { + "method": "GET", "url": "http://example.com", "body":"body", "timestamp": "2021-11-08T18:59:59.9Z", + "headers": {"Content-Type": "application/json", "Host": "example.net", "Cookies":"cookie=1234567890", "User-Agent":"Mozilla/5.0", "x-custom-header":"x-custom-header-value"} + } + `) + req, err := unmarshalRequest(&jsonRequest) + if err != nil { + t.Errorf("err = %v; want nil", err) + } + + fr := req.fasthttpRequest() + + if string(fr.Header.Method()) != "GET" { + t.Errorf("Method = %v; want GET", req.Method) + } + + if string(fr.Body()) != "body" { + t.Errorf("Body = %v; want body", req.Method) + } + + if string(fr.Header.ContentType()) != "application/json" { + t.Errorf("ContentType = %s; want application/json", fr.Header.ContentType()) + } + + if string(fr.Header.UserAgent()) != "Mozilla/5.0" { + t.Errorf("UserAgent = %s; want Mozilla/5.0", fr.Header.UserAgent()) + } + + if string(fr.Header.Peek("x-custom-header")) != "x-custom-header-value" { + t.Errorf("UserAgent = %s; want Mozilla/5.0", fr.Header.UserAgent()) + } + + if string(fr.Header.Peek("Host")) != "example.net" { + t.Errorf("Host = %s; want example.net", fr.Header.Peek("Host")) + } + + if string(fr.Host()) != "example.net" { + t.Errorf("Host = %s; want example.net", fr.Host()) + } + + if string(fr.Header.RequestURI()) != "http://example.com" { + t.Errorf("RequestURI = %s; want http://example.com", fr.Header.RequestURI()) + } + + if string(fr.RequestURI()) != "/" { + t.Errorf("RequestURI = %s; want /", fr.RequestURI()) + } +} diff --git a/pkg/result.go b/pkg/result.go new file mode 100644 index 0000000..ebd41b4 --- /dev/null +++ b/pkg/result.go @@ -0,0 +1,94 @@ +package ripley + +import ( + "encoding/json" + "fmt" + "os" + "time" + + "github.com/valyala/fasthttp" +) + +type Response struct { + StatusCode int `json:"StatusCode"` + Headers map[string][]string `json:"Headers"` + RAddr string `json:"RAddr"` + LAddr string `json:"LAddr"` +} + +type Result struct { + StatusCode int `json:"StatusCode"` + Latency time.Duration `json:"Latency"` + Request *Request `json:"Request"` + Response *Response `json:"Response"` + ErrorMsg string `json:"Error"` +} + +func (r *Result) toJson() string { + j, err := json.Marshal(r) + + if err != nil { + panic(err) + } + + return b2s(j) +} + +func sendToResult(opts *Options, req *Request, resp *fasthttp.Response, latencyStart time.Time, err error, results chan<- *Result) { + latency := time.Since(latencyStart) + + var errorMsg string + var raddr, laddr string + var statusCode int = resp.StatusCode() + + if err != nil { + statusCode = -1 + errorMsg = err.Error() + } + + respHeaders := make(map[string][]string) + resp.Header.VisitAll(func(key, value []byte) { + k := b2s(key) + v := b2s(value) + + respHeaders[k] = append(respHeaders[k], v) + }) + + if adr := resp.RemoteAddr(); adr != nil { + raddr = adr.String() + } + if adr := resp.LocalAddr(); adr != nil { + laddr = adr.String() + } + + results <- &Result{ + StatusCode: statusCode, + Latency: latency, + Request: req, + Response: &Response{ + StatusCode: statusCode, + Headers: respHeaders, + RAddr: raddr, + LAddr: laddr, + }, + ErrorMsg: errorMsg, + } +} + +// TODO: Consider rewriting the code to use a Result Broker with multi-channel and broadcast functionality in order to improve its scalability. +func handleResult(opts *Options, results <-chan *Result, slowestResults *SlowestResults) { + for result := range results { + metricHandleResult(result) + slowestResults.store(result) + + if !opts.Silent { + fmt.Println(result.toJson()) + } + + if !opts.SilentHttpError && result.StatusCode < 0 || (result.StatusCode >= 500 && result.StatusCode <= 599) { + fmt.Fprintln(os.Stderr, result.toJson()) + } + + waitGroupResults.Done() + } +} diff --git a/pkg/result_slowest_results.go b/pkg/result_slowest_results.go new file mode 100644 index 0000000..a9371fc --- /dev/null +++ b/pkg/result_slowest_results.go @@ -0,0 +1,29 @@ +package ripley + +import "sort" + +type SlowestResults struct { + results []Result + nSlowestResults int +} + +func (h *SlowestResults) store(result *Result) { + if h.nSlowestResults == 0 { + return + } + + h.results = append(h.results, *result) + if len(h.results) > h.nSlowestResults { + sort.Slice(h.results, func(i, j int) bool { + return h.results[i].Latency > h.results[j].Latency + }) + h.results = h.results[:h.nSlowestResults] + } +} + +func NewSlowestResults(opts *Options) *SlowestResults { + return &SlowestResults{ + nSlowestResults: opts.PrintNSlowest, + results: make([]Result, 0, opts.PrintNSlowest), + } +}