Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UndeliverableException for StreamNotFoundException #244

Open
dosomder opened this issue Oct 25, 2023 · 5 comments
Open

UndeliverableException for StreamNotFoundException #244

dosomder opened this issue Oct 25, 2023 · 5 comments
Labels
bug Something isn't working

Comments

@dosomder
Copy link

We rarely observe the following exception on our servers which crashes the application:

From the stacktrace it looks like the StreamNotFoundException (or at least onError) is emitted twice which is not legal for a Publisher.
The first time may be here: https://github.com/EventStore/EventStoreDB-Client-Java/blob/trunk/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java#L108
and the second time here: https://github.com/EventStore/EventStoreDB-Client-Java/blob/trunk/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java#L113 (this line is in the stacktrace).

Although the ReadSubscription tries to protect against this with an AtomicBoolean so I don't know how this exception can still happen.
There was some exception handling changes here, not sure if they are related: 3fb4701

Forcing JVM exit because Thread[xxxx-xxxx-xxxxx-16,5,main] threw an uncaught exception
io.reactivex.rxjava3.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.eventstore.dbclient.StreamNotFoundException
	at io.reactivex.rxjava3.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:372)
	at io.reactivex.rxjava3.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:96)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher$PublisherSubscriber.onError(ObservableFromPublisher.java:52)
	at com.eventstore.dbclient.ReadSubscription.onError(ReadSubscription.java:39)
	at com.eventstore.dbclient.AbstractRead.lambda$subscribe$1(AbstractRead.java:113)
	at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990)
	at java.base/java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:1008)
	at java.base/java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2364)
	at com.eventstore.dbclient.AbstractRead.subscribe(AbstractRead.java:112)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher.subscribeActual(ObservableFromPublisher.java:32)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.jdk8.ObservableMapOptional.subscribeActual(ObservableMapOptional.java:42)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableOnErrorComplete.subscribeActual(ObservableOnErrorComplete.java:41)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtMaybe.subscribeActual(ObservableElementAtMaybe.java:32)
	at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
	at io.reactivex.rxjava3.internal.operators.maybe.MaybeMap.subscribeActual(MaybeMap.java:41)
	at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
	at io.reactivex.rxjava3.internal.operators.maybe.MaybeToSingle.subscribeActual(MaybeToSingle.java:46)
	at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
	at io.reactivex.rxjava3.internal.operators.single.SingleFlatMap.subscribeActual(SingleFlatMap.java:37)
	at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
	at io.reactivex.rxjava3.internal.operators.single.SingleFlatMapMaybe.subscribeActual(SingleFlatMapMaybe.java:38)
	at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
	at io.reactivex.rxjava3.internal.operators.maybe.MaybeToSingle.subscribeActual(MaybeToSingle.java:46)
	at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
	at io.reactivex.rxjava3.internal.operators.single.SingleResumeNext.subscribeActual(SingleResumeNext.java:39)
	at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
	at io.reactivex.rxjava3.core.Single.blockingGet(Single.java:3644)
	at com.xxx.xxx.XXXXXX.lambda$xxxxxxxxxx$17(XXXXXXXXX.java:260)
	at io.reactivex.rxjava3.internal.operators.single.SingleFromCallable.subscribeActual(SingleFromCallable.java:43)
	at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
	at io.reactivex.rxjava3.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:25)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.eventstore.dbclient.StreamNotFoundException: null
	at com.eventstore.dbclient.AbstractRead$1.onNext(AbstractRead.java:62)
	at com.eventstore.dbclient.AbstractRead$1.onNext(AbstractRead.java:48)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:478)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:660)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:647)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	... 3 common frames omitted

Client: 4.3.0

@YoEight
Copy link
Member

YoEight commented Oct 25, 2023

Hey @dosomder,

Can you share some info on your runtime workload when it happens? Is there a situation where you delete a stream while another part of your code is reading from it?

@dosomder
Copy link
Author

We don't delete a stream in particular but we set maxAge and maxCount.

In this case when the exception happens, we try to read the last event from a stream and expect it to be empty. This is how we read the stream. We handle the StreamNotFoundException in the rxjava chain.

Observable.fromPublisher(
        this.client.readStreamReactive(
            this.getStreamName(),
            ReadStreamOptions.get().fromEnd().backwards().maxCount(1)))
        // Only interested in the actual event
        .filter(ReadMessage::hasEvent)
        .map(ReadMessage::getEvent)
        // Decode event
        .mapOptional(this.translator::decodeEvent)
        // A none existing stream is to be treated as if no events exist
        .onErrorComplete(StreamNotFoundException.class::isInstance);

@YoEight
Copy link
Member

YoEight commented Nov 9, 2023

@dosomder Sorry for taking time to respond. I think you have diagnosed the issue properly. I need to conduct some tests to see when an exception has been raised in a future, was it already processed by the observer of the subscription or if it's the other way around.

@YoEight YoEight added the bug Something isn't working label Nov 9, 2023
@dosomder
Copy link
Author

dosomder commented Dec 13, 2023

@YoEight Meanwhile we are on client 5.x and since then (6 weeks now) we have not seen this exception on our environments.

@YoEight
Copy link
Member

YoEight commented Dec 13, 2023

Unfortunately, I can assure you that the issue is still there, even if it's scarce. I never managed to reproduce it locally but per RX documentation, the way we just the library in this context is wrong.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants