From babfb2bfefba8d0f63273cba6b96c3182a91a388 Mon Sep 17 00:00:00 2001 From: Mark Brophy Date: Sun, 10 Oct 2021 19:45:35 +0100 Subject: [PATCH] Add support for anonymous in-process servers. Anonymous servers aren't registered statically, meaning they can't be referenced by name. Only the InProcessSocketAddress, fetched via Server.getListenSockets() can be used to connect to the server. This is particularly useful for production Android usage of in-process servers, where process startup latency is crucial, since a custom name resolver can be used to create the server instance on demand without directly impacting the startup latency of in-process gRPC clients. This approach supports a more-standard approach to "OnDeviceServer" referenced in gRFC L73. https://github.com/grpc/proposal/blob/master/L73-java-binderchannel.md#ondeviceserver --- .../inprocess/InProcessChannelBuilder.java | 43 +++++++++++-------- .../io/grpc/inprocess/InProcessServer.java | 24 ++++++++--- .../inprocess/InProcessServerBuilder.java | 17 +++++++- .../inprocess/InProcessSocketAddress.java | 35 ++++++++++++++- .../io/grpc/inprocess/InProcessTransport.java | 38 ++++++++++------ .../inprocess/InProcessTransportTest.java | 4 +- 6 files changed, 116 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java index 8a309408a94..be79c8dedd9 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java @@ -19,7 +19,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.errorprone.annotations.DoNotCall; import io.grpc.ChannelCredentials; import io.grpc.ChannelLogger; import io.grpc.ExperimentalApi; @@ -55,33 +54,37 @@ public final class InProcessChannelBuilder extends * @return a new builder */ public static InProcessChannelBuilder forName(String name) { - return new InProcessChannelBuilder(name); + return forAddress(new InProcessSocketAddress(checkNotNull(name, "name"))); } /** - * Always fails. Call {@link #forName} instead. + * Create a channel builder that will connect to the server referenced by the given target URI. + * Only intended for use with a custom name resolver. + * + * @param target the identity of the server to connect to + * @return a new builder */ - @DoNotCall("Unsupported. Use forName() instead") public static InProcessChannelBuilder forTarget(String target) { - throw new UnsupportedOperationException("call forName() instead"); + return new InProcessChannelBuilder(null, checkNotNull(target, "target")); } /** - * Always fails. Call {@link #forName} instead. + * Create a channel builder that will connect to the server referenced by the given address. + * + * @param address the address of the server to connect to + * @return a new builder */ - @DoNotCall("Unsupported. Use forName() instead") - public static InProcessChannelBuilder forAddress(String name, int port) { - throw new UnsupportedOperationException("call forName() instead"); + public static InProcessChannelBuilder forAddress(InProcessSocketAddress address) { + return new InProcessChannelBuilder(checkNotNull(address, "address"), null); } private final ManagedChannelImplBuilder managedChannelImplBuilder; - private final String name; private ScheduledExecutorService scheduledExecutorService; private int maxInboundMetadataSize = Integer.MAX_VALUE; private boolean transportIncludeStatusCause = false; - private InProcessChannelBuilder(String name) { - this.name = checkNotNull(name, "name"); + private InProcessChannelBuilder( + @Nullable InProcessSocketAddress directAddress, @Nullable String target) { final class InProcessChannelTransportFactoryBuilder implements ClientTransportFactoryBuilder { @Override @@ -90,8 +93,13 @@ public ClientTransportFactory buildClientTransportFactory() { } } - managedChannelImplBuilder = new ManagedChannelImplBuilder(new InProcessSocketAddress(name), - "localhost", new InProcessChannelTransportFactoryBuilder(), null); + if (directAddress != null) { + managedChannelImplBuilder = new ManagedChannelImplBuilder(directAddress, "localhost", + new InProcessChannelTransportFactoryBuilder(), null); + } else { + managedChannelImplBuilder = new ManagedChannelImplBuilder(target, + new InProcessChannelTransportFactoryBuilder(), null); + } // In-process transport should not record its traffic to the stats module. // https://github.com/grpc/grpc-java/issues/2284 @@ -204,7 +212,7 @@ public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) { ClientTransportFactory buildTransportFactory() { return new InProcessClientTransportFactory( - name, scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause); + scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause); } void setStatsEnabled(boolean value) { @@ -215,7 +223,6 @@ void setStatsEnabled(boolean value) { * Creates InProcess transports. Exposed for internal use, as it should be private. */ static final class InProcessClientTransportFactory implements ClientTransportFactory { - private final String name; private final ScheduledExecutorService timerService; private final boolean useSharedTimer; private final int maxInboundMetadataSize; @@ -223,10 +230,8 @@ static final class InProcessClientTransportFactory implements ClientTransportFac private final boolean includeCauseWithStatus; private InProcessClientTransportFactory( - String name, @Nullable ScheduledExecutorService scheduledExecutorService, int maxInboundMetadataSize, boolean includeCauseWithStatus) { - this.name = name; useSharedTimer = scheduledExecutorService == null; timerService = useSharedTimer ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService; @@ -242,7 +247,7 @@ public ConnectionClientTransport newClientTransport( } // TODO(carl-mastrangelo): Pass channelLogger in. return new InProcessTransport( - name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(), + addr, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(), options.getEagAttributes(), includeCauseWithStatus); } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java index 7922ebd21a1..f576670dc72 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java @@ -40,11 +40,20 @@ final class InProcessServer implements InternalServer { private static final ConcurrentMap registry = new ConcurrentHashMap<>(); - static InProcessServer findServer(String name) { - return registry.get(name); + static InProcessServer findServer(SocketAddress addr) { + if (addr instanceof InProcessSocketAddress) { + InProcessSocketAddress inProcessAddress = (InProcessSocketAddress) addr; + if (inProcessAddress.getServer() != null) { + return inProcessAddress.getServer(); + } else { + return registry.get(inProcessAddress.getName()); + } + } + return null; } private final String name; + private final boolean anonymous; private final int maxInboundMetadataSize; private final List streamTracerFactories; private ServerListener listener; @@ -61,6 +70,7 @@ static InProcessServer findServer(String name) { InProcessServerBuilder builder, List streamTracerFactories) { this.name = builder.name; + this.anonymous = builder.anonymous; this.schedulerPool = builder.schedulerPool; this.maxInboundMetadataSize = builder.maxInboundMetadataSize; this.streamTracerFactories = @@ -71,15 +81,17 @@ static InProcessServer findServer(String name) { public void start(ServerListener serverListener) throws IOException { this.listener = serverListener; this.scheduler = schedulerPool.getObject(); - // Must be last, as channels can start connecting after this point. - if (registry.putIfAbsent(name, this) != null) { - throw new IOException("name already registered: " + name); + if (!anonymous) { + // Must be last, as channels can start connecting after this point. + if (registry.putIfAbsent(name, this) != null) { + throw new IOException("name already registered: " + name); + } } } @Override public SocketAddress getListenSocketAddress() { - return new InProcessSocketAddress(name); + return new InProcessSocketAddress(name, anonymous ? this : null); } @Override diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java index 6c68189fcc9..99ba0047d7b 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java @@ -81,7 +81,18 @@ public final class InProcessServerBuilder extends * @return a new builder */ public static InProcessServerBuilder forName(String name) { - return new InProcessServerBuilder(name); + return new InProcessServerBuilder(name, false); + } + + /** + * Create a server builder for an anonymous in-process server. + * Anonymouns servers can only be connected to via their listen address, + * and can't be referenced by name. + * @param name a server identifier used for logging purposes only. + * @return a new builder + */ + public static InProcessServerBuilder anonymous(String name) { + return new InProcessServerBuilder("anon:" + name, true); } /** @@ -101,12 +112,14 @@ public static String generateName() { private final ServerImplBuilder serverImplBuilder; final String name; + final boolean anonymous; int maxInboundMetadataSize = Integer.MAX_VALUE; ObjectPool schedulerPool = SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE); - private InProcessServerBuilder(String name) { + private InProcessServerBuilder(String name, boolean anonymous) { this.name = Preconditions.checkNotNull(name, "name"); + this.anonymous = anonymous; final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder { @Override diff --git a/core/src/main/java/io/grpc/inprocess/InProcessSocketAddress.java b/core/src/main/java/io/grpc/inprocess/InProcessSocketAddress.java index e5f0515f1d0..67e0945e846 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessSocketAddress.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessSocketAddress.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.net.SocketAddress; +import javax.annotation.Nullable; /** * Custom SocketAddress class for {@link InProcessTransport}. @@ -27,13 +28,25 @@ public final class InProcessSocketAddress extends SocketAddress { private static final long serialVersionUID = -2803441206326023474L; private final String name; + @Nullable + private final InProcessServer server; /** * @param name - The name of the inprocess channel or server. * @since 1.0.0 */ public InProcessSocketAddress(String name) { + this(name, null); + } + + /** + * @param name - The name of the inprocess channel or server. + * @param server - The concrete {@link InProcessServer} instance, Will be present on the listen + * address of an anonymous server. + */ + InProcessSocketAddress(String name, @Nullable InProcessServer server) { this.name = checkNotNull(name, "name"); + this.server = server; } /** @@ -45,6 +58,11 @@ public String getName() { return name; } + @Nullable + InProcessServer getServer() { + return server; + } + /** * @since 1.14.0 */ @@ -58,7 +76,13 @@ public String toString() { */ @Override public int hashCode() { - return name.hashCode(); + if (server != null) { + // Since there's a single canonical InProcessSocketAddress instance for + // an anonymous inprocess server, we can just use identity equality. + return super.hashCode(); + } else { + return name.hashCode(); + } } /** @@ -69,6 +93,13 @@ public boolean equals(Object obj) { if (!(obj instanceof InProcessSocketAddress)) { return false; } - return name.equals(((InProcessSocketAddress) obj).name); + InProcessSocketAddress addr = (InProcessSocketAddress) obj; + if (server == null && addr.server == null) { + return name.equals(addr.name); + } else { + // Since there's a single canonical InProcessSocketAddress instance for + // an anonymous inprocess server, we can just use identity equality. + return addr == this; + } } } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 895b709559b..c46b6f24221 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -59,6 +59,7 @@ import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; import java.io.InputStream; +import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -80,7 +81,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private static final Logger log = Logger.getLogger(InProcessTransport.class.getName()); private final InternalLogId logId; - private final String name; + private final SocketAddress address; private final int clientMaxInboundMetadataSize; private final String authority; private final String userAgent; @@ -119,10 +120,10 @@ protected void handleNotInUse() { } }; - private InProcessTransport(String name, int maxInboundMetadataSize, String authority, + private InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent, Attributes eagAttrs, Optional optionalServerListener, boolean includeCauseWithStatus) { - this.name = name; + this.address = address; this.clientMaxInboundMetadataSize = maxInboundMetadataSize; this.authority = authority; this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent); @@ -130,18 +131,18 @@ private InProcessTransport(String name, int maxInboundMetadataSize, String autho this.attributes = Attributes.newBuilder() .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs) - .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name)) - .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name)) + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address) .build(); this.optionalServerListener = optionalServerListener; - logId = InternalLogId.allocate(getClass(), name); + logId = InternalLogId.allocate(getClass(), getServerName(address)); this.includeCauseWithStatus = includeCauseWithStatus; } public InProcessTransport( - String name, int maxInboundMetadataSize, String authority, String userAgent, + SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent, Attributes eagAttrs, boolean includeCauseWithStatus) { - this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, + this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.absent(), includeCauseWithStatus); } @@ -150,7 +151,7 @@ public InProcessTransport( Attributes eagAttrs, ObjectPool serverSchedulerPool, List serverStreamTracerFactories, ServerListener serverListener) { - this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, + this(new InProcessSocketAddress(name), maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.of(serverListener), false); this.serverMaxInboundMetadataSize = maxInboundMetadataSize; this.serverSchedulerPool = serverSchedulerPool; @@ -165,7 +166,7 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) { serverScheduler = serverSchedulerPool.getObject(); serverTransportListener = optionalServerListener.get().transportCreated(this); } else { - InProcessServer server = InProcessServer.findServer(name); + InProcessServer server = InProcessServer.findServer(address); if (server != null) { serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize(); serverSchedulerPool = server.getScheduledExecutorServicePool(); @@ -176,7 +177,8 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) { } } if (serverTransportListener == null) { - shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name); + shutdownStatus = + Status.UNAVAILABLE.withDescription("Could not find server: " + getServerName(address)); final Status localShutdownStatus = shutdownStatus; return new Runnable() { @Override @@ -194,8 +196,8 @@ public void run() { public void run() { synchronized (InProcessTransport.this) { Attributes serverTransportAttrs = Attributes.newBuilder() - .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name)) - .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name)) + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address) .build(); serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs); clientTransportListener.transportReady(); @@ -307,7 +309,7 @@ public void shutdownNow(Status reason) { public String toString() { return MoreObjects.toStringHelper(this) .add("logId", logId.getId()) - .add("name", name) + .add("name", getServerName(address)) .toString(); } @@ -882,6 +884,14 @@ private static Status cleanStatus(Status status, boolean includeCauseWithStatus) return clientStatus; } + private static String getServerName(SocketAddress addr) { + if (addr instanceof InProcessSocketAddress) { + return ((InProcessSocketAddress) addr).getName(); + } else { + return "Bad Server Address: " + addr; + } + } + private static class SingleMessageProducer implements StreamListener.MessageProducer { private InputStream message; diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java index 7325cda73cc..84c37acc4dc 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java @@ -82,8 +82,8 @@ protected String testAuthority(InternalServer server) { @Override protected ManagedClientTransport newClientTransport(InternalServer server) { return new InProcessTransport( - TRANSPORT_NAME, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, testAuthority(server), USER_AGENT, - eagAttrs(), false); + new InProcessSocketAddress(TRANSPORT_NAME), GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, + testAuthority(server), USER_AGENT, eagAttrs(), false); } @Override