Skip to content

Commit

Permalink
Promote @FlowPreview API to stable/experimental (#3548)
Browse files Browse the repository at this point in the history
* Promote @FlowPreview-marked API to stable
* Promote @FlowPreview to @experimental where appropriate
* Delay-related operators are not promoted as they are clearly underdesigned
* Promote AbstractFlow
* Promote flat* related operators to gather more feedback and to work on them incrementally later in 1.7+

Fixes #3542
Fixes #3097
  • Loading branch information
qwwdfsad committed Jan 13, 2023
1 parent 67e21b2 commit c3b7b20
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 10 deletions.
2 changes: 0 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit
/**
* Creates a _cold_ flow that produces a single value from the given functional type.
*/
@FlowPreview
public fun <T> (() -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}
Expand All @@ -80,7 +79,6 @@ public fun <T> (() -> T).asFlow(): Flow<T> = flow {
* fun remoteCallFlow(): Flow<R> = ::remoteCall.asFlow()
* ```
*/
@FlowPreview
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
* default and to control what happens when data is produced faster than it is consumed,
* that is to control backpressure behavior.
*/
@FlowPreview
public fun <T> Flow<T>.produceIn(
scope: CoroutineScope
): ReceiveChannel<T> =
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/Flow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public interface Flow<out T> {
* }
* ```
*/
@FlowPreview
@ExperimentalCoroutinesApi
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

public final override suspend fun collect(collector: FlowCollector<T>) {
Expand Down
13 changes: 8 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
* Name of the property that defines the value of [DEFAULT_CONCURRENCY].
* This is a preview API and can be changed in a backwards-incompatible manner within a single release.
*/
@FlowPreview
public const val DEFAULT_CONCURRENCY_PROPERTY_NAME: String = "kotlinx.coroutines.flow.defaultConcurrency"

/**
* Default concurrency limit that is used by [flattenMerge] and [flatMapMerge] operators.
* It is 16 by default and can be changed on JVM using [DEFAULT_CONCURRENCY_PROPERTY_NAME] property.
* This is a preview API and can be changed in a backwards-incompatible manner within a single release.
*/
@FlowPreview
public val DEFAULT_CONCURRENCY: Int = systemProp(DEFAULT_CONCURRENCY_PROPERTY_NAME,
public val DEFAULT_CONCURRENCY: Int = systemProp(
DEFAULT_CONCURRENCY_PROPERTY_NAME,
16, 1, Int.MAX_VALUE
)

Expand All @@ -39,7 +42,7 @@ public val DEFAULT_CONCURRENCY: Int = systemProp(DEFAULT_CONCURRENCY_PROPERTY_NA
* Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
*/
@FlowPreview
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
map(transform).flattenConcat()

Expand All @@ -63,7 +66,7 @@ public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>
* @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
* at the same time. By default, it is equal to [DEFAULT_CONCURRENCY].
*/
@FlowPreview
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
Expand All @@ -75,7 +78,7 @@ public fun <T, R> Flow<T>.flatMapMerge(
*
* Inner flows are collected by this operator *sequentially*.
*/
@FlowPreview
@ExperimentalCoroutinesApi
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value -> emitAll(value) }
}
Expand Down Expand Up @@ -132,7 +135,7 @@ public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge(
* @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
* at the same time. By default, it is equal to [DEFAULT_CONCURRENCY].
*/
@FlowPreview
@ExperimentalCoroutinesApi
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
Expand Down
1 change: 0 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Migration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public fun <T : Any> Publisher<T>.asFlowDeprecated(): Flow<T> = asFlow()
public fun <T : Any> Flow<T>.asPublisherDeprecated(): Publisher<T> = asPublisher()

/** @suppress */
@FlowPreview
@Deprecated(
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
level = DeprecationLevel.HIDDEN,
Expand Down

0 comments on commit c3b7b20

Please sign in to comment.