Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: allow per-service/method executor #8266

Merged
merged 4 commits into from Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/ForwardingServerBuilder.java
Expand Up @@ -61,6 +61,12 @@ public T executor(@Nullable Executor executor) {
return thisT();
}

@Override
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
delegate().callExecutor(executorSupplier);
return thisT();
}

@Override
public T addService(ServerServiceDefinition service) {
delegate().addService(service);
Expand Down
24 changes: 24 additions & 0 deletions api/src/main/java/io/grpc/ServerBuilder.java
Expand Up @@ -74,6 +74,30 @@ public static ServerBuilder<?> forPort(int port) {
*/
public abstract T executor(@Nullable Executor executor);


/**
* Allows for defining a way to provide a custom executor to handle the server call.
* This executor is the result of calling
* {@link ServerCallExecutorSupplier#getExecutor(ServerCall, Metadata)} per RPC.
*
* <p>It's an optional parameter. If it is provided, the {@link #executor(Executor)} would still
* run necessary tasks before the {@link ServerCallExecutorSupplier} is ready to be called, then
* it switches over.
*
* <p>If it is provided, {@link #directExecutor()} optimization is disabled. But if calling
* {@link ServerCallExecutorSupplier} returns null, the server call is still handled by the
* default {@link #executor(Executor)} as a fallback.
*
* @param executorSupplier the server call executor provider
* @return this
* @since 1.39.0
*
* */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8274")
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
return thisT();
}

/**
* Adds a service implementation to the handler registry.
*
Expand Down
34 changes: 34 additions & 0 deletions api/src/main/java/io/grpc/ServerCallExecutorSupplier.java
@@ -0,0 +1,34 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.util.concurrent.Executor;
import javax.annotation.Nullable;

/**
* Defines what executor the server call is handled, based on each RPC call information at runtime.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"handles the server call"

* */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8274")
public interface ServerCallExecutorSupplier {

/**
* Returns an executor to handle the server call.
* It should never throw. It should return null to fallback to the default executor.
* */
@Nullable
<ReqT, RespT> Executor getExecutor(ServerCall<ReqT, RespT> call, Metadata metadata);
}
Expand Up @@ -24,6 +24,7 @@
import io.grpc.HandlerRegistry;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCallExecutorSupplier;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerStreamTracer;
Expand Down Expand Up @@ -67,6 +68,12 @@ public T directExecutor() {
return thisT();
}

@Override
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
delegate().callExecutor(executorSupplier);
return thisT();
}

