Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promote @FlowPreview API to stable/experimental #3548

Merged
merged 2 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit
/**
* Creates a _cold_ flow that produces a single value from the given functional type.
*/
@FlowPreview
public fun <T> (() -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}
Expand All @@ -80,7 +79,6 @@ public fun <T> (() -> T).asFlow(): Flow<T> = flow {
* fun remoteCallFlow(): Flow<R> = ::remoteCall.asFlow()
* ```
*/
@FlowPreview
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
* default and to control what happens when data is produced faster than it is consumed,
* that is to control backpressure behavior.
*/
@FlowPreview
public fun <T> Flow<T>.produceIn(
scope: CoroutineScope
): ReceiveChannel<T> =
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/Flow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public interface Flow<out T> {
* }
* ```
*/
@FlowPreview
@ExperimentalCoroutinesApi
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

public final override suspend fun collect(collector: FlowCollector<T>) {
Expand Down
13 changes: 8 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
* Name of the property that defines the value of [DEFAULT_CONCURRENCY].
* This is a preview API and can be changed in a backwards-incompatible manner within a single release.
*/
@FlowPreview
public const val DEFAULT_CONCURRENCY_PROPERTY_NAME: String = "kotlinx.coroutines.flow.defaultConcurrency"

/**
* Default concurrency limit that is used by [flattenMerge] and [flatMapMerge] operators.
* It is 16 by default and can be changed on JVM using [DEFAULT_CONCURRENCY_PROPERTY_NAME] property.
* This is a preview API and can be changed in a backwards-incompatible manner within a single release.
*/
@FlowPreview
public val DEFAULT_CONCURRENCY: Int = systemProp(DEFAULT_CONCURRENCY_PROPERTY_NAME,
public val DEFAULT_CONCURRENCY: Int = systemProp(
DEFAULT_CONCURRENCY_PROPERTY_NAME,
16, 1, Int.MAX_VALUE
)

Expand All @@ -39,7 +42,7 @@ public val DEFAULT_CONCURRENCY: Int = systemProp(DEFAULT_CONCURRENCY_PROPERTY_NA
* Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
*/
@FlowPreview
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
map(transform).flattenConcat()

Expand All @@ -63,7 +66,7 @@ public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>
* @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
* at the same time. By default, it is equal to [DEFAULT_CONCURRENCY].
*/
@FlowPreview
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
Expand All @@ -75,7 +78,7 @@ public fun <T, R> Flow<T>.flatMapMerge(
*
* Inner flows are collected by this operator *sequentially*.
*/
@FlowPreview
@ExperimentalCoroutinesApi
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value -> emitAll(value) }
}
Expand Down Expand Up @@ -132,7 +135,7 @@ public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge(
* @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
* at the same time. By default, it is equal to [DEFAULT_CONCURRENCY].
*/
@FlowPreview
@ExperimentalCoroutinesApi
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
Expand Down
1 change: 0 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Migration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public fun <T : Any> Publisher<T>.asFlowDeprecated(): Flow<T> = asFlow()
public fun <T : Any> Flow<T>.asPublisherDeprecated(): Publisher<T> = asPublisher()

/** @suppress */
@FlowPreview
@Deprecated(
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
level = DeprecationLevel.HIDDEN,
Expand Down