From 544cba296f41754504ce1a0d649e431fc900b8e5 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 10 Jul 2021 11:32:51 -0500 Subject: [PATCH 1/3] collect metrics in a separate go routine --- metrics.go | 88 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 53 insertions(+), 35 deletions(-) diff --git a/metrics.go b/metrics.go index 4db7271..69a83e6 100644 --- a/metrics.go +++ b/metrics.go @@ -20,6 +20,8 @@ var ( bytesRcvdDesc *prometheus.Desc ) +const collectFrequency = 10 * time.Second + var collector *aggregatingCollector func init() { @@ -54,16 +56,18 @@ func init() { type aggregatingCollector struct { mutex sync.Mutex - highestID uint64 - conns map[uint64] /* id */ *tracingConn - rtts prometheus.Histogram - connDurations prometheus.Histogram + highestID uint64 + conns map[uint64] /* id */ *tracingConn + rtts prometheus.Histogram + connDurations prometheus.Histogram + segsSent, segsRcvd uint64 + bytesSent, bytesRcvd uint64 } var _ prometheus.Collector = &aggregatingCollector{} func newAggregatingCollector() *aggregatingCollector { - return &aggregatingCollector{ + c := &aggregatingCollector{ conns: make(map[uint64]*tracingConn), rtts: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "tcp_rtt", @@ -76,6 +80,8 @@ func newAggregatingCollector() *aggregatingCollector { Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks }), } + go c.cron() + return c } func (c *aggregatingCollector) AddConn(t *tracingConn) uint64 { @@ -103,45 +109,57 @@ func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) { } } -func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { - now := time.Now() - c.mutex.Lock() - var segsSent, segsRcvd uint64 - var bytesSent, bytesRcvd uint64 - for _, conn := range c.conns { - info, err := conn.getTCPInfo() - if err != nil { - if strings.Contains(err.Error(), "use of closed network connection") { - c.closedConn(conn) +func (c *aggregatingCollector) cron() { + ticker := time.NewTicker(collectFrequency) + defer ticker.Stop() + + for now := range ticker.C { + c.mutex.Lock() + c.segsSent = 0 + c.segsRcvd = 0 + c.bytesSent = 0 + c.bytesRcvd = 0 + for _, conn := range c.conns { + info, err := conn.getTCPInfo() + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { + c.closedConn(conn) + continue + } + log.Errorf("Failed to get TCP info: %s", err) continue } - log.Errorf("Failed to get TCP info: %s", err) - continue - } - if hasSegmentCounter { - segsSent += getSegmentsSent(info) - segsRcvd += getSegmentsRcvd(info) - } - if hasByteCounter { - bytesSent += getBytesSent(info) - bytesRcvd += getBytesRcvd(info) - } - c.rtts.Observe(info.RTT.Seconds()) - c.connDurations.Observe(now.Sub(conn.startTime).Seconds()) - if info.State == tcpinfo.Closed { - c.closedConn(conn) + if hasSegmentCounter { + c.segsSent += getSegmentsSent(info) + c.segsRcvd += getSegmentsRcvd(info) + } + if hasByteCounter { + c.bytesSent += getBytesSent(info) + c.bytesRcvd += getBytesRcvd(info) + } + c.rtts.Observe(info.RTT.Seconds()) + c.connDurations.Observe(now.Sub(conn.startTime).Seconds()) + if info.State == tcpinfo.Closed { + c.closedConn(conn) + } } + c.mutex.Unlock() } - c.mutex.Unlock() +} + +func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { + c.mutex.Lock() + defer c.mutex.Unlock() + metrics <- c.rtts metrics <- c.connDurations if hasSegmentCounter { - segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(segsSent)) + segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(c.segsSent)) if err != nil { log.Errorf("creating tcp_sent_segments_total metric failed: %v", err) return } - segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(segsRcvd)) + segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(c.segsRcvd)) if err != nil { log.Errorf("creating tcp_rcvd_segments_total metric failed: %v", err) return @@ -150,12 +168,12 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { metrics <- segsRcvdMetric } if hasByteCounter { - bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(bytesSent)) + bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(c.bytesSent)) if err != nil { log.Errorf("creating tcp_sent_bytes metric failed: %v", err) return } - bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(bytesRcvd)) + bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(c.bytesRcvd)) if err != nil { log.Errorf("creating tcp_rcvd_bytes metric failed: %v", err) return From 4fe80557bb8a57375b8c091c031745c8beb6eeb9 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 14 Jul 2021 19:15:36 -0400 Subject: [PATCH 2/3] use conn.Close() to remove closed connections from tracer --- metrics.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/metrics.go b/metrics.go index 69a83e6..ec3fbfd 100644 --- a/metrics.go +++ b/metrics.go @@ -123,7 +123,6 @@ func (c *aggregatingCollector) cron() { info, err := conn.getTCPInfo() if err != nil { if strings.Contains(err.Error(), "use of closed network connection") { - c.closedConn(conn) continue } log.Errorf("Failed to get TCP info: %s", err) @@ -139,9 +138,6 @@ func (c *aggregatingCollector) cron() { } c.rtts.Observe(info.RTT.Seconds()) c.connDurations.Observe(now.Sub(conn.startTime).Seconds()) - if info.State == tcpinfo.Closed { - c.closedConn(conn) - } } c.mutex.Unlock() } @@ -183,9 +179,11 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { } } -func (c *aggregatingCollector) closedConn(conn *tracingConn) { +func (c *aggregatingCollector) ClosedConn(conn *tracingConn, direction string) { + c.mutex.Lock() collector.removeConn(conn.id) - closedConns.WithLabelValues(conn.getDirection()).Inc() + c.mutex.Unlock() + closedConns.WithLabelValues(direction).Inc() } type tracingConn struct { @@ -222,6 +220,7 @@ func (c *tracingConn) getDirection() string { } func (c *tracingConn) Close() error { + collector.ClosedConn(c, c.getDirection()) return c.Conn.Close() } From d611dc78e1345005034c9ad50faf1c7704b133d8 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 14 Jul 2021 19:16:28 -0400 Subject: [PATCH 3/3] start collecting RTT and bandwidth metrics when Collect is calleed --- metrics.go | 61 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/metrics.go b/metrics.go index ec3fbfd..179978f 100644 --- a/metrics.go +++ b/metrics.go @@ -54,8 +54,9 @@ func init() { } type aggregatingCollector struct { - mutex sync.Mutex + cronOnce sync.Once + mutex sync.Mutex highestID uint64 conns map[uint64] /* id */ *tracingConn rtts prometheus.Histogram @@ -80,7 +81,6 @@ func newAggregatingCollector() *aggregatingCollector { Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks }), } - go c.cron() return c } @@ -114,36 +114,47 @@ func (c *aggregatingCollector) cron() { defer ticker.Stop() for now := range ticker.C { - c.mutex.Lock() - c.segsSent = 0 - c.segsRcvd = 0 - c.bytesSent = 0 - c.bytesRcvd = 0 - for _, conn := range c.conns { - info, err := conn.getTCPInfo() - if err != nil { - if strings.Contains(err.Error(), "use of closed network connection") { - continue - } - log.Errorf("Failed to get TCP info: %s", err) + c.gatherMetrics(now) + } +} + +func (c *aggregatingCollector) gatherMetrics(now time.Time) { + c.mutex.Lock() + defer c.mutex.Unlock() + + c.segsSent = 0 + c.segsRcvd = 0 + c.bytesSent = 0 + c.bytesRcvd = 0 + for _, conn := range c.conns { + info, err := conn.getTCPInfo() + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { continue } - if hasSegmentCounter { - c.segsSent += getSegmentsSent(info) - c.segsRcvd += getSegmentsRcvd(info) - } - if hasByteCounter { - c.bytesSent += getBytesSent(info) - c.bytesRcvd += getBytesRcvd(info) - } - c.rtts.Observe(info.RTT.Seconds()) - c.connDurations.Observe(now.Sub(conn.startTime).Seconds()) + log.Errorf("Failed to get TCP info: %s", err) + continue } - c.mutex.Unlock() + if hasSegmentCounter { + c.segsSent += getSegmentsSent(info) + c.segsRcvd += getSegmentsRcvd(info) + } + if hasByteCounter { + c.bytesSent += getBytesSent(info) + c.bytesRcvd += getBytesRcvd(info) + } + c.rtts.Observe(info.RTT.Seconds()) + c.connDurations.Observe(now.Sub(conn.startTime).Seconds()) } } func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { + // Start collecting the metrics collection the first time Collect is called. + c.cronOnce.Do(func() { + c.gatherMetrics(time.Now()) + go c.cron() + }) + c.mutex.Lock() defer c.mutex.Unlock()