Skip to content

Commit

Permalink
Merge branch 'master' of github.com:grpc/grpc-java into impl/grpclb_s…
Browse files Browse the repository at this point in the history
…election_change_take_one
  • Loading branch information
voidzcy committed Jan 27, 2020
2 parents 5ca8054 + e9882ec commit c090767
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 62 deletions.
80 changes: 40 additions & 40 deletions grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
Expand Up @@ -238,46 +238,46 @@ public ManagedChannel answer(InvocationOnMock invocation) throws Throwable {
}
}).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class));
doAnswer(new Answer<Subchannel>() {
@Override
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
Subchannel subchannel = mock(Subchannel.class);
EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0];
Attributes attrs = (Attributes) invocation.getArguments()[1];
when(subchannel.getAllAddresses()).thenReturn(Arrays.asList(eag));
when(subchannel.getAttributes()).thenReturn(attrs);
mockSubchannels.add(subchannel);
pooledSubchannelTracker.add(subchannel);
return subchannel;
}
}).when(subchannelPool).takeOrCreateSubchannel(
any(EquivalentAddressGroup.class), any(Attributes.class));
@Override
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
Subchannel subchannel = mock(Subchannel.class);
EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0];
Attributes attrs = (Attributes) invocation.getArguments()[1];
when(subchannel.getAllAddresses()).thenReturn(Arrays.asList(eag));
when(subchannel.getAttributes()).thenReturn(attrs);
mockSubchannels.add(subchannel);
pooledSubchannelTracker.add(subchannel);
return subchannel;
}
}).when(subchannelPool).takeOrCreateSubchannel(
any(EquivalentAddressGroup.class), any(Attributes.class));
doAnswer(new Answer<Subchannel>() {
@Override
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
Subchannel subchannel = mock(Subchannel.class);
List<EquivalentAddressGroup> eagList =
(List<EquivalentAddressGroup>) invocation.getArguments()[0];
Attributes attrs = (Attributes) invocation.getArguments()[1];
when(subchannel.getAllAddresses()).thenReturn(eagList);
when(subchannel.getAttributes()).thenReturn(attrs);
mockSubchannels.add(subchannel);
unpooledSubchannelTracker.add(subchannel);
return subchannel;
}
// TODO(zhangkun83): remove the deprecation suppression on this method once migrated to
// the new createSubchannel().
}).when(helper).createSubchannel(any(List.class), any(Attributes.class));
@Override
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
Subchannel subchannel = mock(Subchannel.class);
List<EquivalentAddressGroup> eagList =
(List<EquivalentAddressGroup>) invocation.getArguments()[0];
Attributes attrs = (Attributes) invocation.getArguments()[1];
when(subchannel.getAllAddresses()).thenReturn(eagList);
when(subchannel.getAttributes()).thenReturn(attrs);
mockSubchannels.add(subchannel);
unpooledSubchannelTracker.add(subchannel);
return subchannel;
}
// TODO(zhangkun83): remove the deprecation suppression on this method once migrated to
// the new createSubchannel().
}).when(helper).createSubchannel(any(List.class), any(Attributes.class));
when(helper.getSynchronizationContext()).thenReturn(syncContext);
when(helper.getScheduledExecutorService()).thenReturn(fakeClock.getScheduledExecutorService());
when(helper.getChannelLogger()).thenReturn(channelLogger);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
currentPicker = (SubchannelPicker) invocation.getArguments()[1];
return null;
}
}).when(helper).updateBalancingState(
any(ConnectivityState.class), any(SubchannelPicker.class));
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
currentPicker = (SubchannelPicker) invocation.getArguments()[1];
return null;
}
}).when(helper).updateBalancingState(
any(ConnectivityState.class), any(SubchannelPicker.class));
when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY);
when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L);
Expand All @@ -293,11 +293,11 @@ public void tearDown() {
try {
if (balancer != null) {
syncContext.execute(new Runnable() {
@Override
public void run() {
balancer.shutdown();
}
});
@Override
public void run() {
balancer.shutdown();
}
});
}
for (ManagedChannel channel : oobChannelTracker) {
assertTrue(channel + " is shutdown", channel.isShutdown());
Expand Down
Expand Up @@ -208,17 +208,17 @@ public void run() {
CallOptions.DEFAULT.withDeadlineAfter(rpcTimeoutSec, TimeUnit.SECONDS));
call.start(
new ClientCall.Listener<SimpleResponse>() {
private String serverId;
private String hostname;

@Override
public void onMessage(SimpleResponse response) {
serverId = response.getServerId();
hostname = response.getHostname();
// TODO(ericgribkoff) Currently some test environments cannot access the stats RPC
// service and rely on parsing stdout.
if (printResponse) {
System.out.println(
"Greeting: Hello world, this is "
+ response.getHostname()
+ hostname
+ ", from "
+ call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
}
Expand All @@ -227,7 +227,7 @@ public void onMessage(SimpleResponse response) {
@Override
public void onClose(Status status, Metadata trailers) {
for (XdsStatsWatcher watcher : savedWatchers) {
watcher.rpcCompleted(requestId, serverId);
watcher.rpcCompleted(requestId, hostname);
}
}
},
Expand Down Expand Up @@ -295,14 +295,14 @@ private XdsStatsWatcher(long startId, long endId) {
this.endId = endId;
}

void rpcCompleted(long requestId, @Nullable String serverId) {
void rpcCompleted(long requestId, @Nullable String hostname) {
synchronized (lock) {
if (startId <= requestId && requestId < endId) {
if (serverId != null) {
if (rpcsByPeer.containsKey(serverId)) {
rpcsByPeer.put(serverId, rpcsByPeer.get(serverId) + 1);
if (hostname != null) {
if (rpcsByPeer.containsKey(hostname)) {
rpcsByPeer.put(hostname, rpcsByPeer.get(hostname) + 1);
} else {
rpcsByPeer.put(serverId, 1);
rpcsByPeer.put(hostname, 1);
}
} else {
noRemotePeer += 1;
Expand Down
Expand Up @@ -124,13 +124,14 @@ private void blockUntilShutdown() throws InterruptedException {
}

private class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
private String host = "";
private final String host;

private TestServiceImpl() {
try {
host = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
logger.log(Level.WARNING, "Failed to get host", e);
logger.log(Level.SEVERE, "Failed to get host", e);
throw new RuntimeException(e);
}
}

Expand Down
9 changes: 4 additions & 5 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Expand Up @@ -25,7 +25,6 @@
import io.envoyproxy.envoy.api.v2.core.Node;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.Status.Code;
Expand Down Expand Up @@ -64,6 +63,7 @@ final class XdsNameResolver extends NameResolver {
private final XdsChannelFactory channelFactory;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
private final ServiceConfigParser serviceConfigParser;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Supplier<Stopwatch> stopwatchSupplier;
private final Bootstrapper bootstrapper;
Expand All @@ -89,6 +89,7 @@ final class XdsNameResolver extends NameResolver {
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
this.syncContext = checkNotNull(args.getSynchronizationContext(), "syncContext");
this.timeService = checkNotNull(args.getScheduledExecutorService(), "timeService");
this.serviceConfigParser = checkNotNull(args.getServiceConfigParser(), "serviceConfigParser");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
Expand Down Expand Up @@ -158,14 +159,12 @@ public void onConfigChanged(ConfigUpdate update) {
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, config)
.set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
.build();
ConfigOrError xdsServiceConfig =
XdsLoadBalancerProvider
.parseLoadBalancingConfigPolicy(config, LoadBalancerRegistry.getDefaultRegistry());
ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(config);
ResolutionResult result =
ResolutionResult.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(attrs)
.setServiceConfig(xdsServiceConfig)
.setServiceConfig(parsedServiceConfig)
.build();
listener.onResult(result);
}
Expand Down
22 changes: 16 additions & 6 deletions xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
Expand Up @@ -39,6 +39,7 @@
import io.grpc.ChannelLogger;
import io.grpc.ManagedChannel;
import io.grpc.NameResolver;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.NameResolver.ResolutionResult;
import io.grpc.NameResolver.ServiceConfigParser;
import io.grpc.Status;
Expand Down Expand Up @@ -86,7 +87,7 @@ public class XdsNameResolverTest {
public final MockitoRule mocks = MockitoJUnit.rule();
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();

private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
Expand All @@ -96,19 +97,24 @@ public void uncaughtException(Thread t, Throwable e) {
});

private final FakeClock fakeClock = new FakeClock();
private final Queue<StreamObserver<DiscoveryResponse>> responseObservers = new ArrayDeque<>();
private final ServiceConfigParser serviceConfigParser = new ServiceConfigParser() {
@Override
public ConfigOrError parseServiceConfig(Map<String, ?> rawServiceConfig) {
return ConfigOrError.fromConfig(rawServiceConfig);
}
};

private final NameResolver.Args args =
NameResolver.Args.newBuilder()
.setDefaultPort(8080)
.setProxyDetector(GrpcUtil.NOOP_PROXY_DETECTOR)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setServiceConfigParser(serviceConfigParser)
.setScheduledExecutorService(fakeClock.getScheduledExecutorService())
.setChannelLogger(mock(ChannelLogger.class))
.build();


private final Queue<StreamObserver<DiscoveryResponse>> responseObservers = new ArrayDeque<>();

@Mock
private BackoffPolicy.Provider backoffPolicyProvider;
@Mock
Expand Down Expand Up @@ -226,7 +232,7 @@ public BootstrapInfo readBootstrap() throws IOException {
}

@Test
public void resolve_passxdsClientPoolInResult() {
public void resolve_passXdsClientPoolInResult() {
xdsNameResolver.start(mockListener);
assertThat(responseObservers).hasSize(1);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
Expand Down Expand Up @@ -260,6 +266,7 @@ public void resolve_foundResource() {
assertThat(result.getAddresses()).isEmpty();
Map<String, ?> serviceConfig =
result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig);
@SuppressWarnings("unchecked")
List<Map<String, ?>> rawLbConfigs =
(List<Map<String, ?>>) serviceConfig.get("loadBalancingConfig");
Expand Down Expand Up @@ -306,6 +313,7 @@ public void resolve_resourceUpdated() {
assertThat(result.getAddresses()).isEmpty();
Map<String, ?> serviceConfig =
result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig);

List<Map<String, ?>> rawLbConfigs =
(List<Map<String, ?>>) serviceConfig.get("loadBalancingConfig");
Expand All @@ -331,6 +339,7 @@ public void resolve_resourceUpdated() {
result = resolutionResultCaptor.getValue();
assertThat(result.getAddresses()).isEmpty();
serviceConfig = result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig);
rawLbConfigs = (List<Map<String, ?>>) serviceConfig.get("loadBalancingConfig");
lbConfig = Iterables.getOnlyElement(rawLbConfigs);
assertThat(lbConfig.keySet()).containsExactly("cds_experimental");
Expand Down Expand Up @@ -366,6 +375,7 @@ public void resolve_resourceNewlyAdded() {
assertThat(result.getAddresses()).isEmpty();
Map<String, ?> serviceConfig =
result.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
assertThat(result.getServiceConfig().getConfig()).isEqualTo(serviceConfig);
@SuppressWarnings("unchecked")
List<Map<String, ?>> rawLbConfigs =
(List<Map<String, ?>>) serviceConfig.get("loadBalancingConfig");
Expand Down

0 comments on commit c090767

Please sign in to comment.