Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing logical OR between multiple operations #571

Open
JavierBejMen opened this issue Jan 27, 2022 · 2 comments
Open

Implementing logical OR between multiple operations #571

JavierBejMen opened this issue Jan 27, 2022 · 2 comments

Comments

@JavierBejMen
Copy link

JavierBejMen commented Jan 27, 2022

Hi all! We are currently developing an stream processing program using RxCpp, but we are having issues on how to correctly approach the following situation.

Let's say we have multiple operations, where each operation can succeed or fail. We want to lift a pipeline of observables that sequentially try each operation while they fail, returning as soon as one operation succeed.

The main questions is:

  • What would be the correct way of implementing a bifurcation, depending on current operation result?

One possible implementation we come across is the following:

We define the OperationResult class, just a wrapper over the event being processed and a boolean indicating if the operation succeeded or failed:

template <class T> class OperationResult{
private:
    T m_event;
    bool m_success;

public:
    OperationResult(bool succeded, T event) : m_success{succeded}, m_event{event}{}

    bool success() const{
        return this->m_success;
    }

    T event() const{
        return this->m_event;
    }
};

And the Operation class, that set up the rxcpp pipeline, where each operation exposes 2 observables, one that emits successfully processed items and one that emits the failed ones.

template <class T> class Operation
{
    using function_t = std::function<OperationResult<T>(OperationResult<T>)>;
    using observable_t = rxcpp::observable<OperationResult<T>>;

private:
    std::string m_name;
    function_t m_fn;
    bool m_inputConnected;

public:
    Operation(const std::string & name, function_t fn) : m_name{name}, m_fn{fn}, m_inputConnected{false}{}
    
    // This function sets up the rxcpp pipeline, implementing the bifurcation depending on OperationResult
    // in a way that no new observables/subscriptions are performed
    std::pair<observable_t, observable_t> connect(const observable_t & input){
        if (this->m_inputConnected){
            throw std::runtime_error("Error, operation " + this->m_name + " is already connected");
        }else{
            auto result = input.map(this->m_fn).publish();
            auto failure = result.filter([](OperationResult<T> result) { return !result.success(); });
            auto success = result.filter([](OperationResult<T> result) { return result.success(); });
            result.connect();

            this->m_inputConnected = true;
            return std::pair(failure, success);
        }
    }
};

This allows for setting up a logical OR between multiple Operations like this:

// Operations
Operation<int> op0("is pair",
                    [](OperationResult<int> res)
                    {
                        cout << "pair got " << res.event() << endl;
                        return OperationResult<int>(res.event() % 2 == 0, res.event());
                    });

Operation<int> op1("is greater than 3",

                    [](OperationResult<int> res)
                    {
                        cout << "greater than 3 got " << res.event() << endl;
                        return OperationResult<int>(res.event() > 3, res.event());
                    });

Operation<int> op2("equals 1",
                    [](OperationResult<int> res)
                    {
                        cout << "equals 1 got " << res.event() << endl;
                        return OperationResult<int>(res.event() == 1, res.event());
                    });

// Input to the observable chain
auto input_sbj = subjects::subject<OperationResult<int>>();
auto input = input_sbj.get_observable();

// Subscriber to be called if one of the 3 operation succeeded
auto success_subscriber = make_subscriber<OperationResult<int>>([](OperationResult<int> res)
                                                                { cout << "[Success] got " << res.event() << endl; });

// Subscriber to be called if all operations failed
auto error_subscriber = make_subscriber<OperationResult<int>>([](OperationResult<int> res)
                                                                { cout << "[Error] got " << res.event() << endl; });

// Logical OR
auto outs = op0.connect(input);
outs.second.subscribe(success_subscriber);

outs = op1.connect(outs.first);
outs.second.subscribe(success_subscriber);

outs = op2.connect(outs.first);
outs.second.subscribe(success_subscriber);

outs.first.subscribe(error_subscriber);

// Emit input
auto s = input_sbj.get_subscriber();
s.on_next(OperationResult(false, 0)); // Success
s.on_next(OperationResult(false, 1)); // Success
s.on_next(OperationResult(false, 3)); // Error
s.on_next(OperationResult(false, 5)); // Success
s.on_completed();

Output:

pair got 0
[Success] got 0
pair got 1
greater than 3 got 1
equals 1 got 1
[Success] got 1
pair got 3
greater than 3 got 3
equals 1 got 3
[Error] got 3
pair got 5
greater than 3 got 5
[Success] got 5

Any help or insight you can give me is much appreciated!

@JavierBejMen JavierBejMen changed the title Implementing logical OR operation between multiple operations Implementing logical OR between multiple operations Jan 27, 2022
@victimsnino
Copy link
Collaborator

What about most straightforward way? when you pass all comparisons to one lambda or something like this? like

observable.filter([](int v){return v % 2 == 0 || v > 3 || v ==1;})

another way I can imagine is use amb operator, but you will need to create it for each obtained value and immediately execute. Like...

observable.flat_map([](int v)
{
return rxcpp::observable<>::just(v)
   .filter([](int v){return v % 2 ==0;})
   .amb(rxcpp::observable<>::just(v).filter([](int v){return v > 3;}), 
        rxcpp::observable<>::just(v).filter([](int v){return v ==  1;}))
});

but not sure if it is really useful (due to every time for each value you will create new observable, configure and immediately run)

@kirkshoop
Copy link
Member

Let's say we have multiple operations, where each operation can succeed or fail. We want to lift a pipeline of observables that sequentially try each operation while they fail, returning as soon as one operation succeed.

That sounds like

Amb(op0.retry(), op1.retry(), ..)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants