Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concat_map implementation or equivalent? #581

Open
iyz91 opened this issue Aug 30, 2021 · 2 comments
Open

concat_map implementation or equivalent? #581

iyz91 opened this issue Aug 30, 2021 · 2 comments

Comments

@iyz91
Copy link

iyz91 commented Aug 30, 2021

Coming from RxJS it was really convenient having the concatMap operator to have a pipeline of dynamic observables that wait for the previous to complete. I noticed that RxPY does not have an obvious equivalent (closest is flat_map but I can't have interleaving).

From what relevant docs/issues I could find, it seems that the following would achieve the same functionality but feels dirty. The of(...)'s are a stand-in for some code that would dynamically generate a new observable depending on the result from the previous.

of(1).pipe(
  map(lambda x: of(x + 1)),    # Dynamically generate observable
  merge(max_concurrent=1),     # Merge and wait to finish
  map(lambda x: of(x + 1)),    # Dynamically generate observable
  merge(max_concurrent=1)      # Merge and wait to finish
)

Is there a better way to achieve this currently (or in general a more "reactive" way)?
Is there any plan to implement a concat_map operator/alias for this functionality?

@iyz91
Copy link
Author

iyz91 commented Aug 31, 2021

Just as a reference for anyone needing the same, I implemented it myself as follows:

# Standard Imports
from typing import Callable

# Third Party Imports
from rx.core import Observable
import rx.operators as ops
from rx.core.typing import Mapper


def concat_map(mapper: Mapper) -> Callable[[Observable], Observable]:
  def _concat_map(source: Observable) -> Observable:
    return source.pipe(
      ops.map(mapper),
      ops.merge(max_concurrent=1)
    )
  return _concat_map

Now you can use it easily, e.g.:

source.pipe(
  concat_map(observable_factory),
  concat_map(another_factory),
  # ...
)

If anybody sees any issues with the above please kindly comment and let me know.

@skewty
Copy link

skewty commented Nov 16, 2023

This shows merged.. Is there a reason this issue was left open?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants