Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a metric that can be used to notice stuck worker threads #70884

Merged
merged 8 commits into from
Nov 13, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/util/workqueue/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,30 @@ func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.Su
return workDuration
}

func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "unfinished_work_seconds",
Help: "How many seconds of work " + name + " has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
})
prometheus.Register(unfinished)
return unfinished
}

func (prometheusMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "longest_running_procesor_microseconds",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*processor

Help: "How many microseconds has the longest running " +
"processor for " + name + " been running.",
})
prometheus.Register(unfinished)
return unfinished
}

func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/client-go/util/workqueue/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_test(
srcs = [
"default_rate_limiters_test.go",
"delaying_queue_test.go",
"metrics_test.go",
"queue_test.go",
"rate_limitting_queue_test.go",
],
Expand Down
113 changes: 88 additions & 25 deletions staging/src/k8s.io/client-go/util/workqueue/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package workqueue
import (
"sync"
"time"

"k8s.io/apimachinery/pkg/util/clock"
)

// This file provides abstractions for setting the provider (e.g., prometheus)
Expand All @@ -28,6 +30,7 @@ type queueMetrics interface {
add(item t)
get(item t)
done(item t)
updateUnfinishedWork()
}

// GaugeMetric represents a single numerical value that can arbitrarily go up
Expand All @@ -37,6 +40,12 @@ type GaugeMetric interface {
Dec()
}

// SettableGaugeMetric represents a single numerical value that can arbitrarily go up
// and down. (Separate from GaugeMetric to preserve backwards compatibility.)
type SettableGaugeMetric interface {
Set(float64)
}

// CounterMetric represents a single numerical value that only ever
// goes up.
type CounterMetric interface {
Expand All @@ -52,9 +61,13 @@ type noopMetric struct{}

func (noopMetric) Inc() {}
func (noopMetric) Dec() {}
func (noopMetric) Set(float64) {}
func (noopMetric) Observe(float64) {}

// defaultQueueMetrics expects the caller to lock before setting any metrics.
type defaultQueueMetrics struct {
clock clock.Clock

// current depth of a workqueue
depth GaugeMetric
// total number of adds handled by a workqueue
Expand All @@ -65,6 +78,10 @@ type defaultQueueMetrics struct {
workDuration SummaryMetric
addTimes map[t]time.Time
processingStartTimes map[t]time.Time

// how long have current threads been working?
unfinishedWorkSeconds SettableGaugeMetric
longestRunningProcessor SettableGaugeMetric
}

func (m *defaultQueueMetrics) add(item t) {
Expand All @@ -75,7 +92,7 @@ func (m *defaultQueueMetrics) add(item t) {
m.adds.Inc()
m.depth.Inc()
if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = time.Now()
m.addTimes[item] = m.clock.Now()
}
}

Expand All @@ -85,9 +102,9 @@ func (m *defaultQueueMetrics) get(item t) {
}

m.depth.Dec()
m.processingStartTimes[item] = time.Now()
m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(sinceInMicroseconds(startTime))
m.latency.Observe(m.sinceInMicroseconds(startTime))
delete(m.addTimes, item)
}
}
Expand All @@ -98,14 +115,39 @@ func (m *defaultQueueMetrics) done(item t) {
}

if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(sinceInMicroseconds(startTime))
m.workDuration.Observe(m.sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item)
}
}

func (m *defaultQueueMetrics) updateUnfinishedWork() {
// Note that a summary metric would be better for this, but prometheus
// doesn't seem to have non-hacky ways to reset the summary metrics.
var total float64
var oldest float64
for _, t := range m.processingStartTimes {
age := m.sinceInMicroseconds(t)
total += age
if age > oldest {
oldest = age
}
}
// Convert to seconds; microseconds is unhelpfully granular for this.
total /= 1000000
m.unfinishedWorkSeconds.Set(total)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just "Set(total / 1000000)"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, if I make another change I'll update this otherwise it doesn't seem worth it :)

Thanks for the review!

m.longestRunningProcessor.Set(oldest) // in microseconds.
}

type noMetrics struct{}

func (noMetrics) add(item t) {}
func (noMetrics) get(item t) {}
func (noMetrics) done(item t) {}
func (noMetrics) updateUnfinishedWork() {}

// Gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 {
return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}

type retryMetrics interface {
Expand All @@ -130,6 +172,8 @@ type MetricsProvider interface {
NewAddsMetric(name string) CounterMetric
NewLatencyMetric(name string) SummaryMetric
NewWorkDurationMetric(name string) SummaryMetric
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric
}

Expand All @@ -151,29 +195,49 @@ func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
return noopMetric{}
}

func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}

func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}

func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{}
}

var metricsFactory = struct {
metricsProvider MetricsProvider
setProviders sync.Once
}{
var globalMetricsFactory = queueMetricsFactory{
metricsProvider: noopMetricsProvider{},
}

func newQueueMetrics(name string) queueMetrics {
var ret *defaultQueueMetrics
if len(name) == 0 {
return ret
type queueMetricsFactory struct {
metricsProvider MetricsProvider

onlyOnce sync.Once
}

func (f *queueMetricsFactory) setProvider(mp MetricsProvider) {
f.onlyOnce.Do(func() {
f.metricsProvider = mp
})
}

func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
mp := f.metricsProvider
if len(name) == 0 || mp == (noopMetricsProvider{}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does mp == (noopMetricsProvider{}) work? I would have though you'd have to do a type-check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprisingly, go does the comparison correctly (otherwise the test wouldn't pass / compile).

return noMetrics{}
}
return &defaultQueueMetrics{
depth: metricsFactory.metricsProvider.NewDepthMetric(name),
adds: metricsFactory.metricsProvider.NewAddsMetric(name),
latency: metricsFactory.metricsProvider.NewLatencyMetric(name),
workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
}
}

Expand All @@ -183,13 +247,12 @@ func newRetryMetrics(name string) retryMetrics {
return ret
}
return &defaultRetryMetrics{
retries: metricsFactory.metricsProvider.NewRetriesMetric(name),
retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
}
}

// SetProvider sets the metrics provider of the metricsFactory.
// SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) {
metricsFactory.setProviders.Do(func() {
metricsFactory.metricsProvider = metricsProvider
})
globalMetricsFactory.setProvider(metricsProvider)
}