Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neither OnCancelHandler nor onComplete gets invoked after half-close #5882

Closed
TMilasius opened this issue Jun 14, 2019 · 14 comments · Fixed by #8408
Closed

Neither OnCancelHandler nor onComplete gets invoked after half-close #5882

TMilasius opened this issue Jun 14, 2019 · 14 comments · Fixed by #8408
Assignees
Labels
Milestone

Comments

@TMilasius
Copy link

What version of gRPC are you using?

1.21.0 (same applies to older versions as well)

Issue

1
After server sends completion event to the client, i.e., half-closes the call, client doesn't dispatch any other events (although request stream isn't closed/ completed).

2
In addition to that server OnCancelHandler doesn't get ever invoked, although client shuts down and closes connection.

Server:

public class TestServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        ServerBuilder.forPort(12345).addService(new TestServiceGrpc.TestServiceImplBase() {

            @Override
            public StreamObserver<Message> bidiOperation(StreamObserver<Message> responseObserver) {
                ServerCallStreamObserver<Message> serverResponseObserver = (ServerCallStreamObserver<Message>) responseObserver;
                serverResponseObserver.setOnCancelHandler(() -> {
                    log.info("Stream cancelled."); // never invoked
                });
                return new StreamObserver<Message>() {

                    @Override
                    public void onNext(Message value) {
                        log.info("Server: received message.");
                        responseObserver.onNext(value);
                        responseObserver.onCompleted();
                    }

                    @Override
                    public void onError(Throwable t) {
                        log.info("Server: received an error.", t);
                    }

                    @Override
                    public void onCompleted() {
                        log.info("Server: received on completed."); // never invoked
                    }
                };
            }
        }).build().start().awaitTermination();
    }
}

Client:

public class TestClient {

    public static void main(String[] args) throws InterruptedException {
        Channel channel = ManagedChannelBuilder.forAddress("localhost", 12345).usePlaintext().build();
        CountDownLatch receivedOnCompleted = new CountDownLatch(1);
        TestServiceStub asynStub = TestServiceGrpc.newStub(channel);
        StreamObserver<Message> requestStream = asynStub.bidiOperation(new ClientResponseObserver<Message, Message>() {

            private ClientCallStreamObserver<Message> requestStream;

            @Override
            public void beforeStart(ClientCallStreamObserver<Message> requestStream) {
                this.requestStream = requestStream;
            }

            @Override
            public void onNext(Message value) {
                log.info("Client received message.");
            }

            @Override
            public void onError(Throwable t) {
                log.info("Client received error.", t);
                receivedOnCompleted.countDown();
            }

            @Override
            public void onCompleted() {
                log.info("Client received on completed.");
                requestStream.onNext(Message.getDefaultInstance()); // This message doesn't get processed by the server
                requestStream.onCompleted(); // Completion event also gets lost somewhere in the middle
                receivedOnCompleted.countDown();
            }
        });
        requestStream.onNext(Message.getDefaultInstance());
        receivedOnCompleted.await();
        Thread.sleep(10000); // give some time for client to complete sending events
        log.info("Client shutdown.");
    }
}

Client logs:

1298 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND SETTINGS: ack=false settings={ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
1323 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
1539 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND SETTINGS: ack=false settings={MAX_CONCURRENT_STREAMS=2147483647, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
1542 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND SETTINGS: ack=true
1545 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
1552 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND SETTINGS: ack=true
1581 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[:authority: localhost:12345, :path: /TestService/BidiOperation, :method: POST, :scheme: http, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.21.0, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
1641 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND DATA: streamId=3 padding=0 endStream=false length=5 bytes=0000000000
1737 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-type: application/grpc, grpc-encoding: identity, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
1763 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND DATA: streamId=3 padding=0 endStream=false length=5 bytes=0000000000
1771 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[grpc-status: 0] streamDependency=0 weight=16 exclusive=false padding=0 endStream=true
1772 INFO  TestClient - Client received message.
1777 DEBUG io.grpc.netty.NettyClientHandler - [id: 0x21d7b029, L:/127.0.0.1:61183 - R:localhost/127.0.0.1:12345] OUTBOUND RST_STREAM: streamId=3 errorCode=8
1781 INFO  TestClient - Client received on completed.
11783 INFO TestClient - Client shutdown.

Server logs:

9704 DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
9704 DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
9706 DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@21c719f5
9826 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND SETTINGS: ack=false settings={MAX_CONCURRENT_STREAMS=2147483647, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
9834 DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
9834 DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
9835 DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
9835 DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
9849 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
9865 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND SETTINGS: ack=false settings={ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
9869 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND SETTINGS: ack=true
9872 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
9875 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND SETTINGS: ack=true
9930 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND HEADERS: streamId=3 headers=GrpcHttp2RequestHeaders[:path: /TestService/BidiOperation, :authority: localhost:12345, :method: POST, :scheme: http, te: trailers, content-type: application/grpc, user-agent: grpc-java-netty/1.21.0, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
10018 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND DATA: streamId=3 padding=0 endStream=false length=5 bytes=0000000000
10036 INFO  TestServer - Server: received message.
10046 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[:status: 200, content-type: application/grpc, grpc-encoding: identity, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
10081 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND DATA: streamId=3 padding=0 endStream=false length=5 bytes=0000000000
10082 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[grpc-status: 0] streamDependency=0 weight=16 exclusive=false padding=0 endStream=true
10099 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] INBOUND RST_STREAM: streamId=3 errorCode=8
20528 DEBUG io.grpc.netty.NettyServerHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 - R:/127.0.0.1:61183] OUTBOUND GO_AWAY: lastStreamId=3 errorCode=2 length=61 bytes=416e206578697374696e6720636f6e6e656374696f6e2077617320666f726369626c7920636c6f736564206279207468652072656d6f746520686f7374
20533 DEBUG io.netty.handler.codec.http2.Http2ConnectionHandler - [id: 0xd1f35e47, L:/127.0.0.1:12345 ! R:/127.0.0.1:61183] Sending GOAWAY failed: lastStreamId '3', errorCode '2', debugData 'An existing connection was forcibly closed by the remote host'. Forcing shutdown of the connection.
java.io.IOException: An existing connection was forcibly closed by the remote host
	at sun.nio.ch.SocketDispatcher.writev0(Native Method)
	at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55)
	at sun.nio.ch.IOUtil.write(IOUtil.java:148)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:420)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:939)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:906)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1370)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.onError(Http2ConnectionHandler.java:629)
	at io.grpc.netty.AbstractNettyHandler.exceptionCaught(AbstractNettyHandler.java:81)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:282)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:261)
	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1375)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:282)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:261)
	at io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:918)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)

What did you expect to see?

1
Client should keep on sending events to server until it closes/ completes request stream, independently if server closed response stream (half-closed call). As of now client is not able to reply to server upon response stream completion.

2
Servers OnCancelHandler must be invoked in either case, independently if call has been already half-closed or not and if connection has been forcibly shutdown or not. It should be possible to free-up resources like database connections in OnCancelHandler, but as off now it doesn't seem that there is a single place where we could do this.

@ejona86
Copy link
Member

ejona86 commented Jun 18, 2019

The server calling onCompleted()/onError() signal the end of the RPC. The client can't send any more after that point (if it does due to races, the data will be thrown away).

OnCancelHandler isn't called because there wasn't a cancellation; the RPC completed successfully. onComplete isn't called because the client didn't halfClose() (the client didn't call onComplete() before the server finished the RPC).

Client-side looks to be operating correctly. On server-side I expect the ServerCall is working correctly. This seems like a problem in the stub. I think we could fabricate an onError when the RPC terminates if we haven't yet called onError/onCompleted. That'd happen in ServerCalls. I think this would be easy to fix.

It should be possible to free-up resources like database connections in OnCancelHandler, but as off now it doesn't seem that there is a single place where we could do this.

No, OnCancelHandler isn't appropriate for that. On the lower-level one of onCancel (same as the handler)/onComplete (different than StreamObserver.onCompleted) is guaranteed to be called. But we don't expose onComplete today. We could add that though, I think.

@ejona86 ejona86 assigned ejona86 and unassigned zhangkun83 Jun 18, 2019
@TMilasius
Copy link
Author

TMilasius commented Jun 18, 2019

Re half-close - is this the same in other GRPC implementations? I was trying to find the documentation describing half-closed connections and how they should/ are working, but haven't found anything. The initial expectation was that in bidirectional streaming case, streams are completely independent and either party can and should signal stream end (completion) and there should be no distinction between client and server. Is there some specific reason why onCompleted event issued by the server results in call termination rather than half-closed stream?

Re OnCancelHandler, actually my expectation was based on ServerCallStreamObserver.setOnCancelHandler(..) javaDoc which says: "Set a Runnable that will be called if the calls isCancelled() state changes from false to true.". Actual implementation seems to be contradicting to that, since if we launch a background thread which periodicly checks streamObserver.isCancelled() eventually it does return true, but provided OnCancelHandler doesn't get called.

@ejona86
Copy link
Member

ejona86 commented Jun 18, 2019

The best documentation for the behavior is probably in ServerCall/ClientCall (either one; they should mirror each other) or in grpc/grpc#15460 . The ServerCall/ClientCall APIs were designed to be close to 1:1 with the underlying protocol, so they match the cross-language restrictions pretty well. Yes, that PR linked isn't merged, although it is mainly for the word choice of what we call various concepts, not for details about the behavior.

It is unfortunate that the cross-language documentation isn't clearly specified, but that only really becomes a problem for bidi streaming. And it shouldn't hurt Java users much since ServerCall/ClientCall defines quite a bit.

Granted, you should have been able to just read the various StreamObserver documentation. You shouldn't need to read ServerCall/ClientCall, but that would have probably gone better we had exposed ServerCall.Listener.onCompleted() to StreamObserver API. Although the conflicting names add confusion.

https://grpc.github.io/grpc-java/javadoc/io/grpc/ClientCall.Listener.html#onClose-io.grpc.Status-io.grpc.Metadata-
https://grpc.github.io/grpc-java/javadoc/io/grpc/ServerCall.html#close-io.grpc.Status-io.grpc.Metadata-

@ejona86
Copy link
Member

ejona86 commented Jun 18, 2019

This issue will be for the bug of not calling onError (since the StreamObserver API expects to always end in onError/onComplete). I've created #5895 for the RPC's completion notification feature.

@ejona86 ejona86 added this to the Next milestone Jun 18, 2019
@ejona86 ejona86 added the bug label Jun 18, 2019
@TMilasius
Copy link
Author

Eric, if StreamObserver will always end with onError/onComplete invocation, I think that will be sufficient and another onComplete handler (to deallocate server resources) is just a nice to have feature. In addition, as you mentioned, there might be a naming clash, which most likely will be hard to understand for average user.

In addition to that I still think that there is a gap between ServerCallStreamObserver.isCancelled() and ServerCallStreamObserver.setOnCancelHandler(...) - at least in javaDocs, which state that handler will be invoked whenever isCancelled() switches from false to true. If you'll start a background thread on server which periodically checks isCancelled() value, you'll notice that eventually it switches from false to true, even if call has been completed gracefully, but it looks like that this handler gets only invoked if call has been terminated without a half-close. Notice that I am reffering to ServerCallStreamObserver.isCancelled() javaDocs not the ServerCall.isCancelled(), which is a bit different.

Example
Server:

public class TestServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        ServerBuilder.forPort(12345).addService(new TestServiceGrpc.TestServiceImplBase() {

            @Override
            public StreamObserver<Message> bidiOperation(StreamObserver<Message> responseObserver) {
                ServerCallStreamObserver<Message> serverResponseObserver = (ServerCallStreamObserver<Message>) responseObserver;
                serverResponseObserver.setOnCancelHandler(() -> {
                    log.info("Stream cancelled."); // never invoked
                });
		new Thread(() -> {
			while (!serverResponseObserver.isCancelled()) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					return;
				}
			}
			log.info("isCancelled switched to true"); // this message eventually gets logged
		}).start();
                return new StreamObserver<Message>() {

                    @Override
                    public void onNext(Message value) {
                        log.info("Server: received message.");
                        responseObserver.onNext(value);
                    }

                    @Override
                    public void onError(Throwable t) {
                        log.info("Server: received an error.", t);
                    }

                    @Override
                    public void onCompleted() {
                        log.info("Server: received on completed.");
                        responseObserver.onCompleted();
                    }
                };
            }
        }).build().start().awaitTermination();
    }
}

Client:

public class TestClient {

    public static void main(String[] args) throws InterruptedException {
        Channel channel = ManagedChannelBuilder.forAddress("localhost", 12345).usePlaintext().build();
        CountDownLatch receivedOnCompleted = new CountDownLatch(1);
        TestServiceStub asynStub = TestServiceGrpc.newStub(channel);
        StreamObserver<Message> requestStream = asynStub.bidiOperation(new StreamObserver<Message>() {

            @Override
            public void onNext(Message value) {
                log.info("Client received message.");
            }

            @Override
            public void onError(Throwable t) {
                log.info("Client received an error.", t);
                receivedOnCompleted.countDown();
            }

            @Override
            public void onCompleted() {
                log.info("Client received on completed.");
                receivedOnCompleted.countDown();
            }
        });
        requestStream.onNext(Message.getDefaultInstance());
        requestStream.onCompleted();
        receivedOnCompleted.await();
        log.info("Client shutdown.");
    }
}

@ejona86
Copy link
Member

ejona86 commented Jun 19, 2019

there is a gap between ServerCallStreamObserver.isCancelled() and ServerCallStreamObserver.setOnCancelHandler(...) ... which state that handler will be invoked whenever isCancelled() switches from false to true

What... isCancelled() should not be true at the end of a graceful RPC closure.

if (status.isOk()) {
listener.onComplete();
} else {
call.cancelled = true;
listener.onCancel();
}

But I tried your program and confirmed the behavior. I tracked the problem to the Context (correctly) being cancelled but then that sets cancelled = true.

this.context.addListener(
new Context.CancellationListener() {
@Override
public void cancelled(Context context) {
ServerStreamListenerImpl.this.call.cancelled = true;
}
},
MoreExecutors.directExecutor());

That behavior's totally a bug. It appears introduced in #2963 which was version 1.5.0. I tested 1.4.0 and isCancelled never swapped to true.

Fixing the bug seems likely to break users (unknowingly) depending on the current behavior. I may have to redefine isCancelled to say it will return true if the call is cancelled or when it is complete. Alternatively I could try fixing the bug internally and seeing how much breaks; if not very much, then maybe there aren't many users impacted.

@ejona86
Copy link
Member

ejona86 commented Jun 19, 2019

If you'll start a background thread on server which periodically checks isCancelled() value, you'll notice that eventually it switches from false to true, even if call has been completed gracefully, but it looks like that this handler gets only invoked if call has been terminated without a half-close

I didn't receive a notification in either case; half-close didn't seem to change whether the cancellation handler was called. That's the behavior I would expect.

@xpunch
Copy link

xpunch commented Jul 26, 2019

Do you think this is the same reason which cause my issue? #6011
Other non-java client request may cause cancellation handler called?

@ejona86 ejona86 assigned voidzcy and unassigned ejona86 Sep 5, 2019
@sschepens
Copy link

Is there any update on this? I have an application which depends on onComplete or onError being called and we find some cases where they are not called which kinda triggers a leak

emilkhshiboun pushed a commit to emilkhshiboun/ds-hw2 that referenced this issue Jan 19, 2021
The bug is actually in grpc and not our code, described here:
grpc/grpc-java#5882

using bidirectional streaming we can not invoke "onCompleted" twice (needed by client & server),
 Note: An exception to this is calling onCompleted from inside onCompleted (this works and this is the temp fix for "one ride reserver", however in our user case this is not relevant as we need to open async requests)

 Our bug:
 Once the server finds a ride and trigger the client's onComplete, the client can not trigger back onComplete for the server, which leads to the semaphore in the server to not to be released, we should think of other way implementing this.

One idea is to not relay on the "onCompleted" and use the actual streaming to sends messages between the client and server, thus the end of the stream can be known when we get a specific message. (via onNext())

If you want to do this, change the grpc method signature, to accept "RequestHeader" (implement it in protobuf), which contains a string message, and data, same for ResponseHeader,
@mohsenrezaeithe
Copy link

mohsenrezaeithe commented Aug 5, 2021

That behavior's totally a bug. It appears introduced in #2963 which was version 1.5.0. I tested 1.4.0 and isCancelled never swapped to true.
Fixing the bug seems likely to break users (unknowingly) depending on the current behavior. I may have to redefine isCancelled to say it will return true if the call is cancelled or when it is complete. Alternatively I could try fixing the bug internally and seeing how much breaks; if not very much, then maybe there aren't many users impacted.

I seem to be still observing this on 1.39.0, with a INBOUND RST_STREAM: ... errorCode=8 without a notification to StreamObserver.onCompleted().

Has there been any decisions made on whether this is going to be fixed or is going to stay the way it is (requiring workarounds)?

If this is going to be fixed, but needs time, given pointers I may be able to contribute, otherwise, what's the recommended workaround that'd allow resource cleanup, much like the transition to StreamObserver.onCompleted() on a bidi stream, on the server side?

@ejona86
Copy link
Member

ejona86 commented Aug 5, 2021

This issue is just for the problem on server-side where isCancelled() == true at the end of the RPC even without a cancel, so onCancelHandler wasn't called. That is why one of the solutions to this was:

I may have to redefine isCancelled to say it will return true if the call is cancelled or when it is complete.

If you want to be notified on server-side when the RPC is complete and not have to rely on cleaning up yourself when your server calls onError/onComplete, then that would be #5895 that I split out of this issue. An approach available today would be to add a io.grpc.Context listener, as the Context is always closed at the end of the RPC. However, that notification is separate from the StreamObserver API so your cleanup code would need to be thread-safe. The notification mentioned in #5895 would run using the same serialization as the rest of StreamObserver so would not require your cleanup code to be thread-safe.

@mohsenrezaeithe
Copy link

@ejona86 thanks for the pointers. How likely will the StreamObserver solution be implemented, given that #5895 is pointing back to this issue and there hasn't been activity on either for some time?

@ejona86
Copy link
Member

ejona86 commented Aug 5, 2021

@mohsenrezaeithe, I'd suggest we discuss that on #5895, if that is the piece you are wanting. That feature is very easy to implement. I think we'd mainly argue over what to call the method name.

@ejona86 ejona86 assigned temawi and unassigned voidzcy Aug 11, 2021
temawi added a commit that referenced this issue Aug 20, 2021
…d. (#8408)

The semantics around cancel vary slightly between ServerCall and CancellableContext - the context should always be cancelled regardless of the outcome of the call while the ServerCall should only be cancelled on a non-OK status.

This fixes a bug where the ServerCall was always marked cancelled regardless of call status.

Fixes #5882
@ejona86
Copy link
Member

ejona86 commented Aug 25, 2021

API Review meeting notes (from Friday, 8/20/2021; just slow copying the notes here):

  • Seems safe, because if any code checking isCancelled() it is likely to be sending messages, and after the app calls onError()/onComplete() they have to stop sending messages, otherwise they will get IllegalStateException.
  • +5 Make the change

@ejona86 ejona86 modified the milestones: Next, 1.41 Aug 30, 2021
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Nov 29, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants