Skip to content

Commit

Permalink
Add support for anonymous in-process servers.
Browse files Browse the repository at this point in the history
Anonymous servers aren't registered statically, meaning they can't be referenced by name.
Only the InProcessSocketAddress, fetched via Server.getListenSockets()
can be used to connect to the server.

This is particularly useful for production Android usage of in-process servers,
where process startup latency is crucial, since a custom name resolver can be
used to create the server instance on demand without directly impacting
the startup latency of in-process gRPC clients.

This approach supports a more-standard approach to "OnDeviceServer" referenced in gRFC L73.
https://github.com/grpc/proposal/blob/master/L73-java-binderchannel.md#ondeviceserver
  • Loading branch information
markb74 committed Oct 10, 2021
1 parent 9266174 commit 8cd4bc5
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 45 deletions.
43 changes: 24 additions & 19 deletions core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
Expand Up @@ -19,7 +19,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.errorprone.annotations.DoNotCall;
import io.grpc.ChannelCredentials;
import io.grpc.ChannelLogger;
import io.grpc.ExperimentalApi;
Expand Down Expand Up @@ -55,33 +54,37 @@ public final class InProcessChannelBuilder extends
* @return a new builder
*/
public static InProcessChannelBuilder forName(String name) {
return new InProcessChannelBuilder(name);
return forAddress(new InProcessSocketAddress(checkNotNull(name, "name")));
}

/**
* Always fails. Call {@link #forName} instead.
* Create a channel builder that will connect to the server referenced by the given target URI.
* Only intended for use with a custom name resolver.
*
* @param target the identity of the server to connect to
* @return a new builder
*/
@DoNotCall("Unsupported. Use forName() instead")
public static InProcessChannelBuilder forTarget(String target) {
throw new UnsupportedOperationException("call forName() instead");
return new InProcessChannelBuilder(null, checkNotNull(target, "target"));
}

