/
client.go
478 lines (426 loc) · 15.8 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package client implementation a full fledged gRPC client for the xDS API
// used by the xds resolver and balancer implementations.
package client
import (
"context"
"errors"
"fmt"
"sync"
"time"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/version"
)
var (
m = make(map[version.TransportAPI]APIClientBuilder)
)
// RegisterAPIClientBuilder registers a client builder for xDS transport protocol
// version specified by b.Version().
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple builders are
// registered for the same version, the one registered last will take effect.
func RegisterAPIClientBuilder(b APIClientBuilder) {
m[b.Version()] = b
}
// getAPIClientBuilder returns the client builder registered for the provided
// xDS transport API version.
func getAPIClientBuilder(version version.TransportAPI) APIClientBuilder {
if b, ok := m[version]; ok {
return b
}
return nil
}
// BuildOptions contains options to be passed to client builders.
type BuildOptions struct {
// Parent is a top-level xDS client or server which has the intelligence to
// take appropriate action based on xDS responses received from the
// management server.
Parent UpdateHandler
// NodeProto contains the Node proto to be used in xDS requests. The actual
// type depends on the transport protocol version used.
NodeProto proto.Message
// Backoff returns the amount of time to backoff before retrying broken
// streams.
Backoff func(int) time.Duration
// Logger provides enhanced logging capabilities.
Logger *grpclog.PrefixLogger
}
// APIClientBuilder creates an xDS client for a specific xDS transport protocol
// version.
type APIClientBuilder interface {
// Build builds a transport protocol specific implementation of the xDS
// client based on the provided clientConn to the management server and the
// provided options.
Build(*grpc.ClientConn, BuildOptions) (APIClient, error)
// Version returns the xDS transport protocol version used by clients build
// using this builder.
Version() version.TransportAPI
}
// APIClient represents the functionality provided by transport protocol
// version specific implementations of the xDS client.
type APIClient interface {
// AddWatch adds a watch for an xDS resource given its type and name.
AddWatch(ResourceType, string)
// RemoveWatch cancels an already registered watch for an xDS resource
// given its type and name.
RemoveWatch(ResourceType, string)
// ReportLoad starts an LRS stream to periodically report load using the
// provided ClientConn, which represent a connection to the management
// server.
ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions)
// Close cleans up resources allocated by the API client.
Close()
}
// LoadReportingOptions contains configuration knobs for reporting load data.
type LoadReportingOptions struct {
loadStore *load.Store
}
// UpdateHandler receives and processes (by taking appropriate actions) xDS
// resource updates from an APIClient for a specific version.
type UpdateHandler interface {
// NewListeners handles updates to xDS listener resources.
NewListeners(map[string]ListenerUpdate)
// NewRouteConfigs handles updates to xDS RouteConfiguration resources.
NewRouteConfigs(map[string]RouteConfigUpdate)
// NewClusters handles updates to xDS Cluster resources.
NewClusters(map[string]ClusterUpdate)
// NewEndpoints handles updates to xDS ClusterLoadAssignment (or tersely
// referred to as Endpoints) resources.
NewEndpoints(map[string]EndpointsUpdate)
}
// ListenerUpdate contains information received in an LDS response, which is of
// interest to the registered LDS watcher.
type ListenerUpdate struct {
// RouteConfigName is the route configuration name corresponding to the
// target which is being watched through LDS.
RouteConfigName string
}
// RouteConfigUpdate contains information received in an RDS response, which is
// of interest to the registered RDS watcher.
type RouteConfigUpdate struct {
// Routes contains a list of routes, each containing matchers and
// corresponding action.
Routes []*Route
}
// Route is both a specification of how to match a request as well as an
// indication of the action to take upon match.
type Route struct {
Path, Prefix, Regex *string
Headers []*HeaderMatcher
Fraction *uint32
Action map[string]uint32 // action is weighted clusters.
}
// HeaderMatcher represents header matchers.
type HeaderMatcher struct {
Name string `json:"name"`
InvertMatch *bool `json:"invertMatch,omitempty"`
ExactMatch *string `json:"exactMatch,omitempty"`
RegexMatch *string `json:"regexMatch,omitempty"`
PrefixMatch *string `json:"prefixMatch,omitempty"`
SuffixMatch *string `json:"suffixMatch,omitempty"`
RangeMatch *Int64Range `json:"rangeMatch,omitempty"`
PresentMatch *bool `json:"presentMatch,omitempty"`
}
// Int64Range is a range for header range match.
type Int64Range struct {
Start int64 `json:"start"`
End int64 `json:"end"`
}
// ServiceUpdate contains information received from LDS and RDS responses,
// which is of interest to the registered service watcher.
type ServiceUpdate struct {
// Routes contain matchers+actions to route RPCs.
Routes []*Route
}
// ClusterUpdate contains information from a received CDS response, which is of
// interest to the registered CDS watcher.
type ClusterUpdate struct {
// ServiceName is the service name corresponding to the clusterName which
// is being watched for through CDS.
ServiceName string
// EnableLRS indicates whether or not load should be reported through LRS.
EnableLRS bool
}
// OverloadDropConfig contains the config to drop overloads.
type OverloadDropConfig struct {
Category string
Numerator uint32
Denominator uint32
}
// EndpointHealthStatus represents the health status of an endpoint.
type EndpointHealthStatus int32
const (
// EndpointHealthStatusUnknown represents HealthStatus UNKNOWN.
EndpointHealthStatusUnknown EndpointHealthStatus = iota
// EndpointHealthStatusHealthy represents HealthStatus HEALTHY.
EndpointHealthStatusHealthy
// EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY.
EndpointHealthStatusUnhealthy
// EndpointHealthStatusDraining represents HealthStatus DRAINING.
EndpointHealthStatusDraining
// EndpointHealthStatusTimeout represents HealthStatus TIMEOUT.
EndpointHealthStatusTimeout
// EndpointHealthStatusDegraded represents HealthStatus DEGRADED.
EndpointHealthStatusDegraded
)
// Endpoint contains information of an endpoint.
type Endpoint struct {
Address string
HealthStatus EndpointHealthStatus
Weight uint32
}
// Locality contains information of a locality.
type Locality struct {
Endpoints []Endpoint
ID internal.LocalityID
Priority uint32
Weight uint32
}
// EndpointsUpdate contains an EDS update.
type EndpointsUpdate struct {
Drops []OverloadDropConfig
Localities []Locality
}
// Options provides all parameters required for the creation of an xDS client.
type Options struct {
// Config contains a fully populated bootstrap config. It is the
// responsibility of the caller to use some sane defaults here if the
// bootstrap process returned with certain fields left unspecified.
Config bootstrap.Config
// DialOpts contains dial options to be used when dialing the xDS server.
DialOpts []grpc.DialOption
// TargetName is the target of the parent ClientConn.
TargetName string
// WatchExpiryTimeout is the amount of time the client is willing to wait
// for the first response from the server for any resource being watched.
// Expiry will not cause cancellation of the watch. It will only trigger the
// invocation of the registered callback and it is left up to the caller to
// decide whether or not they want to cancel the watch.
//
// If this field is left unspecified, a default value of 15 seconds will be
// used. This is based on the default value of the initial_fetch_timeout
// field in corepb.ConfigSource proto.
WatchExpiryTimeout time.Duration
}
// Function to be overridden in tests.
var newAPIClient = func(apiVersion version.TransportAPI, cc *grpc.ClientConn, opts BuildOptions) (APIClient, error) {
cb := getAPIClientBuilder(apiVersion)
if cb == nil {
return nil, fmt.Errorf("no client builder for xDS API version: %v", apiVersion)
}
return cb.Build(cc, opts)
}
// Client is a full fledged gRPC client which queries a set of discovery APIs
// (collectively termed as xDS) on a remote management server, to discover
// various dynamic resources.
//
// A single client object will be shared by the xds resolver and balancer
// implementations. But the same client can only be shared by the same parent
// ClientConn.
//
// Implements UpdateHandler interface.
// TODO(easwars): Make a wrapper struct which implements this interface in the
// style of ccBalancerWrapper so that the Client type does not implement these
// exported methods.
type Client struct {
done *grpcsync.Event
opts Options
cc *grpc.ClientConn // Connection to the xDS server
apiClient APIClient
logger *grpclog.PrefixLogger
updateCh *buffer.Unbounded // chan *watcherInfoWithUpdate
mu sync.Mutex
ldsWatchers map[string]map[*watchInfo]bool
ldsCache map[string]ListenerUpdate
rdsWatchers map[string]map[*watchInfo]bool
rdsCache map[string]RouteConfigUpdate
cdsWatchers map[string]map[*watchInfo]bool
cdsCache map[string]ClusterUpdate
edsWatchers map[string]map[*watchInfo]bool
edsCache map[string]EndpointsUpdate
lrsMu sync.Mutex
lrsClients map[string]*lrsClient
}
// New returns a new xdsClient configured with opts.
func New(opts Options) (*Client, error) {
switch {
case opts.Config.BalancerName == "":
return nil, errors.New("xds: no xds_server name provided in options")
case opts.Config.Creds == nil:
return nil, errors.New("xds: no credentials provided in options")
case opts.Config.NodeProto == nil:
return nil, errors.New("xds: no node_proto provided in options")
}
switch opts.Config.TransportAPI {
case version.TransportV2:
if _, ok := opts.Config.NodeProto.(*v2corepb.Node); !ok {
return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", opts.Config.NodeProto, opts.Config.TransportAPI)
}
case version.TransportV3:
if _, ok := opts.Config.NodeProto.(*v3corepb.Node); !ok {
return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", opts.Config.NodeProto, opts.Config.TransportAPI)
}
}
dopts := []grpc.DialOption{
opts.Config.Creds,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 5 * time.Minute,
Timeout: 20 * time.Second,
}),
}
dopts = append(dopts, opts.DialOpts...)
if opts.WatchExpiryTimeout == 0 {
// This is based on the default value of the initial_fetch_timeout field
// in corepb.ConfigSource proto.
opts.WatchExpiryTimeout = 15 * time.Second
}
c := &Client{
done: grpcsync.NewEvent(),
opts: opts,
updateCh: buffer.NewUnbounded(),
ldsWatchers: make(map[string]map[*watchInfo]bool),
ldsCache: make(map[string]ListenerUpdate),
rdsWatchers: make(map[string]map[*watchInfo]bool),
rdsCache: make(map[string]RouteConfigUpdate),
cdsWatchers: make(map[string]map[*watchInfo]bool),
cdsCache: make(map[string]ClusterUpdate),
edsWatchers: make(map[string]map[*watchInfo]bool),
edsCache: make(map[string]EndpointsUpdate),
lrsClients: make(map[string]*lrsClient),
}
cc, err := grpc.Dial(opts.Config.BalancerName, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err)
}
c.cc = cc
c.logger = prefixLogger((c))
c.logger.Infof("Created ClientConn to xDS server: %s", opts.Config.BalancerName)
apiClient, err := newAPIClient(opts.Config.TransportAPI, cc, BuildOptions{
Parent: c,
NodeProto: opts.Config.NodeProto,
Backoff: backoff.DefaultExponential.Backoff,
Logger: c.logger,
})
if err != nil {
return nil, err
}
c.apiClient = apiClient
c.logger.Infof("Created")
go c.run()
return c, nil
}
// run is a goroutine for all the callbacks.
//
// Callback can be called in watch(), if an item is found in cache. Without this
// goroutine, the callback will be called inline, which might cause a deadlock
// in user's code. Callbacks also cannot be simple `go callback()` because the
// order matters.
func (c *Client) run() {
for {
select {
case t := <-c.updateCh.Get():
c.updateCh.Load()
if c.done.HasFired() {
return
}
c.callCallback(t.(*watcherInfoWithUpdate))
case <-c.done.Done():
return
}
}
}
// Close closes the gRPC connection to the xDS server.
func (c *Client) Close() {
if c.done.HasFired() {
return
}
c.done.Fire()
// TODO: Should we invoke the registered callbacks here with an error that
// the client is closed?
c.apiClient.Close()
c.cc.Close()
c.logger.Infof("Shutdown")
}
// ResourceType identifies resources in a transport protocol agnostic way. These
// will be used in transport version agnostic code, while the versioned API
// clients will map these to appropriate version URLs.
type ResourceType int
// Version agnostic resource type constants.
const (
UnknownResource ResourceType = iota
ListenerResource
HTTPConnManagerResource
RouteConfigResource
ClusterResource
EndpointsResource
)
func (r ResourceType) String() string {
switch r {
case ListenerResource:
return "ListenerResource"
case HTTPConnManagerResource:
return "HTTPConnManagerResource"
case RouteConfigResource:
return "RouteConfigResource"
case ClusterResource:
return "ClusterResource"
case EndpointsResource:
return "EndpointsResource"
default:
return "UnknownResource"
}
}
// IsListenerResource returns true if the provider URL corresponds to an xDS
// Listener resource.
func IsListenerResource(url string) bool {
return url == version.V2ListenerURL || url == version.V3ListenerURL
}
// IsHTTPConnManagerResource returns true if the provider URL corresponds to an xDS
// HTTPConnManager resource.
func IsHTTPConnManagerResource(url string) bool {
return url == version.V2HTTPConnManagerURL || url == version.V3HTTPConnManagerURL
}
// IsRouteConfigResource returns true if the provider URL corresponds to an xDS
// RouteConfig resource.
func IsRouteConfigResource(url string) bool {
return url == version.V2RouteConfigURL || url == version.V3RouteConfigURL
}
// IsClusterResource returns true if the provider URL corresponds to an xDS
// Cluster resource.
func IsClusterResource(url string) bool {
return url == version.V2ClusterURL || url == version.V3ClusterURL
}
// IsEndpointsResource returns true if the provider URL corresponds to an xDS
// Endpoints resource.
func IsEndpointsResource(url string) bool {
return url == version.V2EndpointsURL || url == version.V3EndpointsURL
}