Skip to content

Commit

Permalink
Merge branch 'master' of github.com:grpc/grpc-java into impl/grpclb_s…
Browse files Browse the repository at this point in the history
…election_change_take_one
  • Loading branch information
voidzcy committed Jan 25, 2020
2 parents 5ca8054 + e9882ec commit 4e33ec5
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 22 deletions.
Expand Up @@ -208,17 +208,17 @@ public void run() {
CallOptions.DEFAULT.withDeadlineAfter(rpcTimeoutSec, TimeUnit.SECONDS));
call.start(
new ClientCall.Listener<SimpleResponse>() {
private String serverId;
private String hostname;

@Override
public void onMessage(SimpleResponse response) {
serverId = response.getServerId();
hostname = response.getHostname();
// TODO(ericgribkoff) Currently some test environments cannot access the stats RPC
// service and rely on parsing stdout.
if (printResponse) {
System.out.println(
"Greeting: Hello world, this is "
+ response.getHostname()
+ hostname
+ ", from "
+ call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
}
Expand All @@ -227,7 +227,7 @@ public void onMessage(SimpleResponse response) {
@Override
public void onClose(Status status, Metadata trailers) {
for (XdsStatsWatcher watcher : savedWatchers) {
watcher.rpcCompleted(requestId, serverId);
watcher.rpcCompleted(requestId, hostname);
}
}
},
Expand Down Expand Up @@ -295,14 +295,14 @@ private XdsStatsWatcher(long startId, long endId) {
this.endId = endId;
}

void rpcCompleted(long requestId, @Nullable String serverId) {
void rpcCompleted(long requestId, @Nullable String hostname) {
synchronized (lock) {
if (startId <= requestId && requestId < endId) {
if (serverId != null) {
if (rpcsByPeer.containsKey(serverId)) {
rpcsByPeer.put(serverId, rpcsByPeer.get(serverId) + 1);
if (hostname != null) {
if (rpcsByPeer.containsKey(hostname)) {
rpcsByPeer.put(hostname, rpcsByPeer.get(hostname) + 1);
} else {
rpcsByPeer.put(serverId, 1);
rpcsByPeer.put(hostname, 1);
}
} else {
noRemotePeer += 1;
Expand Down
Expand Up @@ -124,13 +124,14 @@ private void blockUntilShutdown() throws InterruptedException {
}

private class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
private String host = "";
private final String host;

private TestServiceImpl() {
try {
host = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
logger.log(Level.WARNING, "Failed to get host", e);
logger.log(Level.SEVERE, "Failed to get host", e);
throw new RuntimeException(e);
}
}

Expand Down
9 changes: 4 additions & 5 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Expand Up @@ -25,7 +25,6 @@
import io.envoyproxy.envoy.api.v2.core.Node;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.Status.Code;
Expand Down Expand Up @@ -64,6 +63,7 @@ final class XdsNameResolver extends NameResolver {
private final XdsChannelFactory channelFactory;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
private final ServiceConfigParser serviceConfigParser;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Supplier<Stopwatch> stopwatchSupplier;
private final Bootstrapper bootstrapper;
Expand All @@ -89,6 +89,7 @@ final class XdsNameResolver extends NameResolver {
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
this.syncContext = checkNotNull(args.getSynchronizationContext(), "syncContext");
this.timeService = checkNotNull(args.getScheduledExecutorService(), "timeService");
this.serviceConfigParser = checkNotNull(args.getServiceConfigParser(), "serviceConfigParser");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
Expand Down Expand Up @@ -158,14 +159,12 @@ public void onConfigChanged(ConfigUpdate update) {
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, config)
.set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
.build();
ConfigOrError xdsServiceConfig =
XdsLoadBalancerProvider
.parseLoadBalancingConfigPolicy(config, LoadBalancerRegistry.getDefaultRegistry());
ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(config);
ResolutionResult result =
ResolutionResult.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(attrs)
.setServiceConfig(xdsServiceConfig)
.setServiceConfig(parsedServiceConfig)
.build();
listener.onResult(result);
}
Expand Down
22 changes: 16 additions & 6 deletions xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
Expand Up @@ -39,6 +39,7 @@
import io.grpc.ChannelLogger;
import io.grpc.ManagedChannel;
import io.grpc.NameResolver;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.NameResolver.ResolutionResult;
import io.grpc.NameResolver.ServiceConfigParser;
import io.grpc.Status;
Expand Down Expand Up @@ -86,7 +87,7 @@ public class XdsNameResolverTest {
public final MockitoRule mocks = MockitoJUnit.rule();
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();

private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
Expand All @@ -96,19 +97,24 @@ public void uncaughtException(Thread t, Throwable e) {
});

private final FakeClock fakeClock = new FakeClock();
private final Queue<StreamObserver<DiscoveryResponse>> responseObservers = new ArrayDeque<>();
private final ServiceConfigParser serviceConfigParser = new ServiceConfigParser() {
@Override
public ConfigOrError parseServiceConfig(Map<String, ?> rawServiceConfig) {
return ConfigOrError.fromConfig(rawServiceConfig);
}
};

private final NameResolver.Args args =
NameResolver.Args.newBuilder()
.setDefaultPort(8080)
.setProxyDetector(GrpcUtil.NOOP_PROXY_DETECTOR)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setServiceConfigParser(serviceConfigParser)
.setScheduledExecutorService(fakeClock.getScheduledExecutorService())
.setChannelLogger(mock(ChannelLogger.class))
.build();


private final Queue<StreamObserver<DiscoveryResponse>> responseObservers = new ArrayDeque<>();

@Mock
private BackoffPolicy.Provider backoffPolicyProvider;
@Mock
Expand Down Expand Up @@ -226,7 +232,7 @@ public BootstrapInfo readBootstrap() throws IOException {
}

@Test
public void resolve_passxdsClientPoolInResult() {
public void resolve_passXdsClientPoolInResult() {
xdsNameResolver.start(mockListener);
assertThat(responseObservers).hasSize(1);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
Expand Down Expand Up @@ -260,6 +266,7 @@ public void resolve_foundResource() {
assertThat(result.getAddresses()).isEmpty();
Map<String, ?> serviceConfig =
result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig);
@SuppressWarnings("unchecked")
List<Map<String, ?>> rawLbConfigs =
(List<Map<String, ?>>) serviceConfig.get("loadBalancingConfig");
Expand Down Expand Up @@ -306,6 +313,7 @@ public void resolve_resourceUpdated() {
assertThat(result.getAddresses()).isEmpty();
Map<String, ?> serviceConfig =
result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig);

List<Map<String, ?>> rawLbConfigs =
(List<Map<String, ?>>) serviceConfig.get("loadBalancingConfig");
Expand All @@ -331,6 +339,7 @@ public void resolve_resourceUpdated() {
result = resolutionResultCaptor.getValue();
assertThat(result.getAddresses()).isEmpty();
serviceConfig = result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig);
rawLbConfigs = (List<Map<String, ?>>) serviceConfig.get("loadBalancingConfig");
lbConfig = Iterables.getOnlyElement(rawLbConfigs);
assertThat(lbConfig.keySet()).containsExactly("cds_experimental");
Expand Down Expand Up @@ -366,6 +375,7 @@ public void resolve_resourceNewlyAdded() {
assertThat(result.getAddresses()).isEmpty();
Map<String, ?> serviceConfig =
result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig);
@SuppressWarnings("unchecked")
List<Map<String, ?>> rawLbConfigs =
(List<Map<String, ?>>) serviceConfig.get("loadBalancingConfig");
Expand Down

0 comments on commit 4e33ec5

Please sign in to comment.