Skip to content

Commit

Permalink
Introduce non-nullable types in reactive integrations where appropria…
Browse files Browse the repository at this point in the history
…te (#3393)

* Introduce non-nullable types in reactive integrations where appropriate

* For RxJava2, use them in internal implementations where appropriate
* For RxJava3, introduce & Any bound to a generic argument in our extensions to avoid errors in Kotlin 1.8.0 due to non-nullability rx3 annotations being part of generics upper bound. This change went through committee, and all the "unsound" declarations such as "RxSignature<Foo?>" were properly highlighted as a warning that would become an error.
  • Loading branch information
qwwdfsad committed Sep 20, 2022
1 parent 7f557e9 commit 61ba10d
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 57 deletions.
2 changes: 0 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) {
tasks.withType(AbstractKotlinCompile).all {
kotlinOptions.freeCompilerArgs += OptInPreset.optInAnnotations.collect { "-Xopt-in=" + it }
kotlinOptions.freeCompilerArgs += "-progressive"
// Disable KT-36770 for RxJava2 integration
kotlinOptions.freeCompilerArgs += "-XXLanguage:-ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated"
// Remove null assertions to get smaller bytecode on Android
kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"]
}
Expand Down
50 changes: 38 additions & 12 deletions reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ import kotlin.coroutines.*
*/
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
subscribe(object : CompletableObserver {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(Unit) }
override fun onError(e: Throwable) { cont.resumeWithException(e) }
override fun onSubscribe(d: Disposable) {
cont.disposeOnCancellation(d)
}

override fun onComplete() {
cont.resume(Unit)
}

override fun onError(e: Throwable) {
cont.resumeWithException(e)
}
})
}

Expand All @@ -41,13 +49,23 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
* function immediately resumes with [CancellationException] and disposes of its subscription.
*/
@Suppress("UNCHECKED_CAST")
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(null) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
override fun onSubscribe(d: Disposable) {
cont.disposeOnCancellation(d)
}

override fun onComplete() {
cont.resume(null)
}

override fun onSuccess(t: T & Any) {
cont.resume(t)
}

override fun onError(error: Throwable) {
cont.resumeWithException(error)
}
})
}

Expand Down Expand Up @@ -119,9 +137,17 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
override fun onSubscribe(d: Disposable) {
cont.disposeOnCancellation(d)
}

override fun onSuccess(t: T & Any) {
cont.resume(t)
}

override fun onError(error: Throwable) {
cont.resumeWithException(error)
}
})
}

Expand Down Expand Up @@ -225,7 +251,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
cont.invokeOnCancellation { sub.dispose() }
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
Expand Down
8 changes: 4 additions & 4 deletions reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ private class SubscriptionChannel<T> :
_subscription.value = sub
}

override fun onSuccess(t: T) {
override fun onSuccess(t: T & Any) {
trySend(t)
close(cause = null)
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}

Expand All @@ -80,15 +80,15 @@ private class SubscriptionChannel<T> :

/** @suppress */
@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
public fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
return channel
}

/** @suppress */
@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
public fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
return channel
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private class RxObservableCoroutine<T : Any>(
processResFunc = RxObservableCoroutine<*>::processResultSelectSend as ProcessResultFunction
)

@Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER")
@Suppress("UNUSED_PARAMETER")
private fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
// Try to acquire the mutex and complete in the registration phase.
if (mutex.tryLock()) {
Expand Down Expand Up @@ -113,7 +113,7 @@ private class RxObservableCoroutine<T : Any>(
}
}

