Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription results keep in upstream order #3574

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

bbakerman
Copy link
Member

@bbakerman bbakerman commented Apr 21, 2024

See spring-projects/spring-graphql#949

This PR has a reproduction of the problem reported and also a series fo fixes

However after discussions with the Spring team, its not apparent that the old "stream what completes first" approach is wrong.

So I have added two modes - the older CompletionStageMappingPublisher which will emit objects as soon as they complete and a newer CompletionStageMappingOrderedPublisher that will emit elements in the original upstream Publisher order.

Both have bufferring (for elements that have not completed when they are mapped into graphql objects) but once keeps them in original presentation order while the old one does what it has always done.

I have split the inner Subscriber classes out into their own classes and hence can unit test them better

I have changed the tests considerable. I have tested more edge cases.

I have also made them finish up when onError is called and they cancel any futures in flight.

@bbakerman
Copy link
Member Author

Updated it to have a Mono property thats async

onSubscribe
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=0, name=name0, lastEpisode=episode-boundedElastic-1for0, isFavorite=true}}, dataPresent=true, extensions=null}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=1, name=name1, lastEpisode=episode-boundedElastic-1for1, isFavorite=false}}, dataPresent=true, extensions=null}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=2, name=name2, lastEpisode=episode-boundedElastic-1for2, isFavorite=true}}, dataPresent=true, extensions=null}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=3, name=name3, lastEpisode=episode-boundedElastic-1for3, isFavorite=false}}, dataPresent=true, extensions=null}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=4, name=name4, lastEpisode=episode-boundedElastic-1for4, isFavorite=true}}, dataPresent=true, extensions=null}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=5, name=name5, lastEpisode=episode-boundedElastic-1for5, isFavorite=false}}, dataPresent=true, extensions=null}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=6, name=name6, lastEpisode=episode-boundedElastic-2for6, isFavorite=true}}, dataPresent=true, extensions=null}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=7, name=name7, lastEpisode=episode-boundedElastic-2for7, isFavorite=false}}, dataPresent=true, extensions=null}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=8, name=name8, lastEpisode=episode-boundedElastic-2for8, isFavorite=true}}, dataPresent=true, extensions=null}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=9, name=name9, lastEpisode=episode-boundedElastic-1for9, isFavorite=false}}, dataPresent=true, extensions=null}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=10, name=name10, lastEpisode=episode-boundedElastic-1for10, isFavorite=true}}, dataPresent=true, extensions=null}
complete

@madwind
Copy link

madwind commented Apr 21, 2024

when request one by one

    private CompletableFuture<Flux<Object>> mkFluxDF(DataFetchingEnvironment env) {
        // async deliver of the publisher with random snoozing between values
        return CompletableFuture.supplyAsync(() -> Flux.generate(() -> 0, (counter, sink) -> {
            sink.next(mkValue(counter));
            snooze(rand(10, 100));
            if (counter == 10) {
                sink.complete();
            }
            return counter + 1;
        }).doOnNext(System.out::println));
    }
onSubscribe
{id=0, counter=0}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=0, name=name0, lastEpisode=episode-boundedElastic-1for0, isFavorite=true}}, dataPresent=true, extensions=null}
{id=1, counter=1}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=1, name=name1, lastEpisode=episode-boundedElastic-2for1, isFavorite=false}}, dataPresent=true, extensions=null}
{id=2, counter=2}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=2, name=name2, lastEpisode=episode-boundedElastic-2for2, isFavorite=true}}, dataPresent=true, extensions=null}
{id=3, counter=3}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=3, name=name3, lastEpisode=episode-boundedElastic-2for3, isFavorite=false}}, dataPresent=true, extensions=null}
{id=4, counter=4}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=4, name=name4, lastEpisode=episode-boundedElastic-2for4, isFavorite=true}}, dataPresent=true, extensions=null}
{id=5, counter=5}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=5, name=name5, lastEpisode=episode-boundedElastic-1for5, isFavorite=false}}, dataPresent=true, extensions=null}
{id=6, counter=6}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=6, name=name6, lastEpisode=episode-boundedElastic-1for6, isFavorite=true}}, dataPresent=true, extensions=null}
{id=7, counter=7}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=7, name=name7, lastEpisode=episode-boundedElastic-1for7, isFavorite=false}}, dataPresent=true, extensions=null}
{id=8, counter=8}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=8, name=name8, lastEpisode=episode-boundedElastic-2for8, isFavorite=true}}, dataPresent=true, extensions=null}
{id=9, counter=9}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=9, name=name9, lastEpisode=episode-boundedElastic-2for9, isFavorite=false}}, dataPresent=true, extensions=null}
{id=10, counter=10}
	onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=10, name=name10, lastEpisode=episode-boundedElastic-2for10, isFavorite=true}}, dataPresent=true, extensions=null}
complete

when request 10

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            System.out.println("onSubscribe");
            subscription.request(10);
        }

        @Override
        public void onNext(Object o) {
            System.out.println("\tonNext : " + o);
//            subscription.request(1);
        }

{id=0, counter=0}
{id=1, counter=1}
{id=2, counter=2}
{id=3, counter=3}
{id=4, counter=4}
{id=5, counter=5}
{id=6, counter=6}
onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=0, name=name0, lastEpisode=episode-boundedElastic-1for0, isFavorite=true}}, dataPresent=true, extensions=null}
onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=2, name=name2, lastEpisode=episode-boundedElastic-3for2, isFavorite=true}}, dataPresent=true, extensions=null}
onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=3, name=name3, lastEpisode=episode-boundedElastic-4for3, isFavorite=false}}, dataPresent=true, extensions=null}
{id=7, counter=7}
{id=8, counter=8}
{id=9, counter=9}
onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=1, name=name1, lastEpisode=episode-boundedElastic-2for1, isFavorite=false}}, dataPresent=true, extensions=null}
onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=4, name=name4, lastEpisode=episode-boundedElastic-5for4, isFavorite=true}}, dataPresent=true, extensions=null}
onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=5, name=name5, lastEpisode=episode-boundedElastic-1for5, isFavorite=false}}, dataPresent=true, extensions=null}
onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=6, name=name6, lastEpisode=episode-boundedElastic-2for6, isFavorite=true}}, dataPresent=true, extensions=null}
onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=8, name=name8, lastEpisode=episode-boundedElastic-4for8, isFavorite=true}}, dataPresent=true, extensions=null}
onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=7, name=name7, lastEpisode=episode-boundedElastic-3for7, isFavorite=false}}, dataPresent=true, extensions=null}
onNext : ExecutionResultImpl{errors=[], data={searchVideo={id=9, name=name9, lastEpisode=episode-boundedElastic-5for9, isFavorite=false}}, dataPresent=true, extensions=null}


