Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One sequential processor - multiple streams scenario #553

Open
sergrt opened this issue Feb 8, 2021 · 0 comments
Open

One sequential processor - multiple streams scenario #553

sergrt opened this issue Feb 8, 2021 · 0 comments

Comments

@sergrt
Copy link

sergrt commented Feb 8, 2021

Hello.

Is there any way to design following pattern wit RxCpp without using locks? The scenario is:

  1. Component A creates rxcpp subject, starts new OS thread and produce values stream, by calling
    sentences_stream_.get_subscriber().on_next(data);
  2. Component B (created on the same thread, as component A) subscribes on the subject of component A (sentences_stream_). As soon as data is being produced by another thread, this data is being processed on that thread
  3. Component C wants to supply component B with additional data of the same type as component A do. The trick is, that
    values from component A and component C should be processed subsequently, without overlapping in time. Imagine, for example, that component B writes data char by char to the console. So to prevent mess, data from A and C should be processed either in one thread, or have some sort of scheduler.
    I found solution like this - store observables in the container (vector) and iterate with merge. So values from A and C are processed in one new thread:
rxcpp::observable<>::iterate(observables_).merge(rxcpp::observe_on_new_thread()).subscribe(
// ...
);

But in this case I am missing another important feature - component B cannot unsubscribe from component C. And by some observations - it is not the way it should be done, because storing observables in a vector cannot solve problem of unsubscribing - it seems that it is not possible to erase particular observable based on comparison (at least in my case calling "find" with the observable, supplied earlier, returns nothing).

So is it possible to make all this "one sequential processor - multiple streams" pattern without using locks?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant