Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: allow per-service/method executor #8266

Merged
merged 4 commits into from Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/ForwardingServerBuilder.java
Expand Up @@ -61,6 +61,12 @@ public T executor(@Nullable Executor executor) {
return thisT();
}

@Override
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
delegate().callExecutor(executorSupplier);
return thisT();
}

@Override
public T addService(ServerServiceDefinition service) {
delegate().addService(service);
Expand Down
24 changes: 24 additions & 0 deletions api/src/main/java/io/grpc/ServerBuilder.java
Expand Up @@ -74,6 +74,30 @@ public static ServerBuilder<?> forPort(int port) {
*/
public abstract T executor(@Nullable Executor executor);


/**
* Allows for defining a way to provide a custom executor to handle the server call.
* This executor is the result of calling
* {@link ServerCallExecutorSupplier#getExecutor(ServerCall, Metadata)} per RPC.
*
* <p>It's an optional parameter. If it is provided, the {@link #executor(Executor)} would still
* run necessary tasks before the {@link ServerCallExecutorSupplier} is ready to be called, then
* it switches over.
*
* <p>If it is provided, {@link #directExecutor()} optimization is disabled. But if calling
* {@link ServerCallExecutorSupplier} returns null, the server call is still handled by the
* default {@link #executor(Executor)} as a fallback.
*
* @param executorSupplier the server call executor provider
* @return this
* @since 1.39.0
*
* */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8274")
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
return thisT();
}

/**
* Adds a service implementation to the handler registry.
*
Expand Down
34 changes: 34 additions & 0 deletions api/src/main/java/io/grpc/ServerCallExecutorSupplier.java
@@ -0,0 +1,34 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.util.concurrent.Executor;
import javax.annotation.Nullable;

/**
* Defines what executor handles the server call, based on each RPC call information at runtime.
* */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8274")
public interface ServerCallExecutorSupplier {

/**
* Returns an executor to handle the server call.
* It should never throw. It should return null to fallback to the default executor.
* */
@Nullable
<ReqT, RespT> Executor getExecutor(ServerCall<ReqT, RespT> call, Metadata metadata);
}
Expand Up @@ -24,6 +24,7 @@
import io.grpc.HandlerRegistry;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCallExecutorSupplier;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerStreamTracer;
Expand Down Expand Up @@ -67,6 +68,12 @@ public T directExecutor() {
return thisT();
}

@Override
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
delegate().callExecutor(executorSupplier);
return thisT();
}

