Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Fix xDS Server in the case of dynamic RDS #6889

Closed
wants to merge 18 commits into from

Conversation

zasweq
Copy link
Contributor

@zasweq zasweq commented Dec 21, 2023

Fix the xDS Server to align with spec in https://github.com/grpc/proposal/blob/master/A36-xds-for-servers.md.

xds/internal/server/listener_wrapper:

  • Wait for all resources (LDS and RDS) to transition into serving, and if not serving accept() connections then close(). If received another LDS configuration, keep serving using old LDS until the new LDS has full RDS resources (i.e. each RDS resource has received an update or an error), then gracefully drain connections that were accepted and configured with old LDS, and switch to new LDS for new accepted connections.
  • Build all routing configurations for Filter Chains when LDS goes READY.
  • Build routing configuration dynamically using LDS + RDS on any RDS update.
  • Handle mode switches inline, drain server transports on any transition into non serving, accept() + close() if not serving.

xds/internal/server/conn_wrapper: Read routing configuration dynamically through an atomic load of a pointer, also add L7 error representation.

xds/internal/server/rds_handler: Persist rds updates and error conditions for each specific route resource being watched. Add a helper which determines whether all dynamic rds resources needed have been received. RDS errors mean the RDS has received configuration, trigger failure at L7 routing layer against incoming RPCs.

xds/server: Don’t block on a “good update”, immediately start serving on lis passed into Serve once wrapped, if xDS resources aren’t ready or LDS returns resource not found Accept() and Close() in not serving mode. Log any L7 routing errors due to xDS Configuration problems.

server: Call a callback with the server transport once it’s created on the Conn. This gives access to the server transport to xDS layer, which will be gracefully closed on transitions into not serving and transitions to a new LDS configuration. It also guarantees at some point the server transport will be gracefully drained. This replaces the Drain() operation previously present.

All LDS + RDS callbacks from the client are synchronous. The events that can happen asynchronous are the Accepting of a Conn (protected by mutex) and also an RPC on the Conn (protected with an atomic pointer). Thus dropped spawned event goroutines handleServingModeChanges() in the xDS Server, and run() on the listener_wrapper. Handled all the xDS resources inline and synced data structures/behaviors using above methods. L7 error = fail RPC with status code UNAVAILABLE (don’t give back more information due to security risk).

Fixes #6788.

RELEASE NOTES:

  • xds/server: Fix xDS Server RDS handling

@zasweq zasweq requested a review from easwars December 21, 2023 21:43
@zasweq zasweq added this to the 1.61 Release milestone Dec 21, 2023
@zasweq zasweq changed the title Fix xDS Server in the case of dynamic RDS xds: Fix xDS Server in the case of dynamic RDS Dec 21, 2023
Copy link
Contributor

@easwars easwars left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made as detailed as pass as I could. Hopefully this is helpful while you wait for Doug to return.

// DefaultServerListenerWithRouteConfigName returns a basic xds Listener
// resource to be used on the server side. The returned Listener resource
// contains a RouteCongiguration resource name that needs to be resolved.
func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing function DefaultServerListener calls defaultServerListenerCommon whose last parameter indicates whether the route configuration is to be inlined or not? Can the implementation of this function be replaced with a one-liner return defaultServerListenerCommon(host, port, secLevel, routeName, false)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch. This was left over from when I wrote the e2e test to trigger failure.

// specifies to route to a route specfying non forwarding action. This is
// intended to be used on the server side for RDS requests, and corresponds to
// the inline route configuration in DefaultServerListener.
func RouteConfigNonForwardingTarget(routeName string) *v3routepb.RouteConfiguration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently have a RouteConfigOptions struct that is passed to RouteConfigResourceWithOptions. Can the options struct be extended to include a field which specifies the type of route action instead of having separate functions for each type of route action?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this, and I think what I have now is better/simpler. The issue is the route config's route action is currently coupeled to the cluster specifier type, and also has another layer of knobs for each specific route action. This is just the cluster specifier type cluster, but adding a knob that only applies to one cluster specifier plugin feels weird to me. Thank you for the suggestion though.

Comment on lines +1291 to +1294
// CallbackConn is a conn with a callback function.
type CallbackConn interface {
Callback(ServerTransport)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See: go/go-style/decisions#interfaces

Go interfaces generally belong in the package that consumes values of the interface type, 
not a package that implements the interface type.

Why does this interface have to be defined in the internal/transport package? I don't see this package consuming this interface or implementing it.

Also, this interface is utterly underspecified. It does not explain any of the following:

  • when is this method called?
  • is it called only ever once or can it called multiple times?
  • who calls this method?
  • who implements the interface?
  • what are implementations supposed to do in this callback?
  • do implementations have to be non-blocking in this callback?

From the PR description:

server: Call a callback with the server transport once it’s created on the Conn.
This gives access to the server transport to xDS layer, which will be gracefully
closed on transitions into not serving and transitions to a new LDS configuration.
It also guarantees at some point the server transport will be gracefully drained.
This replaces the Drain() operation previously present.

I don't see the problem with the existing approach of draining server transports that this new method overcomes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method guarantees synchornization between the Conns closing, and a new Conn being accepted. Previously, there was no way in the case you yielded thread after an Accept and synchronized with the possibility of a new Conn being added to the map, so it races. This guarantees synchronization between those components. Will document this further. (You don't have the server transport object until it wraps the accepted conn with the http2_server, so we were doing it wrong). Previously, the server would drain all the conns.

