Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce non-nullable types in reactive integrations where appropriate #3393

Merged
merged 5 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) {
tasks.withType(AbstractKotlinCompile).all {
kotlinOptions.freeCompilerArgs += OptInPreset.optInAnnotations.collect { "-Xopt-in=" + it }
kotlinOptions.freeCompilerArgs += "-progressive"
// Disable KT-36770 for RxJava2 integration
kotlinOptions.freeCompilerArgs += "-XXLanguage:-ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated"
kotlinOptions.freeCompilerArgs += "-XXLanguage:+ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated"
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
// Remove null assertions to get smaller bytecode on Android
kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"]
Copy link
Contributor

@cpovirk cpovirk Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure exactly what -progressive enables, but I notice that I can get a few more errors (noted above) if I enable the flags from #3007. (Thanks for pointing me here from there!)

        // Recognize rxjava3 nullness annotations even before that becomes the default (which may happen in 1.8): https://kotlinlang.org/docs/java-interop.html#nullability-annotations
        kotlinOptions.freeCompilerArgs += "-Xnullability-annotations=@io.reactivex.rxjava3.annotations:strict"
        // Recognize nullness annotations on type arguments, etc.: https://kotlinlang.org/docs/java-interop.html#annotating-type-arguments-and-type-parameters
        kotlinOptions.freeCompilerArgs += "-Xtype-enhancement-improvements-strict-mode"

}
Expand Down
58 changes: 43 additions & 15 deletions reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ import kotlin.coroutines.*
*/
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
subscribe(object : CompletableObserver {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(Unit) }
override fun onError(e: Throwable) { cont.resumeWithException(e) }
override fun onSubscribe(d: Disposable) {
cont.disposeOnCancellation(d)
}

override fun onComplete() {
cont.resume(Unit)
}

override fun onError(e: Throwable) {
cont.resumeWithException(e)
}
})
}

Expand All @@ -43,12 +51,23 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
*/
@Suppress("UNCHECKED_CAST")
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(null) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
subscribe(object : MaybeObserver<T & Any> {
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
override fun onSubscribe(d: Disposable) {
cont.disposeOnCancellation(d)
}

override fun onComplete() {
cont.resume(null)
}

override fun onSuccess(t: T & Any) {
cont.resume(t)
}

override fun onError(error: Throwable) {
cont.resumeWithException(error)
}
} as MaybeObserver<T>)
}

/**
Expand Down Expand Up @@ -117,12 +136,21 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
@Suppress("UNCHECKED_CAST")
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
subscribe(object : SingleObserver<T & Any> {
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
override fun onSubscribe(d: Disposable) {
cont.disposeOnCancellation(d)
}

override fun onSuccess(t: T & Any) {
cont.resume(t)
}

override fun onError(error: Throwable) {
cont.resumeWithException(error)
}
} as SingleObserver<T>)
}

// ------------------------ ObservableSource ------------------------
Expand Down Expand Up @@ -225,7 +253,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
cont.invokeOnCancellation { sub.dispose() }
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
Expand Down
16 changes: 9 additions & 7 deletions reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,24 @@ public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit):
toChannel().consumeEach(action)

@PublishedApi
@Suppress("UNCHECKED_CAST")
internal fun <T> MaybeSource<T>.toChannel(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
(this as MaybeSource<T & Any>).subscribe(channel)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
return channel
}

@PublishedApi
@Suppress("UNCHECKED_CAST")
internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
(this as ObservableSource<T & Any>).subscribe(channel)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
return channel
}

@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
private class SubscriptionChannel<T> :
LinkedListChannel<T>(null), Observer<T>, MaybeObserver<T>
LinkedListChannel<T>(null), Observer<T & Any>, MaybeObserver<T & Any>
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
{
private val _subscription = atomic<Disposable?>(null)

Expand All @@ -60,12 +62,12 @@ private class SubscriptionChannel<T> :
_subscription.value = sub
}

override fun onSuccess(t: T) {
override fun onSuccess(t: T & Any) {
trySend(t)
close(cause = null)
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}

Expand All @@ -80,15 +82,15 @@ private class SubscriptionChannel<T> :

/** @suppress */
@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
public fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
return channel
}

