-
Notifications
You must be signed in to change notification settings - Fork 38.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add a metric that can be used to notice stuck worker threads #70884
Changes from 7 commits
6195d10
5a8444c
74c50c0
44a87ba
578962d
fd77aa5
680ddd4
980242c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,8 @@ package workqueue | |
import ( | ||
"sync" | ||
"time" | ||
|
||
"k8s.io/apimachinery/pkg/util/clock" | ||
) | ||
|
||
// This file provides abstractions for setting the provider (e.g., prometheus) | ||
|
@@ -28,6 +30,7 @@ type queueMetrics interface { | |
add(item t) | ||
get(item t) | ||
done(item t) | ||
updateUnfinishedWork() | ||
} | ||
|
||
// GaugeMetric represents a single numerical value that can arbitrarily go up | ||
|
@@ -37,6 +40,12 @@ type GaugeMetric interface { | |
Dec() | ||
} | ||
|
||
// SettableGaugeMetric represents a single numerical value that can arbitrarily go up | ||
// and down. (Separate from GaugeMetric to preserve backwards compatibility.) | ||
type SettableGaugeMetric interface { | ||
Set(float64) | ||
} | ||
|
||
// CounterMetric represents a single numerical value that only ever | ||
// goes up. | ||
type CounterMetric interface { | ||
|
@@ -52,9 +61,13 @@ type noopMetric struct{} | |
|
||
func (noopMetric) Inc() {} | ||
func (noopMetric) Dec() {} | ||
func (noopMetric) Set(float64) {} | ||
func (noopMetric) Observe(float64) {} | ||
|
||
// defaultQueueMetrics expects the caller to lock before setting any metrics. | ||
type defaultQueueMetrics struct { | ||
clock clock.Clock | ||
|
||
// current depth of a workqueue | ||
depth GaugeMetric | ||
// total number of adds handled by a workqueue | ||
|
@@ -65,6 +78,10 @@ type defaultQueueMetrics struct { | |
workDuration SummaryMetric | ||
addTimes map[t]time.Time | ||
processingStartTimes map[t]time.Time | ||
|
||
// how long have current threads been working? | ||
unfinishedWorkSeconds SettableGaugeMetric | ||
longestRunningProcessor SettableGaugeMetric | ||
} | ||
|
||
func (m *defaultQueueMetrics) add(item t) { | ||
|
@@ -75,7 +92,7 @@ func (m *defaultQueueMetrics) add(item t) { | |
m.adds.Inc() | ||
m.depth.Inc() | ||
if _, exists := m.addTimes[item]; !exists { | ||
m.addTimes[item] = time.Now() | ||
m.addTimes[item] = m.clock.Now() | ||
} | ||
} | ||
|
||
|
@@ -85,9 +102,9 @@ func (m *defaultQueueMetrics) get(item t) { | |
} | ||
|
||
m.depth.Dec() | ||
m.processingStartTimes[item] = time.Now() | ||
m.processingStartTimes[item] = m.clock.Now() | ||
if startTime, exists := m.addTimes[item]; exists { | ||
m.latency.Observe(sinceInMicroseconds(startTime)) | ||
m.latency.Observe(m.sinceInMicroseconds(startTime)) | ||
delete(m.addTimes, item) | ||
} | ||
} | ||
|
@@ -98,14 +115,39 @@ func (m *defaultQueueMetrics) done(item t) { | |
} | ||
|
||
if startTime, exists := m.processingStartTimes[item]; exists { | ||
m.workDuration.Observe(sinceInMicroseconds(startTime)) | ||
m.workDuration.Observe(m.sinceInMicroseconds(startTime)) | ||
delete(m.processingStartTimes, item) | ||
} | ||
} | ||
|
||
func (m *defaultQueueMetrics) updateUnfinishedWork() { | ||
// Note that a summary metric would be better for this, but prometheus | ||
// doesn't seem to have non-hacky ways to reset the summary metrics. | ||
var total float64 | ||
var oldest float64 | ||
for _, t := range m.processingStartTimes { | ||
age := m.sinceInMicroseconds(t) | ||
total += age | ||
if age > oldest { | ||
oldest = age | ||
} | ||
} | ||
// Convert to seconds; microseconds is unhelpfully granular for this. | ||
total /= 1000000 | ||
m.unfinishedWorkSeconds.Set(total) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just "Set(total / 1000000)"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure, if I make another change I'll update this otherwise it doesn't seem worth it :) Thanks for the review! |
||
m.longestRunningProcessor.Set(oldest) // in microseconds. | ||
} | ||
|
||
type noMetrics struct{} | ||
|
||
func (noMetrics) add(item t) {} | ||
func (noMetrics) get(item t) {} | ||
func (noMetrics) done(item t) {} | ||
func (noMetrics) updateUnfinishedWork() {} | ||
|
||
// Gets the time since the specified start in microseconds. | ||
func sinceInMicroseconds(start time.Time) float64 { | ||
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) | ||
func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 { | ||
return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) | ||
} | ||
|
||
type retryMetrics interface { | ||
|
@@ -130,6 +172,8 @@ type MetricsProvider interface { | |
NewAddsMetric(name string) CounterMetric | ||
NewLatencyMetric(name string) SummaryMetric | ||
NewWorkDurationMetric(name string) SummaryMetric | ||
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric | ||
NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric | ||
NewRetriesMetric(name string) CounterMetric | ||
} | ||
|
||
|
@@ -151,29 +195,49 @@ func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { | |
return noopMetric{} | ||
} | ||
|
||
func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { | ||
return noopMetric{} | ||
} | ||
|
||
func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { | ||
return noopMetric{} | ||
} | ||
|
||
func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { | ||
return noopMetric{} | ||
} | ||
|
||
var metricsFactory = struct { | ||
metricsProvider MetricsProvider | ||
setProviders sync.Once | ||
}{ | ||
var globalMetricsFactory = queueMetricsFactory{ | ||
metricsProvider: noopMetricsProvider{}, | ||
} | ||
|
||
func newQueueMetrics(name string) queueMetrics { | ||
var ret *defaultQueueMetrics | ||
if len(name) == 0 { | ||
return ret | ||
type queueMetricsFactory struct { | ||
metricsProvider MetricsProvider | ||
|
||
onlyOnce sync.Once | ||
} | ||
|
||
func (f *queueMetricsFactory) setProvider(mp MetricsProvider) { | ||
f.onlyOnce.Do(func() { | ||
f.metricsProvider = mp | ||
}) | ||
} | ||
|
||
func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics { | ||
mp := f.metricsProvider | ||
if len(name) == 0 || mp == (noopMetricsProvider{}) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Surprisingly, go does the comparison correctly (otherwise the test wouldn't pass / compile). |
||
return noMetrics{} | ||
} | ||
return &defaultQueueMetrics{ | ||
depth: metricsFactory.metricsProvider.NewDepthMetric(name), | ||
adds: metricsFactory.metricsProvider.NewAddsMetric(name), | ||
latency: metricsFactory.metricsProvider.NewLatencyMetric(name), | ||
workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name), | ||
addTimes: map[t]time.Time{}, | ||
processingStartTimes: map[t]time.Time{}, | ||
clock: clock, | ||
depth: mp.NewDepthMetric(name), | ||
adds: mp.NewAddsMetric(name), | ||
latency: mp.NewLatencyMetric(name), | ||
workDuration: mp.NewWorkDurationMetric(name), | ||
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), | ||
longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name), | ||
addTimes: map[t]time.Time{}, | ||
processingStartTimes: map[t]time.Time{}, | ||
} | ||
} | ||
|
||
|
@@ -183,13 +247,12 @@ func newRetryMetrics(name string) retryMetrics { | |
return ret | ||
} | ||
return &defaultRetryMetrics{ | ||
retries: metricsFactory.metricsProvider.NewRetriesMetric(name), | ||
retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name), | ||
} | ||
} | ||
|
||
// SetProvider sets the metrics provider of the metricsFactory. | ||
// SetProvider sets the metrics provider for all subsequently created work | ||
// queues. Only the first call has an effect. | ||
func SetProvider(metricsProvider MetricsProvider) { | ||
metricsFactory.setProviders.Do(func() { | ||
metricsFactory.metricsProvider = metricsProvider | ||
}) | ||
globalMetricsFactory.setProvider(metricsProvider) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*processor