diff --git a/examples/go.sum b/examples/go.sum index fae9a831ae02..664ce40392fb 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -33,15 +33,17 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU= -github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20200313221541-5f7e5dd04533/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9 h1:cQ58MWbYGnI4x6Gk6FUzirMcMYUgvYOLa9fiO7chY1A= +github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/envoyproxy/go-control-plane v0.9.4 h1:rEvIZUSZ3fx39WIi3JkQqQBitGwpELBIYWeBVh6wn+E= -github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.6 h1:GgblEiDzxf5ajlAZY4aC8xp7DwkrGfauFNMGdB2bBv0= +github.com/envoyproxy/go-control-plane v0.9.6/go.mod h1:GFqM7v0B62MraO4PWRedIbhThr/Rf7ev6aHOOPXeaDA= github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -110,6 +112,7 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/go.mod b/go.mod index 0bcae7362db8..c77aa212a130 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module google.golang.org/grpc go 1.11 require ( - github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f - github.com/envoyproxy/go-control-plane v0.9.4 + github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9 + github.com/envoyproxy/go-control-plane v0.9.6 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b - github.com/golang/protobuf v1.3.3 + github.com/golang/protobuf v1.4.2 github.com/google/go-cmp v0.4.0 github.com/google/uuid v1.1.2 golang.org/x/net v0.0.0-20190311183353-d8887717615a diff --git a/go.sum b/go.sum index bab616e439f1..05002ae60f58 100644 --- a/go.sum +++ b/go.sum @@ -5,11 +5,14 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU= -github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20200313221541-5f7e5dd04533/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9 h1:cQ58MWbYGnI4x6Gk6FUzirMcMYUgvYOLa9fiO7chY1A= +github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.4 h1:rEvIZUSZ3fx39WIi3JkQqQBitGwpELBIYWeBVh6wn+E= -github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.6 h1:GgblEiDzxf5ajlAZY4aC8xp7DwkrGfauFNMGdB2bBv0= +github.com/envoyproxy/go-control-plane v0.9.6/go.mod h1:GFqM7v0B62MraO4PWRedIbhThr/Rf7ev6aHOOPXeaDA= github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= @@ -19,15 +22,24 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I= -github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -65,6 +77,15 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc h1:/hemPrYIhOhy8zYrNj+069zDB68us2sMGsfkFJO0iZs= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/security/advancedtls/go.sum b/security/advancedtls/go.sum index f2ab78d92322..441c63c7fd17 100644 --- a/security/advancedtls/go.sum +++ b/security/advancedtls/go.sum @@ -36,9 +36,10 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20200313221541-5f7e5dd04533/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.6/go.mod h1:GFqM7v0B62MraO4PWRedIbhThr/Rf7ev6aHOOPXeaDA= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -104,6 +105,7 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index a56acb7e293d..3f46241a0866 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -690,7 +690,7 @@ func (s) TestEDS_LoadReport(t *testing.T) { lsWrapper := &loadStoreWrapper{} lsWrapper.update(loadStore, testClusterNames[0]) cw := &xdsClientWrapper{ - load: lsWrapper, + loadWrapper: lsWrapper, } cc := testutils.NewTestClientConn(t) diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go index f22c1624a3b6..b75282179098 100644 --- a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go +++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go @@ -35,8 +35,7 @@ import ( // balancer. It's defined so we can override xdsclientNew function in tests. type xdsClientInterface interface { WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func()) - LoadStore() *load.Store - ReportLoad(server string, clusterName string) (cancel func()) + ReportLoad(server string) (loadStore *load.Store, cancel func()) Close() } @@ -102,7 +101,12 @@ type xdsClientWrapper struct { // xdsClient could come from attributes, or created with balancerName. xdsClient xdsClientInterface - load *loadStoreWrapper + // loadWrapper is a wrapper with loadOriginal, with clusterName and + // edsServiceName. It's used children to report loads. + loadWrapper *loadStoreWrapper + // loadOriginal is the load.Store for reporting loads to lrsServerName. It's + // returned by the client. + loadOriginal *load.Store // edsServiceName is the edsServiceName currently being watched, not // necessary the edsServiceName from service config. // @@ -127,7 +131,7 @@ func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), bb logger: logger, newEDSUpdate: newEDSUpdate, bbo: bbo, - load: &loadStoreWrapper{}, + loadWrapper: &loadStoreWrapper{}, } } @@ -248,15 +252,15 @@ func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) { } c.loadReportServer = loadReportServer if c.loadReportServer != nil { - c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, c.edsServiceName) + c.loadOriginal, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer) } } func (c *xdsClientWrapper) loadStore() load.PerClusterReporter { - if c == nil || c.load.store == nil { + if c == nil || c.loadWrapper.store == nil { return nil } - return c.load + return c.loadWrapper } // handleUpdate applies the service config and attributes updates to the client, @@ -264,12 +268,25 @@ func (c *xdsClientWrapper) loadStore() load.PerClusterReporter { func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) { clientChanged := c.updateXDSClient(config, attr) + var updateLoadStore bool + // Need to restart EDS watch when one of the following happens: // - the xds_client is updated // - the xds_client didn't change, but the edsServiceName changed if clientChanged || c.edsServiceName != config.EDSServiceName { c.edsServiceName = config.EDSServiceName c.startEndpointsWatch() + updateLoadStore = true + } + + // Only need to restart load reporting when: + // - the loadReportServer name changed + if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) { + c.startLoadReport(config.LrsLoadReportingServerName) + updateLoadStore = true + } + + if updateLoadStore { // TODO: this update for the LRS service name is too early. It should // only apply to the new EDS response. But this is applied to the RPCs // before the new EDS response. To fully fix this, the EDS balancer @@ -277,14 +294,9 @@ func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attr // // This is OK for now, because we don't actually expect edsServiceName // to change. Fix this (a bigger change) will happen later. - c.load.update(c.xdsClient.LoadStore(), c.edsServiceName) + c.loadWrapper.update(c.loadOriginal, c.edsServiceName) } - // Only need to restart load reporting when: - // - the loadReportServer name changed - if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) { - c.startLoadReport(config.LrsLoadReportingServerName) - } } func (c *xdsClientWrapper) cancelWatch() { diff --git a/xds/internal/balancer/edsbalancer/xds_lrs_test.go b/xds/internal/balancer/edsbalancer/xds_lrs_test.go index 8d888ec6f3b0..955f54401c86 100644 --- a/xds/internal/balancer/edsbalancer/xds_lrs_test.go +++ b/xds/internal/balancer/edsbalancer/xds_lrs_test.go @@ -66,7 +66,7 @@ func (s) TestXDSLoadReporting(t *testing.T) { if err != nil { t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) } - if got.Server != "" || got.Cluster != testEDSClusterName { - t.Fatalf("xdsClient.ReportLoad called with {%v, %v}: want {\"\", %v}", got.Server, got.Cluster, testEDSClusterName) + if got.Server != "" { + t.Fatalf("xdsClient.ReportLoad called with {%v}: want {\"\"}", got.Server) } } diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go index 1361fb15728f..4983b30ff47f 100644 --- a/xds/internal/balancer/lrs/balancer.go +++ b/xds/internal/balancer/lrs/balancer.go @@ -144,8 +144,7 @@ func (ccw *ccWrapper) UpdateState(s balancer.State) { // xdsClientInterface contains only the xds_client methods needed by LRS // balancer. It's defined so we can override xdsclient in tests. type xdsClientInterface interface { - LoadStore() *load.Store - ReportLoad(server string, clusterName string) func() + ReportLoad(server string) (*load.Store, func()) Close() } @@ -199,12 +198,17 @@ type xdsClientWrapper struct { clusterName string edsServiceName string lrsServerName string - load *loadStoreWrapper + // loadOriginal is the load.Store for reporting loads to lrsServerName. It's + // returned by the client. + loadOriginal *load.Store + // loadWrapper is a wrapper with loadOriginal, with clusterName and + // edsServiceName. It's used children to report loads. + loadWrapper *loadStoreWrapper } func newXDSClientWrapper() *xdsClientWrapper { return &xdsClientWrapper{ - load: &loadStoreWrapper{}, + loadWrapper: &loadStoreWrapper{}, } } @@ -220,7 +224,6 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attribut if w.c != clientFromAttr { // xds client is different, restart. restartLoadReport = true - updateLoadStore = true w.c = clientFromAttr } } @@ -244,34 +247,37 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attribut w.lrsServerName = newConfig.LrsLoadReportingServerName } - // This updates the clusterName and serviceName that will reported for the - // loads. The update here is too early, the perfect timing is when the - // picker is updated with the new connection. But from this balancer's point - // of view, it's impossible to tell. - // - // On the other hand, this will almost never happen. Each LRS policy - // shouldn't get updated config. The parent should do a graceful switch when - // the clusterName or serviceName is changed. - if updateLoadStore { - w.load.update(w.c.LoadStore(), w.clusterName, w.edsServiceName) - } - if restartLoadReport { + updateLoadStore = true if w.cancelLoadReport != nil { w.cancelLoadReport() w.cancelLoadReport = nil } if w.c != nil { - w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName, w.clusterName) + w.loadOriginal, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName) + } + } + + if updateLoadStore { + // This updates the clusterName and serviceName that will reported for the + // loads. The update here is too early, the perfect timing is when the + // picker is updated with the new connection. But from this balancer's point + // of view, it's impossible to tell. + // + // On the other hand, this will almost never happen. Each LRS policy + // shouldn't get updated config. The parent should do a graceful switch when + // the clusterName or serviceName is changed. + if updateLoadStore { + w.loadWrapper.update(w.loadOriginal, w.clusterName, w.edsServiceName) } } } func (w *xdsClientWrapper) loadStore() load.PerClusterReporter { - if w.load.store == nil { + if w.loadWrapper.store == nil { return nil } - return w.load + return w.loadWrapper } func (w *xdsClientWrapper) close() { diff --git a/xds/internal/balancer/lrs/balancer_test.go b/xds/internal/balancer/lrs/balancer_test.go index 789cfea0c00a..38dd573ef14b 100644 --- a/xds/internal/balancer/lrs/balancer_test.go +++ b/xds/internal/balancer/lrs/balancer_test.go @@ -84,8 +84,8 @@ func TestLoadReporting(t *testing.T) { if err != nil { t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) } - if got.Server != testLRSServerName || got.Cluster != testClusterName { - t.Fatalf("xdsClient.ReportLoad called with {%q, %q}: want {%q, %q}", got.Server, got.Cluster, testLRSServerName, testClusterName) + if got.Server != testLRSServerName { + t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName) } sc1 := <-cc.NewSubConnCh diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index b2afed0c0140..ecaa886c4d1a 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -30,6 +30,7 @@ import ( v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" "github.com/golang/protobuf/proto" + load2 "google.golang.org/grpc/xds/internal/client/load" "google.golang.org/grpc" "google.golang.org/grpc/internal/backoff" @@ -39,7 +40,6 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/client/bootstrap" - "google.golang.org/grpc/xds/internal/client/load" "google.golang.org/grpc/xds/internal/version" ) @@ -78,9 +78,6 @@ type BuildOptions struct { // Backoff returns the amount of time to backoff before retrying broken // streams. Backoff func(int) time.Duration - // LoadStore contains load reports which need to be pushed to the management - // server. - LoadStore *load.Store // Logger provides enhanced logging capabilities. Logger *grpclog.PrefixLogger } @@ -118,10 +115,7 @@ type APIClient interface { // LoadReportingOptions contains configuration knobs for reporting load data. type LoadReportingOptions struct { - // ClusterName is the cluster name for which load is being reported. - ClusterName string - // TargetName is the target of the parent ClientConn. - TargetName string + load *load2.Store } // UpdateHandler receives and processes (by taking appropriate actions) xDS @@ -292,7 +286,6 @@ type Client struct { opts Options cc *grpc.ClientConn // Connection to the xDS server apiClient APIClient - loadStore *load.Store logger *grpclog.PrefixLogger @@ -306,6 +299,9 @@ type Client struct { cdsCache map[string]ClusterUpdate edsWatchers map[string]map[*watchInfo]bool edsCache map[string]EndpointsUpdate + + lrsMu sync.Mutex + lrsClients map[string]*lrsClient } // New returns a new xdsClient configured with opts. @@ -346,9 +342,8 @@ func New(opts Options) (*Client, error) { } c := &Client{ - done: grpcsync.NewEvent(), - opts: opts, - loadStore: load.NewStore(), + done: grpcsync.NewEvent(), + opts: opts, updateCh: buffer.NewUnbounded(), ldsWatchers: make(map[string]map[*watchInfo]bool), @@ -374,7 +369,6 @@ func New(opts Options) (*Client, error) { Parent: c, NodeProto: opts.Config.NodeProto, Backoff: backoff.DefaultExponential.Backoff, - LoadStore: c.loadStore, Logger: c.logger, }) if err != nil { diff --git a/xds/internal/client/client_loadreport.go b/xds/internal/client/client_loadreport.go index e52c3b93f90e..b331ca2629ef 100644 --- a/xds/internal/client/client_loadreport.go +++ b/xds/internal/client/client_loadreport.go @@ -22,30 +22,46 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/xds/internal/client/load" + load2 "google.golang.org/grpc/xds/internal/client/load" ) // NodeMetadataHostnameKey is the metadata key for specifying the target name in // the node proto of an LRS request. const NodeMetadataHostnameKey = "PROXYLESS_CLIENT_HOSTNAME" -// LoadStore returns the underlying load data store used by the xDS client. -func (c *Client) LoadStore() *load.Store { - return c.loadStore +// lrsClient maps to one lrsServer. It contains: +// - a ClientConn to this server (only if it's different from the xds server) +// - a load.Store that contains loads only for this server +type lrsClient struct { + refCount int + cancel func() + cc *grpc.ClientConn // nil if the server is same as the xds server + load *load2.Store } -// ReportLoad sends the load of the given clusterName to the given server. If -// the server is not an empty string, and is different from the xds server, a -// new ClientConn will be created. +// ReportLoad starts an load reporting stream to the given server. If the server +// is not an empty string, and is different from the xds server, a new +// ClientConn will be created. // // The same options used for creating the Client will be used (including // NodeProto, and dial options if necessary). // -// It returns a function to cancel the load reporting stream. If server is -// different from xds server, the ClientConn will also be closed. -func (c *Client) ReportLoad(server string, clusterName string) func() { +// It returns a Store for the user to report loads, a function to cancel the +// load reporting stream. +func (c *Client) ReportLoad(server string) (*load.Store, func()) { + c.lrsMu.Lock() + defer c.lrsMu.Unlock() + + // If there's already a client to this server, use it. + if c, ok := c.lrsClients[server]; ok { + c.refCount++ + return c.load, c.cancel + } + + // First reporting stream to this server. var ( - cc *grpc.ClientConn - closeCC bool + cc *grpc.ClientConn + newCC bool ) c.logger.Infof("Starting load report to server: %s", server) if server == "" || server == c.opts.Config.BalancerName { @@ -57,20 +73,35 @@ func (c *Client) ReportLoad(server string, clusterName string) func() { if err != nil { // An error from a non-blocking dial indicates something serious. c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err) - return func() {} + return nil, func() {} } cc = ccNew - closeCC = true + newCC = true } + + store := load.NewStore() ctx, cancel := context.WithCancel(context.Background()) go c.apiClient.ReportLoad(ctx, c.cc, LoadReportingOptions{ - ClusterName: clusterName, - TargetName: c.opts.TargetName, + load: store, }) - return func() { - cancel() - if closeCC { - cc.Close() + + lrsC := &lrsClient{ + refCount: 1, + cancel: cancel, + load: store, + } + if newCC { + lrsC.cc = cc + } + c.lrsClients[server] = lrsC + + return store, func() { + c.lrsMu.Lock() + defer c.lrsMu.Unlock() + lrsC.cancel() + lrsC.refCount-- + if lrsC.refCount == 0 && lrsC.cc != nil { + lrsC.cc.Close() } } } diff --git a/xds/internal/client/transport_helper.go b/xds/internal/client/transport_helper.go index 3ce1f8721b3b..1cf72c2740fe 100644 --- a/xds/internal/client/transport_helper.go +++ b/xds/internal/client/transport_helper.go @@ -24,6 +24,7 @@ import ( "time" "github.com/golang/protobuf/proto" + load2 "google.golang.org/grpc/xds/internal/client/load" "google.golang.org/grpc" "google.golang.org/grpc/internal/buffer" @@ -71,19 +72,21 @@ type VersionedClient interface { NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) // SendFirstLoadStatsRequest constructs and sends the first request on the - // LRS stream. This contains the node proto with appropriate metadata - // fields. - SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error + // LRS stream. + SendFirstLoadStatsRequest(s grpc.ClientStream) error // HandleLoadStatsResponse receives the first response from the server which // contains the load reporting interval and the clusters for which the // server asks the client to report load for. - HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error) + // + // If the response sets SendAllClusters to true, the returned clusters is + // nil. + HandleLoadStatsResponse(s grpc.ClientStream) (clusters []string, _ time.Duration, _ error) // SendLoadStatsRequest will be invoked at regular intervals to send load // report with load data reported since the last time this method was // invoked. - SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error + SendLoadStatsRequest(s grpc.ClientStream, store *load2.Store, clusterNames []string) error } // TransportHelper contains all xDS transport protocol related functionality @@ -469,23 +472,23 @@ func (t *TransportHelper) ReportLoad(ctx context.Context, cc *grpc.ClientConn, o } logger.Infof("lrs: created LRS stream") - if err := t.vClient.SendFirstLoadStatsRequest(stream, opts.TargetName); err != nil { + if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil { logger.Warningf("lrs: failed to send first request: %v", err) continue } - interval, err := t.vClient.HandleLoadStatsResponse(stream, opts.ClusterName) + clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream) if err != nil { logger.Warning(err) continue } retries = 0 - t.sendLoads(ctx, stream, opts.ClusterName, interval) + t.sendLoads(ctx, stream, opts.load, clusters, interval) } } -func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, clusterName string, interval time.Duration) { +func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, store *load2.Store, clusterNames []string, interval time.Duration) { tick := time.NewTicker(interval) defer tick.Stop() for { @@ -494,7 +497,7 @@ func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStrea case <-ctx.Done(): return } - if err := t.vClient.SendLoadStatsRequest(stream, clusterName); err != nil { + if err := t.vClient.SendLoadStatsRequest(stream, store, clusterNames); err != nil { logger.Warning(err) return } diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go index c3de39c3a853..96bd5e9b5686 100644 --- a/xds/internal/client/v2/client.go +++ b/xds/internal/client/v2/client.go @@ -28,7 +28,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/internal/grpclog" xdsclient "google.golang.org/grpc/xds/internal/client" - "google.golang.org/grpc/xds/internal/client/load" "google.golang.org/grpc/xds/internal/version" v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" @@ -68,7 +67,6 @@ func newClient(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIC cc: cc, parent: opts.Parent, nodeProto: nodeProto, - loadStore: opts.LoadStore, logger: opts.Logger, } v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) @@ -87,7 +85,6 @@ type client struct { ctx context.Context cancelCtx context.CancelFunc parent xdsclient.UpdateHandler - loadStore *load.Store logger *grpclog.PrefixLogger // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. diff --git a/xds/internal/client/v2/loadreport.go b/xds/internal/client/v2/loadreport.go index a06dcb8e9f0d..78a3063e340b 100644 --- a/xds/internal/client/v2/loadreport.go +++ b/xds/internal/client/v2/loadreport.go @@ -26,17 +26,18 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/xds/internal/client/load" v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" v2endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" - structpb "github.com/golang/protobuf/ptypes/struct" "google.golang.org/grpc" "google.golang.org/grpc/xds/internal" - xdsclient "google.golang.org/grpc/xds/internal/client" ) +const clientFeatureLRSSendAllClusters = "envoy.lrs.supports_send_all_clusters" + type lrsStream lrsgrpc.LoadReportingService_StreamLoadStatsClient func (v2c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) { @@ -44,7 +45,7 @@ func (v2c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) return c.StreamLoadStats(ctx) } -func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error { +func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error { stream, ok := s.(lrsStream) if !ok { return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s) @@ -53,71 +54,52 @@ func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName str if node == nil { node = &v2corepb.Node{} } - if node.Metadata == nil { - node.Metadata = &structpb.Struct{} - } - if node.Metadata.Fields == nil { - node.Metadata.Fields = make(map[string]*structpb.Value) - } - node.Metadata.Fields[xdsclient.NodeMetadataHostnameKey] = &structpb.Value{ - Kind: &structpb.Value_StringValue{StringValue: targetName}, - } + node.ClientFeatures = append(node.ClientFeatures, clientFeatureLRSSendAllClusters) req := &lrspb.LoadStatsRequest{Node: node} v2c.logger.Infof("lrs: sending init LoadStatsRequest: %v", req) return stream.Send(req) } -func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error) { +func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) { stream, ok := s.(lrsStream) if !ok { - return 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s) + return nil, 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s) } resp, err := stream.Recv() if err != nil { - return 0, fmt.Errorf("lrs: failed to receive first response: %v", err) + return nil, 0, fmt.Errorf("lrs: failed to receive first response: %v", err) } v2c.logger.Infof("lrs: received first LoadStatsResponse: %+v", resp) interval, err := ptypes.Duration(resp.GetLoadReportingInterval()) if err != nil { - return 0, fmt.Errorf("lrs: failed to convert report interval: %v", err) + return nil, 0, fmt.Errorf("lrs: failed to convert report interval: %v", err) } - // The LRS client should join the clusters it knows with the cluster - // list from response, and send loads for them. - // - // But the LRS client now only supports one cluster. TODO: extend it to - // support multiple clusters. - var clusterFoundInResponse bool - for _, c := range resp.Clusters { - if c == clusterName { - clusterFoundInResponse = true - } - } - if !clusterFoundInResponse { - return 0, fmt.Errorf("lrs: received clusters %v does not contain expected {%v}", resp.Clusters, clusterName) - } if resp.ReportEndpointGranularity { // TODO: fixme to support per endpoint loads. - return 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation") + return nil, 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation") } - return interval, nil + clusters := resp.Clusters + if resp.SendAllClusters { + // Return nil to send stats for all clusters. + clusters = nil + } + + return clusters, interval, nil } -func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error { +func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loadStore *load.Store, clusterNames []string) error { stream, ok := s.(lrsStream) if !ok { return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s) } - if v2c.loadStore == nil { - return errors.New("lrs: LoadStore is not initialized") - } var clusterStats []*v2endpointpb.ClusterStats - sds := v2c.loadStore.Stats([]string{clusterName}) + sds := loadStore.Stats(clusterNames) for _, sd := range sds { var ( droppedReqs []*v2endpointpb.ClusterStats_DroppedRequests diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go index 9894280ede32..edc52694eab4 100644 --- a/xds/internal/client/v3/client.go +++ b/xds/internal/client/v3/client.go @@ -28,7 +28,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/internal/grpclog" xdsclient "google.golang.org/grpc/xds/internal/client" - "google.golang.org/grpc/xds/internal/client/load" "google.golang.org/grpc/xds/internal/version" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -68,7 +67,6 @@ func newClient(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIC cc: cc, parent: opts.Parent, nodeProto: nodeProto, - loadStore: opts.LoadStore, logger: opts.Logger, } v3c.ctx, v3c.cancelCtx = context.WithCancel(context.Background()) @@ -87,7 +85,6 @@ type client struct { ctx context.Context cancelCtx context.CancelFunc parent xdsclient.UpdateHandler - loadStore *load.Store logger *grpclog.PrefixLogger // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. diff --git a/xds/internal/client/v3/loadreport.go b/xds/internal/client/v3/loadreport.go index beca34c49bfd..4e921f308bdc 100644 --- a/xds/internal/client/v3/loadreport.go +++ b/xds/internal/client/v3/loadreport.go @@ -26,17 +26,18 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/xds/internal/client/load" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" - structpb "github.com/golang/protobuf/ptypes/struct" "google.golang.org/grpc" "google.golang.org/grpc/xds/internal" - xdsclient "google.golang.org/grpc/xds/internal/client" ) +const clientFeatureLRSSendAllClusters = "envoy.lrs.supports_send_all_clusters" + type lrsStream lrsgrpc.LoadReportingService_StreamLoadStatsClient func (v3c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) { @@ -44,7 +45,7 @@ func (v3c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) return c.StreamLoadStats(ctx) } -func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error { +func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error { stream, ok := s.(lrsStream) if !ok { return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s) @@ -53,71 +54,52 @@ func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName str if node == nil { node = &v3corepb.Node{} } - if node.Metadata == nil { - node.Metadata = &structpb.Struct{} - } - if node.Metadata.Fields == nil { - node.Metadata.Fields = make(map[string]*structpb.Value) - } - node.Metadata.Fields[xdsclient.NodeMetadataHostnameKey] = &structpb.Value{ - Kind: &structpb.Value_StringValue{StringValue: targetName}, - } + node.ClientFeatures = append(node.ClientFeatures, clientFeatureLRSSendAllClusters) req := &lrspb.LoadStatsRequest{Node: node} v3c.logger.Infof("lrs: sending init LoadStatsRequest: %v", req) return stream.Send(req) } -func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error) { +func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) { stream, ok := s.(lrsStream) if !ok { - return 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s) + return nil, 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s) } resp, err := stream.Recv() if err != nil { - return 0, fmt.Errorf("lrs: failed to receive first response: %v", err) + return nil, 0, fmt.Errorf("lrs: failed to receive first response: %v", err) } v3c.logger.Infof("lrs: received first LoadStatsResponse: %+v", resp) interval, err := ptypes.Duration(resp.GetLoadReportingInterval()) if err != nil { - return 0, fmt.Errorf("lrs: failed to convert report interval: %v", err) + return nil, 0, fmt.Errorf("lrs: failed to convert report interval: %v", err) } - // The LRS client should join the clusters it knows with the cluster - // list from response, and send loads for them. - // - // But the LRS client now only supports one cluster. TODO: extend it to - // support multiple clusters. - var clusterFoundInResponse bool - for _, c := range resp.Clusters { - if c == clusterName { - clusterFoundInResponse = true - } - } - if !clusterFoundInResponse { - return 0, fmt.Errorf("lrs: received clusters %v does not contain expected {%v}", resp.Clusters, clusterName) - } if resp.ReportEndpointGranularity { // TODO: fixme to support per endpoint loads. - return 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation") + return nil, 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation") } - return interval, nil + clusters := resp.Clusters + if resp.SendAllClusters { + // Return nil to send stats for all clusters. + clusters = nil + } + + return clusters, interval, nil } -func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error { +func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, loadStore *load.Store, clusterNames []string) error { stream, ok := s.(lrsStream) if !ok { return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s) } - if v3c.loadStore == nil { - return errors.New("lrs: LoadStore is not initialized") - } var clusterStats []*v3endpointpb.ClusterStats - sds := v3c.loadStore.Stats([]string{clusterName}) + sds := loadStore.Stats(clusterNames) for _, sd := range sds { var ( droppedReqs []*v3endpointpb.ClusterStats_DroppedRequests diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go index cd2710e612aa..408817c17846 100644 --- a/xds/internal/testutils/fakeclient/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -156,14 +156,12 @@ func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) error { type ReportLoadArgs struct { // Server is the name of the server to which the load is reported. Server string - // Cluster is the name of the cluster for which load is reported. - Cluster string } // ReportLoad starts reporting load about clusterName to server. -func (xdsC *Client) ReportLoad(server string, clusterName string) (cancel func()) { - xdsC.loadReportCh.Send(ReportLoadArgs{Server: server, Cluster: clusterName}) - return func() {} +func (xdsC *Client) ReportLoad(server string) (loadStore *load.Store, cancel func()) { + xdsC.loadReportCh.Send(ReportLoadArgs{Server: server}) + return xdsC.loadStore, func() {} } // LoadStore returns the underlying load data store.