public override suspend fun send(element: T) {
override suspend fun send(element: T) {
mutex.lock()
doLockedNext(element)?.let { throw it }
}
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-rx3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ compileKotlin {
tasks.withType(DokkaTaskPartial.class) {
dokkaSourceSets.configureEach {
externalDocumentationLink {
url.set(new URL('http://reactivex.io/RxJava/3.x/javadoc/'))
url.set(new URL('https://reactivex.io/RxJava/3.x/javadoc/'))
packageListUrl.set(projectDir.toPath().resolve("package.list").toUri().toURL())
}
}
Expand Down
46 changes: 25 additions & 21 deletions reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
* function immediately resumes with [CancellationException] and disposes of its subscription.
*/
@Suppress("UNCHECKED_CAST")
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T> {
public suspend fun <T> MaybeSource<T & Any>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T & Any> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(null) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onSuccess(t: T & Any) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
}
Expand All @@ -61,7 +60,7 @@ public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellab
*
* @throws NoSuchElementException if no elements were produced by this [MaybeSource].
*/
public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
public suspend fun <T> MaybeSource<T & Any>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()

/**
* Awaits for completion of the maybe without blocking a thread.
Expand All @@ -84,7 +83,7 @@ public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?:
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("this.awaitSingleOrNull()")
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
public suspend fun <T> MaybeSource<T & Any>.await(): T? = awaitSingleOrNull()

/**
* Awaits for completion of the maybe without blocking a thread.
Expand All @@ -107,7 +106,7 @@ public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
public suspend fun <T> MaybeSource<T & Any>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default

// ------------------------ SingleSource ------------------------

Expand All @@ -119,10 +118,10 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
public suspend fun <T> SingleSource<T & Any>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T & Any> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onSuccess(t: T & Any) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
}
Expand All @@ -139,7 +138,8 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine
*
* @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitFirst(): T = awaitOne(Mode.FIRST) as T

/**
* Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
Expand All @@ -150,7 +150,9 @@ public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrDefault(default: T): T =
awaitOne(Mode.FIRST_OR_DEFAULT, default) as T

/**
* Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
Expand All @@ -161,7 +163,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T =
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)

/**
* Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
Expand All @@ -172,7 +174,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mod
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrElse(defaultValue: () -> T): T =
awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()

/**
Expand All @@ -185,7 +187,8 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () ->
*
* @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitLast(): T = awaitOne(Mode.LAST) as T

/**
* Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
Expand All @@ -198,26 +201,27 @@ public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
* @throws NoSuchElementException if the observable does not emit any value
* @throws IllegalArgumentException if the observable emits more than one value
*/
public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
@Suppress("UNCHECKED_CAST")
public suspend fun <T> ObservableSource<T & Any>.awaitSingle(): T = awaitOne(Mode.SINGLE) as T

// ------------------------ private ------------------------

internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
invokeOnCancellation { d.dispose() }

private enum class Mode(val s: String) {
private enum class Mode(@JvmField val s: String) {
FIRST("awaitFirst"),
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
LAST("awaitLast"),
SINGLE("awaitSingle");
override fun toString(): String = s
}

private suspend fun <T> ObservableSource<T>.awaitOne(
private suspend fun <T> ObservableSource<T & Any>.awaitOne(
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
subscribe(object : Observer<T> {
): T? = suspendCancellableCoroutine { cont ->
subscribe(object : Observer<T & Any> {
private lateinit var subscription: Disposable
private var value: T? = null
private var seenValue = false
Expand All @@ -227,7 +231,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
cont.invokeOnCancellation { sub.dispose() }
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
Expand Down
15 changes: 7 additions & 8 deletions reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlinx.coroutines.flow.*
* [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
*/
@PublishedApi
internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
internal fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
return channel
Expand All @@ -33,7 +33,7 @@ internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
* [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
*/
@PublishedApi
internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
internal fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
return channel
Expand All @@ -45,7 +45,7 @@ internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
* If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
* [collect].
*/
public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
public suspend inline fun <T> MaybeSource<T & Any>.collect(action: (T) -> Unit): Unit =
openSubscription().consumeEach(action)

/**
Expand All @@ -54,12 +54,11 @@ public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit
* If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
* [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
*/
public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
openSubscription().consumeEach(action)
public suspend inline fun <T> ObservableSource<T & Any>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action)

@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
private class SubscriptionChannel<T> :
LinkedListChannel<T>(null), Observer<T>, MaybeObserver<T>
LinkedListChannel<T>(null), Observer<T & Any>, MaybeObserver<T & Any>
{
private val _subscription = atomic<Disposable?>(null)

Expand All @@ -73,12 +72,12 @@ private class SubscriptionChannel<T> :
_subscription.value = sub
}

override fun onSuccess(t: T) {
override fun onSuccess(t: T & Any) {
trySend(t)
close(cause = null)
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}

Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet
*
* @param context -- the coroutine context from which the resulting maybe is going to be signalled
*/
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T & Any> = rxMaybe(context) {
this@asMaybe.await()
}

Expand Down
6 changes: 3 additions & 3 deletions reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import kotlin.coroutines.*
public fun <T> rxMaybe(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
): Maybe<T> {
): Maybe<T & Any> {
require(context[Job] === null) { "Maybe context cannot contain job in it." +
"Its lifecycle should be managed via Disposable handle. Had $context" }
return rxMaybeInternal(GlobalScope, context, block)
Expand All @@ -30,7 +30,7 @@ private fun <T> rxMaybeInternal(
scope: CoroutineScope, // support for legacy rxMaybe in scope
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Maybe<T> = Maybe.create { subscriber ->
): Maybe<T & Any> = Maybe.create { subscriber ->
val newContext = scope.newCoroutineContext(context)
val coroutine = RxMaybeCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine))
Expand All @@ -39,7 +39,7 @@ private fun <T> rxMaybeInternal(

private class RxMaybeCoroutine<T>(
parentContext: CoroutineContext,
private val subscriber: MaybeEmitter<T>
private val subscriber: MaybeEmitter<T & Any>
) : AbstractCoroutine<T>(parentContext, false, true) {
override fun onCompleted(value: T) {
try {
Expand Down

0 comments on commit 61ba10d

Please sign in to comment.