/** @suppress */
@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
public fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
return channel
Expand Down
44 changes: 24 additions & 20 deletions reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
* function immediately resumes with [CancellationException] and disposes of its subscription.
*/
@Suppress("UNCHECKED_CAST")
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T> {
public suspend fun <T> MaybeSource<T & Any>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T & Any> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(null) }
override fun onSuccess(t: T) { cont.resume(t) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

T & Any

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bound seems to be properly propagated from the declaration site

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the object is implementing MaybeObserver<T & Any>, I think the method parameter needs to be T & Any to match.

Or you know what, actually, even if we were implementing plain MaybeObserver<T>, we'd probably still need the method parameter to be T & Any, since the interface declares its method as requiring a @NonNull T:

https://github.com/ReactiveX/RxJava/blob/8e53e0ee1637791923a81fdb9d17dfe8c1569250/src/main/java/io/reactivex/rxjava3/core/MaybeObserver.java#L76

I think that's why, with the current parameter type of T, the build fails with the flags I posted in the build script.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. T & Any is required since Kotlin 1.8.0, it was an omission on the language site

Expand All @@ -61,7 +60,7 @@ public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellab
*
* @throws NoSuchElementException if no elements were produced by this [MaybeSource].
*/
public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
public suspend fun <T> MaybeSource<T & Any>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()

/**
* Awaits for completion of the maybe without blocking a thread.
Expand All @@ -84,7 +83,7 @@ public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?:
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("this.awaitSingleOrNull()")
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
public suspend fun <T> MaybeSource<T & Any>.await(): T? = awaitSingleOrNull()

/**
* Awaits for completion of the maybe without blocking a thread.
Expand All @@ -107,7 +106,7 @@ public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
public suspend fun <T> MaybeSource<T & Any>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default

// ------------------------ SingleSource ------------------------

Expand All @@ -119,10 +118,10 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
public suspend fun <T> SingleSource<T & Any>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T & Any> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onSuccess(t: T & Any) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
}
Expand All @@ -139,7 +138,8 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine
*
* @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitFirst(): T = awaitOne(Mode.FIRST) as T

/**
* Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
Expand All @@ -150,7 +150,9 @@ public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrDefault(default: T): T =
awaitOne(Mode.FIRST_OR_DEFAULT, default) as T

/**
* Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
Expand All @@ -161,7 +163,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T =
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)

/**
* Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
Expand All @@ -172,7 +174,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mod
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrElse(defaultValue: () -> T): T =
awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()

/**
Expand All @@ -185,7 +187,8 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () ->
*
* @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitLast(): T = awaitOne(Mode.LAST) as T

/**
* Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
Expand All @@ -198,26 +201,27 @@ public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
* @throws NoSuchElementException if the observable does not emit any value
* @throws IllegalArgumentException if the observable emits more than one value
*/
public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitSingle(): T = awaitOne(Mode.SINGLE) as T

// ------------------------ private ------------------------

internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
invokeOnCancellation { d.dispose() }

private enum class Mode(val s: String) {
private enum class Mode(@JvmField val s: String) {
FIRST("awaitFirst"),
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
LAST("awaitLast"),
SINGLE("awaitSingle");
override fun toString(): String = s
}

private suspend fun <T> ObservableSource<T>.awaitOne(
private suspend fun <T> ObservableSource<T & Any>.awaitOne(
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
subscribe(object : Observer<T> {
): T? = suspendCancellableCoroutine { cont ->
subscribe(object : Observer<T & Any> {
private lateinit var subscription: Disposable
private var value: T? = null
private var seenValue = false
Expand All @@ -227,7 +231,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
cont.invokeOnCancellation { sub.dispose() }
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
Expand Down