Skip to content

Commit

Permalink
[lrs_stream_report] handle multiple clusters in LRS stream
Browse files Browse the repository at this point in the history
- xdsclient.ReportLoad
  - create one LRS stream and one load.Store for each server address
  - to take just the server address, and return the load.Store
  - update EDS and LRS balancing policy to recreate LRS stream only when server address changes
- LRS stream (v2 and v3)
  - set feature `send_all_clusters` in `node`
  - handle `resp.Clusters` and only send load for those clusters
  - handle `resp.SendAllClusters` and send load for all clusters
  • Loading branch information
menghanl committed Oct 7, 2020
1 parent 5af6040 commit 985f6e5
Show file tree
Hide file tree
Showing 17 changed files with 208 additions and 180 deletions.
11 changes: 7 additions & 4 deletions examples/go.sum
Expand Up @@ -33,15 +33,17 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200313221541-5f7e5dd04533/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9 h1:cQ58MWbYGnI4x6Gk6FUzirMcMYUgvYOLa9fiO7chY1A=
github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.4 h1:rEvIZUSZ3fx39WIi3JkQqQBitGwpELBIYWeBVh6wn+E=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.6 h1:GgblEiDzxf5ajlAZY4aC8xp7DwkrGfauFNMGdB2bBv0=
github.com/envoyproxy/go-control-plane v0.9.6/go.mod h1:GFqM7v0B62MraO4PWRedIbhThr/Rf7ev6aHOOPXeaDA=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
Expand Down Expand Up @@ -110,6 +112,7 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Expand Up @@ -3,10 +3,10 @@ module google.golang.org/grpc
go 1.11

require (
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f
github.com/envoyproxy/go-control-plane v0.9.4
github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9
github.com/envoyproxy/go-control-plane v0.9.6
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.3.3
github.com/golang/protobuf v1.4.2
github.com/google/go-cmp v0.4.0
github.com/google/uuid v1.1.2
golang.org/x/net v0.0.0-20190311183353-d8887717615a
Expand Down
33 changes: 27 additions & 6 deletions go.sum
Expand Up @@ -5,11 +5,14 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200313221541-5f7e5dd04533/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9 h1:cQ58MWbYGnI4x6Gk6FUzirMcMYUgvYOLa9fiO7chY1A=
github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4 h1:rEvIZUSZ3fx39WIi3JkQqQBitGwpELBIYWeBVh6wn+E=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.6 h1:GgblEiDzxf5ajlAZY4aC8xp7DwkrGfauFNMGdB2bBv0=
github.com/envoyproxy/go-control-plane v0.9.6/go.mod h1:GFqM7v0B62MraO4PWRedIbhThr/Rf7ev6aHOOPXeaDA=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
Expand All @@ -19,15 +22,24 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -65,6 +77,15 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc h1:/hemPrYIhOhy8zYrNj+069zDB68us2sMGsfkFJO0iZs=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
6 changes: 4 additions & 2 deletions security/advancedtls/go.sum
Expand Up @@ -36,9 +36,10 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200313221541-5f7e5dd04533/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20200909154343-1f710aca26a9/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.6/go.mod h1:GFqM7v0B62MraO4PWRedIbhThr/Rf7ev6aHOOPXeaDA=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down Expand Up @@ -104,6 +105,7 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/edsbalancer/eds_impl_test.go
Expand Up @@ -690,7 +690,7 @@ func (s) TestEDS_LoadReport(t *testing.T) {
lsWrapper := &loadStoreWrapper{}
lsWrapper.update(loadStore, testClusterNames[0])
cw := &xdsClientWrapper{
load: lsWrapper,
loadWrapper: lsWrapper,
}

cc := testutils.NewTestClientConn(t)
Expand Down
38 changes: 25 additions & 13 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper.go
Expand Up @@ -35,8 +35,7 @@ import (
// balancer. It's defined so we can override xdsclientNew function in tests.
type xdsClientInterface interface {
WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func())
LoadStore() *load.Store
ReportLoad(server string, clusterName string) (cancel func())
ReportLoad(server string) (loadStore *load.Store, cancel func())
Close()
}

Expand Down Expand Up @@ -102,7 +101,12 @@ type xdsClientWrapper struct {
// xdsClient could come from attributes, or created with balancerName.
xdsClient xdsClientInterface

load *loadStoreWrapper
// loadWrapper is a wrapper with loadOriginal, with clusterName and
// edsServiceName. It's used children to report loads.
loadWrapper *loadStoreWrapper
// loadOriginal is the load.Store for reporting loads to lrsServerName. It's
// returned by the client.
loadOriginal *load.Store
// edsServiceName is the edsServiceName currently being watched, not
// necessary the edsServiceName from service config.
//
Expand All @@ -127,7 +131,7 @@ func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), bb
logger: logger,
newEDSUpdate: newEDSUpdate,
bbo: bbo,
load: &loadStoreWrapper{},
loadWrapper: &loadStoreWrapper{},
}
}

Expand Down Expand Up @@ -248,43 +252,51 @@ func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) {
}
c.loadReportServer = loadReportServer
if c.loadReportServer != nil {
c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, c.edsServiceName)
c.loadOriginal, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer)
}
}

