Skip to content

Commit

Permalink
Revert "core: allow per-service/method executor (grpc#8266)"
Browse files Browse the repository at this point in the history
This reverts commit c540229.
  • Loading branch information
YifeiZhuang committed Jun 25, 2021
1 parent 72ae12b commit 47a32bf
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 378 deletions.
6 changes: 0 additions & 6 deletions api/src/main/java/io/grpc/ForwardingServerBuilder.java
Expand Up @@ -61,12 +61,6 @@ public T executor(@Nullable Executor executor) {
return thisT();
}

@Override
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
delegate().callExecutor(executorSupplier);
return thisT();
}

@Override
public T addService(ServerServiceDefinition service) {
delegate().addService(service);
Expand Down
24 changes: 0 additions & 24 deletions api/src/main/java/io/grpc/ServerBuilder.java
Expand Up @@ -74,30 +74,6 @@ public static ServerBuilder<?> forPort(int port) {
*/
public abstract T executor(@Nullable Executor executor);


/**
* Allows for defining a way to provide a custom executor to handle the server call.
* This executor is the result of calling
* {@link ServerCallExecutorSupplier#getExecutor(ServerCall, Metadata)} per RPC.
*
* <p>It's an optional parameter. If it is provided, the {@link #executor(Executor)} would still
* run necessary tasks before the {@link ServerCallExecutorSupplier} is ready to be called, then
* it switches over.
*
* <p>If it is provided, {@link #directExecutor()} optimization is disabled. But if calling
* {@link ServerCallExecutorSupplier} returns null, the server call is still handled by the
* default {@link #executor(Executor)} as a fallback.
*
* @param executorSupplier the server call executor provider
* @return this
* @since 1.39.0
*
* */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8274")
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
return thisT();
}

/**
* Adds a service implementation to the handler registry.
*
Expand Down
34 changes: 0 additions & 34 deletions api/src/main/java/io/grpc/ServerCallExecutorSupplier.java

This file was deleted.

Expand Up @@ -24,7 +24,6 @@
import io.grpc.HandlerRegistry;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCallExecutorSupplier;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerStreamTracer;
Expand Down Expand Up @@ -68,12 +67,6 @@ public T directExecutor() {
return thisT();
}

@Override
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
delegate().callExecutor(executorSupplier);
return thisT();
}

@Override
public T executor(@Nullable Executor executor) {
delegate().executor(executor);
Expand Down
14 changes: 2 additions & 12 deletions core/src/main/java/io/grpc/internal/SerializingExecutor.java
Expand Up @@ -59,7 +59,7 @@ private static AtomicHelper getAtomicHelper() {
private static final int RUNNING = -1;

/** Underlying executor that all submitted Runnable objects are run on. */
private Executor executor;
private final Executor executor;

/** A list of Runnables to be run in order. */
private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
Expand All @@ -76,15 +76,6 @@ public SerializingExecutor(Executor executor) {
this.executor = executor;
}

/**
* Only call this from this SerializingExecutor Runnable, so that the executor is immediately
* visible to this SerializingExecutor executor.
* */
public void setExecutor(Executor executor) {
Preconditions.checkNotNull(executor, "'executor' must not be null.");
this.executor = executor;
}

/**
* Runs the given runnable strictly after all Runnables that were submitted
* before it, and using the {@code executor} passed to the constructor. .
Expand Down Expand Up @@ -127,8 +118,7 @@ private void schedule(@Nullable Runnable removable) {
public void run() {
Runnable r;
try {
Executor oldExecutor = executor;
while (oldExecutor == executor && (r = runQueue.poll()) != null ) {
while ((r = runQueue.poll()) != null) {
try {
r.run();
} catch (RuntimeException e) {
Expand Down
155 changes: 41 additions & 114 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -46,14 +46,12 @@
import io.grpc.InternalServerInterceptors;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallExecutorSupplier;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter;
import io.grpc.Status;
import io.grpc.StatusException;
import io.perfmark.Link;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
Expand Down Expand Up @@ -127,7 +125,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
private final InternalChannelz channelz;
private final CallTracer serverCallTracer;
private final Deadline.Ticker ticker;
private final ServerCallExecutorSupplier executorSupplier;

/**
* Construct a server.
Expand Down Expand Up @@ -162,7 +159,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
this.serverCallTracer = builder.callTracerFactory.create();
this.ticker = checkNotNull(builder.ticker, "ticker");
channelz.addServer(this);
this.executorSupplier = builder.executorSupplier;
}

/**
Expand Down Expand Up @@ -473,11 +469,11 @@ private void streamCreatedInternal(
final Executor wrappedExecutor;
// This is a performance optimization that avoids the synchronization and queuing overhead
// that comes with SerializingExecutor.
if (executorSupplier != null || executor != directExecutor()) {
wrappedExecutor = new SerializingExecutor(executor);
} else {
if (executor == directExecutor()) {
wrappedExecutor = new SerializeReentrantCallsDirectExecutor();
stream.optimizeForDirectExecutor();
} else {
wrappedExecutor = new SerializingExecutor(executor);
}

if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
Expand All @@ -503,124 +499,52 @@ private void streamCreatedInternal(

final JumpToApplicationThreadServerStreamListener jumpListener
= new JumpToApplicationThreadServerStreamListener(
wrappedExecutor, executor, stream, context, tag);
wrappedExecutor, executor, stream, context, tag);
stream.setListener(jumpListener);
final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create();
// Run in serializing executor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. MethodLookup() and HandleServerCall() are proactively
// queued before any callbacks are queued at serializing executor.
// MethodLookup() runs on the default executor.
// When executorSupplier is enabled, MethodLookup() may set/change the executor in the
// SerializingExecutor before it finishes running.
// Then HandleServerCall() and callbacks would switch to the executorSupplier executor.
// Otherwise, they all run on the default executor.

final class MethodLookup extends ContextRunnable {
MethodLookup() {
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. Callbacks can still be triggered, but they will be
// queued.

final class StreamCreated extends ContextRunnable {
StreamCreated() {
super(context);
}

@Override
public void runInContext() {
PerfMark.startTask("ServerTransportListener$MethodLookup.startCall", tag);
PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag);
PerfMark.linkIn(link);
try {
runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$MethodLookup.startCall", tag);
PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag);
}
}

private void runInternal() {
ServerMethodDefinition<?, ?> wrapMethod;
ServerCallParameters<?, ?> callParams;
ServerStreamListener listener = NOOP_LISTENER;
try {
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
if (method == null) {
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
}
if (method == null) {
Status status = Status.UNIMPLEMENTED.withDescription(
"Method not found: " + methodName);
"Method not found: " + methodName);
// TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
// memory as a map whose key is the method name, this would allow a misbehaving
// client to blow up the server in-memory stats storage by sending large number of
// distinct unimplemented method
// names. (https://github.com/grpc/grpc-java/issues/2285)
stream.close(status, new Metadata());
context.cancel(null);
future.cancel(false);
return;
}
wrapMethod = wrapMethod(stream, method, statsTraceCtx);
callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag);
future.set(callParams);
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx, tag);
} catch (Throwable t) {
stream.close(Status.fromThrowable(t), new Metadata());
context.cancel(null);
future.cancel(false);
throw t;
}
}

private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor(
final ServerMethodDefinition<ReqT, RespT> methodDef,
final ServerStream stream,
final Metadata headers,
final Context.CancellableContext context,
final Tag tag) {
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>(
stream,
methodDef.getMethodDescriptor(),
headers,
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer,
tag);
if (executorSupplier != null) {
Executor switchingExecutor = executorSupplier.getExecutor(call, headers);
if (switchingExecutor != null) {
((SerializingExecutor)wrappedExecutor).setExecutor(switchingExecutor);
}
}
return new ServerCallParameters<>(call, methodDef.getServerCallHandler());
}
}

final class HandleServerCall extends ContextRunnable {
HandleServerCall() {
super(context);
}

@Override
public void runInContext() {
PerfMark.startTask("ServerTransportListener$HandleServerCall.startCall", tag);
PerfMark.linkIn(link);
try {
runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$HandleServerCall.startCall", tag);
}
}

private void runInternal() {
ServerStreamListener listener = NOOP_LISTENER;
ServerCallParameters<?,?> callParameters;
try {
if (future.isCancelled()) {
return;
}
if (!future.isDone() || (callParameters = future.get()) == null) {
Status status = Status.INTERNAL.withDescription(
"Unexpected failure retrieving server call parameters.");
throw new StatusException(status);
}
listener = startWrappedCall(methodName, callParameters, headers);
} catch (Throwable ex) {
stream.close(Status.fromThrowable(ex), new Metadata());
context.cancel(null);
throw new IllegalStateException(ex);
} finally {
jumpListener.setListener(listener);
}
Expand All @@ -644,8 +568,7 @@ public void cancelled(Context context) {
}
}

wrappedExecutor.execute(new MethodLookup());
wrappedExecutor.execute(new HandleServerCall());
wrappedExecutor.execute(new StreamCreated());
}

private Context.CancellableContext createContext(
Expand All @@ -670,8 +593,9 @@ private Context.CancellableContext createContext(
}

/** Never returns {@code null}. */
private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream,
ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx) {
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
statsTraceCtx.serverCallStarted(
new ServerCallInfoImpl<>(
Expand All @@ -685,31 +609,34 @@ private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
return wMethodDef;
}

private final class ServerCallParameters<ReqT, RespT> {
ServerCallImpl<ReqT, RespT> call;
ServerCallHandler<ReqT, RespT> callHandler;

public ServerCallParameters(ServerCallImpl<ReqT, RespT> call,
ServerCallHandler<ReqT, RespT> callHandler) {
this.call = call;
this.callHandler = callHandler;
}
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag);
}

private <WReqT, WRespT> ServerStreamListener startWrappedCall(
String fullMethodName,
ServerCallParameters<WReqT, WRespT> params,
Metadata headers) {
ServerCall.Listener<WReqT> callListener =
params.callHandler.startCall(params.call, headers);
if (callListener == null) {
ServerMethodDefinition<WReqT, WRespT> methodDef,
ServerStream stream,
Metadata headers,
Context.CancellableContext context,
Tag tag) {

ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(
stream,
methodDef.getMethodDescriptor(),
headers,
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer,
tag);

ServerCall.Listener<WReqT> listener =
methodDef.getServerCallHandler().startCall(call, headers);
if (listener == null) {
throw new NullPointerException(
"startCall() returned a null listener for method " + fullMethodName);
"startCall() returned a null listener for method " + fullMethodName);
}
return params.call.newServerStreamListener(callListener);
return call.newServerStreamListener(listener);
}
}

Expand Down

0 comments on commit 47a32bf

Please sign in to comment.