// Accept() for all incoming connections, but writes happen rarely (when we
// get a Listener resource update).
// mu guards access to the current serving mode and the active filter chain
// manager.
mu sync.RWMutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this is defined as a read-write mutex in existing code as well. But I don't see a reason why this needs to be a read-write mutex. Currently, the only place where a write lock is acquired is from the for loop in Accept. There cannot be more than one goroutine executing that for loop concurrently.

Also, using a read-write mutex needs to be done with extreme caution. We don't have tools that prevent us from writing to the data that is being protected by this mutex when holding only a read lock. I personally debugged a race due to a bug where we were writing while only holding the read lock, and it was not fun.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write lock is also held in maybeUpdateFilterChains, called in both LDS and RDS flow, which can come in async. It also is potentially held at the beginning of LDS processing, and in an LDS Resource Not Found.

// Filter chains received as part of the last good update.
filterChains *xdsresource.FilterChainManager
// Filter chain manager currently serving.
activeFilterChainManager *xdsresource.FilterChainManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this field is being accessed from handleRDSUpdate where we are not holding the lock.

It is also accessed from instantiateFilterChainRoutingConfigurations which does not hold the lock. I see that it is currently being called with the lock held, but that doesn't guarantee that it will always be called with the lock held in the future as well. Please mark functions with a Locked suffix to indicate that they need to be called while holding a mutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added locked. The reason it is ok to read that in handleRDSUpdate, is it can only be written to in instantiateFilterChainRoutingConfigurations, which is guaranteed to be sync (as also called within an xDS update, which is all sync).

waitForFailedRPCWithStatusCode(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding"))
}

// waitForFailedRPCWithStatusCode makes Unary RPC's until it receives the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/Unary/unary/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

t.Helper()

c := testgrpc.NewTestServiceClient(cc)
ticker := time.NewTicker(1 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One second is a lot of time. Something like 10ms should be fine.

Copy link
Contributor Author

@zasweq zasweq Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched. This was one second in master, wonder why (I switched it to expect a certain status and error and to use a ticker).

for {
select {
case <-ctx.Done():
t.Fatalf("failure when waiting for RPCs to fail with certain status %v: %v. most recent error received from RPC: %v", sts, ctx.Err(), err.Error())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message puts want before got. See: go/go-style/decisions#got-before-want

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sts is the want, so I think this is correct here.

_, err = c.EmptyCall(ctx, &testpb.Empty{})
for _, st := range sts {
if status.Code(err) == st.Code() && strings.Contains(err.Error(), st.Message()) {
t.Logf("most recent error happy case: %v", err.Error())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this logline useful when the test fails?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it is part of the t.Fatalf argument above.

// Close any new connections. After it has received the resource not found
// error, the server should move to serving, successfully Accept Connections,
// and fail at the L7 level with resource not found specified.
func (s) TestResourceNotFoundRDS(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a lot of WIP tests from this point on. So, I'm stopping here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@easwars easwars assigned zasweq and unassigned easwars Dec 26, 2023
Copy link

codecov bot commented Jan 3, 2024

Codecov Report

Merging #6889 (3d35b6d) into master (bb0d32f) will increase coverage by 0.10%.
Report is 7 commits behind head on master.
The diff coverage is 80.99%.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #6889      +/-   ##
==========================================
+ Coverage   83.65%   83.76%   +0.10%     
==========================================
  Files         286      287       +1     
  Lines       30756    30928     +172     
==========================================
+ Hits        25730    25907     +177     
- Misses       3963     3965       +2     
+ Partials     1063     1056       -7     
Files Coverage Δ
internal/transport/http2_server.go 89.85% <ø> (+0.56%) ⬆️
server.go 81.11% <100.00%> (-0.35%) ⬇️
xds/internal/xdsclient/client_new.go 85.89% <100.00%> (+2.82%) ⬆️
xds/xds.go 34.61% <ø> (ø)
xds/internal/server/conn_wrapper.go 73.80% <84.21%> (+2.16%) ⬆️
xds/server.go 82.73% <80.00%> (-3.77%) ⬇️
xds/internal/xdsclient/xdsresource/filter_chain.go 92.78% <91.66%> (+0.81%) ⬆️
xds/internal/xdsclient/clientimpl_watchers.go 76.36% <53.84%> (-6.97%) ⬇️
...ds/internal/xdsclient/xdsresource/resource_type.go 78.57% <71.42%> (-21.43%) ⬇️
xds/internal/xdsclient/authority.go 89.92% <57.14%> (-1.89%) ⬇️
... and 2 more

... and 21 files with indirect coverage changes

@ginayeh ginayeh requested a review from dfawley January 3, 2024 18:10
@zasweq zasweq assigned dfawley and unassigned zasweq Jan 8, 2024
@zasweq
Copy link
Contributor Author

zasweq commented Jan 10, 2024

Continued in #6915. The main thing about this PR is the diff with respect to Easwar's comments, as I did it in one commit :).

@zasweq zasweq closed this Jan 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support dynamic RDS server side
3 participants