@bbakerman bbakerman changed the title Reproduction showing async DFs keeping field order Subscription results keep in upstream order Apr 24, 2024
…it out the different publishers and also took out the subscribers
… subscribers now inhrent from each other and outstanding futures are now cancelled if the Publisher fails
* source publisher. But this can be changed to {@link Boolean#TRUE} to keep them in order.
*/
public static final String KEEP_SUBSCRIPTION_EVENTS_ORDERED = "KEEP_SUBSCRIPTION_EVENTS_ORDERED";

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default this is off (old behavior) but it can be opted into

* @param <U> the upstream type to be mapped to
*/
@Internal
public class CompletionStageMappingOrderedPublisher<D, U> extends CompletionStageMappingPublisher<D, U> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it extends CompletionStageMappingPublisher because they have a LOT in common - just the ordering on inflight objects is different

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note also this has been split out of the publisher class

private final Publisher<U> upstreamPublisher;
private final Function<U, CompletionStage<D>> mapper;
protected final Publisher<U> upstreamPublisher;
protected final Function<U, CompletionStage<D>> mapper;
Copy link
Member Author

@bbakerman bbakerman Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now designed to be inherited from its ordered peer

@@ -52,126 +54,4 @@ public Publisher<U> getUpstreamPublisher() {
return upstreamPublisher;
}

@SuppressWarnings("ReactiveStreamsSubscriberImplementation")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved to its own class up in the same package

* @param <D> mapped downstream values
*/
@Internal
public class CompletionStageOrderedSubscriber<U, D> extends CompletionStageSubscriber<U, D> implements Subscriber<U> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It inherents from CompletionStageSubscriber because again they are very much the same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except for ordering of source events

if (throwable instanceof CompletionException & throwable.getCause() != null) {
return throwable.getCause();
}
return throwable;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the onError will be the underling exception not the wrapper CompletionException that comes from CF usage

* @param <D> mapped downstream values
*/
@Internal
public class CompletionStageSubscriber<U, D> implements Subscriber<U> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was split out from the peer Publisher code as its own file. Its complicated enough to deserve that.

protected final Queue<CompletionStage<?>> inFlightDataQ;
protected final LockKit.ReentrantLock lock = new LockKit.ReentrantLock();
protected final AtomicReference<Runnable> onCompleteRun;
protected final AtomicBoolean isTerminal;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Designed to be inheret from by CompletionStageOrderedSubscriber

text
}
}
""").graphQLContext([(SubscriptionExecutionStrategy.KEEP_SUBSCRIPTION_EVENTS_ORDERED): true]).build()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our hint to make them ordered. Here is an example

import java.util.concurrent.CompletionStage
import java.util.function.Function

class CompletionStageMappingOrderedPublisherTest extends Specification {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests for the two classes kinda repeat themselves - but I figuered that even though they inherent from each other - the tests should stress them the same and make no assumptions on how they are implemented

With even more work maybe the tests become common ones use where: but I didnt go that far - copy pasta rather

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And - no I ended up refactoring so that the same tests are run for both implementations - one in order and one not in order


import static graphql.execution.reactive.CompletionStageSubscriberTest.mapperThatDoesNotComplete

class CompletionStageOrderedSubscriberTest extends Specification {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by breaking out the Subscriber classes into their own class - we can unit test them better

@@ -0,0 +1,71 @@
package graphql.execution.reactive.tck;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made a series of new TestNG compliance tests that use the TCK to ensure they are valid publishers

* This reproduction is to see what's happening with Subscriptions and whether they keep their
* order when values are async.
*/
public class SubscriptionReproduction {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a main() reproduction class - not a test - I wanted to find out what was happening and I thought this was a good way to find out. Other code now tests that the fix is in place

…bined the tests so there is less repeated test code
@@ -117,6 +117,7 @@ dependencies {

testImplementation 'org.reactivestreams:reactive-streams-tck:' + reactiveStreamsVersion
testImplementation "io.reactivex.rxjava2:rxjava:2.2.21"
testImplementation "io.projectreactor:reactor-core:3.6.5"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added so we can do Reactor specific tests if we choose

@@ -40,9 +35,16 @@ public CompletionStageMappingPublisher(Publisher<U> upstreamPublisher, Function<

@Override
public void subscribe(Subscriber<? super D> downstreamSubscriber) {
upstreamPublisher.subscribe(new CompletionStageSubscriber(downstreamSubscriber));
assertNotNullWithNPE(downstreamSubscriber, () -> "Subscriber passed to subscribe must not be null");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spec says it MUST be non null


@NotNull
protected Subscriber<? super U> createSubscriber(Subscriber<? super D> downstreamSubscriber) {
return new CompletionStageSubscriber<>(mapper, downstreamSubscriber);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering comes from the Subscriber impl

onCompleteOrError(() -> {
onCompleteOrErrorRunCalled.set(true);
downstreamSubscriber.onError(t);
});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See in the old impl we waited until ALL inflights finished before we called onError - well now we dont.

We fail fast after discussing it with the Spring folks

CompletableFuture<?> cf = cs.toCompletableFuture();
if (cf.isDone()) {
// take it off the queue
inFlightDataQ.poll();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a lock so we can rely on it not changing here on the second mutative read

downstreamSubscriber.onError(t);
cancelInFlightFutures();
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now fails fast

if (isTerminal.compareAndSet(false, true)) {
downstreamSubscriber.onComplete();
}
});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waits for all inflight CFs to complete

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants