Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: allow per-service/method executor #8266

Merged
merged 4 commits into from Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 8 additions & 2 deletions core/src/main/java/io/grpc/internal/SerializingExecutor.java
Expand Up @@ -59,7 +59,7 @@ private static AtomicHelper getAtomicHelper() {
private static final int RUNNING = -1;

/** Underlying executor that all submitted Runnable objects are run on. */
private final Executor executor;
private volatile Executor executor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: we don't actually need this to be volatile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh is it because it is only set and execute from the same network thread(eventloop)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only called from within the Runnable currently being executed (r.run();). That is not the network thread. While a Runnable is running, runState == RUNNING and the network thread will not access executor.


/** A list of Runnables to be run in order. */
private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
Expand All @@ -76,6 +76,11 @@ public SerializingExecutor(Executor executor) {
this.executor = executor;
}

public void setExecutor(Executor executor) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document that this can only be called from a Runnable running within the SerializingExecutor.

Preconditions.checkNotNull(executor, "'executor' must not be null.");
this.executor = executor;
}

/**
* Runs the given runnable strictly after all Runnables that were submitted
* before it, and using the {@code executor} passed to the constructor. .
Expand Down Expand Up @@ -118,7 +123,8 @@ private void schedule(@Nullable Runnable removable) {
public void run() {
Runnable r;
try {
while ((r = runQueue.poll()) != null) {
Executor oldExecutor = executor;
while ((r = runQueue.poll()) != null && oldExecutor == executor ) {
try {
r.run();
} catch (RuntimeException e) {
Expand Down
227 changes: 175 additions & 52 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -52,6 +52,7 @@
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.perfmark.Link;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
Expand Down Expand Up @@ -125,6 +126,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
private final InternalChannelz channelz;
private final CallTracer serverCallTracer;
private final Deadline.Ticker ticker;
private final ExecutorSupplier executorSupplier;

/**
* Construct a server.
Expand Down Expand Up @@ -159,6 +161,18 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
this.serverCallTracer = builder.callTracerFactory.create();
this.ticker = checkNotNull(builder.ticker, "ticker");
channelz.addServer(this);
this.executorSupplier = new ExecutorSupplier() {
@Override
public Executor getExecutor(ServerCall<?, ?> call,
Metadata header) {
return executor;
}
};
}


public interface ExecutorSupplier {
Executor getExecutor(ServerCall<?,?> call, Metadata metadata);
}

/**
Expand Down Expand Up @@ -466,14 +480,14 @@ public void streamCreated(ServerStream stream, String methodName, Metadata heade

private void streamCreatedInternal(
final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) {
final Executor wrappedExecutor;
final Executor defaultExecutor;
// This is a performance optimization that avoids the synchronization and queuing overhead
// that comes with SerializingExecutor.
if (executor == directExecutor()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we'll need to disable these two optimizations if an executor supplier is provided. I think the threading for optimizeForDirectExecutor requires it to be done inline before this method returns. It is safe to call it even if direct executor isn't used, but if/when migrating deframer gets enabled again getting it wrong hurts the non-direct case much more than it helps the direct case; that is to say, if we don't know which executor will be used, it is better to error on the side of not calling the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But executor supplier is never used here when direct executor is enabled. It looks optimizeForDirectExecutor just means we should use SerializeReentrantCallsDirectExecutor, and that would run tasks inline but still queued if already executing, so, it is still safe when we would have two contextRunnables, because listener call backs would still be queued at SerializeReentrantCallsDirectExecutor.
How does migrating deframer hurt the non-direct case?
Did you mean we should not allow them to co-exist?
executor==directExecutor and executor_supplier != null validation error at server builder.
executor==directExecutor and executor_supplier == null: optimizeForDirectExecutor and use SerializeReentrantCallsDirectExecutor
executor!=directExecutor and executor_supplier != null: Just SerializingExecutor, no optimization.
executor!=directExecutor and executor_supplier == null: always use executor(I need to check and fix)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But executor supplier is never used here when direct executor is enabled.

But that is clearly not right/incomplete. The executor for lookups in the fallback registry may be direct executor and that does not seem to be cause to ignore a user's executor supplier. I was discussing how we'll go about mixing all this stuff together.

It looks optimizeForDirectExecutor just means we should use SerializeReentrantCallsDirectExecutor

SerializeReentrantCallsDirectExecutor is different from optimizeForDirectExecutor. optimizeForDirectExecutor was intended to disable MigratingDeframer when using direct executor, although MigratingDeframer is disabled for all cases at the moment.

How does migrating deframer hurt the non-direct case?

It helps the non-direct case. However, it adds overhead that can hurt the direct case, so the code was disabling it when direct executor was used. I was saying the overhead it adds to direct is pretty small compared to how much it helps the non-direct case; we can simply disable this optimization (i.e., not call optimizeForDirectExecutor) if an executor supplier is provided.

executor==directExecutor and executor_supplier != null validation error at server builder.

I see no reason to cause an error. The two conceptually can make sense. It is just a question of how many optimizations does it disable. I'm suggesting we'll treat this the same as executor!=directExecutor and executor_supplier != null.

wrappedExecutor = new SerializeReentrantCallsDirectExecutor();
defaultExecutor = new SerializeReentrantCallsDirectExecutor();
stream.optimizeForDirectExecutor();
} else {
wrappedExecutor = new SerializingExecutor(executor);
defaultExecutor = new SerializingExecutor(executor);
}

if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
Expand All @@ -498,12 +512,13 @@ private void streamCreatedInternal(
final Link link = PerfMark.linkOut();

final JumpToApplicationThreadServerStreamListener jumpListener
= new JumpToApplicationThreadServerStreamListener(
wrappedExecutor, executor, stream, context, tag);
= new JumpToApplicationThreadServerStreamListener(null, executor, stream, context, tag);
stream.setListener(jumpListener);
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. Callbacks can still be triggered, but they will be
// queued.
// Run in serializing executor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. Callbacks can still be triggered.
// If callExecutor needs no executor switch, they will be queued at serializing executor.
// If callExecutor needs a switch due to executorSupplier, they will be queued at jumpListener
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks out-of-date. I guess the old comment isn't inaccurate now, although don't know if we want to describe more.

// first then delivered to serializing executor for the second queueing.

final class StreamCreated extends ContextRunnable {
StreamCreated() {
Expand All @@ -523,30 +538,25 @@ public void runInContext() {

private void runInternal() {
ServerStreamListener listener = NOOP_LISTENER;
ServerMethodDefinition<?, ?> wrapMethod;
Executor switchingExecutor = null;
try {
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
if (method == null) {
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
wrapMethod = lookupMethod(stream, methodName, statsTraceCtx);
if (defaultExecutor instanceof SerializingExecutor && executorSupplier != null) {
switchingExecutor =
switchExecutorProcessCall(methodName, wrapMethod, stream, headers, context, tag);
}
if (method == null) {
Status status = Status.UNIMPLEMENTED.withDescription(
"Method not found: " + methodName);
// TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
// memory as a map whose key is the method name, this would allow a misbehaving
// client to blow up the server in-memory stats storage by sending large number of
// distinct unimplemented method
// names. (https://github.com/grpc/grpc-java/issues/2285)
stream.close(status, new Metadata());
context.cancel(null);
return;
if (switchingExecutor == null) {
listener = processCall(methodName, wrapMethod, stream, headers, context, tag);
jumpListener.setListener(listener);
jumpListener.setCallExecutor(defaultExecutor);
}
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx, tag);
} catch (Throwable t) {
stream.close(Status.fromThrowable(t), new Metadata());
context.cancel(null);
throw t;
} finally {
jumpListener.setListener(listener);
jumpListener.setCallExecutor(defaultExecutor);
throw t;
}

// An extremely short deadline may expire before stream.setListener(jumpListener).
Expand All @@ -566,9 +576,72 @@ public void cancelled(Context context) {

context.addListener(new ServerStreamCancellationListener(), directExecutor());
}

private <ReqT, RespT> ServerStreamListener processCall(final String fullMethodName,
final ServerMethodDefinition<ReqT, RespT> methodDef,
final ServerStream stream,
final Metadata headers,
final Context.CancellableContext context,
final Tag tag) {
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>(
stream,
methodDef.getMethodDescriptor(),
headers,
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer,
tag);
return startWrappedCall(fullMethodName, methodDef, call, headers);
}

private <ReqT, RespT> Executor switchExecutorProcessCall(final String fullMethodName,
final ServerMethodDefinition<ReqT, RespT> methodDef,
final ServerStream stream,
final Metadata headers,
final Context.CancellableContext context,
final Tag tag) {
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>(
stream,
methodDef.getMethodDescriptor(),
headers,
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer,
tag);
final Executor switchingExecutor = executorSupplier.getExecutor(call, headers);
if (switchingExecutor == null) {
return null;
}
final class CallHandled extends ContextRunnable {
CallHandled() {
super(context);
}

@Override
public void runInContext() {
ServerStreamListener listener = NOOP_LISTENER;
try {
listener = startWrappedCall(fullMethodName, methodDef, call, headers);
} catch (Throwable t) {
stream.close(Status.fromThrowable(t), new Metadata());
context.cancel(null);
throw t;
} finally {
jumpListener.setListener(listener);
jumpListener.setCallExecutor(defaultExecutor);
}
}
}

((SerializingExecutor)defaultExecutor).setExecutor(switchingExecutor);
defaultExecutor.execute(new CallHandled());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our discussion I thought we decided this approach was broken because this Runnable might be scheduled behind other runnables that are going to use the listener. For example, if the client cancels the call before this line runs.

Although, I realize now a way to resolve that. We can unconditionally enqueue this Runnable at the very beginning.

wrappedExecutor.execute(new LookupMethod()); // creates Call and calls setExecutor() if necessary
wrappedExecutor.execute(new StreamCreated()); // calls jumpListener.setListener()

If we want to optimize that, we avoid LookupMethod when unnecessary. Before we were trying to avoid enqueuing the later Runnables, but it seems much easier to avoid enqueuing the earlier runnables (if we care).

The main annoyance is we'll want to pass the created ServerCall to StreamCreated, but we'll have to pass it after the runnable is constructed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our discussion I thought we decided this approach was broken because this Runnable might be scheduled behind other runnables that are going to use the listener. For example, if the client cancels the call before this line runs.

It is broken if we execute Runnables that uses the listener immediately. So, we enqueue them all and deliver all at once. But this enqueue needs synchronization that's probably a problem.

Although, I realize now a way to resolve that. We can unconditionally enqueue this Runnable at the very beginning.

wrappedExecutor.execute(new LookupMethod()); // creates Call and calls setExecutor() if necessary
wrappedExecutor.execute(new StreamCreated()); // calls jumpListener.setListener()

If we want to optimize that, we avoid LookupMethod when unnecessary. Before we were trying to avoid enqueuing the later Runnables, but it seems much easier to avoid enqueuing the earlier runnables (if we care).

Oh this approach sounds nice. i'll try it.
One optimization this approach can't do is to put the second runnable inline of the first one, unless we can cancel the second one before the first one finishes.

The main annoyance is we'll want to pass the created ServerCall to StreamCreated, but we'll have to pass it after the runnable is constructed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is broken if we execute Runnables that uses the listener immediately. So, we enqueue them all and deliver all at once.

Oh, I see it now. I should have looked at that more closely.

One optimization this approach can't do is to put the second runnable inline of the first one, unless we can cancel the second one before the first one finishes.

SerializingExecutor will do that naturally if the executor doesn't change. Yes, it will be a different Runnable instance, but they are guaranteed to run back-to-back on the same thread (assuming the executor doesn't change).

return switchingExecutor;
}
}

wrappedExecutor.execute(new StreamCreated());
defaultExecutor.execute(new StreamCreated());
}

private Context.CancellableContext createContext(
Expand All @@ -592,10 +665,28 @@ private Context.CancellableContext createContext(
return context;
}

private ServerMethodDefinition<?,?> lookupMethod(ServerStream stream,
String methodName, StatsTraceContext statsTraceCtx) {
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
if (method == null) {
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
}
if (method == null) {
Status status = Status.UNIMPLEMENTED.withDescription(
"Method not found: " + methodName);
// TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
// memory as a map whose key is the method name, this would allow a misbehaving
// client to blow up the server in-memory stats storage by sending large number of
// distinct unimplemented method
// names. (https://github.com/grpc/grpc-java/issues/2285)
throw new StatusRuntimeException(status);
}
return wrapMethod(stream, method, statsTraceCtx);
}

/** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) {
private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream,
ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
statsTraceCtx.serverCallStarted(
new ServerCallInfoImpl<>(
Expand All @@ -609,34 +700,21 @@ private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag);
return wMethodDef;
}

private <WReqT, WRespT> ServerStreamListener startWrappedCall(
String fullMethodName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are interested: Looks like we can remove this argument by using params.call.getMethodDescriptor().getFullMethodName(). Looks like ServerCallHandled then wouldn't need to look at the fullMethodName at all (only MethodLookup would).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh looks this is only for logging the original fullMethodName. And this is an existing code path.

ServerMethodDefinition<WReqT, WRespT> methodDef,
ServerStream stream,
Metadata headers,
Context.CancellableContext context,
Tag tag) {

ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(
stream,
methodDef.getMethodDescriptor(),
headers,
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer,
tag);

ServerCall.Listener<WReqT> listener =
methodDef.getServerCallHandler().startCall(call, headers);
if (listener == null) {
ServerCallImpl<WReqT, WRespT> call,
Metadata headers) {
ServerCall.Listener<WReqT> callListener =
methodDef.getServerCallHandler().startCall(call, headers);
if (callListener == null) {
throw new NullPointerException(
"startCall() returned a null listener for method " + fullMethodName);
"startCall() returned a null listener for method " + fullMethodName);
}
return call.newServerStreamListener(listener);
return call.newServerStreamListener(callListener);
}
}

Expand Down Expand Up @@ -704,7 +782,7 @@ public void onReady() {}
*/
@VisibleForTesting
static final class JumpToApplicationThreadServerStreamListener implements ServerStreamListener {
private final Executor callExecutor;
private final DelayedExecutor callExecutor;
private final Executor cancelExecutor;
private final Context.CancellableContext context;
private final ServerStream stream;
Expand All @@ -714,7 +792,10 @@ static final class JumpToApplicationThreadServerStreamListener implements Server

public JumpToApplicationThreadServerStreamListener(Executor executor,
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
this.callExecutor = executor;
this.callExecutor = new DelayedExecutor();
if (executor != null) {
callExecutor.setExecutor(executor);
}
this.cancelExecutor = cancelExecutor;
this.stream = stream;
this.context = context;
Expand All @@ -731,6 +812,48 @@ private ServerStreamListener getListener() {
return listener;
}

public void setCallExecutor(Executor executor) {
Preconditions.checkNotNull(executor, "executor must not be null");
Preconditions.checkState(this.callExecutor.realExecutor == null,
"executor already set");
callExecutor.setExecutor(executor);
}

private static class DelayedExecutor implements Executor {
private List<Runnable> runQueue = new ArrayList<>();
private Executor realExecutor;

@Override
public void execute(Runnable command) {
synchronized (DelayedExecutor.this) {
if (realExecutor == null) {
runQueue.add(command);
return;
}
}
realExecutor.execute(command);
}

private void setExecutor(Executor executor) {
List<Runnable> toRun = new ArrayList<>();
while (true) {
synchronized (DelayedExecutor.this) {
if (runQueue.isEmpty()) {
realExecutor = executor;
break;
}
List<Runnable> tmp = runQueue;
runQueue = toRun;
toRun = tmp;
}
for (Runnable r: toRun) {
executor.execute(r);
}
toRun.clear();
}
}
}

@VisibleForTesting
void setListener(ServerStreamListener listener) {
Preconditions.checkNotNull(listener, "listener must not be null");
Expand Down