Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: close connections on exit (without another map) #1262

Merged
merged 4 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bench/bench_reader/bench_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bufio"
"flag"
"fmt"
"log"
"net"
"runtime"
Expand Down Expand Up @@ -75,7 +76,9 @@ func subWorker(td time.Duration, workers int, tcpAddr string, topic string, chan
conn.Write(nsq.MagicV2)
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
ci := make(map[string]interface{})
ci["client_id"] = "test"
ci["client_id"] = "reader"
ci["hostname"] = "reader"
ci["user_agent"] = fmt.Sprintf("bench_reader/%s", nsq.VERSION)
cmd, _ := nsq.Identify(ci)
cmd.WriteTo(rw)
nsq.Subscribe(topic, channel).WriteTo(rw)
Expand Down
9 changes: 9 additions & 0 deletions bench/bench_writer/bench_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bufio"
"flag"
"fmt"
"log"
"net"
"runtime"
Expand Down Expand Up @@ -77,8 +78,16 @@ func pubWorker(td time.Duration, tcpAddr string, batchSize int, batch [][]byte,
}
conn.Write(nsq.MagicV2)
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
ci := make(map[string]interface{})
ci["client_id"] = "writer"
ci["hostname"] = "writer"
ci["user_agent"] = fmt.Sprintf("bench_writer/%s", nsq.VERSION)
cmd, _ := nsq.Identify(ci)
cmd.WriteTo(rw)
rdyChan <- 1
<-goChan
rw.Flush()
nsq.ReadResponse(rw)
var msgCount int64
endTime := time.Now().Add(td)
for {
Expand Down
7 changes: 6 additions & 1 deletion internal/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ import (
"net"
)

type Client interface {
Close() error
}

// Protocol describes the basic behavior of any protocol in the system
type Protocol interface {
IOLoop(conn net.Conn) error
NewClient(net.Conn) Client
IOLoop(Client) error
}

// SendResponse is a server side utility function to prefix data with a length header
Expand Down
2 changes: 1 addition & 1 deletion nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Consumer interface {
Pause()
Close() error
TimedOutMessage()
Stats() ClientStats
Stats(string) ClientStats
Empty()
}

Expand Down
4 changes: 2 additions & 2 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ func TestChannelEmptyConsumer(t *testing.T) {
}

for _, cl := range channel.clients {
stats := cl.Stats()
stats := cl.Stats("").(ClientV2Stats)
test.Equal(t, int64(25), stats.InFlightCount)
}

channel.Empty()

for _, cl := range channel.clients {
stats := cl.Stats()
stats := cl.Stats("").(ClientV2Stats)
test.Equal(t, int64(0), stats.InFlightCount)
}
}
Expand Down
98 changes: 89 additions & 9 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/tls"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -47,6 +48,78 @@ type identifyEvent struct {
MsgTimeout time.Duration
}

type PubCount struct {
Topic string `json:"topic"`
Count uint64 `json:"count"`
}

type ClientV2Stats struct {
ClientID string `json:"client_id"`
Hostname string `json:"hostname"`
Version string `json:"version"`
RemoteAddress string `json:"remote_address"`
State int32 `json:"state"`
ReadyCount int64 `json:"ready_count"`
InFlightCount int64 `json:"in_flight_count"`
MessageCount uint64 `json:"message_count"`
FinishCount uint64 `json:"finish_count"`
RequeueCount uint64 `json:"requeue_count"`
ConnectTime int64 `json:"connect_ts"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
UserAgent string `json:"user_agent"`
Authed bool `json:"authed,omitempty"`
AuthIdentity string `json:"auth_identity,omitempty"`
AuthIdentityURL string `json:"auth_identity_url,omitempty"`

PubCounts []PubCount `json:"pub_counts,omitempty"`

TLS bool `json:"tls"`
CipherSuite string `json:"tls_cipher_suite"`
TLSVersion string `json:"tls_version"`
TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"`
TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"`
}

func (s ClientV2Stats) String() string {
connectTime := time.Unix(s.ConnectTime, 0)
duration := time.Since(connectTime).Truncate(time.Second)

_, port, _ := net.SplitHostPort(s.RemoteAddress)
id := fmt.Sprintf("%s:%s %s", s.Hostname, port, s.UserAgent)

// producer
if len(s.PubCounts) > 0 {
var total uint64
var topicOut []string
for _, v := range s.PubCounts {
total += v.Count
topicOut = append(topicOut, fmt.Sprintf("%s=%d", v.Topic, v.Count))
}
return fmt.Sprintf("[%s %-21s] msgs: %-8d topics: %s connected: %s",
s.Version,
id,
total,
strings.Join(topicOut, ","),
duration,
)
}

// consumer
return fmt.Sprintf("[%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s",
s.Version,
id,
s.State,
s.InFlightCount,
s.ReadyCount,
s.FinishCount,
s.RequeueCount,
s.MessageCount,
duration,
)
}

type clientV2 struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
ReadyCount int64
Expand Down Expand Up @@ -154,6 +227,16 @@ func (c *clientV2) String() string {
return c.RemoteAddr().String()
}

func (c *clientV2) Type() int {
c.metaLock.RLock()
hasPublished := len(c.pubCounts) > 0
c.metaLock.RUnlock()
if hasPublished {
return typeProducer
}
return typeConsumer
}

func (c *clientV2) Identify(data identifyDataV2) error {
c.nsqd.logf(LOG_INFO, "[%s] IDENTIFY: %+v", c, data)

Expand Down Expand Up @@ -199,7 +282,7 @@ func (c *clientV2) Identify(data identifyDataV2) error {
return nil
}

func (c *clientV2) Stats() ClientStats {
func (c *clientV2) Stats(topicName string) ClientStats {
c.metaLock.RLock()
clientID := c.ClientID
hostname := c.Hostname
Expand All @@ -212,13 +295,17 @@ func (c *clientV2) Stats() ClientStats {
}
pubCounts := make([]PubCount, 0, len(c.pubCounts))
for topic, count := range c.pubCounts {
if len(topicName) > 0 && topic != topicName {
continue
}
pubCounts = append(pubCounts, PubCount{
Topic: topic,
Count: count,
})
break
}
c.metaLock.RUnlock()
stats := ClientStats{
stats := ClientV2Stats{
Version: "V2",
RemoteAddress: c.RemoteAddr().String(),
ClientID: clientID,
Expand Down Expand Up @@ -250,13 +337,6 @@ func (c *clientV2) Stats() ClientStats {
return stats
}

func (c *clientV2) IsProducer() bool {
c.metaLock.RLock()
retval := len(c.pubCounts) > 0
c.metaLock.RUnlock()
return retval
}

// struct to convert from integers to the human readable strings
type prettyConnectionState struct {
tls.ConnectionState
Expand Down
100 changes: 11 additions & 89 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,6 @@ func (s *httpServer) doPauseChannel(w http.ResponseWriter, req *http.Request, ps
}

func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
var producerStats []ClientStats

reqParams, err := http_api.NewReqParams(req)
if err != nil {
s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
Expand All @@ -493,81 +491,36 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro
if !ok {
includeMem = true
}
if includeClients {
producerStats = s.nsqd.GetProducerStats()
}

stats := s.nsqd.GetStats(topicName, channelName, includeClients)
health := s.nsqd.GetHealth()
startTime := s.nsqd.GetStartTime()
uptime := time.Since(startTime)

// filter by topic (if specified)
if len(topicName) > 0 {
for _, topicStats := range stats {
if topicStats.TopicName == topicName {
// filter by channel (if specified)
if len(channelName) > 0 {
for _, channelStats := range topicStats.Channels {
if channelStats.ChannelName == channelName {
topicStats.Channels = []ChannelStats{channelStats}
break
}
}
}
stats = []TopicStats{topicStats}
break
}
}

filteredProducerStats := make([]ClientStats, 0)
for _, clientStat := range producerStats {
var found bool
var count uint64
for _, v := range clientStat.PubCounts {
if v.Topic == topicName {
count = v.Count
found = true
break
}
}
if !found {
continue
}
clientStat.PubCounts = []PubCount{PubCount{
Topic: topicName,
Count: count,
}}
filteredProducerStats = append(filteredProducerStats, clientStat)
}
producerStats = filteredProducerStats
}

var ms *memStats
if includeMem {
m := getMemStats()
ms = &m
}
if !jsonFormat {
return s.printStats(stats, producerStats, ms, health, startTime, uptime), nil
return s.printStats(stats, ms, health, startTime, uptime), nil
}

// TODO: should producer stats be hung off topics?
return struct {
Version string `json:"version"`
Health string `json:"health"`
StartTime int64 `json:"start_time"`
Topics []TopicStats `json:"topics"`
Memory *memStats `json:"memory,omitempty"`
Producers []ClientStats `json:"producers"`
}{version.Binary, health, startTime.Unix(), stats, ms, producerStats}, nil
}{version.Binary, health, startTime.Unix(), stats.Topics, ms, stats.Producers}, nil
}

func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats, ms *memStats, health string, startTime time.Time, uptime time.Duration) []byte {
func (s *httpServer) printStats(stats Stats, ms *memStats, health string, startTime time.Time, uptime time.Duration) []byte {
var buf bytes.Buffer
w := &buf

now := time.Now()

fmt.Fprintf(w, "%s\n", version.String("nsqd"))
fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339))
fmt.Fprintf(w, "uptime %s\n", uptime)
Expand All @@ -587,13 +540,13 @@ func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats,
fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns)
}

if len(stats) == 0 {
if len(stats.Topics) == 0 {
fmt.Fprintf(w, "\nTopics: None\n")
} else {
fmt.Fprintf(w, "\nTopics:")
}

for _, t := range stats {
for _, t := range stats.Topics {
var pausedPrefix string
if t.Paused {
pausedPrefix = "*P "
Expand Down Expand Up @@ -627,48 +580,17 @@ func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats,
c.E2eProcessingLatency,
)
for _, client := range c.Clients {
connectTime := time.Unix(client.ConnectTime, 0)
// truncate to the second
duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second
fmt.Fprintf(w, " [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n",
client.Version,
client.ClientID,
client.State,
client.InFlightCount,
client.ReadyCount,
client.FinishCount,
client.RequeueCount,
client.MessageCount,
duration,
)
fmt.Fprintf(w, " %s\n", client)
}
}
}

if len(producerStats) == 0 {
if len(stats.Producers) == 0 {
fmt.Fprintf(w, "\nProducers: None\n")
} else {
fmt.Fprintf(w, "\nProducers:")
for _, client := range producerStats {
connectTime := time.Unix(client.ConnectTime, 0)
// truncate to the second
duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second
var totalPubCount uint64
for _, v := range client.PubCounts {
totalPubCount += v.Count
}
fmt.Fprintf(w, "\n [%s %-21s] msgs: %-8d connected: %s\n",
client.Version,
client.ClientID,
totalPubCount,
duration,
)
for _, v := range client.PubCounts {
fmt.Fprintf(w, " [%-15s] msgs: %-8d\n",
v.Topic,
v.Count,
)
}
fmt.Fprintf(w, "\nProducers:\n")
for _, client := range stats.Producers {
fmt.Fprintf(w, " %s\n", client)
}
}

Expand Down
Loading