@Override
public T executor(@Nullable Executor executor) {
delegate().executor(executor);
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/java/io/grpc/internal/SerializingExecutor.java
Expand Up @@ -59,7 +59,7 @@ private static AtomicHelper getAtomicHelper() {
private static final int RUNNING = -1;

/** Underlying executor that all submitted Runnable objects are run on. */
private final Executor executor;
private Executor executor;

/** A list of Runnables to be run in order. */
private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
Expand All @@ -76,6 +76,15 @@ public SerializingExecutor(Executor executor) {
this.executor = executor;
}

/**
* Only call this from this SerializingExecutor Runnable, so that the executor is immediately
* visible to this SerializingExecutor executor.
* */
public void setExecutor(Executor executor) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document that this can only be called from a Runnable running within the SerializingExecutor.

Preconditions.checkNotNull(executor, "'executor' must not be null.");
this.executor = executor;
}

/**
* Runs the given runnable strictly after all Runnables that were submitted
* before it, and using the {@code executor} passed to the constructor. .
Expand Down Expand Up @@ -118,7 +127,8 @@ private void schedule(@Nullable Runnable removable) {
public void run() {
Runnable r;
try {
while ((r = runQueue.poll()) != null) {
Executor oldExecutor = executor;
while (oldExecutor == executor && (r = runQueue.poll()) != null ) {
try {
r.run();
} catch (RuntimeException e) {
Expand Down
155 changes: 114 additions & 41 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -46,12 +46,14 @@
import io.grpc.InternalServerInterceptors;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallExecutorSupplier;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter;
import io.grpc.Status;
import io.grpc.StatusException;
import io.perfmark.Link;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
Expand Down Expand Up @@ -125,6 +127,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
private final InternalChannelz channelz;
private final CallTracer serverCallTracer;
private final Deadline.Ticker ticker;
private final ServerCallExecutorSupplier executorSupplier;

/**
* Construct a server.
Expand Down Expand Up @@ -159,6 +162,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
this.serverCallTracer = builder.callTracerFactory.create();
this.ticker = checkNotNull(builder.ticker, "ticker");
channelz.addServer(this);
this.executorSupplier = builder.executorSupplier;
}

/**
Expand Down Expand Up @@ -469,11 +473,11 @@ private void streamCreatedInternal(
final Executor wrappedExecutor;
// This is a performance optimization that avoids the synchronization and queuing overhead
// that comes with SerializingExecutor.
if (executor == directExecutor()) {
if (executorSupplier != null || executor != directExecutor()) {
wrappedExecutor = new SerializingExecutor(executor);
} else {
wrappedExecutor = new SerializeReentrantCallsDirectExecutor();
stream.optimizeForDirectExecutor();
} else {
wrappedExecutor = new SerializingExecutor(executor);
}

if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
Expand All @@ -499,52 +503,124 @@ private void streamCreatedInternal(

final JumpToApplicationThreadServerStreamListener jumpListener
= new JumpToApplicationThreadServerStreamListener(
wrappedExecutor, executor, stream, context, tag);
wrappedExecutor, executor, stream, context, tag);
stream.setListener(jumpListener);
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. Callbacks can still be triggered, but they will be
// queued.

final class StreamCreated extends ContextRunnable {
StreamCreated() {
final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create();
// Run in serializing executor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. MethodLookup() and HandleServerCall() are proactively
// queued before any callbacks are queued at serializing executor.
// MethodLookup() runs on the default executor.
// When executorSupplier is enabled, MethodLookup() may set/change the executor in the
// SerializingExecutor before it finishes running.
// Then HandleServerCall() and callbacks would switch to the executorSupplier executor.
// Otherwise, they all run on the default executor.

final class MethodLookup extends ContextRunnable {
MethodLookup() {
super(context);
}

@Override
public void runInContext() {
PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag);
PerfMark.startTask("ServerTransportListener$MethodLookup.startCall", tag);
PerfMark.linkIn(link);
try {
runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag);
PerfMark.stopTask("ServerTransportListener$MethodLookup.startCall", tag);
}
}

private void runInternal() {
ServerStreamListener listener = NOOP_LISTENER;
ServerMethodDefinition<?, ?> wrapMethod;
ServerCallParameters<?, ?> callParams;
try {
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
if (method == null) {
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
}
if (method == null) {
Status status = Status.UNIMPLEMENTED.withDescription(
"Method not found: " + methodName);
"Method not found: " + methodName);
// TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
// memory as a map whose key is the method name, this would allow a misbehaving
// client to blow up the server in-memory stats storage by sending large number of
// distinct unimplemented method
// names. (https://github.com/grpc/grpc-java/issues/2285)
stream.close(status, new Metadata());
context.cancel(null);
future.cancel(false);
return;
}
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx, tag);
wrapMethod = wrapMethod(stream, method, statsTraceCtx);
callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag);
future.set(callParams);
} catch (Throwable t) {
stream.close(Status.fromThrowable(t), new Metadata());
context.cancel(null);
future.cancel(false);
throw t;
}
}

private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor(
final ServerMethodDefinition<ReqT, RespT> methodDef,
final ServerStream stream,
final Metadata headers,
final Context.CancellableContext context,
final Tag tag) {
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>(
stream,
methodDef.getMethodDescriptor(),
headers,
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer,
tag);
if (executorSupplier != null) {
Executor switchingExecutor = executorSupplier.getExecutor(call, headers);
if (switchingExecutor != null) {
((SerializingExecutor)wrappedExecutor).setExecutor(switchingExecutor);
}
}
return new ServerCallParameters<>(call, methodDef.getServerCallHandler());
}
}

final class HandleServerCall extends ContextRunnable {
HandleServerCall() {
super(context);
}

@Override
public void runInContext() {
PerfMark.startTask("ServerTransportListener$HandleServerCall.startCall", tag);
PerfMark.linkIn(link);
try {
runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$HandleServerCall.startCall", tag);
}
}

private void runInternal() {
ServerStreamListener listener = NOOP_LISTENER;
ServerCallParameters<?,?> callParameters;
try {
if (future.isCancelled()) {
return;
}
if (!future.isDone() || (callParameters = future.get()) == null) {
Status status = Status.INTERNAL.withDescription(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should throw this as an exception? If it's an exception then it will be logged (and the catch will handle it). Seems that would make it more likely we'd learn about the bug.

"Unexpected failure retrieving server call parameters.");
throw new StatusException(status);
}
listener = startWrappedCall(methodName, callParameters, headers);
} catch (Throwable ex) {
stream.close(Status.fromThrowable(ex), new Metadata());
context.cancel(null);
throw new IllegalStateException(ex);
} finally {
jumpListener.setListener(listener);
}
Expand All @@ -568,7 +644,8 @@ public void cancelled(Context context) {
}
}

wrappedExecutor.execute(new StreamCreated());
wrappedExecutor.execute(new MethodLookup());
wrappedExecutor.execute(new HandleServerCall());
}

private Context.CancellableContext createContext(
Expand All @@ -593,9 +670,8 @@ private Context.CancellableContext createContext(
}

/** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) {
private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream,
ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
statsTraceCtx.serverCallStarted(
new ServerCallInfoImpl<>(
Expand All @@ -609,34 +685,31 @@ private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag);
return wMethodDef;
}

private final class ServerCallParameters<ReqT, RespT> {
ServerCallImpl<ReqT, RespT> call;
ServerCallHandler<ReqT, RespT> callHandler;

public ServerCallParameters(ServerCallImpl<ReqT, RespT> call,
ServerCallHandler<ReqT, RespT> callHandler) {
this.call = call;
this.callHandler = callHandler;
}
}

private <WReqT, WRespT> ServerStreamListener startWrappedCall(
String fullMethodName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are interested: Looks like we can remove this argument by using params.call.getMethodDescriptor().getFullMethodName(). Looks like ServerCallHandled then wouldn't need to look at the fullMethodName at all (only MethodLookup would).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh looks this is only for logging the original fullMethodName. And this is an existing code path.

ServerMethodDefinition<WReqT, WRespT> methodDef,
ServerStream stream,
Metadata headers,
Context.CancellableContext context,
Tag tag) {

ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(
stream,
methodDef.getMethodDescriptor(),
headers,
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer,
tag);

ServerCall.Listener<WReqT> listener =
methodDef.getServerCallHandler().startCall(call, headers);
if (listener == null) {
ServerCallParameters<WReqT, WRespT> params,
Metadata headers) {
ServerCall.Listener<WReqT> callListener =
params.callHandler.startCall(params.call, headers);
if (callListener == null) {
throw new NullPointerException(
"startCall() returned a null listener for method " + fullMethodName);
"startCall() returned a null listener for method " + fullMethodName);
}
return call.newServerStreamListener(listener);
return params.call.newServerStreamListener(callListener);
}
}

Expand Down