Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
start collecting RTT and bandwidth metrics when Collect is calleed
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jul 15, 2021
1 parent 4fe8055 commit 660cdd6
Showing 1 changed file with 36 additions and 25 deletions.
61 changes: 36 additions & 25 deletions metrics.go
Expand Up @@ -54,8 +54,9 @@ func init() {
}

type aggregatingCollector struct {
mutex sync.Mutex
cronOnce sync.Once

mutex sync.Mutex
highestID uint64
conns map[uint64] /* id */ *tracingConn
rtts prometheus.Histogram
Expand All @@ -80,7 +81,6 @@ func newAggregatingCollector() *aggregatingCollector {
Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks
}),
}
go c.cron()
return c
}

Expand Down Expand Up @@ -114,36 +114,47 @@ func (c *aggregatingCollector) cron() {
defer ticker.Stop()

for now := range ticker.C {
c.mutex.Lock()
c.segsSent = 0
c.segsRcvd = 0
c.bytesSent = 0
c.bytesRcvd = 0
for _, conn := range c.conns {
info, err := conn.getTCPInfo()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
continue
}
log.Errorf("Failed to get TCP info: %s", err)
c.collectMetrics(now)
}
}

func (c *aggregatingCollector) collectMetrics(now time.Time) {
c.mutex.Lock()
defer c.mutex.Unlock()

c.segsSent = 0
c.segsRcvd = 0
c.bytesSent = 0
c.bytesRcvd = 0
for _, conn := range c.conns {
info, err := conn.getTCPInfo()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
continue
}
if hasSegmentCounter {
c.segsSent += getSegmentsSent(info)
c.segsRcvd += getSegmentsRcvd(info)
}
if hasByteCounter {
c.bytesSent += getBytesSent(info)
c.bytesRcvd += getBytesRcvd(info)
}
c.rtts.Observe(info.RTT.Seconds())
c.connDurations.Observe(now.Sub(conn.startTime).Seconds())
log.Errorf("Failed to get TCP info: %s", err)
continue
}
c.mutex.Unlock()
if hasSegmentCounter {
c.segsSent += getSegmentsSent(info)
c.segsRcvd += getSegmentsRcvd(info)
}
if hasByteCounter {
c.bytesSent += getBytesSent(info)
c.bytesRcvd += getBytesRcvd(info)
}
c.rtts.Observe(info.RTT.Seconds())
c.connDurations.Observe(now.Sub(conn.startTime).Seconds())
}
}

func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
// Start collecting the metrics collection the first time Collect is called.
c.cronOnce.Do(func() {
c.collectMetrics(time.Now())
go c.cron()
})

c.mutex.Lock()
defer c.mutex.Unlock()

Expand Down

0 comments on commit 660cdd6

Please sign in to comment.