Skip to content

Commit

Permalink
client: implement proper config selector interceptors (#4235)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Mar 5, 2021
1 parent 930c791 commit 61f0b5f
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 48 deletions.
75 changes: 56 additions & 19 deletions internal/resolver/config_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"

"google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -54,30 +55,66 @@ type RPCConfig struct {
Interceptor ClientInterceptor
}

// ClientStream will ultimately be a superset of grpc.ClientStream as
// operations become necessary to support.
// ClientStream is the same as grpc.ClientStream, but defined here for circular
// dependency reasons.
type ClientStream interface {
// Done is invoked when the RPC is finished using its connection, or could
// not be assigned a connection. RPC operations may still occur on
// ClientStream after done is called, since the interceptor is invoked by
// application-layer operations.
Done()
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
CloseSend() error
// Context returns the context for this stream.
//
// It should not be called until after Header or RecvMsg has returned. Once
// called, subsequent client-side retries are disabled.
Context() context.Context
// SendMsg is generally called by generated code. On error, SendMsg aborts
// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the server. An
// untimely stream closure may result in lost messages. To ensure delivery,
// users should ensure the RPC completed successfully using RecvMsg.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
SendMsg(m interface{}) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
// status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(m interface{}) error
}

// NOPClientStream is a ClientStream that does nothing
type NOPClientStream struct{}

// Done is a nop.
func (NOPClientStream) Done() {}

var _ ClientStream = NOPClientStream{}

// ClientInterceptor is an interceptor for gRPC client streams.
type ClientInterceptor interface {
// NewStream can intercept ClientStream calls. The provided ClientStream
// should not be used during NewStream. RPCInfo.Context should not be used
// (will be nil).
NewStream(context.Context, RPCInfo, ClientStream) (context.Context, ClientStream, error)
// NewStream produces a ClientStream for an RPC which may optionally use
// the provided function to produce a stream for delegation. Note:
// RPCInfo.Context should not be used (will be nil).
//
// done is invoked when the RPC is finished using its connection, or could
// not be assigned a connection. RPC operations may still occur on
// ClientStream after done is called, since the interceptor is invoked by
// application-layer operations. done must never be nil when called.
NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
}

// ServerInterceptor is unimplementable; do not use.
Expand Down
24 changes: 17 additions & 7 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}
c := defaultCallInfo()
// Provide an opportunity for the first RPC to see the first service config
// provided by the resolver.
if err := cc.waitForResolvedAddrs(ctx); err != nil {
Expand All @@ -175,12 +174,16 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth

var mc serviceconfig.MethodConfig
var onCommit func()
var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
}

rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
if err != nil {
return nil, status.Convert(err).Err()
}
var doneFunc func()

if rpcConfig != nil {
if rpcConfig.Context != nil {
ctx = rpcConfig.Context
Expand All @@ -189,15 +192,22 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
onCommit = rpcConfig.OnCommitted
if rpcConfig.Interceptor != nil {
rpcInfo.Context = nil
newCtx, cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, iresolver.NOPClientStream{})
if err != nil {
return nil, status.Convert(err).Err()
ns := newStream
newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
if err != nil {
return nil, status.Convert(err).Err()
}
return cs, nil
}
ctx = newCtx
doneFunc = cs.Done
}
}

return newStream(ctx, func() {})
}

func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
c := defaultCallInfo()
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
}
Expand Down
17 changes: 7 additions & 10 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,16 +314,13 @@ type interceptorList struct {
interceptors []iresolver.ClientInterceptor
}

func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, cs iresolver.ClientStream) (context.Context, iresolver.ClientStream, error) {
for _, i := range il.interceptors {
var err error
newCTX, newCS, err := i.NewStream(ctx, ri, cs)
if err != nil {
cs.Done()
return nil, nil, err
func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
for i := len(il.interceptors) - 1; i >= 0; i-- {
ns := newStream
interceptor := il.interceptors[i]
newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return interceptor.NewStream(ctx, ri, done, ns)
}
cs = newCS
ctx = newCTX
}
return ctx, cs, nil
return newStream(ctx, func() {})
}
31 changes: 19 additions & 12 deletions xds/internal/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,25 +1057,28 @@ type filterInterceptor struct {
err error
}

func (fi *filterInterceptor) NewStream(ctx context.Context, i iresolver.RPCInfo, cs iresolver.ClientStream) (context.Context, iresolver.ClientStream, error) {
func (fi *filterInterceptor) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
*fi.path = append(*fi.path, "newstream:"+fi.s)
if fi.err != nil {
return nil, nil, fi.err
return nil, fi.err
}
return ctx, &clientStream{cs: cs, path: fi.path, s: fi.s}, nil
d := func() {
*fi.path = append(*fi.path, "done:"+fi.s)
done()
}
cs, err := newStream(ctx, d)
if err != nil {
return nil, err
}
return &clientStream{ClientStream: cs, path: fi.path, s: fi.s}, nil
}

type clientStream struct {
cs iresolver.ClientStream
iresolver.ClientStream
path *[]string
s string
}

func (cs *clientStream) Done() {
*cs.path = append(*cs.path, "done:"+cs.s)
cs.cs.Done()
}

type filterCfg struct {
httpfilter.FilterConfig
s string
Expand Down Expand Up @@ -1249,14 +1252,18 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
}
_, cs, err := res.Interceptor.NewStream(context.Background(), iresolver.RPCInfo{}, iresolver.NOPClientStream{})
var doneFunc func()
_, err = res.Interceptor.NewStream(context.Background(), iresolver.RPCInfo{}, func() {}, func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
doneFunc = done
return nil, nil
})
if tc.newStreamErr != "" {
if err == nil || !strings.Contains(err.Error(), tc.newStreamErr) {
t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.newStreamErr)
}
if err == nil {
res.OnCommitted()
cs.Done()
doneFunc()
}
continue
}
Expand All @@ -1265,7 +1272,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {

}
res.OnCommitted()
cs.Done()
doneFunc()

// Confirm the desired path is found in remainingWant, and remove it.
pass := false
Expand Down

0 comments on commit 61f0b5f

Please sign in to comment.