diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index fe23b99138e..ebc8c774be9 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -151,16 +151,16 @@ public boolean shouldAccept(Runnable command) { private SubchannelPool subchannelPool; private final ArrayList logs = new ArrayList<>(); private final ChannelLogger channelLogger = new ChannelLogger() { - @Override - public void log(ChannelLogLevel level, String msg) { - logs.add(level + ": " + msg); - } + @Override + public void log(ChannelLogLevel level, String msg) { + logs.add(level + ": " + msg); + } - @Override - public void log(ChannelLogLevel level, String template, Object... args) { - log(level, MessageFormat.format(template, args)); - } - }; + @Override + public void log(ChannelLogLevel level, String template, Object... args) { + log(level, MessageFormat.format(template, args)); + } + }; private SubchannelPicker currentPicker; private LoadBalancerGrpc.LoadBalancerImplBase mockLbService; @Captor @@ -207,12 +207,12 @@ public StreamObserver balanceLoad( StreamObserver requestObserver = mock(StreamObserver.class); Answer closeRpc = new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - responseObserver.onCompleted(); - return null; - } - }; + @Override + public Void answer(InvocationOnMock invocation) { + responseObserver.onCompleted(); + return null; + } + }; doAnswer(closeRpc).when(requestObserver).onCompleted(); lbRequestObservers.add(requestObserver); return requestObserver; @@ -221,63 +221,63 @@ public Void answer(InvocationOnMock invocation) { fakeLbServer = InProcessServerBuilder.forName("fakeLb") .directExecutor().addService(mockLbService).build().start(); doAnswer(new Answer() { - @Override - public ManagedChannel answer(InvocationOnMock invocation) throws Throwable { - String authority = (String) invocation.getArguments()[1]; - ManagedChannel channel; - if (failingLbAuthorities.contains(authority)) { - channel = InProcessChannelBuilder.forName("nonExistFakeLb").directExecutor() - .overrideAuthority(authority).build(); - } else { - channel = InProcessChannelBuilder.forName("fakeLb").directExecutor() - .overrideAuthority(authority).build(); + @Override + public ManagedChannel answer(InvocationOnMock invocation) throws Throwable { + String authority = (String) invocation.getArguments()[1]; + ManagedChannel channel; + if (failingLbAuthorities.contains(authority)) { + channel = InProcessChannelBuilder.forName("nonExistFakeLb").directExecutor() + .overrideAuthority(authority).build(); + } else { + channel = InProcessChannelBuilder.forName("fakeLb").directExecutor() + .overrideAuthority(authority).build(); + } + fakeOobChannels.add(channel); + oobChannelTracker.add(channel); + return channel; } - fakeOobChannels.add(channel); - oobChannelTracker.add(channel); - return channel; - } - }).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class)); + }).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class)); doAnswer(new Answer() { - @Override - public Subchannel answer(InvocationOnMock invocation) throws Throwable { - Subchannel subchannel = mock(Subchannel.class); - EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0]; - Attributes attrs = (Attributes) invocation.getArguments()[1]; - when(subchannel.getAllAddresses()).thenReturn(Arrays.asList(eag)); - when(subchannel.getAttributes()).thenReturn(attrs); - mockSubchannels.add(subchannel); - pooledSubchannelTracker.add(subchannel); - return subchannel; - } - }).when(subchannelPool).takeOrCreateSubchannel( - any(EquivalentAddressGroup.class), any(Attributes.class)); + @Override + public Subchannel answer(InvocationOnMock invocation) throws Throwable { + Subchannel subchannel = mock(Subchannel.class); + EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0]; + Attributes attrs = (Attributes) invocation.getArguments()[1]; + when(subchannel.getAllAddresses()).thenReturn(Arrays.asList(eag)); + when(subchannel.getAttributes()).thenReturn(attrs); + mockSubchannels.add(subchannel); + pooledSubchannelTracker.add(subchannel); + return subchannel; + } + }).when(subchannelPool).takeOrCreateSubchannel( + any(EquivalentAddressGroup.class), any(Attributes.class)); doAnswer(new Answer() { - @Override - public Subchannel answer(InvocationOnMock invocation) throws Throwable { - Subchannel subchannel = mock(Subchannel.class); - List eagList = - (List) invocation.getArguments()[0]; - Attributes attrs = (Attributes) invocation.getArguments()[1]; - when(subchannel.getAllAddresses()).thenReturn(eagList); - when(subchannel.getAttributes()).thenReturn(attrs); - mockSubchannels.add(subchannel); - unpooledSubchannelTracker.add(subchannel); - return subchannel; - } - // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to - // the new createSubchannel(). - }).when(helper).createSubchannel(any(List.class), any(Attributes.class)); + @Override + public Subchannel answer(InvocationOnMock invocation) throws Throwable { + Subchannel subchannel = mock(Subchannel.class); + List eagList = + (List) invocation.getArguments()[0]; + Attributes attrs = (Attributes) invocation.getArguments()[1]; + when(subchannel.getAllAddresses()).thenReturn(eagList); + when(subchannel.getAttributes()).thenReturn(attrs); + mockSubchannels.add(subchannel); + unpooledSubchannelTracker.add(subchannel); + return subchannel; + } + // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to + // the new createSubchannel(). + }).when(helper).createSubchannel(any(List.class), any(Attributes.class)); when(helper.getSynchronizationContext()).thenReturn(syncContext); when(helper.getScheduledExecutorService()).thenReturn(fakeClock.getScheduledExecutorService()); when(helper.getChannelLogger()).thenReturn(channelLogger); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - currentPicker = (SubchannelPicker) invocation.getArguments()[1]; - return null; - } - }).when(helper).updateBalancingState( - any(ConnectivityState.class), any(SubchannelPicker.class)); + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + currentPicker = (SubchannelPicker) invocation.getArguments()[1]; + return null; + } + }).when(helper).updateBalancingState( + any(ConnectivityState.class), any(SubchannelPicker.class)); when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY); when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L); @@ -293,11 +293,11 @@ public void tearDown() { try { if (balancer != null) { syncContext.execute(new Runnable() { - @Override - public void run() { - balancer.shutdown(); - } - }); + @Override + public void run() { + balancer.shutdown(); + } + }); } for (ManagedChannel channel : oobChannelTracker) { assertTrue(channel + " is shutdown", channel.isShutdown()); diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java index 67ad40674eb..66d061b766d 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java @@ -208,17 +208,17 @@ public void run() { CallOptions.DEFAULT.withDeadlineAfter(rpcTimeoutSec, TimeUnit.SECONDS)); call.start( new ClientCall.Listener() { - private String serverId; + private String hostname; @Override public void onMessage(SimpleResponse response) { - serverId = response.getServerId(); + hostname = response.getHostname(); // TODO(ericgribkoff) Currently some test environments cannot access the stats RPC // service and rely on parsing stdout. if (printResponse) { System.out.println( "Greeting: Hello world, this is " - + response.getHostname() + + hostname + ", from " + call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); } @@ -227,7 +227,7 @@ public void onMessage(SimpleResponse response) { @Override public void onClose(Status status, Metadata trailers) { for (XdsStatsWatcher watcher : savedWatchers) { - watcher.rpcCompleted(requestId, serverId); + watcher.rpcCompleted(requestId, hostname); } } }, @@ -295,14 +295,14 @@ private XdsStatsWatcher(long startId, long endId) { this.endId = endId; } - void rpcCompleted(long requestId, @Nullable String serverId) { + void rpcCompleted(long requestId, @Nullable String hostname) { synchronized (lock) { if (startId <= requestId && requestId < endId) { - if (serverId != null) { - if (rpcsByPeer.containsKey(serverId)) { - rpcsByPeer.put(serverId, rpcsByPeer.get(serverId) + 1); + if (hostname != null) { + if (rpcsByPeer.containsKey(hostname)) { + rpcsByPeer.put(hostname, rpcsByPeer.get(hostname) + 1); } else { - rpcsByPeer.put(serverId, 1); + rpcsByPeer.put(hostname, 1); } } else { noRemotePeer += 1; diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java index e3ae481142f..915be98a409 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java @@ -124,13 +124,14 @@ private void blockUntilShutdown() throws InterruptedException { } private class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { - private String host = ""; + private final String host; private TestServiceImpl() { try { host = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { - logger.log(Level.WARNING, "Failed to get host", e); + logger.log(Level.SEVERE, "Failed to get host", e); + throw new RuntimeException(e); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 207db3473ce..6451df79588 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -25,7 +25,6 @@ import io.envoyproxy.envoy.api.v2.core.Node; import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; -import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.Status.Code; @@ -64,6 +63,7 @@ final class XdsNameResolver extends NameResolver { private final XdsChannelFactory channelFactory; private final SynchronizationContext syncContext; private final ScheduledExecutorService timeService; + private final ServiceConfigParser serviceConfigParser; private final BackoffPolicy.Provider backoffPolicyProvider; private final Supplier stopwatchSupplier; private final Bootstrapper bootstrapper; @@ -89,6 +89,7 @@ final class XdsNameResolver extends NameResolver { this.channelFactory = checkNotNull(channelFactory, "channelFactory"); this.syncContext = checkNotNull(args.getSynchronizationContext(), "syncContext"); this.timeService = checkNotNull(args.getScheduledExecutorService(), "timeService"); + this.serviceConfigParser = checkNotNull(args.getServiceConfigParser(), "serviceConfigParser"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper"); @@ -158,14 +159,12 @@ public void onConfigChanged(ConfigUpdate update) { .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, config) .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool) .build(); - ConfigOrError xdsServiceConfig = - XdsLoadBalancerProvider - .parseLoadBalancingConfigPolicy(config, LoadBalancerRegistry.getDefaultRegistry()); + ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(config); ResolutionResult result = ResolutionResult.newBuilder() .setAddresses(ImmutableList.of()) .setAttributes(attrs) - .setServiceConfig(xdsServiceConfig) + .setServiceConfig(parsedServiceConfig) .build(); listener.onResult(result); } diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index ad80f79bc6d..cb82406dae5 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -39,6 +39,7 @@ import io.grpc.ChannelLogger; import io.grpc.ManagedChannel; import io.grpc.NameResolver; +import io.grpc.NameResolver.ConfigOrError; import io.grpc.NameResolver.ResolutionResult; import io.grpc.NameResolver.ServiceConfigParser; import io.grpc.Status; @@ -86,7 +87,7 @@ public class XdsNameResolverTest { public final MockitoRule mocks = MockitoJUnit.rule(); @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); - + private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @Override @@ -96,19 +97,24 @@ public void uncaughtException(Thread t, Throwable e) { }); private final FakeClock fakeClock = new FakeClock(); + private final Queue> responseObservers = new ArrayDeque<>(); + private final ServiceConfigParser serviceConfigParser = new ServiceConfigParser() { + @Override + public ConfigOrError parseServiceConfig(Map rawServiceConfig) { + return ConfigOrError.fromConfig(rawServiceConfig); + } + }; + private final NameResolver.Args args = NameResolver.Args.newBuilder() .setDefaultPort(8080) .setProxyDetector(GrpcUtil.NOOP_PROXY_DETECTOR) .setSynchronizationContext(syncContext) - .setServiceConfigParser(mock(ServiceConfigParser.class)) + .setServiceConfigParser(serviceConfigParser) .setScheduledExecutorService(fakeClock.getScheduledExecutorService()) .setChannelLogger(mock(ChannelLogger.class)) .build(); - - private final Queue> responseObservers = new ArrayDeque<>(); - @Mock private BackoffPolicy.Provider backoffPolicyProvider; @Mock @@ -226,7 +232,7 @@ public BootstrapInfo readBootstrap() throws IOException { } @Test - public void resolve_passxdsClientPoolInResult() { + public void resolve_passXdsClientPoolInResult() { xdsNameResolver.start(mockListener); assertThat(responseObservers).hasSize(1); StreamObserver responseObserver = responseObservers.poll(); @@ -260,6 +266,7 @@ public void resolve_foundResource() { assertThat(result.getAddresses()).isEmpty(); Map serviceConfig = result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); + assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig); @SuppressWarnings("unchecked") List> rawLbConfigs = (List>) serviceConfig.get("loadBalancingConfig"); @@ -306,6 +313,7 @@ public void resolve_resourceUpdated() { assertThat(result.getAddresses()).isEmpty(); Map serviceConfig = result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); + assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig); List> rawLbConfigs = (List>) serviceConfig.get("loadBalancingConfig"); @@ -331,6 +339,7 @@ public void resolve_resourceUpdated() { result = resolutionResultCaptor.getValue(); assertThat(result.getAddresses()).isEmpty(); serviceConfig = result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); + assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig); rawLbConfigs = (List>) serviceConfig.get("loadBalancingConfig"); lbConfig = Iterables.getOnlyElement(rawLbConfigs); assertThat(lbConfig.keySet()).containsExactly("cds_experimental"); @@ -366,6 +375,7 @@ public void resolve_resourceNewlyAdded() { assertThat(result.getAddresses()).isEmpty(); Map serviceConfig = result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); + assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig); @SuppressWarnings("unchecked") List> rawLbConfigs = (List>) serviceConfig.get("loadBalancingConfig");