/
client.go
281 lines (253 loc) · 9.5 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package v2 provides xDS v2 transport protocol specific functionality.
package v2
import (
"context"
"fmt"
"sync"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/version"
v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
statuspb "google.golang.org/genproto/googleapis/rpc/status"
)
func init() {
xdsclient.RegisterAPIClientBuilder(clientBuilder{})
}
var (
resourceTypeToURL = map[xdsclient.ResourceType]string{
xdsclient.ListenerResource: version.V2ListenerURL,
xdsclient.RouteConfigResource: version.V2RouteConfigURL,
xdsclient.ClusterResource: version.V2ClusterURL,
xdsclient.EndpointsResource: version.V2EndpointsURL,
}
)
type clientBuilder struct{}
func (clientBuilder) Build(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) {
return newClient(cc, opts)
}
func (clientBuilder) Version() version.TransportAPI {
return version.TransportV2
}
func newClient(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) {
nodeProto, ok := opts.NodeProto.(*v2corepb.Node)
if !ok {
return nil, fmt.Errorf("xds: unsupported Node proto type: %T, want %T", opts.NodeProto, (*v2corepb.Node)(nil))
}
v2c := &client{
cc: cc,
parent: opts.Parent,
nodeProto: nodeProto,
loadStore: opts.LoadStore,
logger: opts.Logger,
}
v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())
v2c.TransportHelper = xdsclient.NewTransportHelper(v2c, opts.Logger, opts.Backoff)
return v2c, nil
}
type adsStream v2adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// client performs the actual xDS RPCs using the xDS v2 API. It creates a
// single ADS stream on which the different types of xDS requests and responses
// are multiplexed.
type client struct {
*xdsclient.TransportHelper
ctx context.Context
cancelCtx context.CancelFunc
parent xdsclient.UpdateHandler
loadStore *load.Store
logger *grpclog.PrefixLogger
// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
cc *grpc.ClientConn
nodeProto *v2corepb.Node
mu sync.Mutex
// ldsResourceName is the LDS resource_name to watch. It is set to the first
// LDS resource_name to watch, and removed when the LDS watch is canceled.
//
// It's from the dial target of the parent ClientConn. RDS resource
// processing needs this to do the host matching.
ldsResourceName string
ldsWatchCount int
}
// AddWatch overrides the transport helper's AddWatch to save the LDS
// resource_name. This is required when handling an RDS response to perform host
// matching.
func (v2c *client) AddWatch(rType xdsclient.ResourceType, rName string) {
v2c.mu.Lock()
// Special handling for LDS, because RDS needs the LDS resource_name for
// response host matching.
if rType == xdsclient.ListenerResource {
// Set hostname to the first LDS resource_name, and reset it when the
// last LDS watch is removed. The upper level Client isn't expected to
// watchLDS more than once.
v2c.ldsWatchCount++
if v2c.ldsWatchCount == 1 {
v2c.ldsResourceName = rName
}
}
v2c.mu.Unlock()
v2c.TransportHelper.AddWatch(rType, rName)
}
// RemoveWatch overrides the transport helper's RemoveWatch to clear the LDS
// resource_name when the last watch is removed.
func (v2c *client) RemoveWatch(rType xdsclient.ResourceType, rName string) {
v2c.mu.Lock()
// Special handling for LDS, because RDS needs the LDS resource_name for
// response host matching.
if rType == xdsclient.ListenerResource {
// Set hostname to the first LDS resource_name, and reset it when the
// last LDS watch is removed. The upper level Client isn't expected to
// watchLDS more than once.
v2c.ldsWatchCount--
if v2c.ldsWatchCount == 0 {
v2c.ldsResourceName = ""
}
}
v2c.mu.Unlock()
v2c.TransportHelper.RemoveWatch(rType, rName)
}
func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
return v2adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc).StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true))
}
// sendRequest sends out a DiscoveryRequest for the given resourceNames, of type
// rType, on the provided stream.
//
// version is the ack version to be sent with the request
// - If this is the new request (not an ack/nack), version will be empty.
// - If this is an ack, version will be the version from the response.
// - If this is a nack, version will be the previous acked version (from
// versionMap). If there was no ack before, it will be empty.
func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
stream, ok := s.(adsStream)
if !ok {
return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
}
req := &v2xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: resourceTypeToURL[rType],
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
}
if errMsg != "" {
req.ErrorDetail = &statuspb.Status{
Code: int32(codes.InvalidArgument), Message: errMsg,
}
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)
}
v2c.logger.Debugf("ADS request sent: %v", req)
return nil
}
// RecvResponse blocks on the receipt of one response message on the provided
// stream.
func (v2c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) {
stream, ok := s.(adsStream)
if !ok {
return nil, fmt.Errorf("xds: Attempt to receive response on unsupported stream type: %T", s)
}
resp, err := stream.Recv()
if err != nil {
// TODO: call watch callbacks with error when stream is broken.
return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err)
}
v2c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl())
v2c.logger.Debugf("ADS response received: %v", resp)
return resp, nil
}
func (v2c *client) HandleResponse(r proto.Message) (xdsclient.ResourceType, string, string, error) {
rType := xdsclient.UnknownResource
resp, ok := r.(*v2xdspb.DiscoveryResponse)
if !ok {
return rType, "", "", fmt.Errorf("xds: unsupported message type: %T", resp)
}
// Note that the xDS transport protocol is versioned independently of
// the resource types, and it is supported to transfer older versions
// of resource types using new versions of the transport protocol, or
// vice-versa. Hence we need to handle v3 type_urls as well here.
var err error
url := resp.GetTypeUrl()
switch {
case xdsclient.IsListenerResource(url):
err = v2c.handleLDSResponse(resp)
rType = xdsclient.ListenerResource
case xdsclient.IsRouteConfigResource(url):
err = v2c.handleRDSResponse(resp)
rType = xdsclient.RouteConfigResource
case xdsclient.IsClusterResource(url):
err = v2c.handleCDSResponse(resp)
rType = xdsclient.ClusterResource
case xdsclient.IsEndpointsResource(url):
err = v2c.handleEDSResponse(resp)
rType = xdsclient.EndpointsResource
default:
return rType, "", "", xdsclient.ErrResourceTypeUnsupported{
ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()),
}
}
return rType, resp.GetVersionInfo(), resp.GetNonce(), err
}
// handleLDSResponse processes an LDS response received from the xDS server. On
// receipt of a good response, it also invokes the registered watcher callback.
func (v2c *client) handleLDSResponse(resp *v2xdspb.DiscoveryResponse) error {
update, err := xdsclient.UnmarshalListener(resp.GetResources(), v2c.logger)
if err != nil {
return err
}
v2c.parent.NewListeners(update)
return nil
}
// handleRDSResponse processes an RDS response received from the xDS server. On
// receipt of a good response, it caches validated resources and also invokes
// the registered watcher callback.
func (v2c *client) handleRDSResponse(resp *v2xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
hostname := v2c.ldsResourceName
v2c.mu.Unlock()
update, err := xdsclient.UnmarshalRouteConfig(resp.GetResources(), hostname, v2c.logger)
if err != nil {
return err
}
v2c.parent.NewRouteConfigs(update)
return nil
}
// handleCDSResponse processes an CDS response received from the xDS server. On
// receipt of a good response, it also invokes the registered watcher callback.
func (v2c *client) handleCDSResponse(resp *v2xdspb.DiscoveryResponse) error {
update, err := xdsclient.UnmarshalCluster(resp.GetResources(), v2c.logger)
if err != nil {
return err
}
v2c.parent.NewClusters(update)
return nil
}
func (v2c *client) handleEDSResponse(resp *v2xdspb.DiscoveryResponse) error {
update, err := xdsclient.UnmarshalEndpoints(resp.GetResources(), v2c.logger)
if err != nil {
return err
}
v2c.parent.NewEndpoints(update)
return nil
}