diff --git a/Dockerfile b/Dockerfile
index 7f10616..be615ad 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,6 +1,21 @@
+FROM golang:1.20-alpine as build
+
+ADD . /src/
+
+RUN cd /src && \
+ go mod download && \
+ go build -o /ripley . && \
+ go build -o /dummyweb etc/dummyweb.go && \
+ go build -o /linegen etc/linegen.go
+
+##################################
# Start fresh from a smaller image
-FROM alpine:3.15.0
+##################################
+FROM alpine:latest
RUN apk add ca-certificates
-COPY ripley /usr/bin/ripley
-ENTRYPOINT ["/usr/bin/ripley"]
\ No newline at end of file
+COPY --from=build /ripley /usr/bin/ripley
+COPY --from=build /linegen /usr/bin/linegen
+COPY --from=build /dummyweb /usr/bin/dummyweb
+
+ENTRYPOINT ["/usr/bin/ripley"]
diff --git a/README.md b/README.md
index db15c8e..db4b659 100644
--- a/README.md
+++ b/README.md
@@ -129,3 +129,11 @@ cat etc/requests.jsonl | ./ripley -pace "30s@1" -dry-run
```bash
go test pkg/*.go
```
+
+## Memory profiling
+
+```
+./ripley -metricsServerEnable -printStat -memprofile dist/mem.pprof -pace 1m@10 -workers 100 < etc/requests.jsonl
+
+go tool pprof --alloc_objects ripley dist/mem.pprof
+```bash
diff --git a/etc/dummyweb.go b/etc/dummyweb.go
index b05898f..04a8e99 100644
--- a/etc/dummyweb.go
+++ b/etc/dummyweb.go
@@ -1,23 +1,100 @@
package main
import (
+ "encoding/json"
+ "flag"
+ "fmt"
"log"
- "net/http"
- "net/http/httputil"
+ "os"
+ "os/signal"
+ "syscall"
"time"
+
+ "github.com/valyala/fasthttp"
+ "github.com/valyala/fasthttp/reuseport"
)
-func handler(w http.ResponseWriter, r *http.Request) {
- dump, err := httputil.DumpRequest(r, true)
+var silent bool
+
+func main() {
+ flag.BoolVar(&silent, "silent", false, "whether to silence output")
+ addr := flag.String("addr", "localhost:8080", "server listen address. Default: localhost:8080")
+ flag.Parse()
+
+ // Create a new listener on the given address using port reuse
+ ln, err := reuseport.Listen("tcp4", *addr)
if err != nil {
- panic(err)
+ log.Fatalf("error creating listener: %v", err)
}
+ defer ln.Close()
- log.Printf("%v: %s\n", time.Now().Format(time.UnixDate), string(dump))
- w.Write([]byte("hi\n"))
+ // Create a new fasthttp server
+ server := &fasthttp.Server{
+ TCPKeepalive: true,
+ LogAllErrors: true,
+ ReadBufferSize: 1024 * 1024,
+ WriteBufferSize: 1024 * 1024,
+ ReadTimeout: 90 * time.Second,
+ WriteTimeout: 5 * time.Second,
+ Handler: requestHandler,
+ }
+
+ // Start the server in a goroutine
+ go func() {
+ if err := server.Serve(ln); err != nil {
+ log.Fatalf("error starting server: %v", err)
+ }
+ }()
+
+ // Wait for a signal to stop the server
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
+ <-sig
+
+ // Stop the server
+ server.Shutdown()
}
-func main() {
- http.HandleFunc("/", handler)
- log.Fatal(http.ListenAndServe(":8080", nil))
+func requestToJSON(req *fasthttp.Request) ([]byte, error) {
+ type requestJSON struct {
+ URI string `json:"uri"`
+ Method string `json:"method"`
+ Headers map[string]string `json:"headers"`
+ ContentType string `json:"content_type"`
+ Body string `json:"body"`
+ }
+
+ // Get the request URI, method, headers, content type, and body
+ uri := string(req.URI().FullURI())
+ method := string(req.Header.Method())
+ headers := make(map[string]string)
+ req.Header.VisitAll(func(k, v []byte) {
+ headers[string(k)] = string(v)
+ })
+ contentType := string(req.Header.ContentType())
+ body := string(req.Body())
+
+ // Create a requestJSON struct and marshal it to JSON
+ reqJSON := &requestJSON{
+ URI: uri,
+ Method: method,
+ Headers: headers,
+ ContentType: contentType,
+ Body: body,
+ }
+ return json.Marshal(reqJSON)
+}
+
+func requestHandler(ctx *fasthttp.RequestCtx) {
+ jsonData, _ := requestToJSON(&ctx.Request)
+
+ if !silent {
+ fmt.Println(string(jsonData))
+ }
+
+ ctx.SetContentType("application/json")
+ ctx.Response.Header.SetContentLength(len(jsonData))
+ // ctx.Response.Header.Set("Connection", "keep-alive")
+ ctx.SetStatusCode(fasthttp.StatusOK)
+ ctx.Write(jsonData)
}
diff --git a/etc/linegen.go b/etc/linegen.go
new file mode 100644
index 0000000..bf399bd
--- /dev/null
+++ b/etc/linegen.go
@@ -0,0 +1,29 @@
+package main
+
+import (
+ "bufio"
+ "flag"
+ "fmt"
+ "os"
+ "time"
+)
+
+func main() {
+ rate := flag.Int("rate", 10, "lines per second")
+ flag.Parse()
+
+ tickDuration := time.Duration((1000000 / (*rate))) * time.Microsecond
+
+ ticker := time.NewTicker(tickDuration)
+ defer ticker.Stop()
+
+ scanner := bufio.NewScanner(os.Stdin)
+ if !scanner.Scan() {
+ return
+ }
+ line := scanner.Text()
+
+ for range ticker.C {
+ fmt.Println(line)
+ }
+}
diff --git a/etc/requests.jsonl b/etc/requests.jsonl
index 165da08..1fd1d8f 100644
--- a/etc/requests.jsonl
+++ b/etc/requests.jsonl
@@ -1,15 +1,12 @@
-{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:50.9Z"}
-{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:51.9Z"}
-{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:52.9Z"}
-{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:53.9Z"}
-{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:54.9Z"}
-{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:55.9Z"}
-{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:56.9Z"}
-{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T18:59:57.9Z"}
-{"url": "http://localhost:8080/", "method": "POST", "body": "{\"foo\": \"bar\"}", "headers": {"Accept": "text/plain"}, "timestamp": "2021-11-08T18:59:58.9Z"}
+{"url": "http://localhost:8080/localhost:8080", "method": "GET", "timestamp": "2021-11-08T18:59:50.9Z", "headers": {"Content-Type": "application/json", "Host": "example.net", "Cookies":"cookie=1234567890", "User-Agent":"Mozilla/5.0", "x-custom-header":"x-custom-header-value"}}
+{"url": "http://localhost:8081/localhost:8081", "method": "GET", "timestamp": "2021-11-08T18:59:50.9Z", "headers": {"Content-Type": "application/json", "Host": "example.net", "Cookies":"cookie=1234567890", "User-Agent":"Mozilla/5.0", "x-custom-header":"x-custom-header-value"}}
+{"url": "http://localhost:8080/localhost:8080", "method": "POST", "body": "{\"foo\": \"bar\"}", "headers": {"Accept": "text/plain"}, "timestamp": "2021-11-08T18:59:58.9Z"}
+{"url": "http://localhost:8081/localhost:8081", "method": "POST", "body": "{\"foo\": \"bar\"}", "headers": {"Accept": "text/plain"}, "timestamp": "2021-11-08T18:59:58.9Z"}
+{"url": "http://localhost:8081/", "method": "GET", "headers": {"Accept": "text/plain"}, "timestamp": "2021-11-08T18:59:59.9Z"}
{"url": "http://localhost:8080/", "method": "GET", "headers": {"Accept": "text/plain"}, "timestamp": "2021-11-08T18:59:59.9Z"}
-{"url": "http://localhost:8080/", "method": "HEAD", "timestamp": "2021-11-08T19:00:00.00Z"}
+{"url": "http://localhost:8080/", "method": "HEAD", "timestamp": "2021-11-08T19:00:00.00Z", "headers": {"Connection": "close" }}
{"url": "http://localhost:8080/", "method": "OPTIONS", "timestamp": "2021-11-08T19:00:00.01Z"}
{"url": "http://localhost:8080/", "method": "TRACE", "timestamp": "2021-11-08T19:00:00.02Z"}
+{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T19:00:00.04Z"}
+{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T19:00:00.04Z"}}}
{"url": "http://localhost:8080/", "method": "PROPFIND", "timestamp": "2021-11-08T19:00:00.03Z"}
-{"url": "http://localhost:8080/", "method": "GET", "timestamp": "2021-11-08T19:00:00.04Z"}
\ No newline at end of file
diff --git a/go.mod b/go.mod
index ed2334f..d6ac7f6 100644
--- a/go.mod
+++ b/go.mod
@@ -1,3 +1,20 @@
module github.com/loveholidays/ripley
-go 1.17
+go 1.20
+
+require (
+ github.com/VictoriaMetrics/metrics v1.23.1
+ github.com/montanaflynn/stats v0.7.0
+ github.com/valyala/fasthttp v1.45.0
+ github.com/valyala/fastjson v1.6.4
+)
+
+require (
+ github.com/andybalholm/brotli v1.0.5 // indirect
+ github.com/klauspost/compress v1.16.3 // indirect
+ github.com/valyala/bytebufferpool v1.0.0 // indirect
+ github.com/valyala/fastrand v1.1.0 // indirect
+ github.com/valyala/histogram v1.2.0 // indirect
+ github.com/valyala/tcplisten v1.0.0 // indirect
+ golang.org/x/sys v0.6.0 // indirect
+)
diff --git a/go.sum b/go.sum
index e69de29..805eb81 100644
--- a/go.sum
+++ b/go.sum
@@ -0,0 +1,22 @@
+github.com/VictoriaMetrics/metrics v1.23.1 h1:/j8DzeJBxSpL2qSIdqnRFLvQQhbJyJbbEi22yMm7oL0=
+github.com/VictoriaMetrics/metrics v1.23.1/go.mod h1:rAr/llLpEnAdTehiNlUxKgnjcOuROSzpw0GvjpEbvFc=
+github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
+github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
+github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY=
+github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
+github.com/montanaflynn/stats v0.7.0 h1:r3y12KyNxj/Sb/iOE46ws+3mS1+MZca1wlHQFPsY/JU=
+github.com/montanaflynn/stats v0.7.0/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
+github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
+github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
+github.com/valyala/fasthttp v1.45.0 h1:zPkkzpIn8tdHZUrVa6PzYd0i5verqiPSkgTd3bSUcpA=
+github.com/valyala/fasthttp v1.45.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA=
+github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
+github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
+github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
+github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
+github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
+github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
+github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
+github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
+golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
diff --git a/main.go b/main.go
index 1753070..c36a6b0 100644
--- a/main.go
+++ b/main.go
@@ -20,29 +20,49 @@ package main
import (
"flag"
+ "fmt"
+ _ "net/http/pprof"
"os"
+ "os/signal"
"runtime"
"runtime/pprof"
+ "syscall"
ripley "github.com/loveholidays/ripley/pkg"
)
func main() {
- paceStr := flag.String("pace", "10s@1", `[duration]@[rate], e.g. "1m@1 30s@1.5 1h@2"`)
- silent := flag.Bool("silent", false, "Suppress output")
- dryRun := flag.Bool("dry-run", false, "Consume input but do not send HTTP requests to targets")
- timeout := flag.Int("timeout", 10, "HTTP client timeout in seconds")
- strict := flag.Bool("strict", false, "Panic on bad input")
- memprofile := flag.String("memprofile", "", "Write memory profile to `file` before exit")
- numWorkers := flag.Int("workers", 1000, "Number of client workers to use")
+ var opts ripley.Options
+
+ flag.StringVar(&opts.Pace, "pace", "10s@1", `[duration]@[rate], e.g. "1m@1 30s@1.5 1h@2"`)
+ flag.BoolVar(&opts.Silent, "silent", false, "Suppress output")
+ flag.BoolVar(&opts.SilentHttpError, "silentHttpError", false, "Suppress HTTP errors (http codes 5xx) output")
+
+ flag.BoolVar(&opts.DryRun, "dry-run", false, "Consume input but do not send HTTP requests to targets")
+ flag.IntVar(&opts.Timeout, "timeout", 10, "HTTP client request timeout in seconds")
+ flag.IntVar(&opts.TimeoutConnection, "timeoutConnection", 3, "HTTP client connetion timeout in seconds")
+ flag.BoolVar(&opts.Strict, "strict", false, "Panic on bad input")
+ flag.StringVar(&opts.Memprofile, "memprofile", "", "Write memory profile to `file` before exit")
+ flag.IntVar(&opts.NumWorkers, "workers", 10, "Number of client workers to use")
+
+ flag.BoolVar(&opts.PrintStat, "printStat", false, "Print statistics to stdout at the end")
+ flag.BoolVar(&opts.MetricsServerEnable, "metricsServerEnable", false, "Enable metrics server. Server prometheus statistics on /metrics endpoint")
+ flag.StringVar(&opts.MetricsServerAddr, "metricsServerAddr", "0.0.0.0:8081", "Metrics server listen address")
+
+ flag.IntVar(&opts.PrintNSlowest, "printNSlowest", 0, "Print N slowest Requests")
+
+ flag.Usage = func() {
+ fmt.Fprintf(os.Stderr, "Usage: %s -target string\n", os.Args[0])
+ flag.PrintDefaults()
+ }
flag.Parse()
- exitCode := ripley.Replay(*paceStr, *silent, *dryRun, *timeout, *strict, *numWorkers)
+ exitCode := ripley.Replay(&opts)
defer os.Exit(exitCode)
- if *memprofile != "" {
- f, err := os.Create(*memprofile)
+ if opts.Memprofile != "" {
+ f, err := os.Create(opts.Memprofile)
if err != nil {
panic(err)
@@ -54,5 +74,10 @@ func main() {
if err := pprof.WriteHeapProfile(f); err != nil {
panic(err)
}
+
+ // Wait for a signal to stop the server
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
+ <-sig
}
}
diff --git a/pkg/client.go b/pkg/client.go
index b2cee5c..743707a 100644
--- a/pkg/client.go
+++ b/pkg/client.go
@@ -19,68 +19,55 @@ along with this program. If not, see .
package ripley
import (
- "io"
- "net/http"
+ "sync"
"time"
+
+ "github.com/valyala/fasthttp"
)
-type Result struct {
- StatusCode int `json:"statusCode"`
- Latency time.Duration `json:"latency"`
- Request *request `json:"request"`
- ErrorMsg string `json:"error"`
+type HttpClientsPool struct {
+ pool sync.Map
}
-func startClientWorkers(numWorkers int, requests <-chan *request, results chan<- *Result, dryRun bool, timeout int) {
- client := &http.Client{
- Timeout: time.Duration(timeout) * time.Second,
- CheckRedirect: func(req *http.Request, via []*http.Request) error {
- return http.ErrUseLastResponse
- },
- }
+var httpClientsPool HttpClientsPool
+
+func startClientWorkers(opts *Options, requests chan *Request, results chan *Result, slowestResults *SlowestResults, metricsRequestReceived chan<- bool) {
+ go metricMeasureChannelCapacityAndLengh(requests, results)
+ go handleResult(opts, results, slowestResults)
+ go metricsServer(opts, metricsRequestReceived)
- for i := 0; i <= numWorkers; i++ {
- go doHttpRequest(client, requests, results, dryRun)
+ for i := 0; i < opts.NumWorkers; i++ {
+ go doHttpRequest(opts, requests, results)
}
}
-func doHttpRequest(client *http.Client, requests <-chan *request, results chan<- *Result, dryRun bool) {
- for req := range requests {
- latencyStart := time.Now()
-
- if dryRun {
- sendResult(req, &http.Response{}, latencyStart, "", results)
- } else {
- go func() {
- httpReq, err := req.httpRequest()
-
- if err != nil {
- sendResult(req, &http.Response{}, latencyStart, err.Error(), results)
- return
- }
-
- resp, err := client.Do(httpReq)
-
- if err != nil {
- sendResult(req, &http.Response{}, latencyStart, err.Error(), results)
- return
- }
-
- _, err = io.ReadAll(resp.Body)
- defer resp.Body.Close()
-
- if err != nil {
- sendResult(req, &http.Response{}, latencyStart, err.Error(), results)
- return
- }
-
- sendResult(req, resp, latencyStart, "", results)
- }()
- }
+func getOrCreateHttpClient(opts *Options, req *Request) (*fasthttp.HostClient, error) {
+ if val, ok := httpClientsPool.pool.Load(req.Address); ok {
+ return val.(*fasthttp.HostClient), nil
}
+
+ // If another goroutine has stored a value before us,
+ // use the stored value instead of the one we just created
+ val, _ := httpClientsPool.pool.LoadOrStore(req.Address, httpClientsPool.createHttpClient(opts, req))
+ return val.(*fasthttp.HostClient), nil
}
-func sendResult(req *request, resp *http.Response, latencyStart time.Time, err string, results chan<- *Result) {
- latency := time.Now().Sub(latencyStart)
- results <- &Result{StatusCode: resp.StatusCode, Latency: latency, Request: req, ErrorMsg: err}
+func (h *HttpClientsPool) createHttpClient(opts *Options, req *Request) interface{} {
+ return &fasthttp.HostClient{
+ Addr: req.Address,
+ Name: "ripley",
+ MaxConns: opts.NumWorkers,
+ ReadBufferSize: 512 * 1024,
+ WriteBufferSize: 128 * 1024,
+ ConnPoolStrategy: fasthttp.LIFO,
+ IsTLS: req.IsTLS,
+ MaxConnWaitTimeout: time.Duration(opts.Timeout) * time.Second,
+ MaxConnDuration: time.Duration(opts.Timeout) * time.Second,
+ MaxIdleConnDuration: time.Duration(opts.Timeout) * time.Second,
+ ReadTimeout: time.Duration(opts.Timeout) * time.Second,
+ WriteTimeout: time.Duration(opts.Timeout) * time.Second,
+ Dial: CountingDialer(opts),
+ DisablePathNormalizing: true,
+ DisableHeaderNamesNormalizing: true,
+ }
}
diff --git a/pkg/dialer.go b/pkg/dialer.go
new file mode 100644
index 0000000..0667b14
--- /dev/null
+++ b/pkg/dialer.go
@@ -0,0 +1,70 @@
+package ripley
+
+import (
+ "net"
+ "time"
+
+ "github.com/VictoriaMetrics/metrics"
+ "github.com/valyala/fasthttp"
+)
+
+type CountingConnection struct {
+ net.Conn
+ failedConnections *metrics.Counter
+ openConnections *metrics.Counter
+ closedConnections *metrics.Counter
+ writeBytes *metrics.Counter
+ readBytes *metrics.Counter
+}
+
+func CountingDialer(opts *Options) fasthttp.DialFunc {
+ return func(addr string) (net.Conn, error) {
+ failedConnections := getOrCreateFailedConnectionsCounter(addr)
+ openConnections := getOrCreateOpenConnectionsCounter(addr)
+ closedConnections := getOrCreateClosedConnectionsCounter(addr)
+ writeBytes := getOrCreateWriteBytesCounter(addr)
+ readBytes := getOrCreateReadBytesCounter(addr)
+
+ tcpDialer := &fasthttp.TCPDialer{Concurrency: opts.NumWorkers, DNSCacheDuration: 24 * time.Hour}
+ conn, err := tcpDialer.DialTimeout(addr, time.Duration(opts.TimeoutConnection)*time.Second)
+ if err != nil {
+ failedConnections.Inc()
+ return nil, err
+ }
+
+ openConnections.Inc()
+ return &CountingConnection{
+ conn,
+ failedConnections,
+ openConnections,
+ closedConnections,
+ writeBytes,
+ readBytes,
+ }, nil
+ }
+}
+
+func (c *CountingConnection) Close() error {
+ err := c.Conn.Close()
+ c.closedConnections.Inc()
+ return err
+}
+
+func (c *CountingConnection) Read(b []byte) (n int, err error) {
+ n, err = c.Conn.Read(b)
+ if err == nil {
+ c.readBytes.Add(n)
+ }
+
+ return
+}
+
+func (c *CountingConnection) Write(b []byte) (n int, err error) {
+ n, err = c.Conn.Write(b)
+
+ if err == nil {
+ c.writeBytes.Add(n)
+ }
+
+ return
+}
diff --git a/pkg/metrics.go b/pkg/metrics.go
new file mode 100644
index 0000000..6fd0f22
--- /dev/null
+++ b/pkg/metrics.go
@@ -0,0 +1,105 @@
+package ripley
+
+import (
+ "fmt"
+ "net/http"
+ "time"
+
+ _ "net/http/pprof"
+
+ "github.com/VictoriaMetrics/metrics"
+)
+
+const metricsDefaultSummaryWindow = 5 * time.Minute
+
+var metricsDefaultSummaryQuantiles = []float64{0.5, 0.9, 0.95, 0.99, 1}
+
+func metricsServer(opts *Options, metricsRequestReceived chan<- bool) {
+ if !opts.MetricsServerEnable {
+ return
+ }
+
+ // Expose the registered metrics at `/metrics` path.
+ http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
+ metrics.WritePrometheus(w, true)
+ select {
+ case metricsRequestReceived <- true:
+ default:
+ }
+ })
+
+ if err := http.ListenAndServe(opts.MetricsServerAddr, nil); err != nil {
+ panic(err)
+ }
+}
+
+func getOrCreatePacerPhaseTimeCounter(phase string) *metrics.Counter {
+ return metrics.GetOrCreateCounter(fmt.Sprintf(`pacer_phases{phase="%s"}`, phase))
+}
+
+func getOrCreateChannelLengthCounter(name string) *metrics.Counter {
+ return metrics.GetOrCreateCounter(fmt.Sprintf(`channel_length{channel="%s"}`, name))
+}
+
+func getOrCreateChannelCapacityCounter(name string) *metrics.Counter {
+ return metrics.GetOrCreateCounter(fmt.Sprintf(`channel_capacity{channel="%s"}`, name))
+}
+
+func getOrCreateRequestDurationSummary(addr string) *metrics.Summary {
+ return metrics.GetOrCreateSummaryExt(fmt.Sprintf(`requests_duration_seconds{addr="%s"}`, addr), metricsDefaultSummaryWindow, metricsDefaultSummaryQuantiles)
+}
+
+func getOrCreateResponseCodeCounter(code int, addr string) *metrics.Counter {
+ return metrics.GetOrCreateCounter(fmt.Sprintf(`response_code{status="%d", addr="%s"}`, code, addr))
+}
+
+func getOrCreateFailedConnectionsCounter(addr string) *metrics.Counter {
+ return metrics.GetOrCreateCounter(fmt.Sprintf(`connections_failed{addr="%s"}`, addr))
+}
+
+func getOrCreateOpenConnectionsCounter(addr string) *metrics.Counter {
+ return metrics.GetOrCreateCounter(fmt.Sprintf(`connections_opened{addr="%s"}`, addr))
+}
+
+func getOrCreateClosedConnectionsCounter(addr string) *metrics.Counter {
+ return metrics.GetOrCreateCounter(fmt.Sprintf(`connections_closed{addr="%s"}`, addr))
+}
+
+func getOrCreateWriteBytesCounter(addr string) *metrics.Counter {
+ return metrics.GetOrCreateCounter(fmt.Sprintf(`connections_write_bytes{addr="%s"}`, addr))
+}
+
+func getOrCreateReadBytesCounter(addr string) *metrics.Counter {
+ return metrics.GetOrCreateCounter(fmt.Sprintf(`connections_read_bytes{addr="%s"}`, addr))
+}
+
+func updatePacerMetrics(p *phase) {
+ metrics_pacer_phases := getOrCreatePacerPhaseTimeCounter(fmt.Sprintf("%s@%.3f", p.duration, p.rate))
+ metrics_pacer_phases.Set(uint64(time.Now().Unix()))
+}
+
+func metricHandleResult(result *Result) {
+ requests_duration_seconds := getOrCreateRequestDurationSummary(result.Request.Address)
+ requests_duration_seconds.Update(result.Latency.Seconds())
+
+ response_code := getOrCreateResponseCodeCounter(result.StatusCode, result.Request.Address)
+ response_code.Inc()
+}
+
+func metricMeasureChannelCapacityAndLengh(requests chan *Request, results chan *Result) {
+ ticker := time.Tick(time.Second)
+
+ requests_channel_length := getOrCreateChannelLengthCounter("requests")
+ requests_channel_capacity := getOrCreateChannelCapacityCounter("requests")
+
+ results_channel_length := getOrCreateChannelLengthCounter("results")
+ results_channel_capacity := getOrCreateChannelCapacityCounter("results")
+
+ for range ticker {
+ requests_channel_length.Set(uint64(len(requests)))
+ requests_channel_capacity.Set(uint64(cap(requests)))
+
+ results_channel_length.Set(uint64(len(results)))
+ results_channel_capacity.Set(uint64(cap(results)))
+ }
+}
diff --git a/pkg/pace.go b/pkg/pace.go
index 15c2aaf..21ecf3f 100644
--- a/pkg/pace.go
+++ b/pkg/pace.go
@@ -21,6 +21,7 @@ package ripley
import (
"strconv"
"strings"
+ "sync"
"time"
)
@@ -28,6 +29,7 @@ type pacer struct {
phases []*phase
lastRequestTime time.Time
done bool
+ mutex sync.RWMutex
}
type phase struct {
@@ -35,7 +37,7 @@ type phase struct {
rate float64
}
-func newPacer(phasesStr string) (*pacer, error) {
+func NewPacer(phasesStr string) (*pacer, error) {
phases, err := parsePhases(phasesStr)
if err != nil {
@@ -47,22 +49,37 @@ func newPacer(phasesStr string) (*pacer, error) {
func (p *pacer) start() {
// Run a timer for the first phase's duration
+ updatePacerMetrics(p.phases[0])
+
time.AfterFunc(p.phases[0].duration, p.onPhaseElapsed)
}
func (p *pacer) onPhaseElapsed() {
// Pop phase
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
p.phases = p.phases[1:]
if len(p.phases) == 0 {
p.done = true
+ updatePacerMetrics(&phase{duration: 0, rate: 0})
} else {
+ updatePacerMetrics(p.phases[0])
+
// Create a timer with next phase
time.AfterFunc(p.phases[0].duration, p.onPhaseElapsed)
}
}
func (p *pacer) waitDuration(t time.Time) time.Duration {
+ p.mutex.RLock()
+ defer p.mutex.RUnlock()
+ // Need to check as time.AfterFunc updates phases lengh
+ if len(p.phases) == 0 {
+ return 0
+ }
+
// If there are no more phases left, continue with the last phase's rate
if p.lastRequestTime.IsZero() {
p.lastRequestTime = t
diff --git a/pkg/pace_test.go b/pkg/pace_test.go
index 980bcef..9e0f119 100644
--- a/pkg/pace_test.go
+++ b/pkg/pace_test.go
@@ -74,7 +74,7 @@ func TestParseManyPhases(t *testing.T) {
}
func TestWaitDuration(t *testing.T) {
- pacer, err := newPacer("30s@1")
+ pacer, err := NewPacer("30s@1")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -97,7 +97,7 @@ func TestWaitDuration(t *testing.T) {
}
func TestWaitDuration5X(t *testing.T) {
- pacer, err := newPacer("30s@10")
+ pacer, err := NewPacer("30s@10")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -120,7 +120,7 @@ func TestWaitDuration5X(t *testing.T) {
}
func TestPacerDoneOnLastPhaseElapsed(t *testing.T) {
- pacer, err := newPacer("30s@10")
+ pacer, err := NewPacer("30s@10")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
diff --git a/pkg/replay.go b/pkg/replay.go
index e6cb08d..85f1140 100644
--- a/pkg/replay.go
+++ b/pkg/replay.go
@@ -20,97 +20,133 @@ package ripley
import (
"bufio"
- "encoding/json"
"fmt"
"os"
+ "os/signal"
"sync"
+ "syscall"
"time"
+ "unsafe"
+
+ "github.com/VictoriaMetrics/metrics"
)
-func Replay(phasesStr string, silent, dryRun bool, timeout int, strict bool, numWorkers int) int {
+type Options struct {
+ Pace string
+ Silent bool
+ SilentHttpError bool
+ DryRun bool
+ Timeout int
+ TimeoutConnection int
+ Strict bool
+ Memprofile string
+ NumWorkers int
+ PrintStat bool
+ MetricsServerEnable bool
+ MetricsServerAddr string
+ PrintNSlowest int
+}
+
+// Ensures we have handled all HTTP request results before exiting
+var waitGroupResults sync.WaitGroup
+
+func Replay(opts *Options) int {
// Default exit code
var exitCode int = 0
- // Ensures we have handled all HTTP request results before exiting
- var waitGroup sync.WaitGroup
+ var slowestResults = NewSlowestResults(opts)
+ var metricsRequestReceived = make(chan bool)
+
+ // Print metrics and slowest results at the end
+ defer printStats(slowestResults, opts)
// Send requests for the HTTP client workers to pick up on this channel
- requests := make(chan *request)
+ var requests = make(chan *Request, opts.NumWorkers)
defer close(requests)
// HTTP client workers will send their results on this channel
- results := make(chan *Result)
+ var results = make(chan *Result, opts.NumWorkers)
defer close(results)
// The pacer controls the rate of replay
- pacer, err := newPacer(phasesStr)
-
+ pacer, err := NewPacer(opts.Pace)
if err != nil {
panic(err)
}
// Read request JSONL input from STDIN
- scanner := bufio.NewScanner(os.Stdin)
+ reader := bufio.NewReaderSize(os.Stdin, 1024*1024)
+ scanner := bufio.NewScanner(reader)
// Start HTTP client goroutine pool
- startClientWorkers(numWorkers, requests, results, dryRun, timeout)
+ startClientWorkers(opts, requests, results, slowestResults, metricsRequestReceived)
pacer.start()
+ // Channel to handle interrupt signal
+ interruptChan := make(chan os.Signal, 1)
+ signal.Notify(interruptChan, syscall.SIGINT, syscall.SIGTERM)
+
+ // Print metrics and slowest results by signal
+ go func() {
+ <-interruptChan
+ printStats(slowestResults, opts)
+ os.Exit(exitCode)
+ }()
+
for scanner.Scan() {
- req, err := unmarshalRequest(scanner.Bytes())
+ if pacer.done {
+ break
+ }
+ waitGroupResults.Add(1)
+
+ b := scanner.Bytes()
+ req, err := unmarshalRequest(&b)
if err != nil {
exitCode = 126
- result, _ := json.Marshal(Result{
- StatusCode: 0,
+ res := &Result{
+ StatusCode: -2,
Latency: 0,
Request: req,
ErrorMsg: fmt.Sprintf("%v", err),
- })
- fmt.Println(string(result))
+ }
- if strict {
+ if opts.Strict {
panic(err)
}
+
+ results <- res
continue
}
- if pacer.done {
- break
- }
+ duration := pacer.waitDuration(req.Timestamp)
+ time.Sleep(duration)
- // The pacer decides how long to wait between requests
- waitDuration := pacer.waitDuration(req.Timestamp)
- time.Sleep(waitDuration)
requests <- req
- waitGroup.Add(1)
-
- // Goroutine to handle the HTTP client result
- go func() {
- defer waitGroup.Done()
+ }
- result := <-results
+ if err := scanner.Err(); err != nil {
+ panic(err)
+ }
- // If there's a panic elsewhere, this channel can return nil
- if result == nil {
- return
- }
+ waitGroupResults.Wait()
- jsonResult, err := json.Marshal(result)
+ // Wait until the latest Prometheus metrics are received by exporter
+ select {
+ case <-metricsRequestReceived:
+ case <-time.After(time.Duration(2) * time.Second):
+ }
- if err != nil {
- panic(err)
- }
+ return exitCode
+}
- if !silent {
- fmt.Println(string(jsonResult))
- }
- }()
+func printStats(slowestResults *SlowestResults, opts *Options) {
+ if opts.PrintStat {
+ metrics.WritePrometheus(os.Stdout, true)
}
-
- if scanner.Err() != nil {
- panic(scanner.Err())
+ for _, slowestResult := range slowestResults.results {
+ fmt.Println(slowestResult.toJson())
}
+}
- waitGroup.Wait()
-
- return exitCode
+func b2s(b []byte) string {
+ return unsafe.String(unsafe.SliceData(b), len(b))
}
diff --git a/pkg/request.go b/pkg/request.go
index d798307..9ecceae 100644
--- a/pkg/request.go
+++ b/pkg/request.go
@@ -19,63 +19,97 @@ along with this program. If not, see .
package ripley
import (
- "bytes"
- "encoding/json"
"fmt"
- "net/http"
+ "net/url"
"time"
+
+ "github.com/valyala/fasthttp"
+ "github.com/valyala/fastjson"
)
var (
validMethods = [9]string{"GET", "HEAD", "POST", "PUT", "DELETE", "CONNECT", "OPTIONS", "TRACE", "PATCH"}
)
-type request struct {
- Method string `json:"method"`
- Url string `json:"url"`
- Body string `json:"body"`
- Timestamp time.Time `json:"timestamp"`
- Headers map[string]string `json:"headers"`
+type Request struct {
+ Address string `json:"Address"`
+ IsTLS bool `json:"IsTLS"`
+ Method string `json:"Method"`
+ Url string `json:"Url"`
+ Body string `json:"Body"`
+ Timestamp time.Time `json:"Timestamp"`
+ Headers map[string]string `json:"Headers"`
}
-func (r *request) httpRequest() (*http.Request, error) {
- req, err := http.NewRequest(r.Method, r.Url, bytes.NewReader([]byte(r.Body)))
+func (r *Request) fasthttpRequest() *fasthttp.Request {
+ req := fasthttp.AcquireRequest()
- if err != nil {
- return nil, err
- }
+ req.Header.SetMethod(r.Method)
+ req.SetRequestURI(r.Url)
+ req.SetBody([]byte(r.Body))
for k, v := range r.Headers {
- req.Header.Add(k, v)
+ req.Header.Set(k, v)
}
- if host := req.Header.Get("Host"); host != "" {
- req.Host = host
+ if host := req.Header.Peek("Host"); len(host) > 0 {
+ req.SetHost(b2s(host))
}
- return req, nil
+ return req
}
-func unmarshalRequest(jsonRequest []byte) (*request, error) {
- req := &request{}
- err := json.Unmarshal(jsonRequest, &req)
+func unmarshalRequest(jsonRequest *[]byte) (*Request, error) {
+ var p fastjson.Parser
+ v, err := p.ParseBytes(*jsonRequest)
+ if err != nil {
+ return &Request{}, err
+ }
+
+ req := &Request{
+ Method: b2s(v.GetStringBytes("method")),
+ Url: b2s(v.GetStringBytes("url")),
+ Body: b2s(v.GetStringBytes("body")),
+ Headers: make(map[string]string),
+ }
+
+ if req.Url == "" {
+ return req, fmt.Errorf("missing required key: url")
+ }
+ // Parse URL
+ up, err := url.Parse(req.Url)
if err != nil {
return req, err
}
+ req.Address = up.Host
+ if up.Port() == "" {
+ req.Address += ":80"
+ }
+ req.IsTLS = up.Scheme == "https"
// Validate
if !validMethod(req.Method) {
return req, fmt.Errorf("Invalid method: %s", req.Method)
}
- if req.Url == "" {
- return req, fmt.Errorf("Missing required key: url")
+ // Parse headers
+ headers := v.GetObject("headers")
+ headers.Visit(func(k []byte, v *fastjson.Value) {
+ req.Headers[b2s(k)] = b2s(v.GetStringBytes())
+ })
+
+ timestampVal := v.GetStringBytes("timestamp")
+ if timestampVal == nil {
+ return req, fmt.Errorf("missing required key: timestamp")
+
}
- if req.Timestamp.IsZero() {
- return req, fmt.Errorf("Missing required key: timestamp")
+ timestamp, err := time.Parse(time.RFC3339Nano, b2s(timestampVal))
+ if err != nil {
+ return req, fmt.Errorf("invalid timestamp: %v", timestamp)
}
+ req.Timestamp = timestamp
return req, nil
}
@@ -88,3 +122,27 @@ func validMethod(requestMethod string) bool {
}
return false
}
+
+func doHttpRequest(opts *Options, requests <-chan *Request, results chan<- *Result) {
+ for req := range requests {
+ latencyStart := time.Now()
+ if opts.DryRun {
+ sendToResult(opts, req, &fasthttp.Response{}, latencyStart, nil, results)
+ } else {
+ httpReq := req.fasthttpRequest()
+ httpResp := fasthttp.AcquireResponse()
+
+ client, err := getOrCreateHttpClient(opts, req)
+ if err != nil {
+ sendToResult(opts, req, httpResp, latencyStart, err, results)
+ return
+ }
+
+ err = client.DoTimeout(httpReq, httpResp, time.Duration(opts.Timeout)*time.Second)
+ sendToResult(opts, req, httpResp, latencyStart, err, results)
+
+ fasthttp.ReleaseRequest(httpReq)
+ fasthttp.ReleaseResponse(httpResp)
+ }
+ }
+}
diff --git a/pkg/request_test.go b/pkg/request_test.go
index 44f26c2..b44e48e 100644
--- a/pkg/request_test.go
+++ b/pkg/request_test.go
@@ -23,18 +23,27 @@ import (
"time"
)
+func TestParseInvalidJson(t *testing.T) {
+ jsonRequest := []byte(`{}}`)
+ _, err := unmarshalRequest(&jsonRequest)
+
+ if err.Error() != `unexpected tail: "}"` {
+ t.Errorf(`err.Error() = %v; want "unexpected tail: "}""`, err.Error())
+ }
+}
+
func TestUnrmarshalInvalidMethod(t *testing.T) {
- jsonRequest := `{"method": "WHAT"}`
- _, err := unmarshalRequest([]byte(jsonRequest))
+ jsonRequest := []byte(`{"method": "WHAT", "url": "http://example.com"}`)
+ _, err := unmarshalRequest(&jsonRequest)
- if err.Error() != "Invalid method: WHAT" {
- t.Errorf(`err.Error() = %v; want "Invalid method: WHAT"`, err.Error())
+ if err.Error() != "invalid method: WHAT" {
+ t.Errorf(`err.Error() = %v; want "invalid method: WHAT"`, err.Error())
}
}
func TestUnrmarshalValid(t *testing.T) {
- jsonRequest := `{"method": "GET", "url": "http://example.com", "timestamp": "2021-11-08T18:59:59.9Z"}`
- req, err := unmarshalRequest([]byte(jsonRequest))
+ jsonRequest := []byte(`{"method": "GET", "url": "http://example.com", "timestamp": "2021-11-08T18:59:59.9Z"}`)
+ req, err := unmarshalRequest(&jsonRequest)
if err != nil {
t.Errorf("err = %v; want nil", err)
@@ -58,3 +67,54 @@ func TestUnrmarshalValid(t *testing.T) {
t.Errorf("req.Timestamp = %v; want %v", req.Timestamp, expectedTime)
}
}
+
+func TestFasthttpRequest(t *testing.T) {
+ jsonRequest := []byte(`
+ {
+ "method": "GET", "url": "http://example.com", "body":"body", "timestamp": "2021-11-08T18:59:59.9Z",
+ "headers": {"Content-Type": "application/json", "Host": "example.net", "Cookies":"cookie=1234567890", "User-Agent":"Mozilla/5.0", "x-custom-header":"x-custom-header-value"}
+ }
+ `)
+ req, err := unmarshalRequest(&jsonRequest)
+ if err != nil {
+ t.Errorf("err = %v; want nil", err)
+ }
+
+ fr := req.fasthttpRequest()
+
+ if string(fr.Header.Method()) != "GET" {
+ t.Errorf("Method = %v; want GET", req.Method)
+ }
+
+ if string(fr.Body()) != "body" {
+ t.Errorf("Body = %v; want body", req.Method)
+ }
+
+ if string(fr.Header.ContentType()) != "application/json" {
+ t.Errorf("ContentType = %s; want application/json", fr.Header.ContentType())
+ }
+
+ if string(fr.Header.UserAgent()) != "Mozilla/5.0" {
+ t.Errorf("UserAgent = %s; want Mozilla/5.0", fr.Header.UserAgent())
+ }
+
+ if string(fr.Header.Peek("x-custom-header")) != "x-custom-header-value" {
+ t.Errorf("UserAgent = %s; want Mozilla/5.0", fr.Header.UserAgent())
+ }
+
+ if string(fr.Header.Peek("Host")) != "example.net" {
+ t.Errorf("Host = %s; want example.net", fr.Header.Peek("Host"))
+ }
+
+ if string(fr.Host()) != "example.net" {
+ t.Errorf("Host = %s; want example.net", fr.Host())
+ }
+
+ if string(fr.Header.RequestURI()) != "http://example.com" {
+ t.Errorf("RequestURI = %s; want http://example.com", fr.Header.RequestURI())
+ }
+
+ if string(fr.RequestURI()) != "/" {
+ t.Errorf("RequestURI = %s; want /", fr.RequestURI())
+ }
+}
diff --git a/pkg/result.go b/pkg/result.go
new file mode 100644
index 0000000..ebd41b4
--- /dev/null
+++ b/pkg/result.go
@@ -0,0 +1,94 @@
+package ripley
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/valyala/fasthttp"
+)
+
+type Response struct {
+ StatusCode int `json:"StatusCode"`
+ Headers map[string][]string `json:"Headers"`
+ RAddr string `json:"RAddr"`
+ LAddr string `json:"LAddr"`
+}
+
+type Result struct {
+ StatusCode int `json:"StatusCode"`
+ Latency time.Duration `json:"Latency"`
+ Request *Request `json:"Request"`
+ Response *Response `json:"Response"`
+ ErrorMsg string `json:"Error"`
+}
+
+func (r *Result) toJson() string {
+ j, err := json.Marshal(r)
+
+ if err != nil {
+ panic(err)
+ }
+
+ return b2s(j)
+}
+
+func sendToResult(opts *Options, req *Request, resp *fasthttp.Response, latencyStart time.Time, err error, results chan<- *Result) {
+ latency := time.Since(latencyStart)
+
+ var errorMsg string
+ var raddr, laddr string
+ var statusCode int = resp.StatusCode()
+
+ if err != nil {
+ statusCode = -1
+ errorMsg = err.Error()
+ }
+
+ respHeaders := make(map[string][]string)
+ resp.Header.VisitAll(func(key, value []byte) {
+ k := b2s(key)
+ v := b2s(value)
+
+ respHeaders[k] = append(respHeaders[k], v)
+ })
+
+ if adr := resp.RemoteAddr(); adr != nil {
+ raddr = adr.String()
+ }
+ if adr := resp.LocalAddr(); adr != nil {
+ laddr = adr.String()
+ }
+
+ results <- &Result{
+ StatusCode: statusCode,
+ Latency: latency,
+ Request: req,
+ Response: &Response{
+ StatusCode: statusCode,
+ Headers: respHeaders,
+ RAddr: raddr,
+ LAddr: laddr,
+ },
+ ErrorMsg: errorMsg,
+ }
+}
+
+// TODO: Consider rewriting the code to use a Result Broker with multi-channel and broadcast functionality in order to improve its scalability.
+func handleResult(opts *Options, results <-chan *Result, slowestResults *SlowestResults) {
+ for result := range results {
+ metricHandleResult(result)
+ slowestResults.store(result)
+
+ if !opts.Silent {
+ fmt.Println(result.toJson())
+ }
+
+ if !opts.SilentHttpError && result.StatusCode < 0 || (result.StatusCode >= 500 && result.StatusCode <= 599) {
+ fmt.Fprintln(os.Stderr, result.toJson())
+ }
+
+ waitGroupResults.Done()
+ }
+}
diff --git a/pkg/result_slowest_results.go b/pkg/result_slowest_results.go
new file mode 100644
index 0000000..a9371fc
--- /dev/null
+++ b/pkg/result_slowest_results.go
@@ -0,0 +1,29 @@
+package ripley
+
+import "sort"
+
+type SlowestResults struct {
+ results []Result
+ nSlowestResults int
+}
+
+func (h *SlowestResults) store(result *Result) {
+ if h.nSlowestResults == 0 {
+ return
+ }
+
+ h.results = append(h.results, *result)
+ if len(h.results) > h.nSlowestResults {
+ sort.Slice(h.results, func(i, j int) bool {
+ return h.results[i].Latency > h.results[j].Latency
+ })
+ h.results = h.results[:h.nSlowestResults]
+ }
+}
+
+func NewSlowestResults(opts *Options) *SlowestResults {
+ return &SlowestResults{
+ nSlowestResults: opts.PrintNSlowest,
+ results: make([]Result, 0, opts.PrintNSlowest),
+ }
+}