New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bidi Blocking Stub #10318
base: master
Are you sure you want to change the base?
Bidi Blocking Stub #10318
Conversation
…ork. Multiple threads blocking on a single ClientCall was hanging.
…es. Fix multithreading logic in ThreadExecutor.
…alue of stub on blocking server streaming calls Also in generator put a space between request and response types in the generic definition and use java.lang.String everywhere as has already been done in master
… and move the logic out of the generated file into the server streaming method
…ThreadlessExecutor class for multithreading.
Co-authored-by: Eric Anderson <ejona@google.com>
…upport to Lock & Condition
private static final Logger log = | ||
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName()); | ||
|
||
private Lock waiterLock = new java.util.concurrent.locks.ReentrantLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
here and the condition. Since it is multi-threading, the final is particularly helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs final
if (client_streaming && server_streaming) { | ||
p->Print( | ||
*vars, | ||
"$BlockingClientCall$<$input_type$, $output_type$>\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go ahead and mark these @ExperimentalApi
for at least one release? The code here is pretty tight. The main thing I could see that could change is BlockingClientCall<?, RespT>
becoming BlockingClientCall<ReqT, RespT>
, because that seems the only choice we have.
Although maybe we could also remove the throws InterruptedException
from server streaming, since we "know" it won't block. Dunno if that is worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Created #10918
…atusException for server streaming and regenerate stubs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need a decision on whether should pass ClientCall instead of channel, method and call options. Then need to update generated code.
if (client_streaming && server_streaming) { | ||
p->Print( | ||
*vars, | ||
"$BlockingClientCall$<$input_type$, $output_type$>\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Created #10918
private static final Logger log = | ||
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName()); | ||
|
||
private Lock waiterLock = new java.util.concurrent.locks.ReentrantLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -869,8 +857,8 @@ static final class ThreadSafeThreadlessExecutor extends ConcurrentLinkedQueue<Ru | |||
private static final Logger log = | |||
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName()); | |||
|
|||
private Lock waiterLock = new java.util.concurrent.locks.ReentrantLock(); | |||
private Condition waiterCondition = waiterLock.newCondition(); | |||
private Lock waiterLock = new ReentrantLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs final
private static final Logger log = | ||
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName()); | ||
|
||
private Lock waiterLock = new java.util.concurrent.locks.ReentrantLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs final
@@ -340,7 +413,7 @@ private abstract static class StartableListener<T> extends ClientCall.Listener<T | |||
abstract void onStart(); | |||
} | |||
|
|||
private static final class CallToStreamObserverAdapter<ReqT> | |||
private static class CallToStreamObserverAdapter<ReqT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, the static and final are separate. This should still be final.
p->Print( | ||
*vars, | ||
"$BlockingClientCall$<?, $output_type$>\n" | ||
" $lower_method_name$($input_type$ request) throws java.lang.InterruptedException,\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should discuss whether we remove InterruptedException here. (We know it won't throw.)
* @throws StatusException if the write to the server failed | ||
*/ | ||
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918") | ||
public static <ReqT, RespT> BlockingClientCall<?, RespT> blockingV2ServerStreamingCall( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To allow us to change the ?
to ReqT
in the generated code if we find it works better, we may want the ReqT
present here but use ?
in the generated code. We can talk about that.
Created a BlockingClientCall class that does blocking streams for all 3 streaming types.