Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

orca: use atomic pointer instead of mutex in server metrics recorder to improve performance #6799

Merged
merged 2 commits into from Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion orca/call_metrics.go
Expand Up @@ -91,7 +91,7 @@ func (rw *recorderWrapper) recorder() CallMetricsRecorder {
func (rw *recorderWrapper) setTrailerMetadata(ctx context.Context) {
var sm *ServerMetrics
if rw.smp != nil {
sm = rw.smp.ServerMetrics()
sm = copyServerMetrics(rw.smp.ServerMetrics())
danielzhaotongliu marked this conversation as resolved.
Show resolved Hide resolved
sm.merge(rw.r.ServerMetrics())
} else {
sm = rw.r.ServerMetrics()
Expand Down
153 changes: 77 additions & 76 deletions orca/server_metrics.go
Expand Up @@ -19,7 +19,7 @@
package orca

import (
"sync"
"sync/atomic"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)
Expand Down Expand Up @@ -142,8 +142,7 @@
}

type serverMetricsRecorder struct {
mu sync.Mutex // protects state
state *ServerMetrics // the current metrics
state atomic.Pointer[ServerMetrics] // latest snapshot of the current metrics
}

// NewServerMetricsRecorder returns an in-memory store for ServerMetrics and
Expand All @@ -154,34 +153,23 @@
}

func newServerMetricsRecorder() *serverMetricsRecorder {
return &serverMetricsRecorder{
state: &ServerMetrics{
CPUUtilization: -1,
MemUtilization: -1,
AppUtilization: -1,
QPS: -1,
EPS: -1,
Utilization: make(map[string]float64),
RequestCost: make(map[string]float64),
NamedMetrics: make(map[string]float64),
},
}
s := new(serverMetricsRecorder)
s.state.Store(&ServerMetrics{
CPUUtilization: -1,
MemUtilization: -1,
AppUtilization: -1,
QPS: -1,
EPS: -1,
Utilization: make(map[string]float64),
RequestCost: make(map[string]float64),
NamedMetrics: make(map[string]float64),
})
return s
}

// ServerMetrics returns a copy of the current ServerMetrics.
// ServerMetrics returns a pointer to the latest snapshot of ServerMetrics.
func (s *serverMetricsRecorder) ServerMetrics() *ServerMetrics {
s.mu.Lock()
defer s.mu.Unlock()
return &ServerMetrics{
CPUUtilization: s.state.CPUUtilization,
MemUtilization: s.state.MemUtilization,
AppUtilization: s.state.AppUtilization,
QPS: s.state.QPS,
EPS: s.state.EPS,
Utilization: copyMap(s.state.Utilization),
RequestCost: copyMap(s.state.RequestCost),
NamedMetrics: copyMap(s.state.NamedMetrics),
}
return s.state.Load()
}

func copyMap(m map[string]float64) map[string]float64 {
Expand All @@ -192,6 +180,19 @@
return ret
}

func copyServerMetrics(sm *ServerMetrics) *ServerMetrics {
return &ServerMetrics{
CPUUtilization: sm.CPUUtilization,
MemUtilization: sm.MemUtilization,
AppUtilization: sm.AppUtilization,
QPS: sm.QPS,
EPS: sm.EPS,
Utilization: copyMap(sm.Utilization),
RequestCost: copyMap(sm.RequestCost),
NamedMetrics: copyMap(sm.NamedMetrics),
}
}

// SetCPUUtilization records a measurement for the CPU utilization metric.
func (s *serverMetricsRecorder) SetCPUUtilization(val float64) {
if val < 0 {
Expand All @@ -200,17 +201,17 @@
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.CPUUtilization = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.CPUUtilization = val
s.state.Store(smCopy)
}

// DeleteCPUUtilization deletes the relevant server metric to prevent it from
// being sent.
func (s *serverMetricsRecorder) DeleteCPUUtilization() {
s.mu.Lock()
defer s.mu.Unlock()
s.state.CPUUtilization = -1
smCopy := copyServerMetrics(s.state.Load())
smCopy.CPUUtilization = -1
s.state.Store(smCopy)
}

// SetMemoryUtilization records a measurement for the memory utilization metric.
Expand All @@ -221,17 +222,17 @@
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.MemUtilization = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.MemUtilization = val
s.state.Store(smCopy)
}

// DeleteMemoryUtilization deletes the relevant server metric to prevent it
// from being sent.
func (s *serverMetricsRecorder) DeleteMemoryUtilization() {
s.mu.Lock()
defer s.mu.Unlock()
s.state.MemUtilization = -1
smCopy := copyServerMetrics(s.state.Load())
smCopy.MemUtilization = -1
s.state.Store(smCopy)
}

// SetApplicationUtilization records a measurement for a generic utilization
Expand All @@ -243,17 +244,17 @@
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.AppUtilization = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.AppUtilization = val
s.state.Store(smCopy)
}

