From 79b9740ef99bd446a5d52881765b183354eb0cff Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 10 Jul 2021 11:32:51 -0500 Subject: [PATCH] collect metrics in a separate go routine --- metrics.go | 84 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 34 deletions(-) diff --git a/metrics.go b/metrics.go index 4db7271..b0326ce 100644 --- a/metrics.go +++ b/metrics.go @@ -20,6 +20,8 @@ var ( bytesRcvdDesc *prometheus.Desc ) +const collectFrequency = 10 * time.Second + var collector *aggregatingCollector func init() { @@ -54,10 +56,12 @@ func init() { type aggregatingCollector struct { mutex sync.Mutex - highestID uint64 - conns map[uint64] /* id */ *tracingConn - rtts prometheus.Histogram - connDurations prometheus.Histogram + highestID uint64 + conns map[uint64] /* id */ *tracingConn + rtts prometheus.Histogram + connDurations prometheus.Histogram + segsSent, segsRcvd uint64 + bytesSent, bytesRcvd uint64 } var _ prometheus.Collector = &aggregatingCollector{} @@ -103,45 +107,57 @@ func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) { } } -func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { - now := time.Now() - c.mutex.Lock() - var segsSent, segsRcvd uint64 - var bytesSent, bytesRcvd uint64 - for _, conn := range c.conns { - info, err := conn.getTCPInfo() - if err != nil { - if strings.Contains(err.Error(), "use of closed network connection") { - c.closedConn(conn) +func (c *aggregatingCollector) cron() { + ticker := time.NewTicker(collectFrequency) + defer ticker.Stop() + + for now := range ticker.C { + c.mutex.Lock() + c.segsSent = 0 + c.segsRcvd = 0 + c.bytesSent = 0 + c.bytesRcvd = 0 + for _, conn := range c.conns { + info, err := conn.getTCPInfo() + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { + c.closedConn(conn) + continue + } + log.Errorf("Failed to get TCP info: %s", err) continue } - log.Errorf("Failed to get TCP info: %s", err) - continue - } - if hasSegmentCounter { - segsSent += getSegmentsSent(info) - segsRcvd += getSegmentsRcvd(info) - } - if hasByteCounter { - bytesSent += getBytesSent(info) - bytesRcvd += getBytesRcvd(info) - } - c.rtts.Observe(info.RTT.Seconds()) - c.connDurations.Observe(now.Sub(conn.startTime).Seconds()) - if info.State == tcpinfo.Closed { - c.closedConn(conn) + if hasSegmentCounter { + c.segsSent += getSegmentsSent(info) + c.segsRcvd += getSegmentsRcvd(info) + } + if hasByteCounter { + c.bytesSent += getBytesSent(info) + c.bytesRcvd += getBytesRcvd(info) + } + c.rtts.Observe(info.RTT.Seconds()) + c.connDurations.Observe(now.Sub(conn.startTime).Seconds()) + if info.State == tcpinfo.Closed { + c.closedConn(conn) + } } + c.mutex.Unlock() } - c.mutex.Unlock() +} + +func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { + c.mutex.Lock() + defer c.mutex.Unlock() + metrics <- c.rtts metrics <- c.connDurations if hasSegmentCounter { - segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(segsSent)) + segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(c.segsSent)) if err != nil { log.Errorf("creating tcp_sent_segments_total metric failed: %v", err) return } - segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(segsRcvd)) + segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(c.segsRcvd)) if err != nil { log.Errorf("creating tcp_rcvd_segments_total metric failed: %v", err) return @@ -150,12 +166,12 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { metrics <- segsRcvdMetric } if hasByteCounter { - bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(bytesSent)) + bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(c.bytesSent)) if err != nil { log.Errorf("creating tcp_sent_bytes metric failed: %v", err) return } - bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(bytesRcvd)) + bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(c.bytesRcvd)) if err != nil { log.Errorf("creating tcp_rcvd_bytes metric failed: %v", err) return