@Override
public T executor(@Nullable Executor executor) {
delegate().executor(executor);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/grpc/internal/SerializingExecutor.java
Expand Up @@ -59,7 +59,7 @@ private static AtomicHelper getAtomicHelper() {
private static final int RUNNING = -1;

/** Underlying executor that all submitted Runnable objects are run on. */
private volatile Executor executor;
private Executor executor;

/** A list of Runnables to be run in order. */
private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -124,7 +124,7 @@ public void run() {
Runnable r;
try {
Executor oldExecutor = executor;
while ((r = runQueue.poll()) != null && oldExecutor == executor ) {
while (oldExecutor == executor && (r = runQueue.poll()) != null ) {
try {
r.run();
} catch (RuntimeException e) {
Expand Down
102 changes: 51 additions & 51 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -46,6 +46,7 @@
import io.grpc.InternalServerInterceptors;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallExecutorSupplier;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
Expand Down Expand Up @@ -125,7 +126,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
private final InternalChannelz channelz;
private final CallTracer serverCallTracer;
private final Deadline.Ticker ticker;
private final ExecutorSupplier executorSupplier;
private final ServerCallExecutorSupplier executorSupplier;

/**
* Construct a server.
Expand Down Expand Up @@ -160,18 +161,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
this.serverCallTracer = builder.callTracerFactory.create();
this.ticker = checkNotNull(builder.ticker, "ticker");
channelz.addServer(this);
this.executorSupplier = new ExecutorSupplier() {
@Override
public Executor getExecutor(ServerCall<?, ?> call,
Metadata header) {
return executor;
}
};
}


public interface ExecutorSupplier {
Executor getExecutor(ServerCall<?,?> call, Metadata metadata);
this.executorSupplier = builder.executorSupplier;
}

/**
Expand Down Expand Up @@ -479,14 +469,14 @@ public void streamCreated(ServerStream stream, String methodName, Metadata heade

private void streamCreatedInternal(
final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) {
final Executor defaultExecutor;
final Executor wrapExecutor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/wrapExecutor/wrappedExecutor/ to keep the name?

// This is a performance optimization that avoids the synchronization and queuing overhead
// that comes with SerializingExecutor.
if (executor == directExecutor()) {
defaultExecutor = new SerializeReentrantCallsDirectExecutor();
stream.optimizeForDirectExecutor();
if (executorSupplier != null || executor != directExecutor()) {
wrapExecutor = new SerializingExecutor(executor);
} else {
defaultExecutor = new SerializingExecutor(executor);
wrapExecutor = new SerializeReentrantCallsDirectExecutor();
stream.optimizeForDirectExecutor();
}

if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
Expand All @@ -512,7 +502,7 @@ private void streamCreatedInternal(

final JumpToApplicationThreadServerStreamListener jumpListener
= new JumpToApplicationThreadServerStreamListener(
defaultExecutor, executor, stream, context, tag);
wrapExecutor, executor, stream, context, tag);
stream.setListener(jumpListener);
final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create();
// Run in serializing executor so jumpListener.setListener() is called before any callbacks
Expand All @@ -528,12 +518,12 @@ final class MethodLookup extends ContextRunnable {

@Override
public void runInContext() {
PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag);
PerfMark.startTask("ServerTransportListener$MethodLookup.startCall", tag);
PerfMark.linkIn(link);
try {
runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag);
PerfMark.stopTask("ServerTransportListener$MethodLookup.startCall", tag);
}
}

Expand All @@ -555,7 +545,6 @@ private void runInternal() {
// names. (https://github.com/grpc/grpc-java/issues/2285)
stream.close(status, new Metadata());
context.cancel(null);
jumpListener.setListener(NOOP_LISTENER);
future.cancel(false);
return;
}
Expand All @@ -566,26 +555,8 @@ private void runInternal() {
stream.close(Status.fromThrowable(t), new Metadata());
context.cancel(null);
future.cancel(false);
jumpListener.setListener(NOOP_LISTENER);
throw t;
}

// An extremely short deadline may expire before stream.setListener(jumpListener).
// This causes NPE as in issue: https://github.com/grpc/grpc-java/issues/6300
// Delay of setting cancellationListener to context will fix the issue.
final class ServerStreamCancellationListener implements Context.CancellationListener {
@Override
public void cancelled(Context context) {
Status status = statusFromCancelled(context);
if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
// This should rarely get run, since the client will likely cancel the stream
// before the timeout is reached.
stream.cancel(status);
}
}
}

context.addListener(new ServerStreamCancellationListener(), directExecutor());
}

private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor(
Expand All @@ -603,30 +574,42 @@ private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor(
compressorRegistry,
serverCallTracer,
tag);
final Executor switchingExecutor = executorSupplier.getExecutor(call, headers);
if (switchingExecutor != null && defaultExecutor instanceof SerializingExecutor) {
((SerializingExecutor)defaultExecutor).setExecutor(switchingExecutor);
if (executorSupplier != null) {
Executor switchingExecutor = executorSupplier.getExecutor(call, headers);
if (switchingExecutor != null) {
((SerializingExecutor)wrapExecutor).setExecutor(switchingExecutor);
}
}
return new ServerCallParameters<>(call, methodDef);
}
}

final class CallHandled extends ContextRunnable {
CallHandled() {
final class ServerCallHandled extends ContextRunnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "handled" in the name, since this handles the RPC, it isn't run after the RPC is handled. Maybe HandleServerCall?

ServerCallHandled() {
super(context);
}

@Override
public void runInContext() {
PerfMark.startTask("ServerTransportListener$ServerCallHandled.startCall", tag);
PerfMark.linkIn(link);
try {
runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$ServerCallHandled.startCall", tag);
}
}

private void runInternal() {
ServerStreamListener listener = NOOP_LISTENER;
ServerCallParameters<?,?> callParameters;
if (future.isCancelled()) {
return;
}
try {
if (future.isCancelled()) {
return;
}
if (!future.isDone() || (callParameters = future.get()) == null) {
Status status = Status.INTERNAL.withDescription(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should throw this as an exception? If it's an exception then it will be logged (and the catch will handle it). Seems that would make it more likely we'd learn about the bug.

"Fail to start server call");
"Fail to start server call.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the message a bit more clear there's a bug in grpc.

stream.close(status, new Metadata());
context.cancel(null);
return;
Expand All @@ -639,11 +622,28 @@ public void runInContext() {
} finally {
jumpListener.setListener(listener);
}

// An extremely short deadline may expire before stream.setListener(jumpListener).
// This causes NPE as in issue: https://github.com/grpc/grpc-java/issues/6300
// Delay of setting cancellationListener to context will fix the issue.
final class ServerStreamCancellationListener implements Context.CancellationListener {
@Override
public void cancelled(Context context) {
Status status = statusFromCancelled(context);
if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
// This should rarely get run, since the client will likely cancel the stream
// before the timeout is reached.
stream.cancel(status);
}
}
}

context.addListener(new ServerStreamCancellationListener(), directExecutor());
}
}

defaultExecutor.execute(new MethodLookup());
defaultExecutor.execute(new CallHandled());
wrapExecutor.execute(new MethodLookup());
wrapExecutor.execute(new ServerCallHandled());
}

private Context.CancellableContext createContext(
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/io/grpc/internal/ServerImplBuilder.java
Expand Up @@ -32,6 +32,7 @@
import io.grpc.InternalChannelz;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCallExecutorSupplier;
import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
Expand Down Expand Up @@ -93,6 +94,8 @@ public static ServerBuilder<?> forPort(int port) {
@Nullable BinaryLog binlog;
InternalChannelz channelz = InternalChannelz.instance();
CallTracer.Factory callTracerFactory = CallTracer.getDefaultFactory();
@Nullable
ServerCallExecutorSupplier executorSupplier;

/**
* An interface to provide to provide transport specific information for the server. This method
Expand Down Expand Up @@ -122,6 +125,12 @@ public ServerImplBuilder executor(@Nullable Executor executor) {
return this;
}

@Override
public ServerImplBuilder callExecutor(ServerCallExecutorSupplier executorSupplier) {
this.executorSupplier = checkNotNull(executorSupplier);
return this;
}

@Override
public ServerImplBuilder addService(ServerServiceDefinition service) {
registryBuilder.addService(checkNotNull(service, "service"));
Expand Down
32 changes: 32 additions & 0 deletions core/src/test/java/io/grpc/internal/SerializingExecutorTest.java
Expand Up @@ -209,6 +209,38 @@ public void run() {
assertEquals(Arrays.asList(1, 2, 3), runs);
}

@Test
public void switchable() {
final SerializingExecutor testExecutor =
new SerializingExecutor(MoreExecutors.directExecutor());
testExecutor.execute(new Runnable() {
@Override
public void run() {
runs.add(1);
testExecutor.setExecutor(singleExecutor);
}
});
testExecutor.execute(new AddToRuns(-2));
assertThat(runs).isEqualTo(Arrays.asList(1));
singleExecutor.drain();
assertThat(runs).isEqualTo(Arrays.asList(1, -2));
}

@Test
public void notSwitch() {
executor.execute(new Runnable() {
@Override
public void run() {
runs.add(1);
executor.setExecutor(singleExecutor);
}
});
executor.execute(new AddToRuns(-2));
assertThat(runs).isEqualTo(Collections.emptyList());
singleExecutor.drain();
assertThat(runs).isEqualTo(Arrays.asList(1, -2));
}

private static class SingleExecutor implements Executor {
private Runnable runnable;

Expand Down