// DeleteApplicationUtilization deletes the relevant server metric to prevent
// it from being sent.
func (s *serverMetricsRecorder) DeleteApplicationUtilization() {
s.mu.Lock()
defer s.mu.Unlock()
s.state.AppUtilization = -1
smCopy := copyServerMetrics(s.state.Load())
smCopy.AppUtilization = -1
s.state.Store(smCopy)
}

// SetQPS records a measurement for the QPS metric.
Expand All @@ -264,16 +265,16 @@
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.QPS = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.QPS = val
s.state.Store(smCopy)
}

// DeleteQPS deletes the relevant server metric to prevent it from being sent.
func (s *serverMetricsRecorder) DeleteQPS() {
s.mu.Lock()
defer s.mu.Unlock()
s.state.QPS = -1
smCopy := copyServerMetrics(s.state.Load())
smCopy.QPS = -1
s.state.Store(smCopy)
}

// SetEPS records a measurement for the EPS metric.
Expand All @@ -284,16 +285,16 @@
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.EPS = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.EPS = val
s.state.Store(smCopy)
}

// DeleteEPS deletes the relevant server metric to prevent it from being sent.
func (s *serverMetricsRecorder) DeleteEPS() {
s.mu.Lock()
defer s.mu.Unlock()
s.state.EPS = -1
smCopy := copyServerMetrics(s.state.Load())
smCopy.EPS = -1
s.state.Store(smCopy)
}

// SetNamedUtilization records a measurement for a utilization metric uniquely
Expand All @@ -305,47 +306,47 @@
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.Utilization[name] = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.Utilization[name] = val
s.state.Store(smCopy)
}

// DeleteNamedUtilization deletes any previously recorded measurement for a
// utilization metric uniquely identifiable by name.
func (s *serverMetricsRecorder) DeleteNamedUtilization(name string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.state.Utilization, name)
smCopy := copyServerMetrics(s.state.Load())
delete(smCopy.Utilization, name)
s.state.Store(smCopy)
}

// SetRequestCost records a measurement for a utilization metric uniquely
// identifiable by name.
func (s *serverMetricsRecorder) SetRequestCost(name string, val float64) {
s.mu.Lock()
defer s.mu.Unlock()
s.state.RequestCost[name] = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.RequestCost[name] = val
s.state.Store(smCopy)
}

// DeleteRequestCost deletes any previously recorded measurement for a
// utilization metric uniquely identifiable by name.
func (s *serverMetricsRecorder) DeleteRequestCost(name string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.state.RequestCost, name)
smCopy := copyServerMetrics(s.state.Load())
delete(smCopy.RequestCost, name)
s.state.Store(smCopy)

Check warning on line 335 in orca/server_metrics.go

View check run for this annotation

Codecov / codecov/patch

orca/server_metrics.go#L333-L335

Added lines #L333 - L335 were not covered by tests
}

// SetNamedMetric records a measurement for a utilization metric uniquely
// identifiable by name.
func (s *serverMetricsRecorder) SetNamedMetric(name string, val float64) {
s.mu.Lock()
defer s.mu.Unlock()
s.state.NamedMetrics[name] = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.NamedMetrics[name] = val
s.state.Store(smCopy)

Check warning on line 343 in orca/server_metrics.go

View check run for this annotation

Codecov / codecov/patch

orca/server_metrics.go#L341-L343

Added lines #L341 - L343 were not covered by tests
}

// DeleteNamedMetric deletes any previously recorded measurement for a
// utilization metric uniquely identifiable by name.
func (s *serverMetricsRecorder) DeleteNamedMetric(name string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.state.NamedMetrics, name)
smCopy := copyServerMetrics(s.state.Load())
delete(smCopy.NamedMetrics, name)
s.state.Store(smCopy)

Check warning on line 351 in orca/server_metrics.go

View check run for this annotation

Codecov / codecov/patch

orca/server_metrics.go#L349-L351

Added lines #L349 - L351 were not covered by tests
}