Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: A new way of "piping" #7203

Open
benlesh opened this issue Mar 1, 2023 · 36 comments
Open

RFC: A new way of "piping" #7203

benlesh opened this issue Mar 1, 2023 · 36 comments
Labels
8.x Issues and PRs for version 8.x 9.x AGENDA ITEM Flagged for discussion at core team meetings

Comments

@benlesh
Copy link
Member

benlesh commented Mar 1, 2023

Given the move away from lift: #7201 #7202

We can make our observable even more compatible with other observables by migrating us away from the pipe method. This is not something that would happen immediately, rather over a very, very long period. The idea is to increase compatibility and improve ergonomics.

Problem: It's annoying to have to from(x) types in order to use our operators with them

There's a common issue where people want to convert other things to observables in order to use our operators. Whether it's an array or a promise, there are times were you need to do from and the only reason you're doing it is because you want that .pipe method, and you want an observable to operate on.

Problem: pipe is a method

  1. It's not tree shakable
  2. It's unlikely to exist on "other observables".

The second one is more of a problem, honestly. If people implement code with the expectation that pipe exists as a method on whatever observable they get, then even while we've made sure our operators will work with any "same-shaped" observable, that custom code will break, because it's expecting pipe to be a method.

Proposal

A new rx method that accepts an ObservableInput as the first argument, and a rest of operators:

export function rx<A, B>(source: ObservableInput<A>, a: OperatorFunction<A, B>): Observable<B>;
export function rx<A, B, C>(source: ObservableInput<A>, a: OperatorFunction<A, B>, b: OperatorFunction<B, C>): Observable<C>;
export function rx<A, B, C, D>(source: ObservableInput<A>, a: OperatorFunction<A, B>, b: OperatorFunction<B, C>, c: OperatorFunction<C, D>): Observable<D>;
// ... and so on
export function rx<T>(source: ObservableInput<T>, ...operators: OperatorFunction<T, any>[]): Observable<unknown> {
  returm pipeArray(operators)(from(source))
}

(Technically, rx is just a standard functional pipe with some specific types, it could be the same pipe method we expose under the hood)

The usage would be as such:

import { rx, map, delay } from 'rxjs'


rx(source$, map(n => n + n)).subscribe(console.log)

// or

rx(somePromise, delay(1000)).subscribe(console.log)

// or

rx(someAsyncIterable, map(n => n + n)).subscribe(console.log)

// ...and so on.

Bonus? from could migrate to just be rx as well

For a long time people have complained about from being a horrible name for an exported function. (Although I do like import { from } from './from' in our codebase 😄). This would mean that couple could technically just do: rx(somePromise) and it's the same as from(somePromise).

@benlesh benlesh added AGENDA ITEM Flagged for discussion at core team meetings 8.x Issues and PRs for version 8.x 9.x labels Mar 1, 2023
@eduardoRoth
Copy link

I like the approach!

@TheLarkInn
Copy link

It's not tree shakable

💯

@UserGalileo
Copy link

Seems great!

Is "rx" the final proposed name? I know it'd be a breaking change, but what if...

import { pipe, flow } from "rxjs";

// Pipes a value into operators
const derived$ = pipe(source$, ...operators)

// Or any other name really. This is what the standalone "pipe" is at the moment.
const customOperator = flow(...operators)

Would it be too bad? 😬 Mentally, .pipe() works on an Observable, I've always found it strange that the standalone version does not!

@timdp
Copy link
Contributor

timdp commented Mar 1, 2023

  • Why not call it pipe?
  • if this is long-term, won't the pipeline operator materialize first?

@demensky
Copy link
Contributor

demensky commented Mar 1, 2023

pipe already exists.

@timdp
Copy link
Contributor

timdp commented Mar 1, 2023

pipeline a la Node.js then?

@jeremymwells
Copy link
Contributor

Not sure how you'd use rx without a pipe concept to be able to tree shake it, but I think this is a great way to solve interop; I like the ergonomics. rx() also has the benefit of wrapping the behavior in something unambiguous.

@BaronVonPerko
Copy link

I like it. Ship it :shipit:

@BigAB
Copy link

BigAB commented Mar 1, 2023

I know people would probably hate it, but have you considered a function that returns a function that would pipe the operator arguments and call the result with the observable passed.

rx(source$)(...operators)

@demensky
Copy link
Contributor

demensky commented Mar 1, 2023

One option is to simply make a function with 2 arguments (the second is an array):

rx(fromEvent(document.body, "mousemove"), [
  debounceTime(2),
  map(({ clientX }) => clientX),
  take(2),
  takeUntil(destroy$),
]);

Formatted by Prettier.

@benlesh
Copy link
Member Author

benlesh commented Mar 2, 2023

I know people would probably hate it, but have you considered a function that returns a function that would pipe the operator arguments and call the result with the observable passed.

@BigAB why do you like this shape? Is there a use case?

@benlesh
Copy link
Member Author

benlesh commented Mar 2, 2023

Why not call it pipe?

@timdp Because it's not really just a pipe. It's applying from to the first argument.

Technically it's pipe(source, from, map(fn), ...)

@timdp
Copy link
Contributor

timdp commented Mar 2, 2023

@timdp Because it's not really just a pipe. It's applying from to the first argument.

Technically it's pipe(source, from, map(fn), ...)

Personally, I don't think that makes a huge difference, but I do see your point.

But if pipe is too ambiguous a name, then the same definitely applies to rx.

How about pipeFrom?

@ducin
Copy link

ducin commented Mar 2, 2023

@benlesh

  1. how about composition?
const step2$ = rx(step1$, map(n => n + n))
const step3$ = rx(step2$, filter(n => n > 0))
step3$.subscribe(console.log)

would the above be the way to "wrap" an observable from the outside?

  1. how about creating custom operators? We can create them using new Observable(subscriber => { ... }) and (I think) nothing changes here. However, it's also possible to use pipe directly, e.g.:
import { pipe } from "rxjs";
import { tap } from "rxjs/operators";

function logWithTag(tag: string) {
  return pipe(
    tap(v => console.log(`logWithTag(${tag}): ${v}`)),
    // put whatever operators you need here
  );
}

// and then

source$.pipe(logWithTag('my-tag')).subscribe(whatever)

with pipe getting deprecated and eventually replaced by rx(source$, ...operators).subscribe(subscriber), wouldn't be the pipe-based composition become out of reach? We'd have to move it to:

function withLog(source: ObservableInput<A>, tag: string) {
  return rx(source,
    tap(v => console.log(`logWithTag(${tag}): ${v}`)),
    // put whatever operators you need here
  );
}

which is doable, but moves away from classical functional composition, I'm afraid. What's your take on this?

@LcsGa
Copy link

LcsGa commented Mar 2, 2023

I don't really like the syntax since it wouldn't be compatible with the pipeline operator (which would be a big benefit for RxJS) right?

Here is an example (in which you are even mentionned haha)

@voliva
Copy link
Contributor

voliva commented Mar 2, 2023

@LcsGa currently the TC39 pipeline operator is not the one as in the example.

The operators in RxJS would be usable in the F# pipe operator proposal, where every operator is a function that takes one argument and returns one value, which is passed to the next one.

TC39 went with the hack syntax, where you need to explicitely pass in a "placeholder" which says where the value is passed onto. So with TC39 pipes and current operators, it would look like:

interval(100)
  |> take(5)(%)
  |> map(v => v + 3)(%)

More about this discussion here #7194

@LcsGa
Copy link

LcsGa commented Mar 2, 2023

@voliva Thanks for the information, I wasn't aware of this!

@BigAB
Copy link

BigAB commented Mar 2, 2023

@BigAB why do you like this shape? Is there a use case?

I like a clear separation between the subject of the call and the composition of functions.

Using an array as the second arg is pretty good and achieves the separation I want, I just sort of "didn't like it" 🤷, but it's not bad :

const obs$ = rx(source$, [
  debounceTime(2),
  map(({ clientX }) => clientX),
  take(2),
  takeUntil(destroy$),
])

pipe(...operators)(source$) is super great, because it doesn't redefine a well known pipe definition, but it sort of "distracts" having the source last, and the whole composing a new observable from source and pipe is so common in RxJS code, it feels like it should go first.

With it first it sort of looks like the F# pipe operator shape...

const obs$ = rx(source$)(
  debounceTime(2),
  map(({ clientX }) => clientX),
  take(2),
  takeUntil(destroy$),
)

I actually really like the idea of calling it pipeFrom or pipeThrough too:

const obs$ = pipeFrom(source$)(
  debounceTime(2),
  map(({ clientX }) => clientX),
  take(2),
  takeUntil(destroy$),
)

EDIT:

If it was pipeThrough and it was implemented something like

export const pipe = (...fns) => (input) => fns.reduce((r, fn) => fn(r), input);
export const pipeThrough = (source) => (...operators) => pipe(...operators)(source);

