Skip to content

Commit

Permalink
[lrs_stream_report] test for lrs client
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Oct 8, 2020
1 parent d9b3cee commit a7dd78c
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 2 deletions.
1 change: 1 addition & 0 deletions xds/internal/client/client.go
Expand Up @@ -354,6 +354,7 @@ func New(opts Options) (*Client, error) {
cdsCache: make(map[string]ClusterUpdate),
edsWatchers: make(map[string]map[*watchInfo]bool),
edsCache: make(map[string]EndpointsUpdate),
lrsClients: make(map[string]*lrsClient),
}

cc, err := grpc.Dial(opts.Config.BalancerName, dopts...)
Expand Down
128 changes: 128 additions & 0 deletions xds/internal/client/client_loadreport_test.go
@@ -0,0 +1,128 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package client_test

import (
"context"
"testing"
"time"

v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
"google.golang.org/grpc/xds/internal/version"

_ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client.
)

const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

func (s) TestLRSClient(t *testing.T) {
fs, sCleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("failed to start fake xDS server: %v", err)
}
defer sCleanup()

xdsC, err := client.New(client.Options{
Config: bootstrap.Config{
BalancerName: fs.Address,
Creds: grpc.WithInsecure(),
NodeProto: &v2corepb.Node{},
TransportAPI: version.TransportV2,
},
})
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer xdsC.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if u, err := fs.NewConnChan.Receive(ctx); err != nil {
t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
}

// Report to the same address should not create new ClientConn.
store1, lrsCancel1 := xdsC.ReportLoad(fs.Address)
defer lrsCancel1()
ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
if u, err := fs.NewConnChan.Receive(ctx); err != context.DeadlineExceeded {
t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err)
}

fs2, sCleanup2, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("failed to start fake xDS server: %v", err)
}
defer sCleanup2()

// Report to a different address should create new ClientConn.
store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address)
defer lrsCancel2()
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if u, err := fs2.NewConnChan.Receive(ctx); err != nil {
t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
}

if store1 == store2 {
t.Fatalf("got same store for different servers, want different")
}

ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil {
t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
}

// Send one resp to the client.
fs2.LRSResponseChan <- &fakeserver.Response{
Resp: &lrspb.LoadStatsResponse{SendAllClusters: true},
}
// Wait for the send to finish, so server will wait for the next request.
time.Sleep(defaultTestShortTimeout)

// Cancel this load reporting stream, server should see error canceled.
lrsCancel2()

// Server should receive a stream canceled error.
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil || status.Code(u.(*fakeserver.Request).Err) != codes.Canceled {
t.Errorf("unexpected LRS request: %v, %v, want error canceled", u, err)
}
}
4 changes: 2 additions & 2 deletions xds/internal/testutils/fakeserver/server.go
Expand Up @@ -203,10 +203,10 @@ type lrsServer struct {

func (lrsS *lrsServer) StreamLoadStats(s lrsgrpc.LoadReportingService_StreamLoadStatsServer) error {
req, err := s.Recv()
lrsS.reqChan.Send(&Request{req, err})
if err != nil {
return err
}
lrsS.reqChan.Send(&Request{req, err})

select {
case r := <-lrsS.respChan:
Expand All @@ -222,12 +222,12 @@ func (lrsS *lrsServer) StreamLoadStats(s lrsgrpc.LoadReportingService_StreamLoad

for {
req, err := s.Recv()
lrsS.reqChan.Send(&Request{req, err})
if err != nil {
if err == io.EOF {
return nil
}
return err
}
lrsS.reqChan.Send(&Request{req, err})
}
}

0 comments on commit a7dd78c

Please sign in to comment.