Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockingStub returns iterator that cannot be closed #2409

Open
jonnyzzz opened this issue Nov 8, 2016 · 10 comments
Open

BlockingStub returns iterator that cannot be closed #2409

jonnyzzz opened this issue Nov 8, 2016 · 10 comments
Labels
Milestone

Comments

@jonnyzzz
Copy link

jonnyzzz commented Nov 8, 2016

I use a blocking sub for a method that returns a stream of messages. The return type of it is Iterator<T>. It turns out I cannot make grpc client to end the request (e.g. because of client failure). There are a number of cases where an open connection may stuck in client because of that

The implementation uses
io.grpc.stub.ClientCalls#blockingServerStreamingCall(io.grpc.Channel, io.grpc.MethodDescriptor<ReqT,RespT>, io.grpc.CallOptions, ReqT)

call, which in turn uses BlockingResponseStream. There is no way to reach the io.grpc.ClientCall#cancel method form the Iterator. Could be nice to wrap an Iterator or make it implement an interface in the similar way as done with ServerStreamObserver/ClientStreamObserver
The other approach could be to implement AutoCloseable from Java with this iterator

There is also a comment on the Iterator implementation class

  // TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
@ejona86
Copy link
Member

ejona86 commented Nov 8, 2016

I'm not wild about how the iterator has turned out either. Streaming+blocking in grpc-java is weak/missing. To do AutoCloseable we'd need to create a new interface that extends both AutoCloseable and Iterator.

The main way to cancel would be to create and attach a CancellableContext, and then cancel the context as you leave the block. You should always cancel the CancellableContext when the unit of work it encompases is complete, even for success.

@jonnyzzz
Copy link
Author

jonnyzzz commented Nov 8, 2016

I decided to use blocking API in tests, to simplify calls I only use in tests. It was not clear form the code. I was no javadoc hints for that. Will try CancellableContext.

@ejona86 ejona86 added the docs label Nov 9, 2016
@ejona86 ejona86 modified the milestones: 1.1, Next Nov 9, 2016
@jhaber
Copy link

jhaber commented Jun 27, 2018

Another option would be to have a method that returns a java.util.stream.Stream<RespT> since streams are AutoCloseable, but I'm guessing grpc-stub can't use Java 8 features.

@systemfreund
Copy link

I'd like to add that not only an application error can prevent closing the iterator. I just tracked down a "direct buffer" ("LEAK: ByteBuf.release() was not called before...") memory leak in our system.

It was caused by not iterating through all elements returned by the blocking stub's Iterator (BlockingResponseStream):

stub.getElements(request)
    .asSequence()
    .any { it.someProperty == 1234 }

This is kotlin code and it looks very innocent. The .asSequence call converts the Iterator returned from the blocking stub to a Sequence, on which we can call .map/.filter/.any etc.

However, because a sequence is evaluated lazily, as soon as any finds a matching object the remaining elements are not going to be processed, leaving the iterator "open" and causing the leak, apparently.

@carl-mastrangelo
Copy link
Contributor

@systemfreund There isn't anything we can really do about that. The suggestion to use CancellableContext is the way to prevent such leaks. Any other approaches are likely going to have bad side effects, worse than the occasional nag log.

@pskiwi
Copy link

pskiwi commented Oct 9, 2019

@carl-mastrangelo Even CancellableContext does not work correctly, because the onClose() method of any attached ClientInterceptor is not called in the case of a cancellation.
This might be solveable by draining the ThreadlessExecutor of ClientCalls.blockingServerStreamingCall in a Context.CancellationListener?

@ejona86
Copy link
Member

ejona86 commented Oct 9, 2019

@pskiwi, I think the behavior you are referring to was fixed in #6255.

@pskiwi
Copy link

pskiwi commented Oct 10, 2019

@ejona86 I tried, but it doesn't seem to fix my problem.
I was hoping, the following test code should work:
ClientCallsTest.java

Is this expected to work?

@ejona86
Copy link
Member

ejona86 commented Oct 10, 2019

@pskiwi, no, that wouldn't work. After the cancellation you need to "drain" the Iterator; keep going until you get hasNext() == false. If you have a while (iter.hasNext()) iter.next(); after the finally, then it would work.

Draining the ThreadlessExecutor from a Context.CancellationListener scares me because of the threading involved.

It would be possible to add a close() method to the Iterator (and require you to cast), but in general we're not wild about the blocking iterator API as it is far too hard to use properly. We think the proper fix here is to have a "real" blocking API.

@mikeholler
Copy link

Does anyone have any examples for the use of CancellableContext with a blocking stub? I'm having a hard time finding a good official source after an hour of searching.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants