Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CompletionStageMappingPublisher is swallowing Jackson serialization errors #3326

Open
grodriguezatlassian opened this issue Sep 7, 2023 · 3 comments
Labels
keep-open Tells Stale Bot to keep PRs and issues open

Comments

@grodriguezatlassian
Copy link

grodriguezatlassian commented Sep 7, 2023

Describe the bug
We encountered a bug that prevented submitting subscription data when the data could not be correctly serialized. This error was logged when we increased logging in another package. Upon stepping through the code, we found that the CompletionStageMappingPublisher only captures RuntimeExceptions, but more types of exceptions should be caught and handled.

To Reproduce
This package is a test application that contains a demo subscriber that generates exceptions to websockets.

subscription-test (1).zip

Here's the TestFetcher from that package. It creates exceptions by submitting an unsupported date format. This demonstrates the issue, which is linked to an older version of graphql-java but is present in the last version as well.

package subscriptiontest.datafetcher;

import com.netflix.graphql.dgs.DgsComponent;
import com.netflix.graphql.dgs.DgsQuery;
import com.netflix.graphql.dgs.DgsSubscription;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.observables.ConnectableObservable;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Flow;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@DgsComponent
@Slf4j
@Component
public class TestDataFetcher implements ObservableOnSubscribe<GQLData> {

    private final Flowable<GQLData> publisher;
    private ObservableEmitter<GQLData> emitter;

    public TestDataFetcher() {
        final Observable<GQLData> gqlDataObservable = Observable.create(this);
        final ConnectableObservable<GQLData> connectableObservable = gqlDataObservable.share().publish();
        connectableObservable.connect();
        publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
    }

    @Override
    public void subscribe(@NotNull ObservableEmitter<GQLData> observableEmitter) {
        emitter = observableEmitter;
    }

    @DgsQuery
    public GQLData gqldata() {
        GQLData gqlData = new GQLData("Here's data", new HashMap<>());
        Map fields = gqlData.getFields();
        fields.put("date", OffsetDateTime.now());
        return gqlData;
    }

    @DgsSubscription
    public Publisher<GQLData> gqlDataSub() {
        return publisher
                .map(filter -> {
                    log.info("Publishing new data");
                    GQLData gqlData = new GQLData("Here's subscription data", new HashMap<>());
                    Map fields = gqlData.getFields();
                    fields.put("date", OffsetDateTime.now());
                    return gqlData;
                })
                .doOnTerminate(() -> log.info("doOnTerminate"))
                .doOnComplete(() -> log.info("doOnComplete"))
                .doAfterTerminate(() -> log.info("doAfterTerminate"))
                .doOnError(e -> log.error("doOnError {}", e.getMessage(), e))
                .doFinally(() -> log.info("doFinally"))
                .doOnCancel(() -> log.info("doOnCancel"));
    }

    @Scheduled(fixedRate = 15 * 1000)
    public void emitSubData() {
        log.info("Emitting data");
        if (emitter != null) {
            emitter.onNext(GQLData.builder().build());
        }
    }

}

Here's what we think the problematic code in CompletionStageMappingPublisher is

        public void onNext(U u) {
            if (!this.onCompleteOrErrorRunCalled.get()) {
                try {
                    CompletionStage<D> completionStage = (CompletionStage)CompletionStageMappingPublisher.this.mapper.apply(u);
                    this.offerToInFlightQ(completionStage);
                    completionStage.whenComplete(this.whenNextFinished(completionStage));
               //Jackson Exceptions can be IOException or RuntimeException and these do not overlap
                } catch (RuntimeException var3) {
                    this.handleThrowable(var3);
                }

            }
        }

Here is the exception we encountered, which is of parent type IOException. InvalidDefinitionException

Here's is the stacktrack example to show how graphql-java and dgs-graphql-websockets interact.

_writeValueAndClose:4721, ObjectMapper (com.fasterxml.jackson.databind)
writeValueAsBytes:3987, ObjectMapper (com.fasterxml.jackson.databind)
onNext:180, WebsocketGraphQLTransportWSProtocolHandler$handleSubscription$1 (com.netflix.graphql.dgs.subscriptions.websockets)
onNext:167, WebsocketGraphQLTransportWSProtocolHandler$handleSubscription$1 (com.netflix.graphql.dgs.subscriptions.websockets)
lambda$whenNextFinished$0:97, CompletionStageMappingPublisher$CompletionStageSubscriber (graphql.execution.reactive)
accept:-1, CompletionStageMappingPublisher$CompletionStageSubscriber$$Lambda$6240/0x0000000801d0d608 (graphql.execution.reactive)
uniWhenComplete:863, CompletableFuture (java.util.concurrent)
uniWhenCompleteStage:887, CompletableFuture (java.util.concurrent)
whenComplete:2325, CompletableFuture (java.util.concurrent)
whenComplete:144, CompletableFuture (java.util.concurrent)
onNext:85, CompletionStageMappingPublisher$CompletionStageSubscriber (graphql.execution.reactive)
onNext:46, HalfSerializer (io.reactivex.rxjava3.internal.util)
onNext:97, StrictSubscriber (io.reactivex.rxjava3.internal.subscribers)
onNext:69, FlowableMap$MapSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
onNext:80, FlowableDoOnLifecycle$SubscriptionLambdaSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
onNext:80, FlowableDoOnLifecycle$SubscriptionLambdaSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
tryOnNext:75, FlowableFilter$FilterSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
onNext:53, FlowableFilter$FilterSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
drain:186, FlowableOnBackpressureBuffer$BackpressureBufferSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
onNext:111, FlowableOnBackpressureBuffer$BackpressureBufferSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
onNext:56, FlowableFromObservable$SubscriberObserver (io.reactivex.rxjava3.internal.operators.flowable)
onNext:180, ObservablePublish$PublishConnection (io.reactivex.rxjava3.internal.operators.observable)
onNext:200, ObservableRefCount$RefCountObserver (io.reactivex.rxjava3.internal.operators.observable)
onNext:180, ObservablePublish$PublishConnection (io.reactivex.rxjava3.internal.operators.observable)
onNext:101, ObservableDoOnEach$DoOnEachObserver (io.reactivex.rxjava3.internal.operators.observable)
onNext:67, ObservableCreate$CreateEmitter (io.reactivex.rxjava3.internal.operators.observable)
onNext:171, ObservableCreate$SerializedEmitter (io.reactivex.rxjava3.internal.operators.observable)
notifySubscribers:220, QueueSubscriptionService (com.service.messaging)
Copy link

Hello, this issue has been inactive for 60 days, so we're marking it as stale. If you would like to continue this discussion, please comment within the next 30 days or we'll close the issue.

@github-actions github-actions bot added the Stale label Nov 22, 2023
Copy link

Hello, as this issue has been inactive for 90 days, we're closing the issue. If you would like to resume the discussion, please create a new issue.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Dec 23, 2023
@dondonz
Copy link
Member

dondonz commented Dec 31, 2023

Reopening because it's worth investigating

@dondonz dondonz reopened this Dec 31, 2023
@dondonz dondonz added keep-open Tells Stale Bot to keep PRs and issues open and removed Stale labels Dec 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
keep-open Tells Stale Bot to keep PRs and issues open
Projects
None yet
Development

No branches or pull requests

3 participants
@dondonz @grodriguezatlassian and others