Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

collect metrics in a separate go routine #82

Merged
merged 3 commits into from Jul 15, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
88 changes: 53 additions & 35 deletions metrics.go
Expand Up @@ -20,6 +20,8 @@ var (
bytesRcvdDesc *prometheus.Desc
)

const collectFrequency = 10 * time.Second

var collector *aggregatingCollector

func init() {
Expand Down Expand Up @@ -54,16 +56,18 @@ func init() {
type aggregatingCollector struct {
mutex sync.Mutex

highestID uint64
conns map[uint64] /* id */ *tracingConn
rtts prometheus.Histogram
connDurations prometheus.Histogram
highestID uint64
conns map[uint64] /* id */ *tracingConn
rtts prometheus.Histogram
connDurations prometheus.Histogram
segsSent, segsRcvd uint64
bytesSent, bytesRcvd uint64
}

var _ prometheus.Collector = &aggregatingCollector{}

func newAggregatingCollector() *aggregatingCollector {
return &aggregatingCollector{
c := &aggregatingCollector{
conns: make(map[uint64]*tracingConn),
rtts: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tcp_rtt",
Expand All @@ -76,6 +80,8 @@ func newAggregatingCollector() *aggregatingCollector {
Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks
}),
}
go c.cron()
return c
}

func (c *aggregatingCollector) AddConn(t *tracingConn) uint64 {
Expand Down Expand Up @@ -103,45 +109,57 @@ func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) {
}
}

func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
now := time.Now()
c.mutex.Lock()
var segsSent, segsRcvd uint64
var bytesSent, bytesRcvd uint64
for _, conn := range c.conns {
info, err := conn.getTCPInfo()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
c.closedConn(conn)
func (c *aggregatingCollector) cron() {
ticker := time.NewTicker(collectFrequency)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()

for now := range ticker.C {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
c.mutex.Lock()
c.segsSent = 0
c.segsRcvd = 0
c.bytesSent = 0
c.bytesRcvd = 0
for _, conn := range c.conns {
info, err := conn.getTCPInfo()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
c.closedConn(conn)
continue
}
log.Errorf("Failed to get TCP info: %s", err)
continue
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a problem that if we conn.getTCPInfo() errors differently from expected such that we'll never close the connection?

I've seen 2021-07-08T21:39:47.925-0400 ERROR tcp-tpt go-tcp-transport@v0.2.2/metrics.go:118 Failed to get TCP info: raw-control tcp 192.168.1.6:4001: getsockopt: not implemented on Windows so I'm concerned this could cause us problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it in my Windows VM, and I'm getting the same error. Should we disable metrics collection on Windows?

log.Errorf("Failed to get TCP info: %s", err)
continue
}
if hasSegmentCounter {
segsSent += getSegmentsSent(info)
segsRcvd += getSegmentsRcvd(info)
}
if hasByteCounter {
bytesSent += getBytesSent(info)
bytesRcvd += getBytesRcvd(info)
}
c.rtts.Observe(info.RTT.Seconds())
c.connDurations.Observe(now.Sub(conn.startTime).Seconds())
if info.State == tcpinfo.Closed {
c.closedConn(conn)
if hasSegmentCounter {
c.segsSent += getSegmentsSent(info)
c.segsRcvd += getSegmentsRcvd(info)
}
if hasByteCounter {
c.bytesSent += getBytesSent(info)
c.bytesRcvd += getBytesRcvd(info)
}
c.rtts.Observe(info.RTT.Seconds())
c.connDurations.Observe(now.Sub(conn.startTime).Seconds())
if info.State == tcpinfo.Closed {
c.closedConn(conn)
}
}
c.mutex.Unlock()
}
c.mutex.Unlock()
}

func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
c.mutex.Lock()
defer c.mutex.Unlock()

metrics <- c.rtts
metrics <- c.connDurations
if hasSegmentCounter {
segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(segsSent))
segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(c.segsSent))
if err != nil {
log.Errorf("creating tcp_sent_segments_total metric failed: %v", err)
return
}
segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(segsRcvd))
segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(c.segsRcvd))
if err != nil {
log.Errorf("creating tcp_rcvd_segments_total metric failed: %v", err)
return
Expand All @@ -150,12 +168,12 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
metrics <- segsRcvdMetric
}
if hasByteCounter {
bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(bytesSent))
bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(c.bytesSent))
if err != nil {
log.Errorf("creating tcp_sent_bytes metric failed: %v", err)
return
}
bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(bytesRcvd))
bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(c.bytesRcvd))
if err != nil {
log.Errorf("creating tcp_rcvd_bytes metric failed: %v", err)
return
Expand Down