func (c *xdsClientWrapper) loadStore() load.PerClusterReporter {
if c == nil || c.load.store == nil {
if c == nil || c.loadWrapper.store == nil {
return nil
}
return c.load
return c.loadWrapper
}

// handleUpdate applies the service config and attributes updates to the client,
// including updating the xds_client to use, and updating the EDS name to watch.
func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) {
clientChanged := c.updateXDSClient(config, attr)

var updateLoadStore bool

// Need to restart EDS watch when one of the following happens:
// - the xds_client is updated
// - the xds_client didn't change, but the edsServiceName changed
if clientChanged || c.edsServiceName != config.EDSServiceName {
c.edsServiceName = config.EDSServiceName
c.startEndpointsWatch()
updateLoadStore = true
}

// Only need to restart load reporting when:
// - the loadReportServer name changed
if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
c.startLoadReport(config.LrsLoadReportingServerName)
updateLoadStore = true
}

if updateLoadStore {
// TODO: this update for the LRS service name is too early. It should
// only apply to the new EDS response. But this is applied to the RPCs
// before the new EDS response. To fully fix this, the EDS balancer
// needs to do a graceful switch to another EDS implementation.
//
// This is OK for now, because we don't actually expect edsServiceName
// to change. Fix this (a bigger change) will happen later.
c.load.update(c.xdsClient.LoadStore(), c.edsServiceName)
c.loadWrapper.update(c.loadOriginal, c.edsServiceName)
}

// Only need to restart load reporting when:
// - the loadReportServer name changed
if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
c.startLoadReport(config.LrsLoadReportingServerName)
}
}

func (c *xdsClientWrapper) cancelWatch() {
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/edsbalancer/xds_lrs_test.go
Expand Up @@ -66,7 +66,7 @@ func (s) TestXDSLoadReporting(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != "" || got.Cluster != testEDSClusterName {
t.Fatalf("xdsClient.ReportLoad called with {%v, %v}: want {\"\", %v}", got.Server, got.Cluster, testEDSClusterName)
if got.Server != "" {
t.Fatalf("xdsClient.ReportLoad called with {%v}: want {\"\"}", got.Server)
}
}
46 changes: 26 additions & 20 deletions xds/internal/balancer/lrs/balancer.go
Expand Up @@ -144,8 +144,7 @@ func (ccw *ccWrapper) UpdateState(s balancer.State) {
// xdsClientInterface contains only the xds_client methods needed by LRS
// balancer. It's defined so we can override xdsclient in tests.
type xdsClientInterface interface {
LoadStore() *load.Store
ReportLoad(server string, clusterName string) func()
ReportLoad(server string) (*load.Store, func())
Close()
}

Expand Down Expand Up @@ -199,12 +198,17 @@ type xdsClientWrapper struct {
clusterName string
edsServiceName string
lrsServerName string
load *loadStoreWrapper
// loadOriginal is the load.Store for reporting loads to lrsServerName. It's
// returned by the client.
loadOriginal *load.Store
// loadWrapper is a wrapper with loadOriginal, with clusterName and
// edsServiceName. It's used children to report loads.
loadWrapper *loadStoreWrapper
}

func newXDSClientWrapper() *xdsClientWrapper {
return &xdsClientWrapper{
load: &loadStoreWrapper{},
loadWrapper: &loadStoreWrapper{},
}
}

Expand All @@ -220,7 +224,6 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attribut
if w.c != clientFromAttr {
// xds client is different, restart.
restartLoadReport = true
updateLoadStore = true
w.c = clientFromAttr
}
}
Expand All @@ -244,34 +247,37 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attribut
w.lrsServerName = newConfig.LrsLoadReportingServerName
}

// This updates the clusterName and serviceName that will reported for the
// loads. The update here is too early, the perfect timing is when the
// picker is updated with the new connection. But from this balancer's point
// of view, it's impossible to tell.
//
// On the other hand, this will almost never happen. Each LRS policy
// shouldn't get updated config. The parent should do a graceful switch when
// the clusterName or serviceName is changed.
if updateLoadStore {
w.load.update(w.c.LoadStore(), w.clusterName, w.edsServiceName)
}

if restartLoadReport {
updateLoadStore = true
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
if w.c != nil {
w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName, w.clusterName)
w.loadOriginal, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName)
}
}

if updateLoadStore {
// This updates the clusterName and serviceName that will reported for the
// loads. The update here is too early, the perfect timing is when the
// picker is updated with the new connection. But from this balancer's point
// of view, it's impossible to tell.
//
// On the other hand, this will almost never happen. Each LRS policy
// shouldn't get updated config. The parent should do a graceful switch when
// the clusterName or serviceName is changed.
if updateLoadStore {
w.loadWrapper.update(w.loadOriginal, w.clusterName, w.edsServiceName)
}
}
}

func (w *xdsClientWrapper) loadStore() load.PerClusterReporter {
if w.load.store == nil {
if w.loadWrapper.store == nil {
return nil
}
return w.load
return w.loadWrapper
}

func (w *xdsClientWrapper) close() {
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/lrs/balancer_test.go
Expand Up @@ -84,8 +84,8 @@ func TestLoadReporting(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerName || got.Cluster != testClusterName {
t.Fatalf("xdsClient.ReportLoad called with {%q, %q}: want {%q, %q}", got.Server, got.Cluster, testLRSServerName, testClusterName)
if got.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
}

sc1 := <-cc.NewSubConnCh
Expand Down

0 comments on commit 985f6e5

Please sign in to comment.