/**
* Always fails. Call {@link #forName} instead.
* Create a channel builder that will connect to the server referenced by the given address.
*
* @param address the address of the server to connect to
* @return a new builder
*/
@DoNotCall("Unsupported. Use forName() instead")
public static InProcessChannelBuilder forAddress(String name, int port) {
throw new UnsupportedOperationException("call forName() instead");
public static InProcessChannelBuilder forAddress(InProcessSocketAddress address) {
return new InProcessChannelBuilder(checkNotNull(address, "address"), null);
}

private final ManagedChannelImplBuilder managedChannelImplBuilder;
private final String name;
private ScheduledExecutorService scheduledExecutorService;
private int maxInboundMetadataSize = Integer.MAX_VALUE;
private boolean transportIncludeStatusCause = false;

private InProcessChannelBuilder(String name) {
this.name = checkNotNull(name, "name");
private InProcessChannelBuilder(
@Nullable InProcessSocketAddress directAddress, @Nullable String target) {

final class InProcessChannelTransportFactoryBuilder implements ClientTransportFactoryBuilder {
@Override
Expand All @@ -90,8 +93,13 @@ public ClientTransportFactory buildClientTransportFactory() {
}
}

managedChannelImplBuilder = new ManagedChannelImplBuilder(new InProcessSocketAddress(name),
"localhost", new InProcessChannelTransportFactoryBuilder(), null);
if (directAddress != null) {
managedChannelImplBuilder = new ManagedChannelImplBuilder(directAddress, "localhost",
new InProcessChannelTransportFactoryBuilder(), null);
} else {
managedChannelImplBuilder = new ManagedChannelImplBuilder(target,
new InProcessChannelTransportFactoryBuilder(), null);
}

// In-process transport should not record its traffic to the stats module.
// https://github.com/grpc/grpc-java/issues/2284
Expand Down Expand Up @@ -204,7 +212,7 @@ public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) {

ClientTransportFactory buildTransportFactory() {
return new InProcessClientTransportFactory(
name, scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause);
scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause);
}

void setStatsEnabled(boolean value) {
Expand All @@ -215,18 +223,15 @@ void setStatsEnabled(boolean value) {
* Creates InProcess transports. Exposed for internal use, as it should be private.
*/
static final class InProcessClientTransportFactory implements ClientTransportFactory {
private final String name;
private final ScheduledExecutorService timerService;
private final boolean useSharedTimer;
private final int maxInboundMetadataSize;
private boolean closed;
private final boolean includeCauseWithStatus;

private InProcessClientTransportFactory(
String name,
@Nullable ScheduledExecutorService scheduledExecutorService,
int maxInboundMetadataSize, boolean includeCauseWithStatus) {
this.name = name;
useSharedTimer = scheduledExecutorService == null;
timerService = useSharedTimer
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
Expand All @@ -242,7 +247,7 @@ public ConnectionClientTransport newClientTransport(
}
// TODO(carl-mastrangelo): Pass channelLogger in.
return new InProcessTransport(
name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
addr, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
options.getEagAttributes(), includeCauseWithStatus);
}

Expand Down
24 changes: 18 additions & 6 deletions core/src/main/java/io/grpc/inprocess/InProcessServer.java
Expand Up @@ -40,11 +40,20 @@ final class InProcessServer implements InternalServer {
private static final ConcurrentMap<String, InProcessServer> registry
= new ConcurrentHashMap<>();

static InProcessServer findServer(String name) {
return registry.get(name);
static InProcessServer findServer(SocketAddress addr) {
if (addr instanceof InProcessSocketAddress) {
InProcessSocketAddress inProcessAddress = (InProcessSocketAddress) addr;
if (inProcessAddress.getServer() != null) {
return inProcessAddress.getServer();
} else {
return registry.get(inProcessAddress.getName());
}
}
return null;
}

private final String name;
private final boolean anonymous;
private final int maxInboundMetadataSize;
private final List<ServerStreamTracer.Factory> streamTracerFactories;
private ServerListener listener;
Expand All @@ -61,6 +70,7 @@ static InProcessServer findServer(String name) {
InProcessServerBuilder builder,
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
this.name = builder.name;
this.anonymous = builder.anonymous;
this.schedulerPool = builder.schedulerPool;
this.maxInboundMetadataSize = builder.maxInboundMetadataSize;
this.streamTracerFactories =
Expand All @@ -71,15 +81,17 @@ static InProcessServer findServer(String name) {
public void start(ServerListener serverListener) throws IOException {
this.listener = serverListener;
this.scheduler = schedulerPool.getObject();
// Must be last, as channels can start connecting after this point.
if (registry.putIfAbsent(name, this) != null) {
throw new IOException("name already registered: " + name);
if (!anonymous) {
// Must be last, as channels can start connecting after this point.
if (registry.putIfAbsent(name, this) != null) {
throw new IOException("name already registered: " + name);
}
}
}

@Override
public SocketAddress getListenSocketAddress() {
return new InProcessSocketAddress(name);
return new InProcessSocketAddress(name, anonymous ? this : null);
}

@Override
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
Expand Up @@ -81,7 +81,18 @@ public final class InProcessServerBuilder extends
* @return a new builder
*/
public static InProcessServerBuilder forName(String name) {
return new InProcessServerBuilder(name);
return new InProcessServerBuilder(name, false);
}

/**
* Create a server builder for an anonymous in-process server.
* Anonymouns servers can only be connected to via their listen address,
* and can't be referenced by name.
* @param name a server identifier used for logging purposes only.
* @return a new builder
*/
public static InProcessServerBuilder anonymous(String name) {
return new InProcessServerBuilder("anon:" + name, true);
}

/**
Expand All @@ -101,12 +112,14 @@ public static String generateName() {

private final ServerImplBuilder serverImplBuilder;
final String name;
final boolean anonymous;
int maxInboundMetadataSize = Integer.MAX_VALUE;
ObjectPool<ScheduledExecutorService> schedulerPool =
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);

private InProcessServerBuilder(String name) {
private InProcessServerBuilder(String name, boolean anonymous) {
this.name = Preconditions.checkNotNull(name, "name");
this.anonymous = anonymous;

final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder {
@Override
Expand Down
35 changes: 33 additions & 2 deletions core/src/main/java/io/grpc/inprocess/InProcessSocketAddress.java
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;

import java.net.SocketAddress;
import javax.annotation.Nullable;

/**
* Custom SocketAddress class for {@link InProcessTransport}.
Expand All @@ -27,13 +28,25 @@ public final class InProcessSocketAddress extends SocketAddress {
private static final long serialVersionUID = -2803441206326023474L;

private final String name;
@Nullable
private final InProcessServer server;

/**
* @param name - The name of the inprocess channel or server.
* @since 1.0.0
*/
public InProcessSocketAddress(String name) {
this(name, null);
}

/**
* @param name - The name of the inprocess channel or server.
* @param server - The concrete {@link InProcessServer} instance, Will be present on the listen
* address of an anonymous server.
*/
InProcessSocketAddress(String name, @Nullable InProcessServer server) {
this.name = checkNotNull(name, "name");
this.server = server;
}

/**
Expand All @@ -45,6 +58,11 @@ public String getName() {
return name;
}

@Nullable
InProcessServer getServer() {
return server;
}

/**
* @since 1.14.0
*/
Expand All @@ -58,7 +76,13 @@ public String toString() {
*/
@Override
public int hashCode() {
return name.hashCode();
if (server != null) {
// Since there's a single canonical InProcessSocketAddress instance for
// an anonymous inprocess server, we can just use identity equality.
return super.hashCode();
} else {
return name.hashCode();
}
}

/**
Expand All @@ -69,6 +93,13 @@ public boolean equals(Object obj) {
if (!(obj instanceof InProcessSocketAddress)) {
return false;
}
return name.equals(((InProcessSocketAddress) obj).name);
InProcessSocketAddress addr = (InProcessSocketAddress) obj;
if (server == null && addr.server == null) {
return name.equals(addr.name);
} else {
// Since there's a single canonical InProcessSocketAddress instance for
// an anonymous inprocess server, we can just use identity equality.
return addr == this;
}
}
}
38 changes: 24 additions & 14 deletions core/src/main/java/io/grpc/inprocess/InProcessTransport.java
Expand Up @@ -59,6 +59,7 @@
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.StreamListener;
import java.io.InputStream;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -80,7 +81,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());

private final InternalLogId logId;
private final String name;
private final SocketAddress address;
private final int clientMaxInboundMetadataSize;
private final String authority;
private final String userAgent;
Expand Down Expand Up @@ -119,29 +120,29 @@ protected void handleNotInUse() {
}
};

private InProcessTransport(String name, int maxInboundMetadataSize, String authority,
private InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
String userAgent, Attributes eagAttrs,
Optional<ServerListener> optionalServerListener, boolean includeCauseWithStatus) {
this.name = name;
this.address = address;
this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
this.authority = authority;
this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
checkNotNull(eagAttrs, "eagAttrs");
this.attributes = Attributes.newBuilder()
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name))
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name))
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
.build();
this.optionalServerListener = optionalServerListener;
logId = InternalLogId.allocate(getClass(), name);
logId = InternalLogId.allocate(getClass(), getServerName(address));
this.includeCauseWithStatus = includeCauseWithStatus;
}

public InProcessTransport(
String name, int maxInboundMetadataSize, String authority, String userAgent,
SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs, boolean includeCauseWithStatus) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.<ServerListener>absent(), includeCauseWithStatus);
}

Expand All @@ -150,7 +151,7 @@ public InProcessTransport(
Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
List<ServerStreamTracer.Factory> serverStreamTracerFactories,
ServerListener serverListener) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
this(new InProcessSocketAddress(name), maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.of(serverListener), false);
this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
this.serverSchedulerPool = serverSchedulerPool;
Expand All @@ -165,7 +166,7 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) {
serverScheduler = serverSchedulerPool.getObject();
serverTransportListener = optionalServerListener.get().transportCreated(this);
} else {
InProcessServer server = InProcessServer.findServer(name);
InProcessServer server = InProcessServer.findServer(address);
if (server != null) {
serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
serverSchedulerPool = server.getScheduledExecutorServicePool();
Expand All @@ -176,7 +177,8 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) {
}
}
if (serverTransportListener == null) {
shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name);
shutdownStatus =
Status.UNAVAILABLE.withDescription("Could not find server: " + getServerName(address));
final Status localShutdownStatus = shutdownStatus;
return new Runnable() {
@Override
Expand All @@ -194,8 +196,8 @@ public void run() {
public void run() {
synchronized (InProcessTransport.this) {
Attributes serverTransportAttrs = Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name))
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name))
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
.build();
serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
clientTransportListener.transportReady();
Expand Down Expand Up @@ -307,7 +309,7 @@ public void shutdownNow(Status reason) {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("logId", logId.getId())
.add("name", name)
.add("name", getServerName(address))
.toString();
}

Expand Down Expand Up @@ -882,6 +884,14 @@ private static Status cleanStatus(Status status, boolean includeCauseWithStatus)
return clientStatus;
}

private static String getServerName(SocketAddress addr) {
if (addr instanceof InProcessSocketAddress) {
return ((InProcessSocketAddress) addr).getName();
} else {
return "Bad Server Address: " + addr;
}
}

private static class SingleMessageProducer implements StreamListener.MessageProducer {
private InputStream message;

Expand Down

0 comments on commit 8cd4bc5

Please sign in to comment.