Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

collect metrics in a separate go routine #82

Merged
merged 3 commits into from
Jul 15, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 53 additions & 35 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ var (
bytesRcvdDesc *prometheus.Desc
)

const collectFrequency = 10 * time.Second

var collector *aggregatingCollector

func init() {
Expand Down Expand Up @@ -54,16 +56,18 @@ func init() {
type aggregatingCollector struct {
mutex sync.Mutex

highestID uint64
conns map[uint64] /* id */ *tracingConn
rtts prometheus.Histogram
connDurations prometheus.Histogram
highestID uint64
conns map[uint64] /* id */ *tracingConn
rtts prometheus.Histogram
connDurations prometheus.Histogram
segsSent, segsRcvd uint64
bytesSent, bytesRcvd uint64
}

var _ prometheus.Collector = &aggregatingCollector{}

func newAggregatingCollector() *aggregatingCollector {
return &aggregatingCollector{
c := &aggregatingCollector{
conns: make(map[uint64]*tracingConn),
rtts: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tcp_rtt",
Expand All @@ -76,6 +80,8 @@ func newAggregatingCollector() *aggregatingCollector {
Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks
}),
}
go c.cron()
return c
}

func (c *aggregatingCollector) AddConn(t *tracingConn) uint64 {
Expand Down Expand Up @@ -103,45 +109,57 @@ func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) {
}
}

func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
now := time.Now()
c.mutex.Lock()
var segsSent, segsRcvd uint64
var bytesSent, bytesRcvd uint64
for _, conn := range c.conns {
info, err := conn.getTCPInfo()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
c.closedConn(conn)
func (c *aggregatingCollector) cron() {
ticker := time.NewTicker(collectFrequency)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()

for now := range ticker.C {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
c.mutex.Lock()
c.segsSent = 0
c.segsRcvd = 0
c.bytesSent = 0
c.bytesRcvd = 0
for _, conn := range c.conns {
info, err := conn.getTCPInfo()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
c.closedConn(conn)
continue
}
log.Errorf("Failed to get TCP info: %s", err)
continue
}
Copy link

@aschmahmann aschmahmann Jul 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a problem that if we conn.getTCPInfo() errors differently from expected such that we'll never close the connection?

I've seen 2021-07-08T21:39:47.925-0400 ERROR tcp-tpt [email protected]/metrics.go:118 Failed to get TCP info: raw-control tcp 192.168.1.6:4001: getsockopt: not implemented on Windows so I'm concerned this could cause us problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it in my Windows VM, and I'm getting the same error. Should we disable metrics collection on Windows?

log.Errorf("Failed to get TCP info: %s", err)
continue
}
if hasSegmentCounter {
segsSent += getSegmentsSent(info)
segsRcvd += getSegmentsRcvd(info)
}
if hasByteCounter {
bytesSent += getBytesSent(info)
bytesRcvd += getBytesRcvd(info)
}
c.rtts.Observe(info.RTT.Seconds())
c.connDurations.Observe(now.Sub(conn.startTime).Seconds())
if info.State == tcpinfo.Closed {
c.closedConn(conn)
if hasSegmentCounter {
c.segsSent += getSegmentsSent(info)
c.segsRcvd += getSegmentsRcvd(info)
}
if hasByteCounter {
c.bytesSent += getBytesSent(info)
c.bytesRcvd += getBytesRcvd(info)
}
c.rtts.Observe(info.RTT.Seconds())
c.connDurations.Observe(now.Sub(conn.startTime).Seconds())
if info.State == tcpinfo.Closed {
c.closedConn(conn)
}
}
c.mutex.Unlock()
}
c.mutex.Unlock()
}

func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
c.mutex.Lock()
defer c.mutex.Unlock()

metrics <- c.rtts
metrics <- c.connDurations
if hasSegmentCounter {
segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(segsSent))
segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(c.segsSent))
if err != nil {
log.Errorf("creating tcp_sent_segments_total metric failed: %v", err)
return
}
segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(segsRcvd))
segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(c.segsRcvd))
if err != nil {
log.Errorf("creating tcp_rcvd_segments_total metric failed: %v", err)
return
Expand All @@ -150,12 +168,12 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
metrics <- segsRcvdMetric
}
if hasByteCounter {
bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(bytesSent))
bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(c.bytesSent))
if err != nil {
log.Errorf("creating tcp_sent_bytes metric failed: %v", err)
return
}
bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(bytesRcvd))
bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(c.bytesRcvd))
if err != nil {
log.Errorf("creating tcp_rcvd_bytes metric failed: %v", err)
return
Expand Down