diff --git a/test/balancer_test.go b/test/balancer_test.go index c45a380115b..a94e25a8faf 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -150,7 +150,7 @@ func (s) TestCredsBundleFromBalancer(t *testing.T) { te.customServerOptions = []grpc.ServerOption{ grpc.Creds(creds), } - te.startServer(&testServer{}) + te.startServer((&testServer{}).Svc()) defer te.tearDown() cc := te.clientConn() @@ -179,7 +179,7 @@ func testPickExtraMetadata(t *testing.T, e env) { grpc.WithBalancerName(testBalancerName), grpc.WithUserAgent(testUserAgent), } - te.startServer(&testServer{security: e.security}) + te.startServer((&testServer{security: e.security}).Svc()) defer te.tearDown() // Set resolver to xds to trigger the extra metadata code path. @@ -228,7 +228,7 @@ func testDoneInfo(t *testing.T, e env) { grpc.WithBalancerName(testBalancerName), } te.userAgent = failAppUA - te.startServer(&testServer{security: e.security}) + te.startServer((&testServer{security: e.security}).Svc()) defer te.tearDown() cc := te.clientConn() @@ -498,7 +498,7 @@ func (s) TestAddressAttributesInNewSubConn(t *testing.T) { } s := grpc.NewServer() - testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(&testServer{})) + testpb.RegisterTestServiceService(s, testServer{}.Svc()) go s.Serve(lis) defer s.Stop() t.Logf("Started gRPC server at %s...", lis.Addr().String()) @@ -556,12 +556,12 @@ func (s) TestServersSwap(t *testing.T) { t.Fatalf("Error while listening. Err: %v", err) } s := grpc.NewServer() - ts := &funcServer{ - unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ts := &testpb.TestServiceService{ + UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Username: username}, nil }, } - testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(ts)) + testpb.RegisterTestServiceService(s, ts) go s.Serve(lis) return lis.Addr().String(), s.Stop } @@ -616,12 +616,12 @@ func (s) TestEmptyAddrs(t *testing.T) { s := grpc.NewServer() defer s.Stop() const one = "1" - ts := &funcServer{ - unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ts := &testpb.TestServiceService{ + UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Username: one}, nil }, } - testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(ts)) + testpb.RegisterTestServiceService(s, ts) go s.Serve(lis) // Initialize pickfirst client @@ -705,12 +705,12 @@ func (s) TestWaitForReady(t *testing.T) { s := grpc.NewServer() defer s.Stop() const one = "1" - ts := &funcServer{ - unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ts := &testpb.TestServiceService{ + UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Username: one}, nil }, } - testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(ts)) + testpb.RegisterTestServiceService(s, ts) go s.Serve(lis) // Initialize client diff --git a/test/channelz_linux_go110_test.go b/test/channelz_linux_go110_test.go index dea374bfc08..35fd2e82680 100644 --- a/test/channelz_linux_go110_test.go +++ b/test/channelz_linux_go110_test.go @@ -43,7 +43,7 @@ func testCZSocketMetricsSocketOption(t *testing.T, e env) { czCleanup := channelz.NewChannelzStorage() defer czCleanupWrapper(czCleanup, t) te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer((&testServer{security: e.security}).Svc()) defer te.tearDown() cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) diff --git a/test/channelz_test.go b/test/channelz_test.go index db510d4c6df..1f2197849e6 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -85,7 +85,7 @@ func (s) TestCZServerRegistrationAndDeletion(t *testing.T) { defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) - te.startServers(&testServer{security: e.security}, c.total) + te.startServers(testServer{security: e.security}.Svc(), c.total) ss, end := channelz.GetServers(c.start, c.max) if int64(len(ss)) != c.length || end != c.end { @@ -104,7 +104,7 @@ func (s) TestCZGetServer(t *testing.T) { defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() ss, _ := channelz.GetServers(0, 0) @@ -253,7 +253,7 @@ func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) { num := 3 // number of backends te := newTest(t, e) var svrAddrs []resolver.Address - te.startServers(&testServer{security: e.security}, num) + te.startServers(testServer{security: e.security}.Svc(), num) r := manual.NewBuilderWithScheme("whatever") for _, a := range te.srvAddrs { svrAddrs = append(svrAddrs, resolver.Address{Addr: a}) @@ -339,7 +339,7 @@ func (s) TestCZServerSocketRegistrationAndDeletion(t *testing.T) { defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) var ccs []*grpc.ClientConn for i := 0; i < c.total; i++ { cc := te.clientConn() @@ -504,7 +504,7 @@ func (s) TestCZChannelMetrics(t *testing.T) { te := newTest(t, e) te.maxClientSendMsgSize = newInt(8) var svrAddrs []resolver.Address - te.startServers(&testServer{security: e.security}, num) + te.startServers(testServer{security: e.security}.Svc(), num) r := manual.NewBuilderWithScheme("whatever") for _, a := range te.srvAddrs { svrAddrs = append(svrAddrs, resolver.Address{Addr: a}) @@ -590,7 +590,7 @@ func (s) TestCZServerMetrics(t *testing.T) { e := tcpClearRREnv te := newTest(t, e) te.maxServerReceiveMsgSize = newInt(8) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) @@ -861,7 +861,7 @@ func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) { te := newTest(t, e) te.maxServerReceiveMsgSize = newInt(20) te.maxClientReceiveMsgSize = newInt(20) - rcw := te.startServerWithConnControl(&testServer{security: e.security}) + rcw := te.startServerWithConnControl(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} @@ -963,7 +963,7 @@ func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *t // Avoid overflowing connection level flow control window, which will lead to // transport being closed. te.serverInitialConnWindowSize = 65536 * 2 - ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ts := &testpb.TestServiceService{FullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { stream.Send(&testpb.StreamingOutputCallResponse{}) <-stream.Context().Done() return status.Errorf(codes.DeadlineExceeded, "deadline exceeded or cancelled") @@ -1048,7 +1048,7 @@ func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { te.serverInitialConnWindowSize = 65536 te.clientInitialWindowSize = 65536 te.clientInitialConnWindowSize = 65536 - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) @@ -1169,7 +1169,7 @@ func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) { MinTime: 500 * time.Millisecond, PermitWithoutStream: true, })) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) te.clientConn() // Dial the server defer te.tearDown() if err := verifyResultWithDelay(func() (bool, error) { @@ -1211,7 +1211,7 @@ func (s) TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) { te := newTest(t, e) te.maxServerReceiveMsgSize = newInt(20) te.maxClientReceiveMsgSize = newInt(20) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc, _ := te.clientConnWithConnControl() tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} @@ -1282,7 +1282,7 @@ func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) { Timeout: 100 * time.Millisecond, }) te.customServerOptions = append(te.customServerOptions, kpOption) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) @@ -1342,7 +1342,7 @@ func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) { defer czCleanupWrapper(czCleanup, t) e := tcpTLSRREnv te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() te.clientConn() if err := verifyResultWithDelay(func() (bool, error) { @@ -1467,7 +1467,7 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) { defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) te.resolverScheme = r.Scheme() @@ -1560,7 +1560,7 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) { e := tcpClearRREnv e.balancer = "" te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) r := manual.NewBuilderWithScheme("whatever") addrs := []resolver.Address{{Addr: te.srvAddr}} r.InitialState(resolver.State{Addresses: addrs}) @@ -1663,7 +1663,7 @@ func (s) TestCZSubChannelPickedNewAddress(t *testing.T) { e := tcpClearRREnv e.balancer = "" te := newTest(t, e) - te.startServers(&testServer{security: e.security}, 3) + te.startServers(testServer{security: e.security}.Svc(), 3) r := manual.NewBuilderWithScheme("whatever") var svrAddrs []resolver.Address for _, a := range te.srvAddrs { @@ -1722,7 +1722,7 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) { defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) te.resolverScheme = r.Scheme() @@ -1816,7 +1816,7 @@ func (s) TestCZChannelConnectivityState(t *testing.T) { defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) te.resolverScheme = r.Scheme() @@ -1939,7 +1939,7 @@ func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) { te := newTest(t, e) channelz.SetMaxTraceEntry(1) defer channelz.ResetMaxTraceEntryToDefault() - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) te.resolverScheme = r.Scheme() @@ -1997,7 +1997,7 @@ func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) { defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) te.resolverScheme = r.Scheme() diff --git a/test/creds_test.go b/test/creds_test.go index 46bdd30dc85..a4d1e6cda0c 100644 --- a/test/creds_test.go +++ b/test/creds_test.go @@ -87,7 +87,7 @@ func (s) TestCredsBundleBoth(t *testing.T) { te.customServerOptions = []grpc.ServerOption{ grpc.Creds(creds), } - te.startServer(&testServer{}) + te.startServer(testServer{}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -109,7 +109,7 @@ func (s) TestCredsBundleTransportCredentials(t *testing.T) { te.customServerOptions = []grpc.ServerOption{ grpc.Creds(creds), } - te.startServer(&testServer{}) + te.startServer(testServer{}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -125,7 +125,7 @@ func (s) TestCredsBundlePerRPCCredentials(t *testing.T) { te.customDialOptions = []grpc.DialOption{ grpc.WithCredentialsBundle(&testCredsBundle{t: t, mode: bundlePerRPCOnly}), } - te.startServer(&testServer{}) + te.startServer(testServer{}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -159,7 +159,7 @@ func (c *clientTimeoutCreds) Clone() credentials.TransportCredentials { func (s) TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) { te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "empty"}) te.userAgent = testAppUA - te.startServer(&testServer{security: te.e.security}) + te.startServer(testServer{security: te.e.security}.Svc()) defer te.tearDown() cc := te.clientConn(grpc.WithTransportCredentials(&clientTimeoutCreds{})) @@ -183,7 +183,7 @@ func (s) TestGRPCMethodAccessibleToCredsViaContextRequestInfo(t *testing.T) { const wantMethod = "/grpc.testing.TestService/EmptyCall" te := newTest(t, env{name: "context-request-info", network: "tcp"}) te.userAgent = testAppUA - te.startServer(&testServer{security: te.e.security}) + te.startServer(testServer{security: te.e.security}.Svc()) defer te.tearDown() cc := te.clientConn(grpc.WithPerRPCCredentials(&methodTestCreds{})) @@ -218,7 +218,7 @@ func (c clientAlwaysFailCred) Clone() credentials.TransportCredentials { func (s) TestFailFastRPCErrorOnBadCertificates(t *testing.T) { te := newTest(t, env{name: "bad-cred", network: "tcp", security: "empty", balancer: "round_robin"}) - te.startServer(&testServer{security: te.e.security}) + te.startServer(testServer{security: te.e.security}.Svc()) defer te.tearDown() opts := []grpc.DialOption{grpc.WithTransportCredentials(clientAlwaysFailCred{})} @@ -246,7 +246,7 @@ func (s) TestFailFastRPCErrorOnBadCertificates(t *testing.T) { func (s) TestWaitForReadyRPCErrorOnBadCertificates(t *testing.T) { te := newTest(t, env{name: "bad-cred", network: "tcp", security: "empty", balancer: "round_robin"}) - te.startServer(&testServer{security: te.e.security}) + te.startServer(testServer{security: te.e.security}.Svc()) defer te.tearDown() opts := []grpc.DialOption{grpc.WithTransportCredentials(clientAlwaysFailCred{})} @@ -312,7 +312,7 @@ func testPerRPCCredentialsViaDialOptions(t *testing.T, e env) { te := newTest(t, e) te.tapHandle = authHandle te.perRPCCreds = testPerRPCCredentials{} - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -331,7 +331,7 @@ func (s) TestPerRPCCredentialsViaCallOptions(t *testing.T) { func testPerRPCCredentialsViaCallOptions(t *testing.T, e env) { te := newTest(t, e) te.tapHandle = authHandle - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -371,7 +371,7 @@ func testPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T, e env) { } return ctx, nil } - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() diff --git a/test/end2end_test.go b/test/end2end_test.go index f26997ea808..b0fa10b7872 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -136,6 +136,20 @@ type testServer struct { var _ testpb.UnstableTestServiceService = (*testServer)(nil) +// Svc returns a registerable TestService for this testServer instances. +// Because `s` is passed by value for convenience, any subsequent changes to +// `s` are not recognized. +func (s testServer) Svc() *testpb.TestServiceService { + return &testpb.TestServiceService{ + EmptyCall: s.EmptyCall, + UnaryCall: s.UnaryCall, + StreamingOutputCall: s.StreamingOutputCall, + StreamingInputCall: s.StreamingInputCall, + FullDuplexCall: s.FullDuplexCall, + HalfDuplexCall: s.HalfDuplexCall, + } +} + func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { if md, ok := metadata.FromIncomingContext(ctx); ok { // For testing purpose, returns an error if user-agent is failAppUA. @@ -566,7 +580,7 @@ func newTest(t *testing.T, e env) *test { return te } -func (te *test) listenAndServe(ts interface{}, listen func(network, address string) (net.Listener, error)) net.Listener { +func (te *test) listenAndServe(ts *testpb.TestServiceService, listen func(network, address string) (net.Listener, error)) net.Listener { te.t.Helper() te.t.Logf("Running test in %s environment...", te.e.name) sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)} @@ -626,7 +640,7 @@ func (te *test) listenAndServe(ts interface{}, listen func(network, address stri sopts = append(sopts, te.customServerOptions...) s := grpc.NewServer(sopts...) if ts != nil { - testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(ts)) + testpb.RegisterTestServiceService(s, ts) } // Create a new default health server if enableHealthServer is set, or use @@ -691,20 +705,20 @@ func (w wrapHS) Stop() { w.s.Close() } -func (te *test) startServerWithConnControl(ts interface{}) *listenerWrapper { +func (te *test) startServerWithConnControl(ts *testpb.TestServiceService) *listenerWrapper { l := te.listenAndServe(ts, listenWithConnControl) return l.(*listenerWrapper) } // startServer starts a gRPC server exposing the provided TestService // implementation. Callers should defer a call to te.tearDown to clean up -func (te *test) startServer(ts interface{}) { +func (te *test) startServer(ts *testpb.TestServiceService) { te.t.Helper() te.listenAndServe(ts, net.Listen) } // startServers starts 'num' gRPC servers exposing the provided TestService. -func (te *test) startServers(ts interface{}, num int) { +func (te *test) startServers(ts *testpb.TestServiceService, num int) { for i := 0; i < num; i++ { te.startServer(ts) te.srvs = append(te.srvs, te.srv.(*grpc.Server)) @@ -912,7 +926,7 @@ func (s) TestContextDeadlineNotIgnored(t *testing.T) { } te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -946,7 +960,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", "grpc: addrConn.resetTransport failed to create client transport: connection error", ) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -988,7 +1002,7 @@ func (s) TestServerGracefulStopIdempotent(t *testing.T) { func testServerGracefulStopIdempotent(t *testing.T, e env) { te := newTest(t, e) te.userAgent = testAppUA - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() for i := 0; i < 3; i++ { @@ -1008,7 +1022,7 @@ func (s) TestServerGoAway(t *testing.T) { func testServerGoAway(t *testing.T, e env) { te := newTest(t, e) te.userAgent = testAppUA - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -1060,7 +1074,7 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) { "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", "grpc: addrConn.resetTransport failed to create client transport: connection error", ) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -1134,7 +1148,7 @@ func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) { "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", "grpc: addrConn.resetTransport failed to create client transport: connection error", ) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -1221,7 +1235,7 @@ func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) { "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", "grpc: addrConn.resetTransport failed to create client transport: connection error", ) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -1258,7 +1272,7 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) { "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", "grpc: addrConn.resetTransport failed to create client transport: connection error", ) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -1333,7 +1347,7 @@ func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) { func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) @@ -1372,7 +1386,7 @@ func testFailFast(t *testing.T, e env) { "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", "grpc: addrConn.resetTransport failed to create client transport: connection error", ) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -1731,7 +1745,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) { te1.resolverScheme = r.Scheme() te1.nonBlockingDial = true - te1.startServer(&testServer{security: e.security}) + te1.startServer(testServer{security: e.security}.Svc()) cc1 := te1.clientConn(grpc.WithResolvers(r)) addrs := []resolver.Address{{Addr: te1.srvAddr}} @@ -1821,7 +1835,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) { te2.maxClientReceiveMsgSize = newInt(1024) te2.maxClientSendMsgSize = newInt(1024) - te2.startServer(&testServer{security: e.security}) + te2.startServer(testServer{security: e.security}.Svc()) defer te2.tearDown() cc2 := te2.clientConn(grpc.WithResolvers(r)) r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: sc}) @@ -1881,7 +1895,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) { te3.maxClientReceiveMsgSize = newInt(4096) te3.maxClientSendMsgSize = newInt(4096) - te3.startServer(&testServer{security: e.security}) + te3.startServer(testServer{security: e.security}.Svc()) defer te3.tearDown() cc3 := te3.clientConn(grpc.WithResolvers(r)) @@ -1965,7 +1979,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) { // test makes sure read from streaming RPC doesn't fail in this case. func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) { te := testServiceConfigSetup(t, tcpClearRREnv) - te.startServer(&testServer{security: tcpClearRREnv.security}) + te.startServer(testServer{security: tcpClearRREnv.security}.Svc()) defer te.tearDown() r := manual.NewBuilderWithScheme("whatever") @@ -2047,7 +2061,7 @@ func testPreloaderClientSend(t *testing.T, e env) { "grpc: addrConn.resetTransport failed to create client transport: connection error", "Failed to dial : context canceled; please retry.", ) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -2121,7 +2135,7 @@ func testMaxMsgSizeClientDefault(t *testing.T, e env) { "grpc: addrConn.resetTransport failed to create client transport: connection error", "Failed to dial : context canceled; please retry.", ) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -2185,7 +2199,7 @@ func testMaxMsgSizeClientAPI(t *testing.T, e env) { "grpc: addrConn.resetTransport failed to create client transport: connection error", "Failed to dial : context canceled; please retry.", ) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -2270,7 +2284,7 @@ func testMaxMsgSizeServerAPI(t *testing.T, e env) { "grpc: addrConn.resetTransport failed to create client transport: connection error", "Failed to dial : context canceled; please retry.", ) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -2375,7 +2389,7 @@ func testTap(t *testing.T, e env) { "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", "grpc: addrConn.resetTransport failed to create client transport: connection error", ) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -2470,7 +2484,7 @@ func (s) TestHealthCheckSuccess(t *testing.T) { func testHealthCheckSuccess(t *testing.T, e env) { te := newTest(t, e) te.enableHealthServer = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) defer te.tearDown() @@ -2492,7 +2506,7 @@ func testHealthCheckFailure(t *testing.T, e env) { "grpc: the client connection is closing; please retry", ) te.enableHealthServer = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) defer te.tearDown() @@ -2516,7 +2530,7 @@ func (s) TestHealthCheckOff(t *testing.T) { func testHealthCheckOff(t *testing.T, e env) { te := newTest(t, e) te.enableHealthServer = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.NotFound) @@ -2533,7 +2547,7 @@ func (s) TestHealthWatchMultipleClients(t *testing.T) { func testHealthWatchMultipleClients(t *testing.T, e env) { te := newTest(t, e) te.enableHealthServer = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -2562,7 +2576,7 @@ func (s) TestHealthWatchSameStatus(t *testing.T) { func testHealthWatchSameStatus(t *testing.T, e env) { te := newTest(t, e) te.enableHealthServer = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService) @@ -2590,7 +2604,7 @@ func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) { te := newTest(t, e) te.healthServer = hs hs.SetServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService) @@ -2611,7 +2625,7 @@ func (s) TestHealthWatchDefaultStatusChange(t *testing.T) { func testHealthWatchDefaultStatusChange(t *testing.T, e env) { te := newTest(t, e) te.enableHealthServer = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService) @@ -2632,7 +2646,7 @@ func (s) TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) { func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) { te := newTest(t, e) te.enableHealthServer = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) defer te.tearDown() @@ -2652,7 +2666,7 @@ func (s) TestHealthWatchOverallServerHealthChange(t *testing.T) { func testHealthWatchOverallServerHealthChange(t *testing.T, e env) { te := newTest(t, e) te.enableHealthServer = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() stream, cf := newHealthCheckStream(t, te.clientConn(), "") @@ -2684,7 +2698,7 @@ func (s) TestUnknownHandler(t *testing.T) { func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) { te := newTest(t, e) te.unknownHandler = unknownHandler - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), "", codes.Unauthenticated) } @@ -2700,7 +2714,7 @@ func (s) TestHealthCheckServingStatus(t *testing.T) { func testHealthCheckServingStatus(t *testing.T, e env) { te := newTest(t, e) te.enableHealthServer = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -2721,7 +2735,7 @@ func (s) TestEmptyUnaryWithUserAgent(t *testing.T) { func testEmptyUnaryWithUserAgent(t *testing.T, e env) { te := newTest(t, e) te.userAgent = testAppUA - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -2752,7 +2766,7 @@ func (s) TestFailedEmptyUnary(t *testing.T) { func testFailedEmptyUnary(t *testing.T, e env) { te := newTest(t, e) te.userAgent = failAppUA - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -2771,7 +2785,7 @@ func (s) TestLargeUnary(t *testing.T) { func testLargeUnary(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -2810,7 +2824,7 @@ func testExceedMsgLimit(t *testing.T, e env) { te := newTest(t, e) maxMsgSize := 1024 te.maxServerMsgSize, te.maxClientMsgSize = newInt(maxMsgSize), newInt(maxMsgSize) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -2889,7 +2903,7 @@ func (s) TestPeerClientSide(t *testing.T) { func testPeerClientSide(t *testing.T, e env) { te := newTest(t, e) te.userAgent = testAppUA - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) peer := new(peer.Peer) @@ -2927,7 +2941,7 @@ func (s) TestPeerNegative(t *testing.T) { func testPeerNegative(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -2947,7 +2961,7 @@ func (s) TestPeerFailedRPC(t *testing.T) { func testPeerFailedRPC(t *testing.T, e env) { te := newTest(t, e) te.maxServerReceiveMsgSize = newInt(1 * 1024) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3001,7 +3015,7 @@ func (s) TestMetadataUnaryRPC(t *testing.T) { func testMetadataUnaryRPC(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3046,7 +3060,7 @@ func (s) TestMetadataOrderUnaryRPC(t *testing.T) { func testMetadataOrderUnaryRPC(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3083,7 +3097,7 @@ func (s) TestMultipleSetTrailerUnaryRPC(t *testing.T) { func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security, multipleSetTrailer: true}) + te.startServer(testServer{security: e.security, multipleSetTrailer: true}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3120,7 +3134,7 @@ func (s) TestMultipleSetTrailerStreamingRPC(t *testing.T) { func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security, multipleSetTrailer: true}) + te.startServer(testServer{security: e.security, multipleSetTrailer: true}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3155,7 +3169,7 @@ func (s) TestSetAndSendHeaderUnaryRPC(t *testing.T) { // To test header metadata is sent on SendHeader(). func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security, setAndSendHeader: true}) + te.startServer(testServer{security: e.security, setAndSendHeader: true}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3198,7 +3212,7 @@ func (s) TestMultipleSetHeaderUnaryRPC(t *testing.T) { // To test header metadata is sent when sending response. func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security, setHeaderOnly: true}) + te.startServer(testServer{security: e.security, setHeaderOnly: true}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3242,7 +3256,7 @@ func (s) TestMultipleSetHeaderUnaryRPCError(t *testing.T) { // To test header metadata is sent when sending status. func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security, setHeaderOnly: true}) + te.startServer(testServer{security: e.security, setHeaderOnly: true}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3285,7 +3299,7 @@ func (s) TestSetAndSendHeaderStreamingRPC(t *testing.T) { // To test header metadata is sent on SendHeader(). func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security, setAndSendHeader: true}) + te.startServer(testServer{security: e.security, setAndSendHeader: true}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3325,7 +3339,7 @@ func (s) TestMultipleSetHeaderStreamingRPC(t *testing.T) { // To test header metadata is sent when sending response. func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security, setHeaderOnly: true}) + te.startServer(testServer{security: e.security, setHeaderOnly: true}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3389,7 +3403,7 @@ func (s) TestMultipleSetHeaderStreamingRPCError(t *testing.T) { // To test header metadata is sent when sending status. func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security, setHeaderOnly: true}) + te.startServer(testServer{security: e.security, setHeaderOnly: true}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3454,7 +3468,7 @@ func (s) TestMalformedHTTP2Metadata(t *testing.T) { func testMalformedHTTP2Metadata(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3497,7 +3511,7 @@ func testTransparentRetry(t *testing.T, e env) { } return ctx, nil } - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -3546,7 +3560,7 @@ func (s) TestCancel(t *testing.T) { func testCancel(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise("grpc: the client connection is closing; please retry") - te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second}) + te.startServer(testServer{security: e.security, unaryCallSleepTime: time.Second}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -3583,7 +3597,7 @@ func testCancelNoIO(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken") te.maxStream = 1 // Only allows 1 live stream per server transport. - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -3592,7 +3606,7 @@ func testCancelNoIO(t *testing.T, e env) { // Start one blocked RPC for which we'll never send streaming // input. This will consume the 1 maximum concurrent streams, // causing future RPCs to hang. - ctx, cancelFirst := context.WithCancel(context.Background()) + ctx, cancelFirst := context.WithTimeout(context.Background(), 5*time.Second) _, err := tc.StreamingInputCall(ctx) if err != nil { t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, ", tc, err) @@ -3605,6 +3619,9 @@ func testCancelNoIO(t *testing.T, e env) { // succeeding. // TODO(bradfitz): add internal test hook for this (Issue 534) for { + if ctx.Err() != nil { + t.Fatal("timed out waiting to get deadline exceeded error") + } ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond) _, err := tc.StreamingInputCall(ctx) cancelSecond() @@ -3672,7 +3689,7 @@ func (s) TestPingPong(t *testing.T) { func testPingPong(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3731,7 +3748,7 @@ func (s) TestMetadataStreamingRPC(t *testing.T) { func testMetadataStreamingRPC(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3808,7 +3825,7 @@ func (s) TestServerStreaming(t *testing.T) { func testServerStreaming(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3863,7 +3880,7 @@ func (s) TestFailedServerStreaming(t *testing.T) { func testFailedServerStreaming(t *testing.T, e env) { te := newTest(t, e) te.userAgent = failAppUA - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -3892,15 +3909,10 @@ func equalError(x, y error) bool { return x == y || (x != nil && y != nil && x.Error() == y.Error()) } -// concurrentSendServer is a TestServiceService whose -// StreamingOutputCall makes ten serial Send calls, sending payloads +// concurrentSendStreamingOutputCall makes ten serial Send calls, sending payloads // "0".."9", inclusive. TestServerStreamingConcurrent verifies they // were received in the correct order, and that there were no races. -// -// All other TestServiceService methods return unimplemented if called. -type concurrentSendServer struct{} - -func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { +func concurrentSendStreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { for i := 0; i < 10; i++ { stream.Send(&testpb.StreamingOutputCallResponse{ Payload: &testpb.Payload{ @@ -3920,7 +3932,7 @@ func (s) TestServerStreamingConcurrent(t *testing.T) { func testServerStreamingConcurrent(t *testing.T, e env) { te := newTest(t, e) - te.startServer(concurrentSendServer{}) + te.startServer(&testpb.TestServiceService{StreamingOutputCall: concurrentSendStreamingOutputCall}) defer te.tearDown() cc := te.clientConn() @@ -4001,7 +4013,7 @@ func (s) TestClientStreaming(t *testing.T) { func testClientStreaming(t *testing.T, e env, sizes []int) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -4047,7 +4059,7 @@ func (s) TestClientStreamingError(t *testing.T) { func testClientStreamingError(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security, earlyFail: true}) + te.startServer(testServer{security: e.security, earlyFail: true}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -4092,7 +4104,7 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { "grpc: the connection is closing", ) te.maxStream = 1 // Only allows 1 live stream per server transport. - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -4132,7 +4144,7 @@ func testStreamsQuotaRecovery(t *testing.T, e env) { "grpc: the connection is closing", ) te.maxStream = 1 // Allows 1 live stream. - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -4202,7 +4214,7 @@ func testCompressServerHasNoSupport(t *testing.T, e env) { te.serverCompression = false te.clientCompression = false te.clientNopCompression = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -4240,7 +4252,7 @@ func testCompressOK(t *testing.T, e env) { te := newTest(t, e) te.serverCompression = true te.clientCompression = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -4301,7 +4313,7 @@ func (s) TestIdentityEncoding(t *testing.T) { func testIdentityEncoding(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -4365,7 +4377,7 @@ func testUnaryClientInterceptor(t *testing.T, e env) { te := newTest(t, e) te.userAgent = testAppUA te.unaryClientInt = failOkayRPC - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -4391,7 +4403,7 @@ func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientC func testStreamClientInterceptor(t *testing.T, e env) { te := newTest(t, e) te.streamClientInt = failOkayStream - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -4427,7 +4439,7 @@ func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInf func testUnaryServerInterceptor(t *testing.T, e env) { te := newTest(t, e) te.unaryServerInt = errInjector - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -4457,7 +4469,7 @@ func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServ func testStreamServerInterceptor(t *testing.T, e env) { te := newTest(t, e) te.streamServerInt = fullDuplexOnly - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -4494,28 +4506,6 @@ func testStreamServerInterceptor(t *testing.T, e env) { } } -// funcServer implements methods of TestServiceService using funcs, -// similar to an http.HandlerFunc. -// Any unimplemented method will return unimplemented. Tests implement the method(s) -// they need. -type funcServer struct { - unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) - streamingInputCall func(stream testpb.TestService_StreamingInputCallServer) error - fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error -} - -func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return s.unaryCall(ctx, in) -} - -func (s *funcServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error { - return s.streamingInputCall(stream) -} - -func (s *funcServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { - return s.fullDuplexCall(stream) -} - func (s) TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) { for _, e := range listTestEnv() { testClientRequestBodyErrorUnexpectedEOF(t, e) @@ -4524,7 +4514,7 @@ func (s) TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) { func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) { te := newTest(t, e) - ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ts := &testpb.TestServiceService{UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { errUnexpectedCall := errors.New("unexpected call func server method") t.Error(errUnexpectedCall) return nil, errUnexpectedCall @@ -4548,7 +4538,7 @@ func (s) TestClientRequestBodyErrorCloseAfterLength(t *testing.T) { func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise("Server.processUnaryRPC failed to write status") - ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ts := &testpb.TestServiceService{UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { errUnexpectedCall := errors.New("unexpected call func server method") t.Error(errUnexpectedCall) return nil, errUnexpectedCall @@ -4572,7 +4562,7 @@ func (s) TestClientRequestBodyErrorCancel(t *testing.T) { func testClientRequestBodyErrorCancel(t *testing.T, e env) { te := newTest(t, e) gotCall := make(chan bool, 1) - ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ts := &testpb.TestServiceService{UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { gotCall <- true return new(testpb.SimpleResponse), nil }} @@ -4608,7 +4598,7 @@ func (s) TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) { func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) { te := newTest(t, e) recvErr := make(chan error, 1) - ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error { + ts := &testpb.TestServiceService{StreamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error { _, err := stream.Recv() recvErr <- err return nil @@ -4650,7 +4640,7 @@ func testClientInitialHeaderEndStream(t *testing.T, e env) { // checking. handlerDone := make(chan struct{}) te := newTest(t, e) - ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error { + ts := &testpb.TestServiceService{StreamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error { defer close(handlerDone) // Block on serverTester receiving RST_STREAM. This ensures server has closed // stream before stream.Recv(). @@ -4694,7 +4684,7 @@ func testClientSendDataAfterCloseSend(t *testing.T, e env) { // checking. handlerDone := make(chan struct{}) te := newTest(t, e) - ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error { + ts := &testpb.TestServiceService{StreamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error { defer close(handlerDone) // Block on serverTester receiving RST_STREAM. This ensures server has closed // stream before stream.Recv(). @@ -4746,7 +4736,7 @@ func (s) TestClientResourceExhaustedCancelFullDuplex(t *testing.T) { func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) { te := newTest(t, e) recvErr := make(chan error, 1) - ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + ts := &testpb.TestServiceService{FullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { defer close(recvErr) _, err := stream.Recv() if err != nil { @@ -5484,7 +5474,7 @@ func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) { te.clientInitialWindowSize = wc.clientStream te.clientInitialConnWindowSize = wc.clientConn - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -5534,7 +5524,7 @@ func (s) TestWaitForReadyConnection(t *testing.T) { func testWaitForReadyConnection(t *testing.T, e env) { te := newTest(t, e) te.userAgent = testAppUA - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() // Non-blocking dial. @@ -5585,7 +5575,7 @@ func testEncodeDoesntPanic(t *testing.T, e env) { te := newTest(t, e) erc := &errCodec{} te.customCodec = erc - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() te.customCodec = nil tc := testpb.NewTestServiceClient(te.clientConn()) @@ -5619,7 +5609,7 @@ func testSvrWriteStatusEarlyWrite(t *testing.T, e env) { if err != nil { t.Fatal(err) } - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) respParam := []*testpb.ResponseParameters{ @@ -5898,7 +5888,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { } // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). te1, ch1 := testServiceConfigSetupTD(t, e) - te1.startServer(&testServer{security: e.security}) + te1.startServer(testServer{security: e.security}.Svc()) defer te1.tearDown() ch1 <- sc @@ -5958,7 +5948,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { te2, ch2 := testServiceConfigSetupTD(t, e) te2.maxClientReceiveMsgSize = newInt(1024) te2.maxClientSendMsgSize = newInt(1024) - te2.startServer(&testServer{security: e.security}) + te2.startServer(testServer{security: e.security}.Svc()) defer te2.tearDown() ch2 <- sc tc = testpb.NewTestServiceClient(te2.clientConn()) @@ -6007,7 +5997,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { te3, ch3 := testServiceConfigSetupTD(t, e) te3.maxClientReceiveMsgSize = newInt(4096) te3.maxClientSendMsgSize = newInt(4096) - te3.startServer(&testServer{security: e.security}) + te3.startServer(testServer{security: e.security}.Svc()) defer te3.tearDown() ch3 <- sc tc = testpb.NewTestServiceClient(te3.clientConn()) @@ -6099,7 +6089,7 @@ func (s) TestMethodFromServerStream(t *testing.T) { func (s) TestInterceptorCanAccessCallOptions(t *testing.T) { e := tcpClearRREnv te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() type observedOptions struct { @@ -6210,7 +6200,7 @@ func testCompressorRegister(t *testing.T, e env) { te.serverCompression = false te.clientUseCompression = true - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -6385,7 +6375,7 @@ func testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T, e te.userAgent = testAppUA smallSize := 1024 te.maxServerReceiveMsgSize = &smallSize - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1048576) @@ -6422,7 +6412,7 @@ func (s) TestRPCTimeout(t *testing.T) { func testRPCTimeout(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond}) + te.startServer(testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -6545,7 +6535,7 @@ func testServerMaxHeaderListSizeClientUserViolation(t *testing.T, e env) { te := newTest(t, e) te.maxServerHeaderListSize = new(uint32) *te.maxServerHeaderListSize = 216 - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -6577,7 +6567,7 @@ func testClientMaxHeaderListSizeServerUserViolation(t *testing.T, e env) { te := newTest(t, e) te.maxClientHeaderListSize = new(uint32) *te.maxClientHeaderListSize = 1 // any header server sends will violate - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc := te.clientConn() @@ -6608,7 +6598,7 @@ func testServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T, e env) te := newTest(t, e) te.maxServerHeaderListSize = new(uint32) *te.maxServerHeaderListSize = 512 - te.startServer(&testServer{security: e.security}) + te.startServer(testServer{security: e.security}.Svc()) defer te.tearDown() cc, dw := te.clientConnWithConnControl() @@ -6650,7 +6640,7 @@ func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env) te := newTest(t, e) te.maxClientHeaderListSize = new(uint32) *te.maxClientHeaderListSize = 200 - lw := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true}) + lw := te.startServerWithConnControl(testServer{security: e.security, setHeaderOnly: true}.Svc()) defer te.tearDown() cc, _ := te.clientConnWithConnControl() tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} @@ -6697,10 +6687,10 @@ func (s) TestNetPipeConn(t *testing.T) { pl := testutils.NewPipeListener() s := grpc.NewServer() defer s.Stop() - ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ts := &testpb.TestServiceService{UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }} - testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(ts)) + testpb.RegisterTestServiceService(s, ts) go s.Serve(pl) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -6725,7 +6715,18 @@ func testLargeTimeout(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise("Server.processUnaryRPC failed to write status") - ts := &funcServer{} + maxTimeoutChan := make(chan time.Duration, 1) + ts := &testpb.TestServiceService{UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + maxTimeout := <-maxTimeoutChan + deadline, ok := ctx.Deadline() + timeout := time.Until(deadline) + minTimeout := maxTimeout - 5*time.Second + if !ok || timeout < minTimeout || timeout > maxTimeout { + t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout) + return nil, status.Error(codes.OutOfRange, "deadline error") + } + return &testpb.SimpleResponse{}, nil + }} te.startServer(ts) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -6737,17 +6738,7 @@ func testLargeTimeout(t *testing.T, e env) { } for i, maxTimeout := range timeouts { - ts.unaryCall = func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - deadline, ok := ctx.Deadline() - timeout := time.Until(deadline) - minTimeout := maxTimeout - 5*time.Second - if !ok || timeout < minTimeout || timeout > maxTimeout { - t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout) - return nil, status.Error(codes.OutOfRange, "deadline error") - } - return &testpb.SimpleResponse{}, nil - } - + maxTimeoutChan <- maxTimeout ctx, cancel := context.WithTimeout(context.Background(), maxTimeout) defer cancel() @@ -6770,11 +6761,11 @@ func (s) TestGoAwayThenClose(t *testing.T) { } s1 := grpc.NewServer() defer s1.Stop() - ts := &funcServer{ - unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ts := &testpb.TestServiceService{ + UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + FullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { // Wait forever. _, err := stream.Recv() if err == nil { @@ -6783,7 +6774,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { return err }, } - testpb.RegisterTestServiceService(s1, testpb.NewTestServiceService(ts)) + testpb.RegisterTestServiceService(s1, ts) go s1.Serve(lis1) conn2Established := grpcsync.NewEvent() @@ -6793,7 +6784,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { } s2 := grpc.NewServer() defer s2.Stop() - testpb.RegisterTestServiceService(s2, testpb.NewTestServiceService(ts)) + testpb.RegisterTestServiceService(s2, ts) go s2.Serve(lis2) r := manual.NewBuilderWithScheme("whatever") @@ -6863,7 +6854,7 @@ func (lis notifyingListener) Accept() (net.Conn, error) { func (s) TestRPCWaitsForResolver(t *testing.T) { te := testServiceConfigSetup(t, tcpClearRREnv) - te.startServer(&testServer{security: tcpClearRREnv.security}) + te.startServer(testServer{security: tcpClearRREnv.security}.Svc()) defer te.tearDown() r := manual.NewBuilderWithScheme("whatever") diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index 0137494efb8..c8a3796048b 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -140,7 +140,7 @@ func setupServer(sc *svrConfig) (s *grpc.Server, lis net.Listener, ts *testHealt ts = newTestHealthServer() } healthgrpc.RegisterHealthServer(s, ts) - testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(&testServer{})) + testpb.RegisterTestServiceService(s, testServer{}.Svc()) go s.Serve(lis) return s, lis, ts, s.Stop, nil }