-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
RpcBehaviorLoadBalancerProvider.java
172 lines (145 loc) · 5.52 KB
/
RpcBehaviorLoadBalancerProvider.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.testing.integration;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.Metadata;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.JsonUtil;
import io.grpc.util.ForwardingLoadBalancer;
import io.grpc.util.ForwardingLoadBalancerHelper;
import java.util.Map;
import javax.annotation.Nonnull;
/**
* Provides a xDS interop test {@link LoadBalancer} designed to work with {@link XdsTestServer}. It
* looks for an "rpc_behavior" field in its configuration and includes the value in the
* "rpc-behavior" metadata entry that is sent to the server. This will cause the test server to
* behave in a predefined way. Endpoint picking logic is delegated to the
* {@link PickFirstLoadBalancer}.
*
* <p>Initial use case is to prove that a custom load balancer can be configured by the control
* plane via xDS. An interop test will configure this LB and then verify it has been correctly
* configured by observing a specific RPC behavior by the server(s).
*
* <p>For more details on what behaviors can be specified, please see:
* https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server
*/
public class RpcBehaviorLoadBalancerProvider extends LoadBalancerProvider {
@Override
public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawLoadBalancingPolicyConfig) {
String rpcBehavior = JsonUtil.getString(rawLoadBalancingPolicyConfig, "rpcBehavior");
if (rpcBehavior == null) {
return ConfigOrError.fromError(
Status.INVALID_ARGUMENT.withDescription("no 'rpcBehavior' defined"));
}
return ConfigOrError.fromConfig(new RpcBehaviorConfig(rpcBehavior));
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new RpcBehaviorLoadBalancer(helper,
LoadBalancerRegistry.getDefaultRegistry().getProvider("round_robin")
.newLoadBalancer(helper));
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return "test.RpcBehaviorLoadBalancer";
}
static class RpcBehaviorConfig {
final String rpcBehavior;
RpcBehaviorConfig(String rpcBehavior) {
this.rpcBehavior = rpcBehavior;
}
}
/**
* Delegates all calls to another LB and wraps the given helper in {@link RpcBehaviorHelper} that
* assures that the rpc-behavior metadata header gets added to all calls.
*/
static class RpcBehaviorLoadBalancer extends ForwardingLoadBalancer {
private final RpcBehaviorHelper helper;
private final LoadBalancer delegateLb;
RpcBehaviorLoadBalancer(Helper helper, LoadBalancer delegateLb) {
this.helper = new RpcBehaviorHelper(helper);
this.delegateLb = delegateLb;
}
@Override
protected LoadBalancer delegate() {
return delegateLb;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.setRpcBehavior(
((RpcBehaviorConfig) resolvedAddresses.getLoadBalancingPolicyConfig()).rpcBehavior);
delegateLb.handleResolvedAddresses(resolvedAddresses);
}
}
/**
* Wraps the picker that is provided when the balancing change updates with the {@link
* RpcBehaviorPicker} that injects the rpc-behavior metadata entry.
*/
static class RpcBehaviorHelper extends ForwardingLoadBalancerHelper {
private final Helper delegateHelper;
private String rpcBehavior;
RpcBehaviorHelper(Helper delegateHelper) {
this.delegateHelper = delegateHelper;
}
void setRpcBehavior(String rpcBehavior) {
this.rpcBehavior = rpcBehavior;
}
@Override
protected Helper delegate() {
return delegateHelper;
}
@Override
public void updateBalancingState(@Nonnull ConnectivityState newState,
@Nonnull SubchannelPicker newPicker) {
delegateHelper.updateBalancingState(newState, new RpcBehaviorPicker(newPicker, rpcBehavior));
}
}
/**
* Includes the rpc-behavior metadata entry on each subchannel pick.
*/
static class RpcBehaviorPicker extends SubchannelPicker {
private static final String RPC_BEHAVIOR_HEADER_KEY = "rpc-behavior";
private final SubchannelPicker delegatePicker;
private final String rpcBehavior;
RpcBehaviorPicker(SubchannelPicker delegatePicker, String rpcBehavior) {
this.delegatePicker = delegatePicker;
this.rpcBehavior = rpcBehavior;
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
args.getHeaders()
.put(Metadata.Key.of(RPC_BEHAVIOR_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER),
rpcBehavior);
return delegatePicker.pickSubchannel(args);
}
}
}