From 1ce7ef09f20e97ff358c1e3ff4c9d0c4d5cd05f9 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Fri, 14 Dec 2012 10:23:14 -0500 Subject: [PATCH] nsq_to_http: per handler logging information --- examples/nsq_to_http/nsq_to_http.go | 64 +++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/examples/nsq_to_http/nsq_to_http.go b/examples/nsq_to_http/nsq_to_http.go index d8c9e9c66..052ea834e 100644 --- a/examples/nsq_to_http/nsq_to_http.go +++ b/examples/nsq_to_http/nsq_to_http.go @@ -1,4 +1,5 @@ -// This is a client that writes out to a http endpoint +// This is an NSQ client that reads the specified topic/channel +// and performs HTTP requests (GET/POST) to the specified endpoints package main @@ -10,12 +11,15 @@ import ( "flag" "fmt" "log" + "math" "math/rand" "net/url" "os" "os/signal" + "sort" "strings" "syscall" + "time" ) const ( @@ -33,6 +37,7 @@ var ( roundRobin = flag.Bool("round-robin", false, "enable round robin mode") throttleFraction = flag.Float64("throttle-fraction", 1.0, "publish only a fraction of messages") httpTimeoutMs = flag.Int("http-timeout-ms", 20000, "timeout for HTTP connect/read/write (each)") + statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables") getAddrs = util.StringArray{} postAddrs = util.StringArray{} nsqdTCPAddrs = util.StringArray{} @@ -46,6 +51,20 @@ func init() { flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)") } +type Durations []time.Duration + +func (s Durations) Len() int { + return len(s) +} + +func (s Durations) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s Durations) Less(i, j int) bool { + return s[i] < s[j] +} + type Publisher interface { Publish(string, []byte) error } @@ -55,14 +74,21 @@ type PublishHandler struct { addresses util.StringArray counter uint64 mode int + reqs Durations + id int } func (ph *PublishHandler) HandleMessage(m *nsq.Message) error { - // skip messages if rand float is greater than throttle - // short-circuit to avoid needless calls to rand + var startTime time.Time + if *throttleFraction < 1.0 && rand.Float64() > *throttleFraction { return nil } + + if *statusEvery > 0 { + startTime = time.Now() + } + switch ph.mode { case ModeAll: for _, addr := range ph.addresses { @@ -80,9 +106,39 @@ func (ph *PublishHandler) HandleMessage(m *nsq.Message) error { ph.counter++ } + if *statusEvery > 0 { + duration := time.Now().Sub(startTime) + ph.reqs = append(ph.reqs, duration) + } + + if *statusEvery > 0 && len(ph.reqs) >= *statusEvery { + var total time.Duration + for _, v := range ph.reqs { + total += v + } + avgMs := (total.Seconds() * 1000) / float64(len(ph.reqs)) + + sort.Sort(ph.reqs) + p95Ms := percentile(95.0, ph.reqs, len(ph.reqs)).Seconds() * 1000 + p99Ms := percentile(99.0, ph.reqs, len(ph.reqs)).Seconds() * 1000 + + log.Printf("handler(%d): finished %d requests - 99th: %.02fms - 95th: %.02fms - avg: %.02fms", + ph.id, *statusEvery, p99Ms, p95Ms, avgMs) + + ph.reqs = ph.reqs[:0] + } + return nil } +func percentile(perc float64, arr []time.Duration, length int) time.Duration { + indexOfPerc := int(math.Ceil(((perc / 100.0) * float64(length)) + 0.5)) + if indexOfPerc >= length { + indexOfPerc = length - 1 + } + return arr[indexOfPerc] +} + type PostPublisher struct{} func (p *PostPublisher) Publish(addr string, msg []byte) error { @@ -187,6 +243,8 @@ func main() { Publisher: publisher, addresses: addresses, mode: mode, + reqs: make(Durations, 0, *statusEvery), + id: i, } r.AddHandler(handler) }