Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow converting async block to Traits #2411

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions Rx.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@
4C8DE0E220D54545003E2D8A /* DisposeBagTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C8DE0E120D54545003E2D8A /* DisposeBagTest.swift */; };
4C8DE0E320D54545003E2D8A /* DisposeBagTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C8DE0E120D54545003E2D8A /* DisposeBagTest.swift */; };
4C8DE0E420D54545003E2D8A /* DisposeBagTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C8DE0E120D54545003E2D8A /* DisposeBagTest.swift */; };
4F4124C227F4A36B00ADF55A /* Driver+Concurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124C127F4A36B00ADF55A /* Driver+Concurrency.swift */; };
4F4124C427F4A54200ADF55A /* Signal+Concurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124C327F4A54200ADF55A /* Signal+Concurrency.swift */; };
4F4124C727F4B85500ADF55A /* Driver+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124C527F4B83500ADF55A /* Driver+ConcurrencyTests.swift */; };
4F4124C827F4B85500ADF55A /* Driver+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124C527F4B83500ADF55A /* Driver+ConcurrencyTests.swift */; };
4F4124C927F4B85600ADF55A /* Driver+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124C527F4B83500ADF55A /* Driver+ConcurrencyTests.swift */; };
4F4124CC27F4BA2E00ADF55A /* Signal+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124CA27F4BA1B00ADF55A /* Signal+ConcurrencyTests.swift */; };
4F4124CD27F4BA2F00ADF55A /* Signal+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124CA27F4BA1B00ADF55A /* Signal+ConcurrencyTests.swift */; };
4F4124CE27F4BA2F00ADF55A /* Signal+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124CA27F4BA1B00ADF55A /* Signal+ConcurrencyTests.swift */; };
504540C924196D960098665F /* WKWebView+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504540C824196D960098665F /* WKWebView+Rx.swift */; };
504540CB24196EB10098665F /* WKWebView+RxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504540CA24196EB10098665F /* WKWebView+RxTests.swift */; };
504540CC24196EB10098665F /* WKWebView+RxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504540CA24196EB10098665F /* WKWebView+RxTests.swift */; };
Expand Down Expand Up @@ -968,6 +976,10 @@
4C5213A9225D41E60079FC77 /* CompactMap.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CompactMap.swift; sourceTree = "<group>"; };
4C5213AB225E20350079FC77 /* Observable+CompactMapTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Observable+CompactMapTests.swift"; sourceTree = "<group>"; };
4C8DE0E120D54545003E2D8A /* DisposeBagTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DisposeBagTest.swift; sourceTree = "<group>"; };
4F4124C127F4A36B00ADF55A /* Driver+Concurrency.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Driver+Concurrency.swift"; sourceTree = "<group>"; };
4F4124C327F4A54200ADF55A /* Signal+Concurrency.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Signal+Concurrency.swift"; sourceTree = "<group>"; };
4F4124C527F4B83500ADF55A /* Driver+ConcurrencyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Driver+ConcurrencyTests.swift"; sourceTree = "<group>"; };
4F4124CA27F4BA1B00ADF55A /* Signal+ConcurrencyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Signal+ConcurrencyTests.swift"; sourceTree = "<group>"; };
504540C824196D960098665F /* WKWebView+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "WKWebView+Rx.swift"; sourceTree = "<group>"; };
504540CA24196EB10098665F /* WKWebView+RxTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "WKWebView+RxTests.swift"; sourceTree = "<group>"; };
504540CD2419701D0098665F /* RxWKNavigationDelegateProxy.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxWKNavigationDelegateProxy.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -1920,9 +1932,11 @@
C8D970DF1F532FD20058F2FE /* TestImplementations */,
C8561B651DFE1169005E97F1 /* ExampleTests.swift */,
C8D970DC1F532FD10058F2FE /* Signal+Test.swift */,
4F4124CA27F4BA1B00ADF55A /* Signal+ConcurrencyTests.swift */,
C8D970DD1F532FD10058F2FE /* SharedSequence+Test.swift */,
C8D970E11F532FD20058F2FE /* SharedSequence+Extensions.swift */,
C8D970DE1F532FD20058F2FE /* Driver+Test.swift */,
4F4124C527F4B83500ADF55A /* Driver+ConcurrencyTests.swift */,
C8D970E21F532FD30058F2FE /* SharedSequence+OperatorTest.swift */,
DB08833826FB07CB005805BE /* SharedSequence+ConcurrencyTests.swift */,
C8091C521FAA3588001DB32A /* ObservableConvertibleType+SharedSequence.swift */,
Expand Down Expand Up @@ -2172,6 +2186,7 @@
C8091C561FAA39C1001DB32A /* ControlEvent+Signal.swift */,
C8B0F7211F53135100548EBE /* ObservableConvertibleType+Signal.swift */,
C8D970CD1F5324D90058F2FE /* Signal+Subscription.swift */,
4F4124C327F4A54200ADF55A /* Signal+Concurrency.swift */,
A2897D65225D0182004EA481 /* PublishRelay+Signal.swift */,
);
path = Signal;
Expand Down Expand Up @@ -2335,6 +2350,7 @@
C89AB1AE1DAAC3350065FBE6 /* ControlEvent+Driver.swift */,
C89AB1AF1DAAC3350065FBE6 /* ControlProperty+Driver.swift */,
C89AB1B01DAAC3350065FBE6 /* Driver+Subscription.swift */,
4F4124C127F4A36B00ADF55A /* Driver+Concurrency.swift */,
C89AB1B11DAAC3350065FBE6 /* Driver.swift */,
CD8F7AC427BA9187001574EB /* Infallible+Driver.swift */,
C89AB1B21DAAC3350065FBE6 /* ObservableConvertibleType+Driver.swift */,
Expand Down Expand Up @@ -3023,6 +3039,7 @@
C88254171B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift in Sources */,
C8C8BCD41F89459300501D4D /* BehaviorRelay+Driver.swift in Sources */,
C882541E1B8A752B00B02D69 /* RxCollectionViewDataSourceProxy.swift in Sources */,
4F4124C427F4A54200ADF55A /* Signal+Concurrency.swift in Sources */,
C85E6FBE1F53025700C5681E /* SchedulerType+SharedSequence.swift in Sources */,
84C225A31C33F00B008724EC /* RxTextStorageDelegateProxy.swift in Sources */,
C89AB1DA1DAAC3350065FBE6 /* Driver.swift in Sources */,
Expand Down Expand Up @@ -3086,6 +3103,7 @@
D9080ACF1EA05AE0002B433B /* RxNavigationControllerDelegateProxy.swift in Sources */,
C88254271B8A752B00B02D69 /* UIBarButtonItem+Rx.swift in Sources */,
C89AB2161DAAC3350065FBE6 /* NSObject+Rx+KVORepresentable.swift in Sources */,
4F4124C227F4A36B00ADF55A /* Driver+Concurrency.swift in Sources */,
C882542B1B8A752B00B02D69 /* UIDatePicker+Rx.swift in Sources */,
C88254221B8A752B00B02D69 /* RxTableViewDataSourceProxy.swift in Sources */,
C882542C1B8A752B00B02D69 /* UIGestureRecognizer+Rx.swift in Sources */,
Expand Down Expand Up @@ -3214,8 +3232,10 @@
C820A97E1EB4FA5A00D431BC /* Observable+RepeatTests.swift in Sources */,
C820A94A1EB4E75E00D431BC /* Observable+AmbTests.swift in Sources */,
1AF67DA21CED420A00C310FA /* PublishSubjectTest.swift in Sources */,
4F4124C727F4B85500ADF55A /* Driver+ConcurrencyTests.swift in Sources */,
C820A9C61EB50A4200D431BC /* Observable+SkipWhileTests.swift in Sources */,
C835093E1C38706E0027C24C /* UIView+RxTests.swift in Sources */,
4F4124CC27F4BA2E00ADF55A /* Signal+ConcurrencyTests.swift in Sources */,
7EDBAEB41C89B1A6006CBE67 /* UITabBarItem+RxTests.swift in Sources */,
C83509411C38706E0027C24C /* BackgroundThreadPrimitiveHotObservable.swift in Sources */,
C8379EF41D1DD326003EF8FC /* UIButton+RxTests.swift in Sources */,
Expand Down Expand Up @@ -3330,6 +3350,7 @@
C83509EE1C3875580027C24C /* Observable.Extensions.swift in Sources */,
C83509BD1C38750D0027C24C /* ControlPropertyTests.swift in Sources */,
4C5213AF225E22500079FC77 /* Observable+CompactMapTests.swift in Sources */,
4F4124CD27F4BA2F00ADF55A /* Signal+ConcurrencyTests.swift in Sources */,
C83509E11C3875500027C24C /* TestVirtualScheduler.swift in Sources */,
C820A94F1EB4EC3C00D431BC /* Observable+ReduceTests.swift in Sources */,
C8B2908A1C94D64700E923D0 /* RxTest+Controls.swift in Sources */,
Expand All @@ -3355,6 +3376,7 @@
C820A94B1EB4E75E00D431BC /* Observable+AmbTests.swift in Sources */,
C834F6C31DB394E100C29244 /* Observable+BlockingTest.swift in Sources */,
C83509D41C38753C0027C24C /* RxObjCRuntimeState.swift in Sources */,
4F4124C827F4B85500ADF55A /* Driver+ConcurrencyTests.swift in Sources */,
C822BACF1DB424EC00F98810 /* Reactive+Tests.swift in Sources */,
C8C4F17E1DE9DF0200003FA7 /* UILabel+RxTests.swift in Sources */,
C83509C01C3875220027C24C /* DelegateProxyTest.swift in Sources */,
Expand Down Expand Up @@ -3545,6 +3567,7 @@
C8845ADC1EDB607800B36836 /* Observable+ShareReplayScopeTests.swift in Sources */,
C834F6C61DB3950600C29244 /* NSControl+RxTests.swift in Sources */,
C83509D61C3875420027C24C /* SentMessageTest.swift in Sources */,
4F4124C927F4B85600ADF55A /* Driver+ConcurrencyTests.swift in Sources */,
C81A097F1E6C27A100900B3B /* Observable+ZipTests.swift in Sources */,
C820AA0C1EB513C800D431BC /* Observable+WindowTests.swift in Sources */,
C8350A021C38755E0027C24C /* BagTest.swift in Sources */,
Expand All @@ -3553,6 +3576,7 @@
C820A9FC1EB510D500D431BC /* Observable+MaterializeTests.swift in Sources */,
C83509E81C3875580027C24C /* PrimitiveMockObserver.swift in Sources */,
C83509BE1C3875100027C24C /* DelegateProxyTest+Cocoa.swift in Sources */,
4F4124CE27F4BA2F00ADF55A /* Signal+ConcurrencyTests.swift in Sources */,
C820A9F41EB5109300D431BC /* Observable+DefaultIfEmpty.swift in Sources */,
C820AA041EB5134000D431BC /* Observable+DelaySubscriptionTests.swift in Sources */,
C8E3906A1F379386004FC993 /* Observable+EnumeratedTests.swift in Sources */,
Expand Down
63 changes: 63 additions & 0 deletions RxCocoa/Traits/Driver/Driver+Concurrency.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//
// Driver+Concurrency.swift
// RxCocoa
//
// Created by Jinwoo Kim on 3/30/22.
// Copyright © 2022 Krunoslav Zaher. All rights reserved.
//

import RxSwift

#if swift(>=5.5.2) && canImport(_Concurrency) && !os(Linux)
// MARK: - Driver
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public extension Driver {
/**
Allows converting asynchronous block to `Driver` trait.

- Parameters:
- priority: The priority of the task.
- detached: Detach when creating the task.
- onErrorJustReturn: Element to return in case of error and after that complete the sequence.
- block: An asynchronous block.
- Returns: An Driver emits value from `block` parameter.
*/
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Driver<Element> {
return Single.from(priority: priority, detached: detached, block)
.asDriver(onErrorJustReturn: onErrorJustReturn)
}

/**
Allows converting asynchronous block to `Driver` trait.

- Parameters:
- priority: The priority of the task.
- detached: Detach when creating the task.
- onErrorDriveWith: Driver that continues to drive the sequence in case of error.
- block: An asynchronous block.
- Returns: An Driver emits value from `block` parameter.
*/
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorDriveWith: Driver<Element>) -> Driver<Element> {
return Single.from(priority: priority, detached: detached, block)
.asDriver(onErrorDriveWith: onErrorDriveWith)
}

/**
Allows converting asynchronous block to `Driver` trait.

- Parameters:
- priority: The priority of the task.
- detached: Detach when creating the task.
- onErrorRecover: Calculates driver that continues to drive the sequence in case of error.
- block: An asynchronous block.
- Returns: An Driver emits value from `block` parameter.
*/
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorRecover: @escaping (Error) -> Driver<Element>) ->Driver<Element> {
return Single.from(priority: priority, detached: detached, block)
.asDriver(onErrorRecover: onErrorRecover)
}
}
#endif
62 changes: 62 additions & 0 deletions RxCocoa/Traits/Signal/Signal+Concurrency.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//
// Signal+Concurrency.swift
// RxCocoa
//
// Created by Jinwoo Kim on 3/30/22.
// Copyright © 2022 Krunoslav Zaher. All rights reserved.
//