...you could use it on non-observables too 🤷 (and just check for Symbol.observable before doing anything Rx specific like lift stuff)

 const result = pipeThrough(3)(
  (n) => n*2,
  (n) => n + 1,
  (x) => `WHOA ${x}!`
)

// result === "Whoa 7!"

@BigAB
Copy link

BigAB commented Mar 2, 2023

I don't know if I missed the "Bonus" section the first time I read it but, after reading it, yeah getting all the from() functionality in rx() makes me think that

rx(source$, [
  debounceTime(2),
  map(({ clientX }) => clientX),
  take(2),
  takeUntil(destroy$),
])

...is the better choice, because you get the great "turn anything into an Rx observable" from rx(whatever), but then you also separate the function composition effectively

@benlesh
Copy link
Member Author

benlesh commented Mar 8, 2023

CORE TEAM:

  • The name needs to be bikeshedded
  • why not just direct people to use pipe(from(x), ...operators)?
  • maybe have pipe(first, ...operators) check the type of first and use from if it's not a function?

There's value here, we want to do this.

@demensky
Copy link
Contributor

demensky commented Mar 9, 2023

@benlesh Can we use from for this?

from(source$, [
  debounceTime(2),
  map(({ clientX }) => clientX),
  take(2),
  takeUntil(destroy$),
])

@jeremymwells
Copy link
Contributor

@demensky - do you think that's stretching intention of from a little too far, or that it could create cognitive dissonance in users. It's def tidy, which is nice, but pipe(first, ...operators) is tidy too.

@kwonoj
Copy link
Member

kwonoj commented Mar 9, 2023

Extending from is also briefly discussed, but consensus was extending existing operator would be somewhat confusing as well as typescript support can be tricky.

@benlesh
Copy link
Member Author

benlesh commented Mar 9, 2023

Personally, I still like rx(source, ...operators).

  1. It's terse.
  2. It can replace from(source) in the long term with just rx(source). (and from is sort of a bad name we stuck with for historical reasons)
  3. It's fairly descriptive, even though it seems like it's not. rx, R.X., Reactive Extensions. in rx(source, ...operators) you're taking some source and extending it with reactivity through those composed operators. I think it fits.

I'm still open to other names though. I'd just prefer that it's short and to the point.

Also something to think about, operators[] vs ...operators... This is formatted with Prettier:

Using operators[]

rx(mouseDowns$, [
  exhaustMap((e) => {
    const startX = e.clientX;
    return rx(mouseMoves$, [
      takeUntil(mouseups$),
      map((e) => e.clientX - startX),
    ]);
  }),
]).subscribe(handleDrag);

Using ...operators

rx(
  mouseDowns$,
  exhaustMap((e) => {
    const startX = e.clientX;
    return rx(
      mouseMoves$,
      takeUntil(mouseups$),
      map((e) => e.clientX - startX)
    );
  })
).subscribe(handleDrag);

Honestly, and I think this is weird, the operators[] version is a little more readable when formatted due to indentation. It's also a bit cheaper, implementation wise.

@timdp
Copy link
Contributor

timdp commented Mar 9, 2023

I would also expect operators[]. Formatting is one motivation, but with a general-purpose function like this, it's also not unlikely that it'll take an options object or another argument at some point.

(Personally, I'm still not sold on the rx name though. Why is it so first-class that it should occupy the name of the library itself? I'm not claiming pipeFrom would be any better but like you said, it needs bikeshedding.)

@demensky
Copy link
Contributor

Let's call it lift?

Kidding… :trollface:

Do I understand correctly that this rx function will be required until there are versions of operators that are convenient to use with the pipeline operator? #7194

For example map will have a signature like this.

function map<T, R>(source$: ObservableInput<T>, project: (value: T, index: number) => R): Observable<R>;

It turns out that rx will only be for operators of external libraries that have not had time to update, right?

@benlesh
Copy link
Member Author

benlesh commented Mar 22, 2023

CORE TEAM: How about r for the name?

The concern with rx as a name revolves around communication. It might be confusing for new users to discuss "Use the rx function" versus literally any other function in our library, which people often refer to as "rx".

benlesh added a commit to benlesh/rxjs that referenced this issue Mar 23, 2023
+ Adds `r` a new method of piping observables. This allows any valid observable input to be passed as the first argument, and
 it will implicitly convert the argument to an Observable and pass it to the function that may be present in the second argument, the
 return value of THAT function is then passed an argument to the next function, and so on
