Skip to content

Commit

Permalink
Replace custom name resolver loader with standard grpc-java one
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Aug 27, 2020
1 parent fc3fa50 commit ee0153b
Show file tree
Hide file tree
Showing 23 changed files with 420 additions and 817 deletions.
8 changes: 0 additions & 8 deletions jetcd-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,6 @@
<groupId>io.etcd</groupId>
<artifactId>jetcd-extensions</artifactId>
</dependency>
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-resolver</artifactId>
</dependency>
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-resolver-dns-srv</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
9 changes: 5 additions & 4 deletions jetcd-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
<groupId>${project.groupId}</groupId>
<artifactId>jetcd-common</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jetcd-resolver</artifactId>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
Expand Down Expand Up @@ -68,6 +64,11 @@
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service-annotations</artifactId>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
36 changes: 17 additions & 19 deletions jetcd-core/src/main/java/io/etcd/jetcd/ClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import io.etcd.jetcd.common.exception.EtcdException;
import io.etcd.jetcd.common.exception.EtcdExceptionFactory;
import io.etcd.jetcd.resolver.URIResolverLoader;
import io.grpc.ClientInterceptor;
import io.grpc.Metadata;
import io.grpc.netty.GrpcSslContexts;
Expand All @@ -57,7 +56,6 @@ public final class ClientBuilder implements Cloneable {
private String loadBalancerPolicy;
private SslContext sslContext;
private String authority;
private URIResolverLoader uriResolverLoader;
private Integer maxInboundMessageSize;
private Map<Metadata.Key<?>, Object> headers;
private List<ClientInterceptor> interceptors;
Expand All @@ -70,6 +68,7 @@ public final class ClientBuilder implements Cloneable {
private ChronoUnit retryChronoUnit = ChronoUnit.MILLIS;
private String retryMaxDuration;
private Integer connectTimeoutMs;
private boolean discovery;

ClientBuilder() {
}
Expand Down Expand Up @@ -281,22 +280,6 @@ public ClientBuilder authority(String authority) {
return this;
}

/**
* @return the uri resolver loader
*/
public URIResolverLoader uriResolverLoader() {
return uriResolverLoader;
}

/**
* @param loader the uri resolver loader
* @return this builder
*/
public ClientBuilder uriResolverLoader(URIResolverLoader loader) {
this.uriResolverLoader = loader;
return this;
}

/**
* @return the maximum message size allowed for a single gRPC frame.
*/
Expand Down Expand Up @@ -388,7 +371,6 @@ public long retryDelay() {
/**
* @param retryDelay The delay between retries.
* @return this builder
*
*/
public ClientBuilder retryDelay(long retryDelay) {
this.retryDelay = retryDelay;
Expand Down Expand Up @@ -507,6 +489,22 @@ public ClientBuilder connectTimeoutMs(Integer connectTimeoutMs) {
return this;
}

/**
* @return if the endpoint represent a discovery address using dns+srv.
*/
public boolean discovery() {
return discovery;
}

/**
* @param discovery if the endpoint represent a discovery address using dns+srv.
* @return this builder
*/
public ClientBuilder discovery(boolean discovery) {
this.discovery = discovery;
return this;
}

/**
* build a new Client.
*
Expand Down
54 changes: 36 additions & 18 deletions jetcd-core/src/main/java/io/etcd/jetcd/ClientConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand All @@ -28,19 +29,22 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import io.etcd.jetcd.api.AuthGrpc;
import io.etcd.jetcd.api.AuthenticateRequest;
import io.etcd.jetcd.api.AuthenticateResponse;
import io.etcd.jetcd.common.exception.ErrorCode;
import io.etcd.jetcd.common.exception.EtcdExceptionFactory;
import io.etcd.jetcd.resolver.URIResolverLoader;
import io.etcd.jetcd.resolver.DnsSrvNameResolver;
import io.etcd.jetcd.resolver.EtcdNameResolver;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
Expand All @@ -64,7 +68,6 @@
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.handleInterrupt;
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.newEtcdException;
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException;
import static io.etcd.jetcd.resolver.SmartNameResolverFactory.forEndpoints;

final class ClientConnectionManager {

Expand Down Expand Up @@ -182,16 +185,10 @@ void close() {
<T extends AbstractStub<T>, R> CompletableFuture<R> withNewChannel(URI endpoint, Function<ManagedChannel, T> stubCustomizer,
Function<T, CompletableFuture<R>> stubConsumer) {

final ManagedChannel channel = defaultChannelBuilder().nameResolverFactory(
forEndpoints(
Util.supplyIfNull(builder.authority(), () -> ""),
Collections.singleton(endpoint),
Util.supplyIfNull(builder.uriResolverLoader(), URIResolverLoader::defaultLoader)))
.build();
final ManagedChannel channel = defaultChannelBuilder(Collections.singletonList(endpoint)).build();
final T stub = stubCustomizer.apply(channel);

try {
T stub = stubCustomizer.apply(channel);

return stubConsumer.apply(stub).whenComplete((r, t) -> channel.shutdown());
} catch (Exception e) {
channel.shutdown();
Expand All @@ -201,7 +198,35 @@ <T extends AbstractStub<T>, R> CompletableFuture<R> withNewChannel(URI endpoint,

@VisibleForTesting
protected ManagedChannelBuilder<?> defaultChannelBuilder() {
final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget("etcd");
return defaultChannelBuilder(builder.endpoints());
}

@VisibleForTesting
protected ManagedChannelBuilder<?> defaultChannelBuilder(Collection<URI> endpoints) {
if (endpoints.isEmpty()) {
throw new IllegalArgumentException("At least one endpoint should be provided");
}

final String target;

if (builder.discovery()) {
if (endpoints.size() != 1) {
throw new IllegalArgumentException("When configured for discovery, there should be only a single endpoint");
}

target = String.format(
"%s:///%s",
DnsSrvNameResolver.SCHEME,
Iterables.get(endpoints, 0));
} else {
target = String.format(
"%s://%s/%s",
EtcdNameResolver.SCHEME,
builder.authority() != null ? builder.authority() : "",
endpoints.stream().map(e -> e.getHost() + ":" + e.getPort()).collect(Collectors.joining(",")));
}

final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(target);

if (builder.maxInboundMessageSize() != null) {
channelBuilder.maxInboundMessageSize(builder.maxInboundMessageSize());
Expand All @@ -226,13 +251,6 @@ protected ManagedChannelBuilder<?> defaultChannelBuilder() {
channelBuilder.withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, builder.connectTimeoutMs());
}

channelBuilder.nameResolverFactory(
forEndpoints(
Util.supplyIfNull(builder.authority(), () -> ""),
builder.endpoints(),
Util.supplyIfNull(builder.uriResolverLoader(),
URIResolverLoader::defaultLoader)));

if (builder.loadBalancerPolicy() != null) {
channelBuilder.defaultLoadBalancingPolicy(builder.loadBalancerPolicy());
} else {
Expand Down

0 comments on commit ee0153b

Please sign in to comment.