import RxSwift

#if swift(>=5.5.2) && canImport(_Concurrency) && !os(Linux)
// MARK: - Signal
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public extension Signal {
/**
Allows converting asynchronous block to `Signal` trait.

- Parameters:
- priority: The priority of the task.
- detached: Detach when creating the task.
- onErrorJustReturn: Element to return in case of error and after that complete the sequence.
- block: An asynchronous block.
- Returns: An Signal emits value from `block` parameter.
*/
static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Signal<Element> {
return Single.from(priority: priority, detached: detached, block)
.asSignal(onErrorJustReturn: onErrorJustReturn)
}

/**
Allows converting asynchronous block to `Signal` trait.

- Parameters:
- priority: The priority of the task.
- detached: Detach when creating the task.
- onErrorSignalWith: Signal that continues to emit the sequence in case of error.
- block: An asynchronous block.
- Returns: An Signal emits value from `block` parameter.
*/
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorSignalWith: Signal<Element>) -> Signal<Element> {
return Single.from(priority: priority, detached: detached, block)
.asSignal(onErrorSignalWith: onErrorSignalWith)
}

/**
Allows converting asynchronous block to `Signal` trait.

- Parameters:
- priority: The priority of the task.
- detached: Detach when creating the task.
- onErrorRecover: Calculates signal that continues to emit the sequence in case of error.
- block: An asynchronous block.
- Returns: An Signal emits value from `block` parameter.
*/
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorRecover: @escaping (_ error: Swift.Error) -> Signal<Element>) -> Signal<Element> {
return Single.from(priority: priority, detached: detached, block)
.asSignal(onErrorRecover: onErrorRecover)
}
}
#endif
32 changes: 31 additions & 1 deletion RxSwift/Traits/Infallible/Infallible+Concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,41 @@ public extension InfallibleType {
onCompleted: { continuation.finish() },
onDisposed: { continuation.onTermination?(.cancelled) }
)

continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}

/**
Allows converting asynchronous block to `Infailable` trait.

- Parameters:
- priority: The priority of the task.
- detached: Detach when creating the task.
- block: An asynchronous block.
- Returns: An Infailable emits value from `block` parameter.
*/
static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async -> Element) -> Infallible<Element> {
return .create { observer in
let operation: @Sendable () async -> Void = {
let element = await block()
observer(.next(element))
observer(.completed)
}
let task: Task<Void, Swift.Error>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since operation isn't throwing this can be Task<Void, Never>. (true for all four locations)


if detached {
task = Task.detached(priority: priority, operation: operation)
} else {
task = Task(priority: priority, operation: operation)
}

return Disposables.create {
task.cancel()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call to cancel the tasks, but the operation never checks if the task was cancelled. This will only do anything if the code passed in as block

Would it make sense to add if Task.isCancelled { return } after let element = await block() and not send anything to observer, as if the observable never fired anything?
Or would it make more sense to not send .next(element) but to still send .completed?

I left that note to show my train of thought, but now I am thinking it is best to leave this as is and not check Task.isCancelled since it is up to the user passing in Block to decide what to do when their Task is cancelled. They still may want to return a partial result perhaps. That should be up to them.

I was thinking there should also be a way to indicate they want to return nothing when their Task gets cancelled, but I think the catch block (in the other three, not Infallible) is probably sufficient for that. If they end user cares to know their Single/Maybe/Completable threw an error because the Task was cancelled and not some other error they can check the error type.

From the documentation:
"Tasks include a shared mechanism for indicating cancellation, but not a shared implementation for how to handle cancellation. Depending on the work you’re doing in the task, the correct way to stop that work varies. Likewise, it’s the responsibility of the code running as part of the task to check for cancellation whenever stopping is appropriate. In a long-task that includes multiple pieces, you might need to check for cancellation at several points, and handle cancellation differently at each point. If you only need to throw an error to stop the work, call the Task.checkCancellation() function to check for cancellation. Other responses to cancellation include returning the work completed so far, returning an empty result, or returning nil."

}
}
}
}
#endif