Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

withLatestFrom doesn't seem to be thread safe #163

Open
fbarbat opened this issue Oct 23, 2023 · 0 comments
Open

withLatestFrom doesn't seem to be thread safe #163

fbarbat opened this issue Oct 23, 2023 · 0 comments

Comments

@fbarbat
Copy link

fbarbat commented Oct 23, 2023

withLatestFrom doesn't seem to be thread safe when subscribing to two different publishers that emit elements from different threads.
This is part of the current implementation of withLatestFrom. Check the comments in uppercase.

private func trackLatestFromSecond(onInitialValue: @escaping () -> Void) {
      var gotInitialValue = false

      let subscriber = AnySubscriber<Other.Output, Other.Failure>(
        receiveSubscription: { [weak self] subscription in
            self?.otherSubscription = subscription
            subscription.request(.unlimited)
        },
        receiveValue: { [weak self] value in
            guard let self = self else { return .none }

            // THIS IS CALLED FROM THREAD B
            self.latestValue = value

            if !gotInitialValue {
                // When getting initial value, start pulling values
                // from upstream in the main sink
                self.sink = Sink(upstream: self.upstream,
                                 downstream: self.downstream,
                                 transformOutput: { [weak self] value in
                                    guard let self = self,
                                          // THIS IS CALLED FROM THREAD A, ACCESSING THE SAME VAR BUT IT IS NOT SYNCHRONIZED
                                          let other = self.latestValue else { return nil }

                                    return self.resultSelector(value, other)
                                 },
                                 transformFailure: { $0 })

                // Signal initial value to start fulfilling downstream demand
                gotInitialValue = true
                onInitialValue()
            }

            return .unlimited
        },
        receiveCompletion: nil)

      self.second.subscribe(subscriber)
    }

Should a lock over latestValue be added to ensure thread safety? Are there reasons not to do it (for example, performance)? Should CombineExt at least update the Swift docs saying it is not supposed to be used from multiple threads?

Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant