Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Suggestion] Create an operator that merges multiple ordered flux's into a single flow with optional fields for flux's with gaps in there keys #3645

Open
vibbix opened this issue Nov 21, 2023 · 6 comments
Labels
for/team-attention This issue needs team attention or action status/need-design This needs more in depth design work type/enhancement A general enhancement

Comments

@vibbix
Copy link

vibbix commented Nov 21, 2023

Combine a Flux.zip-akin operator with a key-selecting variant of Flux.mergeComparing for publishers that should be merged based on keys, for both finite and unbounded sources, of any combination in length.

Motivation

I use Reactor everyday in my data pipeline work, to pretty great success. The lazy operators are amazing at handling complex merge operations across many distinct sources. One of the things I run into however is the case when I am trying to fan-in multiple sources of data that have different lengths. and mismatched (but ordered) keys.

Example use-case

An example of this would be merging in 4 different JSON arrays, where a "match-key" would be missing from some of the sets, or that some of the sets have totally different lengths, and would short circuit early.

I have used Flux.groupBy in the past, but that doesn't work in a unbounded Flux case
I tend to create a custom interleave for these situations, but a generic solution would be incredibly helpful.

Desired solution

An example signature for this kind of operator that I have experimented with:

    /**
     * This operator merges 4 different flux's together into a single flux based on matching keys.
     * In the case of a source either not having a matched key, or ending early, an empty optional is returned.
     * The Flux's do not have to be the same length, and may have different(but ordered) keys
     * <br>
     * Each source is read until their end.
     * It's assumed that all the sources are already ordered, and that K is comparable
     * @param <K> the key type; Required to be comparable. The smallest value is picked to combine
     * @param <T1> type of the value from source1
     * @param <T2> type of the value from source2
     * @param <T3> type of the value from source3
     * @param <T4> type of the value from source4
     * @param source1 The first Publisher source to combine values from
     * @param source2 The second Publisher source to combine values from
     * @param source3 The third Publisher source to combine values from
     * @param source4 The forth Publisher source to combine values from
     * @param prefetch the minimum size of the internal queue per flux
     * @return a flux based on the produced combinations
     */
    public static <K extends Comparable<? super K>, T1, T2, T3, T4>
    Flux<Tuple5<K, Optional<T1>, Optional<T2>, Optional<T3>, Optional<T4>>>
    zipOnKeyOptional(Flux<? extends Map.Entry<K,T1>> source1,
                     Flux<? extends Map.Entry<K,T2>> source2,
                     Flux<? extends Map.Entry<K,T3>> source3,
                     Flux<? extends Map.Entry<K,T4>> source4, int prefetch);

Desired output

---
title: s
---
stateDiagram-v2    
    sourceOne --> Combiner
    sourceTwo --> Combiner
    sourceThree --> Combiner
    sourceFour --> Combiner


    state sourceOne {
        s11: (1,1)
        s12: (2,2)
        s13: (3,3)
        s14: (4,4)
        s15: (5,5)

        [*] --> s11
        s11 --> s12
        s12 --> s13
        s13 --> s14
        s14 --> s15
        s15 --> [*]
    }

    state sourceTwo {
        [*] --> [*]
    }

    state sourceThree {
        s31: (1,1)
        s32: (2,2)
        s33: (4,4)

        [*] --> s31
        s31 --> s32
        s32 --> s33 
        s33 --> [*]
    }

    state sourceFour {
        s41: (1,1)
        s42: (3,3)
        s43: (4,4)

        [*] --> s41
        s41 --> s42
        s42 --> s43 
        s43 --> [*]
    }


    state Combiner {
        sc1: 1 [1, null, 1,    1]
        sc2: 2 [2, null, 2,    null]  
        sc3: 3 [3, null, null, 3]
        sc4: 4 [4, null, 4,    4]
        sc5: 5 [5, null, null, null]
        [*] --> sc1
        sc1 --> sc2
        sc2 --> sc3
        sc3 --> sc4
        sc4 --> sc5
        sc5 --> [*]
    }

Test Case

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple5;
import reactor.util.function.Tuples;

import java.util.Map;
import java.util.Optional;

import static java.util.Map.entry;
import static java.util.Optional.of;
import static java.util.Optional.empty;

public class ReactiveUtilsTest {
    @Test
    void testZipOnKeyOptional() {
        Flux<Map.Entry<Integer, Integer>> fluxOne = Flux.range(1,5).map(i -> entry(i,i));
        Flux<Map.Entry<Integer, Integer>> fluxTwo = Flux.empty();
        Flux<Map.Entry<Integer, Integer>> fluxThree = Flux.just(entry(1,1), entry(2,2), entry(4,4));
        Flux<Map.Entry<Integer, Integer>> fluxFour = Flux.just(entry(1,1), entry(3,3), entry(4,4));

        Flux<Tuple5<Integer, Optional<Integer>, Optional<Integer>, Optional<Integer>, Optional<Integer>>>
                actual = Flux.zipOnKeyOptional(fluxOne, fluxTwo, fluxThree, fluxFour, 4);
        StepVerifier.create(actual)
                .expectNext(Tuples.of(1, of(1), empty(), of(1), of(1)))
                .expectNext(Tuples.of(2, of(2), empty(), of(2), empty()))
                .expectNext(Tuples.of(3, of(3), empty(), empty(), of(3)))
                .expectNext(Tuples.of(4, of(4), empty(), of(4), of(4)))
                .expectNext(Tuples.of(5, of(5), empty(), empty(), empty()))
                .verifyComplete();
    }
}

Considered alternatives

  • Flux.groupBy doesn't work in unbounded / infinite publisher situations.
  • groupedFlux's also can't be joined in a structured-concurrency kind of way, like Mono.zip
  • I typically implement these functions by having:
    1. Having each flux be mapped to a marker interface that allows me to apply them to a POJO builder
    2. Merging the Flux's that now are cast to the marker interface with an operator like Flux.mergeComparingDelayError(...)
    3. Use Flux.windowUntilChanged to group the entities
    4. flatMap with reduceWith accumulator to build the tuple out
    5. the mapped object is aligned key wise, and has sane default Optional.empty() for unmatched fields
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Nov 21, 2023
@vibbix vibbix changed the title [Suggestion] Create an operator that merges multiple ordered flux's into a TupleN<>with optional fields for mismatched tokens [Suggestion] Create an operator that merges multiple ordered flux's into a single flow with optional fields for flux's with gaps in there keys Nov 21, 2023
@OlegDokuka OlegDokuka added type/enhancement A general enhancement and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Nov 22, 2023
@OlegDokuka
Copy link
Contributor

Hi, @vibbix!

Thanks for sharing your interesting use case!
Am I understanding correctly that you need a zip version which will be zipping until the longest source is done, while the other sources which has ended should return fallback or null value?

Cheers,
Oleh

@OlegDokuka OlegDokuka added this to the 3.6.1 milestone Nov 22, 2023
@chemicL
Copy link
Member

chemicL commented Nov 22, 2023

Wondering whether the existing API ideas can be used to achieve this result with the combinator variants, e.g.

zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)

when made configurable to wait for the last active source instead of terminating upon the first finishing one.
The combinator would need to probably return a structure that contains both the output tuple and a decision object that says for particular source whether the value was consumed or should be reused for another zipping round.

The notion of zipping on key is quite limiting potentially.

@vibbix
Copy link
Author

vibbix commented Nov 22, 2023

@OlegDokuka Having a Flux.zipDelayComplete Being able to zip data sources with default fallback values for mismatched lengths would be very useful too. I tend to hack this together by using along the line of Flux.concat(source1, Flux.create(...)) that produces Map.Entry<Integer,Optional<T>>(Integer.MAX_VALUE,Optional.empty()) until the parent subscription cancels. It involves a lot of unboxing, but I built some static helpers to make it easier.

@chemicL

The combinator would need to probably return a structure that contains both the output tuple and a decision object that says for particular source whether the value was consumed or should be reused for another zipping round.

This would be great, and if there was something like this where I can request that the output tuple "replenish" the producer slot that I consumed, I would build these sort of functions on top of that.

in Considered Alternatives I built out this functionality today using Flux.mergeOrderComparing + flux.WindowUntilChanged with generic Map.Entry's & Marker interfaces, but we lose some type-safety and the concept of which "slot"/source publisher the result comes from.

In my case it's more about keeping a row-level/horizontal data structure in-tact. Making this a built-in Reactor operator would ensure the entire workflow is type-safe, that each source publisher is having it's backpressure dealt with correctly, and that buffer bloat is minimized.

The notion of zipping on key is quite limiting potentially.

It could be a great shortcut for this common use case. I tend to have different types in each zip'd source publisher, and even cases where I read different keys from the same object(although this is certainly a more unusual case). I have been workshopping different method signatures for a couple months, and this is the closest I got to a clean signature for a external implementation. Otherwise, each source flux would need a corresponding Function<? extends T1, ? extends K> key combinator.

@chemicL chemicL added the status/need-design This needs more in depth design work label Nov 23, 2023
@chemicL chemicL removed this from the 3.6.1 milestone Nov 23, 2023
@chemicL
Copy link
Member

chemicL commented Nov 23, 2023

@vibbix would you be so kind to provide some more test cases with some corner cases to help us better understand and consider a possible design? For one, I'm wondering if the keys can appear more than once.

If at all you'd be willing to provide the code for the implementation that works so far that would also be beneficial.

@chemicL chemicL added the status/need-user-input This needs user input to proceed label Nov 24, 2023
@vibbix
Copy link
Author

vibbix commented Nov 28, 2023

@chemicL I created a example of what I tend to use now here: vibbix/rx-experiments. The attached README.md has a description of my thought process in the design as well. I have been working on some examples on what a coordinator structure could look like to handle the incoming values as well.

For one, I'm wondering if the keys can appear more than once.

In my design, I assume that any publisher that has multiple of the same keys incoming have to be grouped prior.

@chemicL chemicL added for/team-attention This issue needs team attention or action and removed status/need-user-input This needs user input to proceed labels Dec 7, 2023
@chemicL
Copy link
Member

chemicL commented Dec 7, 2023

@vibbix thank you. We appreciate your input. We are in the planning process currently and will get back when we have some priorities. Just to get a sense of work involved - are you interested in contributing something once we settle on design or would you expect the team or community to provide an implementation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/team-attention This issue needs team attention or action status/need-design This needs more in depth design work type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

4 participants