+ Corrects the types of `Observable#pipe` such that anything can be returned at any step of the functional chain, however the first pipeable function must accept an `Observable<T>`.
+ Adds dtslint and runtime tests for `r` and `pipe`.

NOTE: This does NOT deprecate`Observable#pipe`. That will be done in a follow up, and we need to define what timeline we'll take to remove that, as it's an API that is broadly used - could be v9, could be v10, could be never. At this point, this is alpha/beta functionality.

BREAKING CHANGE: `Observable#pipe` now allows any pipeable unary function as an argument, just so long as the types properly compose, this means in some cases it will now return `unknown` instead of `Observable<unknown>` the workaround is just to cast the result for those cases. (or you can break your operator piping into smaller pipe sets)

Related ReactiveX#7203
benlesh added a commit to benlesh/rxjs that referenced this issue Mar 23, 2023
+ Adds `r` a new method of piping observables. This allows any valid observable input to be passed as the first argument, and
 it will implicitly convert the argument to an Observable and pass it to the function that may be present in the second argument, the
 return value of THAT function is then passed an argument to the next function, and so on
+ Corrects the types of `Observable#pipe` such that anything can be returned at any step of the functional chain, however the first pipeable function must accept an `Observable<T>`.
+ Adds dtslint and runtime tests for `r` and `pipe`.

NOTE: This does NOT deprecate`Observable#pipe`. That will be done in a follow up, and we need to define what timeline we'll take to remove that, as it's an API that is broadly used - could be v9, could be v10, could be never. At this point, this is alpha/beta functionality.

BREAKING CHANGE: `Observable#pipe` now allows any pipeable unary function as an argument, just so long as the types properly compose, this means in some cases it will now return `unknown` instead of `Observable<unknown>` the workaround is just to cast the result for those cases. (or you can break your operator piping into smaller pipe sets)

Related ReactiveX#7203
@jannispl
Copy link

jannispl commented Apr 2, 2023

Personally I would be in favor of rx rather than r in fear of certain IDEs not allowing me to auto-complete/auto-import efficiently as I type if it's just a single character. What might happen is that other things starting with r are considered more relevant.

@infodusha
Copy link

Here is the thing
What about making any observable return function and be function in a same time?
Sample:
source$(tap(console.log)).subscribe()
And
source$.subscribe()
Will both work

@NsdHSO
Copy link

NsdHSO commented Apr 11, 2023

@BigAB why do you like this shape? Is there a use case?

I like a clear separation between the subject of the call and the composition of functions.

Using an array as the second arg is pretty good and achieves the separation I want, I just sort of "didn't like it" 🤷, but it's not bad :

const obs$ = rx(source$, [
  debounceTime(2),
  map(({ clientX }) => clientX),
  take(2),
  takeUntil(destroy$),
])

I like this approach, but what you say about this approach ?

const obs$ = rx( source$, compose([
   debounceTime(2),
   map(({ clientX }) => clientX),
   take(2),
   takeUntil(destroy$)
   ])
 )

@staltz
Copy link
Member

staltz commented Apr 11, 2023

I came here to say that rx is vastly better than r as a name.

@BigAB
Copy link

BigAB commented Apr 11, 2023

I like this approach, but what you say about this approach ?

const obs$ = rx( source$, compose([
   debounceTime(2),
   map(({ clientX }) => clientX),
   take(2),
   takeUntil(destroy$)
   ])
 )

It's okay, but I find the array passed to compose a little unnecessary and compose() usually means a right to left execution order so would it be more like

const obs$ = rx( source$, compose(
  takeUntil(destroy$),
  take(2),
  map(({ clientX }) => clientX),
  debounceTime(2),
))

...which is fine, I guess. But I still like this one better:

rx(source$, [
  debounceTime(2),
  map(({ clientX }) => clientX),
  take(2),
  takeUntil(destroy$),
])

@NsdHSO
Copy link

NsdHSO commented Apr 11, 2023

I came here to say that rx is vastly better than r as a name.

https://twitter.com/BenLesh/status/1645463811149660170?s=20

@krilllind
Copy link

I personally didn't really like the idea of rx to begin with, but I must say it's growing on me.

First, I would argue that rx over just r is way better.
Like other's have already pointed out, searching for rx( instead of r( is much more clear and it communicates intent to reader. Not to mention IDE support for imports.
Because most web applications use some sort of minification today, is it really important to keep the function name as short as possible? I'm thinking rx is fine but I don't see the need to limit ourself just because we want to keep it short.

Secondly, I really prefer the array signature signature over "rest" operator. I think code becomes more readable. Something to think about is if we want to allow developers to skip the array signature if it's only one operator?

const obs$ = rx(source$, [take(1)]);

// could have signature
function rx<A, B>(source: ObservableInput<A>, a: OperatorFunction<A, B>): Observable<B>;
function rx<T>(source: ObservableInput<T>, operators: Array<OperatorFunction<T, any>>): Observable<unknown>;

// and allow for both
const obs1$ = rx(source$, take(1));
const obs2$ = rx(source$, [take(1), map(x => x * 2)]);

Thirdly, I'm curious about the conversion to promises. Consider this existing code (using DI from Angular):

async function() {
  await doSomething();  

 const fullname = await firstValueFrom(this.auth.user$.pipe(
    take(1),
    map(({ firstname, lastname }) => `${firstname} ${lastname}`)
  ));
}

With the new approach, it would be nice if this wasn't so clunky. Perhaps a new operator as well to convert to Promise, or would a config option on rx better?

This would end up looking like this with rx and firstValueFrom

async function() {
  await doSomething();  

  const fullname = await firstValueFrom(rx(this.auth.user$, [
    take(1),
    map(({ firstname, lastname }) => `${firstname} ${lastname}`)
  ]));
}

But perhaps we could have something like

async function() {
  await doSomething();  

  const fullname = await rx(this.auth.user$, [
    take(1),
    map(({ firstname, lastname }) => `${firstname} ${lastname}`),
    firstValueToPromise() // or lastValueToPromise()
  ]);
}

I see this pattern a lot where you want to ensure execution order and using a Promise just make's it super easy when also dealing with external libraries which are already returning Promises.

@morlay
Copy link

morlay commented Apr 14, 2023

Cool.
Once rx not only for Observable Operator as #7229 did.
We could easy to use it in vue / react like:

rx(
    input$,
    debounceTime(500),
    subscribeUntilUnmount((v) => emit("value-change", v))
);

@benlesh
Copy link
Member Author

benlesh commented Apr 19, 2023

CORE TEAM: rx is preferable to r. (sorry, @kwonoj!)

benlesh added a commit that referenced this issue May 16, 2023
* feat(r): A new method of piping has been added

+ Adds `r` a new method of piping observables. This allows any valid observable input to be passed as the first argument, and
 it will implicitly convert the argument to an Observable and pass it to the function that may be present in the second argument, the
 return value of THAT function is then passed an argument to the next function, and so on
+ Corrects the types of `Observable#pipe` such that anything can be returned at any step of the functional chain, however the first pipeable function must accept an `Observable<T>`.
+ Adds dtslint and runtime tests for `r` and `pipe`.

NOTE: This does NOT deprecate`Observable#pipe`. That will be done in a follow up, and we need to define what timeline we'll take to remove that, as it's an API that is broadly used - could be v9, could be v10, could be never. At this point, this is alpha/beta functionality.

BREAKING CHANGE: `Observable#pipe` now allows any pipeable unary function as an argument, just so long as the types properly compose, this means in some cases it will now return `unknown` instead of `Observable<unknown>` the workaround is just to cast the result for those cases. (or you can break your operator piping into smaller pipe sets)

Related #7203

* refactor: rename to `rx`

* docs: fix `r` to `rx` reference.

Co-authored-by: Mladen Jakovljević <jakovljevic.mladen@gmail.com>

* chore: fix description

Co-authored-by: Nicholas Jamieson <nicholas@cartant.com>

* chore: fix return type of part of signature.

Co-authored-by: Mladen Jakovljević <jakovljevic.mladen@gmail.com>

* chore: correct docs to say `rx` and not `r`.

Co-authored-by: Mladen Jakovljević <jakovljevic.mladen@gmail.com>

* chore: address comments

+ Adds some more tests around `Observable#pipe`, ensuring arbitrary unary functions work
+ Adds more dtslint tests for `rx`.
+ Fixes some comments
+ Improves types and runtime for args in `rx` function

---------

Co-authored-by: Mladen Jakovljević <jakovljevic.mladen@gmail.com>
Co-authored-by: Nicholas Jamieson <nicholas@cartant.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
8.x Issues and PRs for version 8.x 9.x AGENDA ITEM Flagged for discussion at core team meetings
Projects
None yet
Development

No branches or pull requests