Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce non-nullable types in reactive integrations where appropriate #3393

Merged
merged 5 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) {
tasks.withType(AbstractKotlinCompile).all {
kotlinOptions.freeCompilerArgs += OptInPreset.optInAnnotations.collect { "-Xopt-in=" + it }
kotlinOptions.freeCompilerArgs += "-progressive"
// Disable KT-36770 for RxJava2 integration
kotlinOptions.freeCompilerArgs += "-XXLanguage:-ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated"
kotlinOptions.freeCompilerArgs += "-XXLanguage:+ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated"
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
// Remove null assertions to get smaller bytecode on Android
kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"]
Copy link
Contributor

@cpovirk cpovirk Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure exactly what -progressive enables, but I notice that I can get a few more errors (noted above) if I enable the flags from #3007. (Thanks for pointing me here from there!)

        // Recognize rxjava3 nullness annotations even before that becomes the default (which may happen in 1.8): https://kotlinlang.org/docs/java-interop.html#nullability-annotations
        kotlinOptions.freeCompilerArgs += "-Xnullability-annotations=@io.reactivex.rxjava3.annotations:strict"
        // Recognize nullness annotations on type arguments, etc.: https://kotlinlang.org/docs/java-interop.html#annotating-type-arguments-and-type-parameters
        kotlinOptions.freeCompilerArgs += "-Xtype-enhancement-improvements-strict-mode"

}
Expand Down
50 changes: 38 additions & 12 deletions reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ import kotlin.coroutines.*
*/
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
subscribe(object : CompletableObserver {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(Unit) }
override fun onError(e: Throwable) { cont.resumeWithException(e) }
override fun onSubscribe(d: Disposable) {
cont.disposeOnCancellation(d)
}

override fun onComplete() {
cont.resume(Unit)
}

override fun onError(e: Throwable) {
cont.resumeWithException(e)
}
})
}

Expand All @@ -41,13 +49,23 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
* function immediately resumes with [CancellationException] and disposes of its subscription.
*/
@Suppress("UNCHECKED_CAST")
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(null) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
override fun onSubscribe(d: Disposable) {
cont.disposeOnCancellation(d)
}

override fun onComplete() {
cont.resume(null)
}

override fun onSuccess(t: T & Any) {
cont.resume(t)
}

override fun onError(error: Throwable) {
cont.resumeWithException(error)
}
})
}

Expand Down Expand Up @@ -119,9 +137,17 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
override fun onSubscribe(d: Disposable) {
cont.disposeOnCancellation(d)
}

override fun onSuccess(t: T & Any) {
cont.resume(t)
}

override fun onError(error: Throwable) {
cont.resumeWithException(error)
}
})
}

Expand Down Expand Up @@ -225,7 +251,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
cont.invokeOnCancellation { sub.dispose() }
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
Expand Down
14 changes: 8 additions & 6 deletions reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,18 @@ public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit):
toChannel().consumeEach(action)

@PublishedApi
@Suppress("UNCHECKED_CAST")
internal fun <T> MaybeSource<T>.toChannel(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
(this as MaybeSource<T & Any>).subscribe(channel)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
return channel
}

@PublishedApi
@Suppress("UNCHECKED_CAST")
internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
(this as ObservableSource<T & Any>).subscribe(channel)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
return channel
}

Expand All @@ -60,12 +62,12 @@ private class SubscriptionChannel<T> :
_subscription.value = sub
}

override fun onSuccess(t: T) {
override fun onSuccess(t: T & Any) {
trySend(t)
close(cause = null)
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}

Expand All @@ -80,15 +82,15 @@ private class SubscriptionChannel<T> :

/** @suppress */
@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
public fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
return channel
}

/** @suppress */
@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
public fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
return channel
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private class RxObservableCoroutine<T : Any>(
processResFunc = RxObservableCoroutine<*>::processResultSelectSend as ProcessResultFunction
)

@Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER")
@Suppress("UNUSED_PARAMETER")
private fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
// Try to acquire the mutex and complete in the registration phase.
if (mutex.tryLock()) {
Expand Down Expand Up @@ -113,7 +113,7 @@ private class RxObservableCoroutine<T : Any>(
}
}

public override suspend fun send(element: T) {
override suspend fun send(element: T) {
mutex.lock()
doLockedNext(element)?.let { throw it }
}
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-rx3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ compileKotlin {
tasks.withType(DokkaTaskPartial.class) {
dokkaSourceSets.configureEach {
externalDocumentationLink {
url.set(new URL('http://reactivex.io/RxJava/3.x/javadoc/'))
url.set(new URL('https://reactivex.io/RxJava/3.x/javadoc/'))
packageListUrl.set(projectDir.toPath().resolve("package.list").toUri().toURL())
}
}
Expand Down
44 changes: 24 additions & 20 deletions reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
* function immediately resumes with [CancellationException] and disposes of its subscription.
*/
@Suppress("UNCHECKED_CAST")
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T> {
public suspend fun <T> MaybeSource<T & Any>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T & Any> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(null) }
override fun onSuccess(t: T) { cont.resume(t) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

T & Any

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bound seems to be properly propagated from the declaration site

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the object is implementing MaybeObserver<T & Any>, I think the method parameter needs to be T & Any to match.

Or you know what, actually, even if we were implementing plain MaybeObserver<T>, we'd probably still need the method parameter to be T & Any, since the interface declares its method as requiring a @NonNull T:

https://github.com/ReactiveX/RxJava/blob/8e53e0ee1637791923a81fdb9d17dfe8c1569250/src/main/java/io/reactivex/rxjava3/core/MaybeObserver.java#L76

I think that's why, with the current parameter type of T, the build fails with the flags I posted in the build script.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. T & Any is required since Kotlin 1.8.0, it was an omission on the language site

Expand All @@ -61,7 +60,7 @@ public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellab
*
* @throws NoSuchElementException if no elements were produced by this [MaybeSource].
*/
public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
public suspend fun <T> MaybeSource<T & Any>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()

/**
* Awaits for completion of the maybe without blocking a thread.
Expand All @@ -84,7 +83,7 @@ public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?:
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("this.awaitSingleOrNull()")
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
public suspend fun <T> MaybeSource<T & Any>.await(): T? = awaitSingleOrNull()

/**
* Awaits for completion of the maybe without blocking a thread.
Expand All @@ -107,7 +106,7 @@ public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
public suspend fun <T> MaybeSource<T & Any>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default

// ------------------------ SingleSource ------------------------

Expand All @@ -119,10 +118,10 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
public suspend fun <T> SingleSource<T & Any>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T & Any> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onSuccess(t: T & Any) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
}
Expand All @@ -139,7 +138,8 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine
*
* @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitFirst(): T = awaitOne(Mode.FIRST) as T

/**
* Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
Expand All @@ -150,7 +150,9 @@ public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrDefault(default: T): T =
awaitOne(Mode.FIRST_OR_DEFAULT, default) as T

/**
* Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
Expand All @@ -161,7 +163,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T =
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)

/**
* Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
Expand All @@ -172,7 +174,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mod
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrElse(defaultValue: () -> T): T =
awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()

/**
Expand All @@ -185,7 +187,8 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () ->
*
* @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitLast(): T = awaitOne(Mode.LAST) as T

/**
* Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
Expand All @@ -198,26 +201,27 @@ public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
* @throws NoSuchElementException if the observable does not emit any value
* @throws IllegalArgumentException if the observable emits more than one value
*/
public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitSingle(): T = awaitOne(Mode.SINGLE) as T

// ------------------------ private ------------------------

internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
invokeOnCancellation { d.dispose() }

private enum class Mode(val s: String) {
private enum class Mode(@JvmField val s: String) {
FIRST("awaitFirst"),
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
LAST("awaitLast"),
SINGLE("awaitSingle");
override fun toString(): String = s
}

private suspend fun <T> ObservableSource<T>.awaitOne(
private suspend fun <T> ObservableSource<T & Any>.awaitOne(
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
subscribe(object : Observer<T> {
): T? = suspendCancellableCoroutine { cont ->
subscribe(object : Observer<T & Any> {
private lateinit var subscription: Disposable
private var value: T? = null
private var seenValue = false
Expand All @@ -227,7 +231,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
cont.invokeOnCancellation { sub.dispose() }
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
Expand Down
11 changes: 5 additions & 6 deletions reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlinx.coroutines.flow.*
* [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
*/
@PublishedApi
internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
internal fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
return channel
Expand All @@ -33,7 +33,7 @@ internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
* [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
*/
@PublishedApi
internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
internal fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
return channel
Expand All @@ -45,7 +45,7 @@ internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
* If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
* [collect].
*/
public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
public suspend inline fun <T> MaybeSource<T & Any>.collect(action: (T) -> Unit): Unit =
openSubscription().consumeEach(action)

/**
Expand All @@ -54,12 +54,11 @@ public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit
* If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
* [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
*/
public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
openSubscription().consumeEach(action)
public suspend inline fun <T> ObservableSource<T & Any>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action)

@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
private class SubscriptionChannel<T> :
LinkedListChannel<T>(null), Observer<T>, MaybeObserver<T>
LinkedListChannel<T>(null), Observer<T & Any>, MaybeObserver<T & Any>
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
{
private val _subscription = atomic<Disposable?>(null)

Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet
*
* @param context -- the coroutine context from which the resulting maybe is going to be signalled
*/
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T & Any> = rxMaybe(context) {
this@asMaybe.await()
}

Expand Down
6 changes: 3 additions & 3 deletions reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import kotlin.coroutines.*
public fun <T> rxMaybe(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
): Maybe<T> {
): Maybe<T & Any> {
require(context[Job] === null) { "Maybe context cannot contain job in it." +
"Its lifecycle should be managed via Disposable handle. Had $context" }
return rxMaybeInternal(GlobalScope, context, block)
Expand All @@ -30,7 +30,7 @@ private fun <T> rxMaybeInternal(
scope: CoroutineScope, // support for legacy rxMaybe in scope
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Maybe<T> = Maybe.create { subscriber ->
): Maybe<T & Any> = Maybe.create { subscriber ->
val newContext = scope.newCoroutineContext(context)
val coroutine = RxMaybeCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine))
Expand All @@ -39,7 +39,7 @@ private fun <T> rxMaybeInternal(

private class RxMaybeCoroutine<T>(
parentContext: CoroutineContext,
private val subscriber: MaybeEmitter<T>
private val subscriber: MaybeEmitter<T & Any>
) : AbstractCoroutine<T>(parentContext, false, true) {
override fun onCompleted(value: T) {
try {
Expand Down