Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CombineLatest on observables created with Defer #327

Open
philippseith opened this issue Sep 3, 2021 · 1 comment
Open

CombineLatest on observables created with Defer #327

philippseith opened this issue Sep 3, 2021 · 1 comment
Assignees
Labels
bug report Reported bug.

Comments

@philippseith
Copy link

Describe the bug
When creating an Observable with Defer, every call to Observe restarts the producer and every observer gets the same values.
But this is not the case when the Observable is transformed with operators that itself call Observe on that Observable, like CombineLatest. In contrast to that, the Map operator is respecting the Defer origin of the Observable.

To Reproduce
Steps to reproduce the behavior:

func TestCombineLatest(t *testing.T) {
	o := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
		next <- rxgo.Of(1)
		next <- rxgo.Of(2)
		next <- rxgo.Of(3)
	}})
	co := rxgo.CombineLatest(func(i ...interface{}) interface{} {
		return i[0]
	}, []rxgo.Observable{o})
	ch1 := co.Observe()
	ch2 := co.Observe()
	rr1 := make([]interface{}, 0)
	rr2 := make([]interface{}, 0)
	done1 := make(chan struct{}, 1)
	go func() {
		for i := 0; i < 3; i++ {
			r1 := <-ch1
			rr1 = append(rr1, r1.V)
		}
		close(done1)
	}()
	done2 := make(chan struct{}, 1)
	go func() {
		for i := 0; i < 3; i++ {
			r2 := <-ch2
			rr2 = append(rr2, r2.V)
		}
		close(done2)
	}()
	<-done1
	<-done2
	Expect(rr1).To(Equal(rr2))
}

The Producer is called only once. Depending on the scheduling 1, 2, 3 is distributed on rr1 and rr2, when no more values are available, rr1, rr2 get nils from the closed channels.

Expected behavior
Both observers should get the 1, 2, 3.

Additional context
The same pattern is working with System.Reactive.Linq in dotnet.

@philippseith philippseith added the bug report Reported bug. label Sep 3, 2021
@philippseith
Copy link
Author

philippseith commented Sep 3, 2021

Some additional thoughts:

  • I fear CombineLatest is not the only operator with this behavior.
  • As a workaround, one can wrap calls to CombineLatest (and perhaps other operators or operator chains) with
func Defer(produceObservable func() rxgo.Observable, opts ...rxgo.Option) rxgo.Observable {
	return rxgo.Defer([]rxgo.Producer{
		func(ctx context.Context, next chan<- rxgo.Item) {
			inCh := produceObservable().Observe(opts...)
			for {
				select {
				case item, ok := <-inCh:
					if !ok {
						return
					}
					select {
					case next <- item:
					case <-ctx.Done():
						return
					}
				case <-ctx.Done():
					return
				}
			}
		},
	},
	)
}

but you need to know if it is necessary.

  • When the workaround is used, the whole operator chain called in the producer, with all its goroutines, is duplicated. It would be better if the multiplexing of the transformed value takes place after the operator chain.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug report Reported bug.
Projects
None yet
Development

No branches or pull requests

2 participants