diff --git a/metrics.go b/metrics.go index 4db7271..179978f 100644 --- a/metrics.go +++ b/metrics.go @@ -20,6 +20,8 @@ var ( bytesRcvdDesc *prometheus.Desc ) +const collectFrequency = 10 * time.Second + var collector *aggregatingCollector func init() { @@ -52,18 +54,21 @@ func init() { } type aggregatingCollector struct { - mutex sync.Mutex + cronOnce sync.Once - highestID uint64 - conns map[uint64] /* id */ *tracingConn - rtts prometheus.Histogram - connDurations prometheus.Histogram + mutex sync.Mutex + highestID uint64 + conns map[uint64] /* id */ *tracingConn + rtts prometheus.Histogram + connDurations prometheus.Histogram + segsSent, segsRcvd uint64 + bytesSent, bytesRcvd uint64 } var _ prometheus.Collector = &aggregatingCollector{} func newAggregatingCollector() *aggregatingCollector { - return &aggregatingCollector{ + c := &aggregatingCollector{ conns: make(map[uint64]*tracingConn), rtts: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "tcp_rtt", @@ -76,6 +81,7 @@ func newAggregatingCollector() *aggregatingCollector { Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks }), } + return c } func (c *aggregatingCollector) AddConn(t *tracingConn) uint64 { @@ -103,45 +109,64 @@ func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) { } } -func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { - now := time.Now() +func (c *aggregatingCollector) cron() { + ticker := time.NewTicker(collectFrequency) + defer ticker.Stop() + + for now := range ticker.C { + c.gatherMetrics(now) + } +} + +func (c *aggregatingCollector) gatherMetrics(now time.Time) { c.mutex.Lock() - var segsSent, segsRcvd uint64 - var bytesSent, bytesRcvd uint64 + defer c.mutex.Unlock() + + c.segsSent = 0 + c.segsRcvd = 0 + c.bytesSent = 0 + c.bytesRcvd = 0 for _, conn := range c.conns { info, err := conn.getTCPInfo() if err != nil { if strings.Contains(err.Error(), "use of closed network connection") { - c.closedConn(conn) continue } log.Errorf("Failed to get TCP info: %s", err) continue } if hasSegmentCounter { - segsSent += getSegmentsSent(info) - segsRcvd += getSegmentsRcvd(info) + c.segsSent += getSegmentsSent(info) + c.segsRcvd += getSegmentsRcvd(info) } if hasByteCounter { - bytesSent += getBytesSent(info) - bytesRcvd += getBytesRcvd(info) + c.bytesSent += getBytesSent(info) + c.bytesRcvd += getBytesRcvd(info) } c.rtts.Observe(info.RTT.Seconds()) c.connDurations.Observe(now.Sub(conn.startTime).Seconds()) - if info.State == tcpinfo.Closed { - c.closedConn(conn) - } } - c.mutex.Unlock() +} + +func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { + // Start collecting the metrics collection the first time Collect is called. + c.cronOnce.Do(func() { + c.gatherMetrics(time.Now()) + go c.cron() + }) + + c.mutex.Lock() + defer c.mutex.Unlock() + metrics <- c.rtts metrics <- c.connDurations if hasSegmentCounter { - segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(segsSent)) + segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(c.segsSent)) if err != nil { log.Errorf("creating tcp_sent_segments_total metric failed: %v", err) return } - segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(segsRcvd)) + segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(c.segsRcvd)) if err != nil { log.Errorf("creating tcp_rcvd_segments_total metric failed: %v", err) return @@ -150,12 +175,12 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { metrics <- segsRcvdMetric } if hasByteCounter { - bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(bytesSent)) + bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(c.bytesSent)) if err != nil { log.Errorf("creating tcp_sent_bytes metric failed: %v", err) return } - bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(bytesRcvd)) + bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(c.bytesRcvd)) if err != nil { log.Errorf("creating tcp_rcvd_bytes metric failed: %v", err) return @@ -165,9 +190,11 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { } } -func (c *aggregatingCollector) closedConn(conn *tracingConn) { +func (c *aggregatingCollector) ClosedConn(conn *tracingConn, direction string) { + c.mutex.Lock() collector.removeConn(conn.id) - closedConns.WithLabelValues(conn.getDirection()).Inc() + c.mutex.Unlock() + closedConns.WithLabelValues(direction).Inc() } type tracingConn struct { @@ -204,6 +231,7 @@ func (c *tracingConn) getDirection() string { } func (c *tracingConn) Close() error { + collector.ClosedConn(c, c.getDirection()) return c.Conn.Close() }