Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

collect metrics in a separate go routine #82

Merged
merged 3 commits into from Jul 15, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 53 additions & 25 deletions metrics.go
Expand Up @@ -20,6 +20,8 @@ var (
bytesRcvdDesc *prometheus.Desc
)

const collectFrequency = 10 * time.Second

var collector *aggregatingCollector

func init() {
Expand Down Expand Up @@ -52,18 +54,21 @@ func init() {
}

type aggregatingCollector struct {
mutex sync.Mutex
cronOnce sync.Once

highestID uint64
conns map[uint64] /* id */ *tracingConn
rtts prometheus.Histogram
connDurations prometheus.Histogram
mutex sync.Mutex
highestID uint64
conns map[uint64] /* id */ *tracingConn
rtts prometheus.Histogram
connDurations prometheus.Histogram
segsSent, segsRcvd uint64
bytesSent, bytesRcvd uint64
}

var _ prometheus.Collector = &aggregatingCollector{}

func newAggregatingCollector() *aggregatingCollector {
return &aggregatingCollector{
c := &aggregatingCollector{
conns: make(map[uint64]*tracingConn),
rtts: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tcp_rtt",
Expand All @@ -76,6 +81,7 @@ func newAggregatingCollector() *aggregatingCollector {
Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks
}),
}
return c
}

func (c *aggregatingCollector) AddConn(t *tracingConn) uint64 {
Expand Down Expand Up @@ -103,45 +109,64 @@ func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) {
}
}

func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
now := time.Now()
func (c *aggregatingCollector) cron() {
ticker := time.NewTicker(collectFrequency)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()

for now := range ticker.C {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
c.gatherMetrics(now)
}
}

func (c *aggregatingCollector) gatherMetrics(now time.Time) {
c.mutex.Lock()
var segsSent, segsRcvd uint64
var bytesSent, bytesRcvd uint64
defer c.mutex.Unlock()

c.segsSent = 0
c.segsRcvd = 0
c.bytesSent = 0
c.bytesRcvd = 0
for _, conn := range c.conns {
info, err := conn.getTCPInfo()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
c.closedConn(conn)
continue
}
log.Errorf("Failed to get TCP info: %s", err)
continue
}
if hasSegmentCounter {
segsSent += getSegmentsSent(info)
segsRcvd += getSegmentsRcvd(info)
c.segsSent += getSegmentsSent(info)
c.segsRcvd += getSegmentsRcvd(info)
}
if hasByteCounter {
bytesSent += getBytesSent(info)
bytesRcvd += getBytesRcvd(info)
c.bytesSent += getBytesSent(info)
c.bytesRcvd += getBytesRcvd(info)
}
c.rtts.Observe(info.RTT.Seconds())
c.connDurations.Observe(now.Sub(conn.startTime).Seconds())
if info.State == tcpinfo.Closed {
c.closedConn(conn)
}
}
c.mutex.Unlock()
}

func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
// Start collecting the metrics collection the first time Collect is called.
c.cronOnce.Do(func() {
c.gatherMetrics(time.Now())
go c.cron()
})

c.mutex.Lock()
defer c.mutex.Unlock()

metrics <- c.rtts
metrics <- c.connDurations
if hasSegmentCounter {
segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(segsSent))
segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(c.segsSent))
if err != nil {
log.Errorf("creating tcp_sent_segments_total metric failed: %v", err)
return
}
segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(segsRcvd))
segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(c.segsRcvd))
if err != nil {
log.Errorf("creating tcp_rcvd_segments_total metric failed: %v", err)
return
Expand All @@ -150,12 +175,12 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
metrics <- segsRcvdMetric
}
if hasByteCounter {
bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(bytesSent))
bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(c.bytesSent))
if err != nil {
log.Errorf("creating tcp_sent_bytes metric failed: %v", err)
return
}
bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(bytesRcvd))
bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(c.bytesRcvd))
if err != nil {
log.Errorf("creating tcp_rcvd_bytes metric failed: %v", err)
return
Expand All @@ -165,9 +190,11 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
}
}

func (c *aggregatingCollector) closedConn(conn *tracingConn) {
func (c *aggregatingCollector) ClosedConn(conn *tracingConn, direction string) {
c.mutex.Lock()
collector.removeConn(conn.id)
closedConns.WithLabelValues(conn.getDirection()).Inc()
c.mutex.Unlock()
closedConns.WithLabelValues(direction).Inc()
}

type tracingConn struct {
Expand Down Expand Up @@ -204,6 +231,7 @@ func (c *tracingConn) getDirection() string {
}

func (c *tracingConn) Close() error {
collector.ClosedConn(c, c.getDirection())
return c.Conn.Close()
}

Expand Down