Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PID LB policy #430

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
290 changes: 290 additions & 0 deletions A80-pid.md
@@ -0,0 +1,290 @@
A68: PID LB policy.
----
* Author(s): @s-matyukevich
* Approver:
* Status: Draft
* Implemented in: PoC in Go
* Last updated: 2024-04-26
* Discussion at: https://groups.google.com/g/grpc-io/c/eD2bE2JzQ2w

## Abstract

This document proposes a design for a new load balancing policy called pid. The term pid stands for [Proportional–integral–derivative controller](https://en.wikipedia.org/wiki/Proportional%E2%80%93integral%E2%80%93derivative_controller). This policy builds upon the [A58: weighted_round_robin LB policy (WRR)][A58] and requires direct load reporting from backends to clients. Similar to wrr, it utilizes client-side weighted round robin load balancing. However, unlike wrr, it does not determine weights deterministically. Instead, it employs a feedback loop with the pid controller to adjust the weights in a manner that allows the load on all backends to converge to the same value. The policy supports either per-call or periodic out-of-band load reporting as per [gRFC A51][A51].

## Background

The `wrr` policy uses the following formula to calculate subchannel weights, which is described in more details in the "Subchannel Weights" section of [gRFC A58][A58]:

$$weight = \dfrac{qps}{utilization + \dfrac{eps}{qps} * error\\_utilization\\_penalty}$$

This formula is effective when backends, which have different average CPU costs per request, receive an identical number of connections. In such scenarios, `wrr` aids in fairly distributing requests between backends, ensuring that more powerful backends receive more requests, and less powerful backends receive fewer requests. However, `wrr` is not effective in correcting imbalances generated by the use of random subsetting, as described in [gRFC A68][A68]. This is because random subsetting leads to a situation where some backends receive more connections than others. The number of connections a server receives does not impact its CPU cost per request metric, so more connected backends will end up receiving more requests than less connected ones.

The `pid` balancer takes a different approach: instead of deterministically calculating weights based on a backend metric, it continuously adjusts weights at runtime. It utilizes a feedback loop based on the backend CPU metric to determine the direction and magnitude of every weight update.


### Related Proposals:
* [gRFC A51: Custom Backend Metrics Support][A51]
* [gRFC A58: `weighted_round_robin` LB policy][A58]
* [gRFC A68: Random subsetting with rendezvous hashing LB policy.][A68]

## Proposal

Introduce a new LB policy `pid`. This policy implements client-side load balancing with direct load reports from backends. It utilizes a feedback loop with a PID controller to dynamically adjust the weights. The policy is otherwise largely a superset of the existing policy `weighted_round_robin`.

### LB Policy Config and Parameters

The `pid` LB policy config will be as follows.

```textproto
message LoadBalancingConfig {
oneof policy {
PIDLbConfig pid = 20 [json_name = "pid"];
}
}

message PIDLbConfig {
// Configuration for the WRR load balancer as defined in [gRFC A58][A58].
// The PID balancer is an extension of WRR and all settings applicable to WRR also apply to PID identically.
WeightedRoundRobinLbConfig wrr_config = 1;

// Threshold beyond which the balancer starts considering the ErrorUtilizationPenalty.
// This helps avoid oscillations in cases where the server experiences a very high and spiky error rate.
// We avoid eliminating the error_utilization_penalty entirely to prevent redirecting all traffic to an instance
// that has low CPU usage but rejects all requests. Default is 0.5.
google.protobuf.FloatValue error_utilization_threshold = 2;

// Controls the convergence speed of the PID controller. Higher values accelerate convergence but may induce oscillations,
// especially if server load changes more rapidly than the PID controller can adjust. Oscillations might also occur due to
// significant delays in load report propagation or extremely spiky server load. To mitigate spiky loads, server owners should
// employ a moving average to smooth the load reporting. Default is 0.1.
google.protobuf.FloatValue proportional_gain = 2;

// Adjusts the smoothness of the PID controller convergence. Higher values enhance smoothness but can decelerate convergence.
// Default is 1.
google.protobuf.FloatValue derivative_gain = 4;

// Maximum allowable weight. Weights proposed by the PID controller exceeding this value will be capped.
// This prevents infinite weight growth, which could occur if only a subset of clients uses PID and increasing weights
// no longer effectively corrects the imbalance. Default is 10.
google.protobuf.FloatValue max_weight = 5;

// Minimum allowable weight. Weights proposed by the PID controller falling below this value will be capped.
// This prevents weights from dropping to zero, which could occur if only a subset of clients uses PID and decreasing weights
// no longer effectively corrects the imbalance. Default is 0.1.
google.protobuf.FloatValue min_weight = 6;
}
```

### PID controller

A PID controller is a control loop feedback mechanism that continuously calculates an error value as the difference between a desired setpoint (`referenceSignal`) and a measured process variable (`actualSignal`). It then applies a correction based on proportional, integral, and derivative terms (denoted P, I, and D respectively), hence the name.

In our implementation, we will not be using the integral part. The integral component is useful for speeding up convergence when the `referenceSignal` changes sharply. In our case, we will be converging the load on the subchannels to a mean value, which is mostly stable.

Here is a sample implementation in pseudo-code:

```
pidController class {
proportionalGain float
derivativeGain float

controlError float

update(referenceSignal float, actualSignal float, samplingInterval duration) float {
previousError = this.controlError
// Save last controlError so we can use it to calculate derivative during next update
this.controlError = referenceSignal - actualSignal
controlErrorDerivative = (this.controlError - previousError) / samplingInterval.Seconds()
controlSignal = this.proportionalGain * this.controlError +
this.derivativeGain * controlErrorDerivative
return controlSignal
}
}
```

The `update` method is expected to be called on a regular basis, with `samplingInterval` being the duration since the last update. The return value is the control signal which, if applied to the system, should minimize the control error. In the next section, we'll discuss how this control signal is converted to `wrr` weight.

The `proportionalGain` and `derivativeGain` parameters are taken from the LB config. `proportionalGain` should be additionally scaled by the `WeightUpdatePeriod` value. This is necessary because derivative error is calculated like `controlErrorDerivative = (this.controlError - previousError) / samplingInterval.Seconds()` and dividing by a very small `samplingInterval` value makes the result too big. `WeightUpdatePeriod` is roughly equal to `samplingInterval` as we will be updating the PID state once per `WeightUpdatePeriod`.

### Extending WRR balancer

The `pid` balancer reuses 90% of the wrr code. The proposal is to refactor the `wrr` codebase and introduce several hooks that allow other balancers, like `pid`, to reuse the code efficiently without the need for duplication. This approach is mostly language-specific, but the general plan is as follows:

* Add a `callbacks` object to the `wrr` balancer: This object will contain a series of callback functions that wrr will invoke at various stages of its lifecycle.
* Introduce a `callbackData` object: This will be utilized by the callbacks to store any data that is reused across different callback functions. The `wrr` balancer will pass this object to all callbacks and treat it as an opaque blob of data.
* Add `callbackConfig` object to the `wrr` balancer: This object will contain the PID specific part of the user provided config as defined in the `LB Policy Config and Parameters` section. The `wrr` balancer will pass this object to all callbacks and treat it as an opaque blob of data.

The `callbacks` object, which is to be provided by the balancer builder, will implement the following interface (expressed in pseudo-code):

```
wrrCallbacks interface {
onSubchannelAdded(subchannelID int, data callbackData, conf callbackConfig)
onSubchannelRemoved(subchannelID int, data callbackData, conf callbackConfig)

// onLoadReport is called when a new load report is received for a given subchannel.
// This function returns the new weight for a subchannel. If the returned value is -1,
// the subchannel should keep using the old value.
// onLoadReport won't be called during the blackout period.
onLoadReport(subchannelId int, report loadReport, data callbackData, conf callbackConfig) float

// onEDFSchedulerUpdate is called after the wrr balancer recreates the EDF scheduler.
onEDFSchedulerUpdate(data callbackData, conf callbackConfig)
}
```

Here is how `pid` balancer implements `wrrCallbacks` interface.
```
func onSubchannelAdded(subchannelID int, data callbackData, conf callbackConfig) {
// Do nothing
}

func onSubchannelRemoved(subchannelID int, data callbackData, conf callbackConfig) {
// Remove subchannelID from two maps that store the value of last utilization
// and last applied weight per subchannel
delete(data.utilizationPerSubchannel, subchannelID)
delete(data.lastAppliedWeightPerSubchannel, subchannelID)
}


func onLoadReport(subchannelId int, load loadReport, data callbackData, conf callbackConfig) float {
utilization = load.ApplicationUtilization
if utilization == 0 {
utilization = load.CpuUtilization
}
if utilization == 0 || load.RpsFractional == 0 {
// Ignore empty load
return -1
}
errorRate = load.Eps / load.RpsFractional
useErrPenalty = errorRate > conf.ErrorUtilizationThreshold
if useErrPenalty {
utilization += errorRate * conf.ErrorUtilizationPenalty
}

// Ensure at least WeightUpdatePeriod has passed since the last update.
// Prevents corruption of PID controller's internal state, which could happen in the following cases:
// * If 2 updates are very close to each other in time, samplingInterval ~= 0 and signal ~= infinity.
// * If multiple updates happened during a single WeightUpdatePeriod, the actual weights are not applied,
// but the PID controller keeps growing the weights and it may easily pass the balancing point.
if time.Since(lastApplied) < conf.WeightUpdatePeriod {
return -1
}

// use value calculated in the onEDFSchedulerUpdate method
meanUtilization = data.meanUtilization

// call the PID controller to get the value of the control signal.
controlSignal = data.pidController.update({
referenceSignal: meanUtilization,
actualSignal: utilization,
samplingInterval: time.Since(lastApplied),
})

// Normalize the signal.
// If meanUtilization ~= 0 the signal will be ~= 0 as well, and convergence will become painfully slow.
// If, meanUtilization >> 1 the signal may become very high, which could lead to oscillations.
if meanUtilization > 0 {
controlSignal *= 1 / meanUtilization
}

lastAppliedWeight = data.lastAppliedWeightPerSubchannel[subchannelID]

// Use controlSignal to adjust the weight.
// First calculate a multiplier that will be used to determine how much weight should be changed.
// The higher is the absolute value of the controlSignal the more we need to adjust the weight.
if controlSignal >= 0 {
// in this case mult should belong to the [1,inf) interval, so we will be increasing the weight.
mult = 1.0 + controlSignal
} else {
// in this case mult should belong to (0, 1) interval, so we will be decreasing the weight.
mult = -1.0 / (controlSignal - 1.0)
}
weight = lastAppliedWeight * mult

// Clamp weight
if weight > conf.MaxWeight {
weight = conf.MaxWeight
}
if weight < conf.MinWeight {
weight = conf.MinWeight
}

// Save resulting utilization and weight.
data.utilizationPerSubchannel[subchannelId] = utilization
data.lastAppliedWeightPerSubchannel[subchannelID] = weight

return weight
}

func onEDFSchedulerUpdate(data callbackData) {
// Calculate mean utilization across all subchannels
totalUtilization = 0
count = len(data.utilizationPerSubchannel)
for _, utilization in data.utilizationPerSubchannel {
totalUtilization += utilization
}
data.meanUtilization = totalUtilization / count
}
```

The proposal is to make `wrrCallbacks` public. This has a number of significant benefits. Besides PID, there are other cases where one might need to extend `wrr`. For example, Spotify [demonstrates](https://www.youtube.com/watch?v=8E5zVdEfwi0) a gRPC load balancer to reduce cross-zone traffic – this can be implemented nicely in terms of `wrr` weights. We are also considering the same and incorporating things like latency into our load balancing decisions. Existing ORCA extension points don't cover these use cases. We leverage ORCA for custom server utilization metrics, but we also need the ability to combine server and client metrics to generate the resulting weight. The alternative is to write our own balancer with custom EDF scheduler and handle details related to subchannel management and interactions with resolvers. With this new API, use cases like this can be covered naturally, users have full control over the end-to-end definition of weights.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @yannickepstein would something like the proposed wrrCallbacks interface work for your use-case, so you don't have to copy-paste the whole wrr?


### Dealing with Oscillations

One of the main challenges with the pid balancer is the potential for oscillations. Several factors influence this likelihood:

1. **Propagation Delay of Load Reports:**
* **Direct Load Reporting**: Here, the delay depends on the request frequency and the `WeightUpdatePeriod` setting. Typically, with the default `WeightUpdatePeriod` of 1 second, propagation is very fast, making this the preferred option when using the pid balancer.
* **OOB Load Reporting**: Users can control the delay by adjusting the `OobReportingPeriod` setting. While the delay is usually larger compared to direct reporting, achieving perfect convergence with OOB reporting is still possible on workloads with stable loads.
2. **Proportional Gain:**
* A high `ProportionalGain` can lead to significant weight adjustments, potentially overshooting the balancing point. The default value of 0.1 generally allows for fast convergence (typically faster than 30 seconds on workloads that are not spiky) while not generating oscillations.
3. **Stability of Server Load:**
* The pid balancer struggles with servers that exhibit spiky loads because the mean utilization is not stable, which disrupts the convergence direction for all subchannels. Unfortunately, this is one aspect users cannot directly control from the client side. To address this, the proposal includes implementing an "average window" mechanism on the server, which will be discussed in the next section.
4. **Number of Subchannels:**
* A larger number of subchannels generally stabilizes the mean utilization, leading to faster convergence and reducing the likelihood of oscillations. This is particularly relevant when considering the use of random subsetting, as discussed in [gRFC A68][A68]. A small subset size can hinder the `pid` balancer’s ability to converge load across backends, particularly if mean utilization is unstable and clients connect only to either overloaded or underloaded servers. However, we have achieved acceptable convergence on a spiky workload with a subset size as small as four, using a three-minute moving average window size for load reporting. A proposed default subset size of 20 typically ensures good convergence on any workload.

By understanding and addressing these factors, the `pid` balancer can be more effectively tuned to manage load balancing across different environments and usage scenarios, minimizing the risks associated with oscillations.

### Moving Average Window for Load Reporting

As outlined in the previous section, smoothing the utilization measurements in server load reports is essential for the `pid` balancer to achieve convergence on spiky workloads. To address this, we propose integrating a moving average window mechanism into the `MetricRecorder` component, as described in [gRFC A51][A51]. This involves adding a `MovingAverageWindowSize` parameter to the component. Instead of storing a single value per metric, `MetricRecorder` will now maintain the last MovingAverageWindowSize reported values in a circular buffer. The process is detailed in the following pseudo-code:

```
func recordMetricXXX(value float) {
// Ensure updates are atomic to avoid corruption of the circular buffer
lock.Lock()

if circularBufferForMetricXXX.isFull() {
sum -= circularBufferForMetricXXX.last()
}
sum += val

// Add the new value to the circular buffer, which automatically removes the oldest value if the buffer is full
circularBufferForMetricXXX.add(value)

// Calculate the average of the values in the circular buffer
cXXXvalue = sum / circularBufferForMetricXXX.size()
lock.Unlock()
}
```

Setting `MovingAverageWindowSize` to 1 mimics the current behavior and should remain the default setting.
This modification allows for more stable load reporting by averaging fluctuations over the specified window, thus providing the pid balancer with more consistent data to inform weight adjustments.


## Rationale
### Alternatives Considered:

The main driver for this proposal was the need to implement subsetting. We explored the possibility of using deterministic subsetting in https://github.com/grpc/proposal/pull/383 and got push-back on this for the reasons explained [here](https://github.com/grpc/proposal/pull/383#discussion_r1334587561)

Additionally, we considered the "scaled wrr" approach, which would adjust the imbalance created by random subsetting by multiplying the server utilization by the number of connections a server receives. Feedback on this approach suggested that it might be more beneficial to pursue more generic solutions that focus on achieving load convergence rather than attempting to tailor the `wrr` method specifically to fit subsetting use cases.

This feedback led us to explore broader, more adaptable strategies that could better address the complexities introduced by subsetting, culminating in the current proposal.

## Implementation
DataDog will provide Go and Java implementations.


[A51]: A51-custom-backend-metrics.md
[A58]: A58-client-side-weighted-round-robin-lb-policy.md
[A68]: https://github.com/grpc/proposal/pull/423