Skip to content


xds: Server implementation (#3999)
Browse files Browse the repository at this point in the history
* xds: Server implementation.

* Remove security related code.

* Add a blocking newListenerWrapper() method.

* Fix some comments.

* Use non-blocking dial.

* Use WaitForReady().

* Use localhost instead of

* Another attempt to make the tests happy on GA.

* Make vet happy.

* Add a missing return.
  • Loading branch information
easwars committed Nov 6, 2020
1 parent bc01f3f commit 9c2f82d
Show file tree
Hide file tree
Showing 4 changed files with 936 additions and 2 deletions.
193 changes: 193 additions & 0 deletions xds/internal/test/xds_server_integration_test.go
@@ -0,0 +1,193 @@
* Copyright 2020 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

// Package xds_test contains e2e tests for xDS use on the server.
package xds_test

import (


v2discoverypb ""
v3corepb ""
v3listenerpb ""
v3httppb ""
testpb ""

const (
defaultTestTimeout = 10 * time.Second
localAddress = "localhost:9999"
listenerName = "grpc/server?udpa.resource.listening_address=localhost:9999"

type s struct {

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})

func setupListenerResponse(respCh chan *fakeserver.Response, name string) {
respCh <- &fakeserver.Response{
Resp: &v2discoverypb.DiscoveryResponse{
Resources: []*anypb.Any{
TypeUrl: version.V2ListenerURL,
Value: func() []byte {
l := &v3listenerpb.Listener{
// This needs to match the name we are querying for.
Name: listenerName,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: &anypb.Any{
TypeUrl: version.V2HTTPConnManagerURL,
Value: func() []byte {
cm := &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
RouteConfigName: "route-config",
mcm, _ := proto.Marshal(cm)
return mcm
ml, _ := proto.Marshal(l)
return ml
TypeUrl: version.V2ListenerURL,

func setupBootstrapFile(t *testing.T, serverURI string) func() {
// Create a bootstrap file in a temporary directory.
tmpdir, err := ioutil.TempDir("", "xds-server-test*")
if err != nil {
t.Fatalf("failed to create tempdir: %v", err)
bootstrapContents := fmt.Sprintf(`
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"xds_servers" : [{
"server_uri": "%s",
"channel_creds": [
{ "type": "insecure" }
}`, serverURI)
bootstrapFileName := path.Join(tmpdir, "bootstrap")
if err := ioutil.WriteFile(bootstrapFileName, []byte(bootstrapContents), os.ModePerm); err != nil {
t.Fatalf("failed to write bootstrap file: %v", err)

origBootstrapFileName := env.BootstrapFileName
env.BootstrapFileName = bootstrapFileName
t.Logf("Create bootstrap file at %s with contents\n%s", bootstrapFileName, bootstrapContents)
return func() { env.BootstrapFileName = origBootstrapFileName }

// TestServerSideXDS is an e2e tests for xDS use on the server. This does not
// use any xDS features because we have not implemented any on the server side.
func (s) TestServerSideXDS(t *testing.T) {
// Spin up a fake xDS management server on a local port.
// TODO(easwars): Switch to using the server from envoy-go-control-plane.
fs, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("failed to start fake xDS server: %v", err)
defer cleanup()
t.Logf("Started xDS management server at %s", fs.Address)

// Setup the fakeserver to respond with a Listener resource.
setupListenerResponse(fs.XDSResponseChan, listenerName)
// Create a bootstrap file in a temporary directory.
defer setupBootstrapFile(t, fs.Address)()

// Initialize a gRPC server which uses xDS, and register stubServer on it.
server := xds.NewGRPCServer()
testpb.RegisterTestServiceServer(server, &testService{})

errCh := make(chan error, 1)
go func() {
defer server.Stop()

// Create a clientconn and make a successful RPC
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.DialContext(ctx, localAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
errCh <- fmt.Errorf("failed to dial local test server: %v", err)
defer cc.Close()

client := testpb.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
errCh <- fmt.Errorf("rpc EmptyCall() failed: %v", err)
errCh <- nil

opts := xds.ServeOptions{Network: "tcp", Address: localAddress}
if err := server.Serve(opts); err != nil {
t.Fatalf("Serve(%+v) failed: %v", opts, err)

if err := <-errCh; err != nil {

type testService struct {

func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil

0 comments on commit 9c2f82d

Please sign in to comment.