Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bidi Blocking Stub #10318

Open
wants to merge 50 commits into
base: master
Choose a base branch
from
Open

Bidi Blocking Stub #10318

wants to merge 50 commits into from

Conversation

larry-safran
Copy link
Contributor

@larry-safran larry-safran commented Jun 28, 2023

Created a BlockingClientCall class that does blocking streams for all 3 streaming types.

@larry-safran larry-safran marked this pull request as ready for review June 28, 2023 00:33
…alue of stub on blocking server streaming calls

Also in generator put a space between request and response types in the generic definition and use java.lang.String everywhere as has already been done in master
… and move the logic out of the generated file into the server streaming method
@ejona86 ejona86 changed the title Blocking Bidi Blocking Stub Sep 21, 2023
…ThreadlessExecutor class for multithreading.
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
private static final Logger log =
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName());

private Lock waiterLock = new java.util.concurrent.locks.ReentrantLock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final here and the condition. Since it is multi-threading, the final is particularly helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs final

stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
if (client_streaming && server_streaming) {
p->Print(
*vars,
"$BlockingClientCall$<$input_type$, $output_type$>\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go ahead and mark these @ExperimentalApi for at least one release? The code here is pretty tight. The main thing I could see that could change is BlockingClientCall<?, RespT> becoming BlockingClientCall<ReqT, RespT>, because that seems the only choice we have.

Although maybe we could also remove the throws InterruptedException from server streaming, since we "know" it won't block. Dunno if that is worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Created #10918

…atusException for server streaming and regenerate stubs.
Copy link
Contributor Author

@larry-safran larry-safran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a decision on whether should pass ClientCall instead of channel, method and call options. Then need to update generated code.

if (client_streaming && server_streaming) {
p->Print(
*vars,
"$BlockingClientCall$<$input_type$, $output_type$>\n"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Created #10918

stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
private static final Logger log =
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName());

private Lock waiterLock = new java.util.concurrent.locks.ReentrantLock();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
@@ -869,8 +857,8 @@ static final class ThreadSafeThreadlessExecutor extends ConcurrentLinkedQueue<Ru
private static final Logger log =
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName());

private Lock waiterLock = new java.util.concurrent.locks.ReentrantLock();
private Condition waiterCondition = waiterLock.newCondition();
private Lock waiterLock = new ReentrantLock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs final

private static final Logger log =
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName());

private Lock waiterLock = new java.util.concurrent.locks.ReentrantLock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs final

@@ -340,7 +413,7 @@ private abstract static class StartableListener<T> extends ClientCall.Listener<T
abstract void onStart();
}

private static final class CallToStreamObserverAdapter<ReqT>
private static class CallToStreamObserverAdapter<ReqT>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, the static and final are separate. This should still be final.

p->Print(
*vars,
"$BlockingClientCall$<?, $output_type$>\n"
" $lower_method_name$($input_type$ request) throws java.lang.InterruptedException,\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should discuss whether we remove InterruptedException here. (We know it won't throw.)

* @throws StatusException if the write to the server failed
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
public static <ReqT, RespT> BlockingClientCall<?, RespT> blockingV2ServerStreamingCall(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To allow us to change the ? to ReqT in the generated code if we find it works better, we may want the ReqT present here but use ? in the generated code. We can talk about that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants