diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java index 8a309408a943..be79c8dedd9c 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java @@ -19,7 +19,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.errorprone.annotations.DoNotCall; import io.grpc.ChannelCredentials; import io.grpc.ChannelLogger; import io.grpc.ExperimentalApi; @@ -55,33 +54,37 @@ public final class InProcessChannelBuilder extends * @return a new builder */ public static InProcessChannelBuilder forName(String name) { - return new InProcessChannelBuilder(name); + return forAddress(new InProcessSocketAddress(checkNotNull(name, "name"))); } /** - * Always fails. Call {@link #forName} instead. + * Create a channel builder that will connect to the server referenced by the given target URI. + * Only intended for use with a custom name resolver. + * + * @param target the identity of the server to connect to + * @return a new builder */ - @DoNotCall("Unsupported. Use forName() instead") public static InProcessChannelBuilder forTarget(String target) { - throw new UnsupportedOperationException("call forName() instead"); + return new InProcessChannelBuilder(null, checkNotNull(target, "target")); } /** - * Always fails. Call {@link #forName} instead. + * Create a channel builder that will connect to the server referenced by the given address. + * + * @param address the address of the server to connect to + * @return a new builder */ - @DoNotCall("Unsupported. Use forName() instead") - public static InProcessChannelBuilder forAddress(String name, int port) { - throw new UnsupportedOperationException("call forName() instead"); + public static InProcessChannelBuilder forAddress(InProcessSocketAddress address) { + return new InProcessChannelBuilder(checkNotNull(address, "address"), null); } private final ManagedChannelImplBuilder managedChannelImplBuilder; - private final String name; private ScheduledExecutorService scheduledExecutorService; private int maxInboundMetadataSize = Integer.MAX_VALUE; private boolean transportIncludeStatusCause = false; - private InProcessChannelBuilder(String name) { - this.name = checkNotNull(name, "name"); + private InProcessChannelBuilder( + @Nullable InProcessSocketAddress directAddress, @Nullable String target) { final class InProcessChannelTransportFactoryBuilder implements ClientTransportFactoryBuilder { @Override @@ -90,8 +93,13 @@ public ClientTransportFactory buildClientTransportFactory() { } } - managedChannelImplBuilder = new ManagedChannelImplBuilder(new InProcessSocketAddress(name), - "localhost", new InProcessChannelTransportFactoryBuilder(), null); + if (directAddress != null) { + managedChannelImplBuilder = new ManagedChannelImplBuilder(directAddress, "localhost", + new InProcessChannelTransportFactoryBuilder(), null); + } else { + managedChannelImplBuilder = new ManagedChannelImplBuilder(target, + new InProcessChannelTransportFactoryBuilder(), null); + } // In-process transport should not record its traffic to the stats module. // https://github.com/grpc/grpc-java/issues/2284 @@ -204,7 +212,7 @@ public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) { ClientTransportFactory buildTransportFactory() { return new InProcessClientTransportFactory( - name, scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause); + scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause); } void setStatsEnabled(boolean value) { @@ -215,7 +223,6 @@ void setStatsEnabled(boolean value) { * Creates InProcess transports. Exposed for internal use, as it should be private. */ static final class InProcessClientTransportFactory implements ClientTransportFactory { - private final String name; private final ScheduledExecutorService timerService; private final boolean useSharedTimer; private final int maxInboundMetadataSize; @@ -223,10 +230,8 @@ static final class InProcessClientTransportFactory implements ClientTransportFac private final boolean includeCauseWithStatus; private InProcessClientTransportFactory( - String name, @Nullable ScheduledExecutorService scheduledExecutorService, int maxInboundMetadataSize, boolean includeCauseWithStatus) { - this.name = name; useSharedTimer = scheduledExecutorService == null; timerService = useSharedTimer ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService; @@ -242,7 +247,7 @@ public ConnectionClientTransport newClientTransport( } // TODO(carl-mastrangelo): Pass channelLogger in. return new InProcessTransport( - name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(), + addr, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(), options.getEagAttributes(), includeCauseWithStatus); } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java index 7922ebd21a10..f576670dc725 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java @@ -40,11 +40,20 @@ final class InProcessServer implements InternalServer { private static final ConcurrentMap registry = new ConcurrentHashMap<>(); - static InProcessServer findServer(String name) { - return registry.get(name); + static InProcessServer findServer(SocketAddress addr) { + if (addr instanceof InProcessSocketAddress) { + InProcessSocketAddress inProcessAddress = (InProcessSocketAddress) addr; + if (inProcessAddress.getServer() != null) { + return inProcessAddress.getServer(); + } else { + return registry.get(inProcessAddress.getName()); + } + } + return null; } private final String name; + private final boolean anonymous; private final int maxInboundMetadataSize; private final List streamTracerFactories; private ServerListener listener; @@ -61,6 +70,7 @@ static InProcessServer findServer(String name) { InProcessServerBuilder builder, List streamTracerFactories) { this.name = builder.name; + this.anonymous = builder.anonymous; this.schedulerPool = builder.schedulerPool; this.maxInboundMetadataSize = builder.maxInboundMetadataSize; this.streamTracerFactories = @@ -71,15 +81,17 @@ static InProcessServer findServer(String name) { public void start(ServerListener serverListener) throws IOException { this.listener = serverListener; this.scheduler = schedulerPool.getObject(); - // Must be last, as channels can start connecting after this point. - if (registry.putIfAbsent(name, this) != null) { - throw new IOException("name already registered: " + name); + if (!anonymous) { + // Must be last, as channels can start connecting after this point. + if (registry.putIfAbsent(name, this) != null) { + throw new IOException("name already registered: " + name); + } } } @Override public SocketAddress getListenSocketAddress() { - return new InProcessSocketAddress(name); + return new InProcessSocketAddress(name, anonymous ? this : null); } @Override diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java index 6c68189fcc94..99ba0047d7bb 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java @@ -81,7 +81,18 @@ public final class InProcessServerBuilder extends * @return a new builder */ public static InProcessServerBuilder forName(String name) { - return new InProcessServerBuilder(name); + return new InProcessServerBuilder(name, false); + } + + /** + * Create a server builder for an anonymous in-process server. + * Anonymouns servers can only be connected to via their listen address, + * and can't be referenced by name. + * @param name a server identifier used for logging purposes only. + * @return a new builder + */ + public static InProcessServerBuilder anonymous(String name) { + return new InProcessServerBuilder("anon:" + name, true); } /** @@ -101,12 +112,14 @@ public static String generateName() { private final ServerImplBuilder serverImplBuilder; final String name; + final boolean anonymous; int maxInboundMetadataSize = Integer.MAX_VALUE; ObjectPool schedulerPool = SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE); - private InProcessServerBuilder(String name) { + private InProcessServerBuilder(String name, boolean anonymous) { this.name = Preconditions.checkNotNull(name, "name"); + this.anonymous = anonymous; final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder { @Override diff --git a/core/src/main/java/io/grpc/inprocess/InProcessSocketAddress.java b/core/src/main/java/io/grpc/inprocess/InProcessSocketAddress.java index e5f0515f1d0b..164917a3dfc4 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessSocketAddress.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessSocketAddress.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.net.SocketAddress; +import javax.annotation.Nullable; /** * Custom SocketAddress class for {@link InProcessTransport}. @@ -27,13 +28,25 @@ public final class InProcessSocketAddress extends SocketAddress { private static final long serialVersionUID = -2803441206326023474L; private final String name; + @Nullable + private final InProcessServer server; /** * @param name - The name of the inprocess channel or server. * @since 1.0.0 */ public InProcessSocketAddress(String name) { + this(name, null); + } + + /** + * @param name - The name of the inprocess channel or server. + * @param server - The concrete {@link InProcessServer} instance, Will be present on the listen + * address of an anonymous server. + */ + InProcessSocketAddress(String name, @Nullable InProcessServer server) { this.name = checkNotNull(name, "name"); + this.server = server; } /** @@ -45,6 +58,11 @@ public String getName() { return name; } + @Nullable + InProcessServer getServer() { + return server; + } + /** * @since 1.14.0 */ @@ -58,7 +76,13 @@ public String toString() { */ @Override public int hashCode() { - return name.hashCode(); + if (server != null) { + // Since there's a single canonical InProcessSocketAddress instance for + // an anonymous inprocess server, we can just use identity equality. + return super.hashCode(); + } else { + return name.hashCode(); + } } /** @@ -69,6 +93,13 @@ public boolean equals(Object obj) { if (!(obj instanceof InProcessSocketAddress)) { return false; } - return name.equals(((InProcessSocketAddress) obj).name); + InProcessSocketAddress addr = (InProcessSocketAddress) obj; + if (server == null && addr.server == null) { + return name.equals(addr.name); + } else { + // Since there's a single canonical InProcessSocketAddress instance for + // an anonymous inprocess server, we can just use identity equality. + return addr == this; + } } } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 895b709559ba..c46b6f24221c 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -59,6 +59,7 @@ import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; import java.io.InputStream; +import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -80,7 +81,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private static final Logger log = Logger.getLogger(InProcessTransport.class.getName()); private final InternalLogId logId; - private final String name; + private final SocketAddress address; private final int clientMaxInboundMetadataSize; private final String authority; private final String userAgent; @@ -119,10 +120,10 @@ protected void handleNotInUse() { } }; - private InProcessTransport(String name, int maxInboundMetadataSize, String authority, + private InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent, Attributes eagAttrs, Optional optionalServerListener, boolean includeCauseWithStatus) { - this.name = name; + this.address = address; this.clientMaxInboundMetadataSize = maxInboundMetadataSize; this.authority = authority; this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent); @@ -130,18 +131,18 @@ private InProcessTransport(String name, int maxInboundMetadataSize, String autho this.attributes = Attributes.newBuilder() .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs) - .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name)) - .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name)) + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address) .build(); this.optionalServerListener = optionalServerListener; - logId = InternalLogId.allocate(getClass(), name); + logId = InternalLogId.allocate(getClass(), getServerName(address)); this.includeCauseWithStatus = includeCauseWithStatus; } public InProcessTransport( - String name, int maxInboundMetadataSize, String authority, String userAgent, + SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent, Attributes eagAttrs, boolean includeCauseWithStatus) { - this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, + this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.absent(), includeCauseWithStatus); } @@ -150,7 +151,7 @@ public InProcessTransport( Attributes eagAttrs, ObjectPool serverSchedulerPool, List serverStreamTracerFactories, ServerListener serverListener) { - this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, + this(new InProcessSocketAddress(name), maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.of(serverListener), false); this.serverMaxInboundMetadataSize = maxInboundMetadataSize; this.serverSchedulerPool = serverSchedulerPool; @@ -165,7 +166,7 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) { serverScheduler = serverSchedulerPool.getObject(); serverTransportListener = optionalServerListener.get().transportCreated(this); } else { - InProcessServer server = InProcessServer.findServer(name); + InProcessServer server = InProcessServer.findServer(address); if (server != null) { serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize(); serverSchedulerPool = server.getScheduledExecutorServicePool(); @@ -176,7 +177,8 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) { } } if (serverTransportListener == null) { - shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name); + shutdownStatus = + Status.UNAVAILABLE.withDescription("Could not find server: " + getServerName(address)); final Status localShutdownStatus = shutdownStatus; return new Runnable() { @Override @@ -194,8 +196,8 @@ public void run() { public void run() { synchronized (InProcessTransport.this) { Attributes serverTransportAttrs = Attributes.newBuilder() - .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name)) - .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name)) + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address) .build(); serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs); clientTransportListener.transportReady(); @@ -307,7 +309,7 @@ public void shutdownNow(Status reason) { public String toString() { return MoreObjects.toStringHelper(this) .add("logId", logId.getId()) - .add("name", name) + .add("name", getServerName(address)) .toString(); } @@ -882,6 +884,14 @@ private static Status cleanStatus(Status status, boolean includeCauseWithStatus) return clientStatus; } + private static String getServerName(SocketAddress addr) { + if (addr instanceof InProcessSocketAddress) { + return ((InProcessSocketAddress) addr).getName(); + } else { + return "Bad Server Address: " + addr; + } + } + private static class SingleMessageProducer implements StreamListener.MessageProducer { private InputStream message; diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java index 7325cda73ccb..84c37acc4dc6 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java @@ -82,8 +82,8 @@ protected String testAuthority(InternalServer server) { @Override protected ManagedClientTransport newClientTransport(InternalServer server) { return new InProcessTransport( - TRANSPORT_NAME, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, testAuthority(server), USER_AGENT, - eagAttrs(), false); + new InProcessSocketAddress(TRANSPORT_NAME), GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, + testAuthority(server), USER_AGENT, eagAttrs(), false); } @Override