Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow size- and time-based chunked #2378

Open
wants to merge 28 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
74d3d60
Merge pull request #1 from Kotlin/master
circusmagnus Mar 18, 2020
c0bf01b
Merge pull request #2 from Kotlin/develop
circusmagnus Mar 18, 2020
ccb76c8
Merge pull request #3 from Kotlin/master
circusmagnus Nov 3, 2020
d52fd69
Merge pull request #4 from Kotlin/master
circusmagnus Nov 6, 2020
6fb01b9
Add time- and size-based chunking operators
circusmagnus Nov 10, 2020
43bfcfb
Remove unused operators
circusmagnus Nov 10, 2020
c378678
Add visibility modifiers and clarify tests
circusmagnus Nov 10, 2020
cfbd8ea
Merge pull request #5 from Kotlin/master
circusmagnus Dec 23, 2020
1c98a45
Merge remote-tracking branch 'origin/master' into flow-time-based-chu…
circusmagnus Dec 23, 2020
5237f92
Chunk with interval and size only
circusmagnus Dec 23, 2020
8b8b28e
Chunk with interval and size only - part 2
circusmagnus Jan 8, 2021
c2a4eac
Merge pull request #6 from Kotlin/master
circusmagnus Mar 29, 2021
5c5c088
Add time- and size-based chunking operators
circusmagnus Nov 10, 2020
e04a106
Remove unused operators
circusmagnus Nov 10, 2020
942b163
Add visibility modifiers and clarify tests
circusmagnus Nov 10, 2020
a12429e
Chunk with interval and size only
circusmagnus Dec 23, 2020
da1a57c
Chunk with interval and size only - part 2
circusmagnus Jan 8, 2021
632d540
Prepare Chunking Methods
circusmagnus Mar 29, 2021
c3244ff
Add a bunch of tests
circusmagnus Mar 31, 2021
5b5c3bd
Test Time based chunking
circusmagnus Apr 2, 2021
2b9e5d1
Add docs and last tests
circusmagnus Apr 12, 2021
9cb86f9
Add test for error propagation in Natural Chunking
circusmagnus Apr 14, 2021
d996a9b
Enable for JDK 1.6
circusmagnus Apr 14, 2021
b16e9b0
Merge remote-tracking branch 'origin/flow-time-based-chunked' into fl…
circusmagnus Apr 14, 2021
3aaf7bd
Merge pull request #7 from Kotlin/develop
circusmagnus Apr 14, 2021
e795cc2
Merge remote-tracking branch 'origin/develop' into flow-time-based-ch…
circusmagnus Apr 14, 2021
3fb6939
Adjust for changes in Channel API
circusmagnus Apr 15, 2021
7431426
New Api dump
circusmagnus Apr 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 12 additions & 12 deletions README.md
Expand Up @@ -7,7 +7,7 @@
[![Slack channel](https://img.shields.io/badge/chat-slack-green.svg?logo=slack)](https://kotlinlang.slack.com/messages/coroutines/)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has been apparently changed on master but not on develop - with PR: Improve readability. #2563


Library support for Kotlin coroutines with [multiplatform](#multiplatform) support.
This is a companion version for Kotlin `1.4.30` release.
This is a companion version for the Kotlin `1.4.30` release.

```kotlin
suspend fun main() = coroutineScope {
Expand Down Expand Up @@ -75,7 +75,7 @@ suspend fun main() = coroutineScope {

## Using in your projects

The libraries are published to [kotlinx](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines) bintray repository,
The libraries are published to [kotlinx](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines) Bintray repository,
linked to [JCenter](https://bintray.com/bintray/jcenter?filterByPkgName=kotlinx.coroutines) and
pushed to [Maven Central](https://search.maven.org/#search%7Cga%7C1%7Cg%3Aorg.jetbrains.kotlinx%20a%3Akotlinx-coroutines*).

Expand Down Expand Up @@ -148,16 +148,16 @@ Make sure that you have `mavenCentral()` in the list of repositories.
### Android

Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android)
module as dependency when using `kotlinx.coroutines` on Android:
module as a dependency when using `kotlinx.coroutines` on Android:

```groovy
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.3'
```

This gives you access to Android [Dispatchers.Main]
coroutine dispatcher and also makes sure that in case of crashed coroutine with unhandled exception this
exception is logged before crashing Android application, similarly to the way uncaught exceptions in
threads are handled by Android runtime.
This gives you access to the Android [Dispatchers.Main]
coroutine dispatcher and also makes sure that in case of a crashed coroutine with an unhandled exception that
this exception is logged before crashing the Android application, similarly to the way uncaught exceptions in
threads are handled by the Android runtime.

#### R8 and ProGuard

Expand All @@ -168,7 +168,7 @@ For more details see ["Optimization" section for Android](ui/kotlinx-coroutines-

The `kotlinx-coroutines-core` artifact contains a resource file that is not required for the coroutines to operate
normally and is only used by the debugger. To exclude it at no loss of functionality, add the following snippet to the
`android` block in your gradle file for the application subproject:
`android` block in your Gradle file for the application subproject:
```groovy
packagingOptions {
exclude "DebugProbesKt.bin"
Expand All @@ -180,7 +180,7 @@ packagingOptions {
Core modules of `kotlinx.coroutines` are also available for
[Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) and [Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html).

In common code that should get compiled for different platforms, you can add dependency to `kotlinx-coroutines-core` right to the `commonMain` source set:
In common code that should get compiled for different platforms, you can add a dependency to `kotlinx-coroutines-core` right to the `commonMain` source set:
```groovy
commonMain {
dependencies {
Expand All @@ -189,7 +189,7 @@ commonMain {
}
```

No more additional dependencies is needed, platform-specific artifacts will be resolved automatically via Gradle metadata available since Gradle 5.3.
No more additional dependencies are needed, platform-specific artifacts will be resolved automatically via Gradle metadata available since Gradle 5.3.

Platform-specific dependencies are recommended to be used only for non-multiplatform projects that are compiled only for target platform.

Expand All @@ -207,11 +207,11 @@ the target Kotlin/Native platform. [List of currently supported targets](https:/


Only single-threaded code (JS-style) on Kotlin/Native is supported in stable versions.
Additionally, special `-native-mt` version is released on a regular basis, for the state of multi-threaded coroutines support
Additionally, a special `-native-mt` version is released on a regular basis, for the state of multi-threaded coroutines support
please follow the [corresponding issue](https://github.com/Kotlin/kotlinx.coroutines/issues/462) for the additional details.

Since Kotlin/Native does not generally provide binary compatibility between versions,
you should use the same version of Kotlin/Native compiler as was used to build `kotlinx.coroutines`.
you should use the same version of the Kotlin/Native compiler as was used to build `kotlinx.coroutines`.

## Building and Contributing

Expand Down
15 changes: 15 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -860,6 +860,20 @@ public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/
public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/flow/ChunkingMethod {
public static final field Companion Lkotlinx/coroutines/flow/ChunkingMethod$Companion;
public abstract fun chunk (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
}

public final class kotlinx/coroutines/flow/ChunkingMethod$Companion {
public final fun BySize (I)Lkotlinx/coroutines/flow/ChunkingMethod;
public final fun ByTime (JI)Lkotlinx/coroutines/flow/ChunkingMethod;
public static synthetic fun ByTime$default (Lkotlinx/coroutines/flow/ChunkingMethod$Companion;JIILjava/lang/Object;)Lkotlinx/coroutines/flow/ChunkingMethod;
public final fun ByTimeOrSize (JI)Lkotlinx/coroutines/flow/ChunkingMethod;
public final fun Natural (I)Lkotlinx/coroutines/flow/ChunkingMethod;
public static synthetic fun Natural$default (Lkotlinx/coroutines/flow/ChunkingMethod$Companion;IILjava/lang/Object;)Lkotlinx/coroutines/flow/ChunkingMethod;
}

public abstract interface class kotlinx/coroutines/flow/Flow {
public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
Expand Down Expand Up @@ -894,6 +908,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun chunked (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/ChunkingMethod;)Lkotlinx/coroutines/flow/Flow;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/StateFlow.kt
Expand Up @@ -21,7 +21,7 @@ import kotlin.native.concurrent.*
* neither does a coroutine started by the [Flow.launchIn] function. An active collector of a state flow is called a _subscriber_.
*
* A [mutable state flow][MutableStateFlow] is created using `MutableStateFlow(value)` constructor function with
* the initial value. The value of mutable state flow can be updated by setting its [value] property.
* the initial value. The value of mutable state flow can be updated by setting its [value] property.
* Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector skips fast updates,
* but always collects the most recently emitted value.
*
Expand Down
211 changes: 211 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt
@@ -0,0 +1,211 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:JvmMultifileClass
@file:JvmName("FlowKt")

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*

/**
* Groups emissions from this Flow into lists, according to the chosen ChunkingMethod. Time based implementations
* collect upstream and emit to downstream in separate coroutines - concurrently, like Flow.buffer() operator.
* Exact timing of emissions is not guaranteed, as it depends on collector coroutine availability.
*
* Size based chunking happens in a single coroutine and is purely sequential.
*
* Emissions always preserve order.
*
* It is possible to pass custom implementation of ChunkingMethod to chunked() operator.
*
* @param method Defines constrains on chunk size and time of its emission.
*/

@ExperimentalCoroutinesApi
public fun <T> Flow<T>.chunked(method: ChunkingMethod): Flow<List<T>> = with(method) { chunk() }

@ExperimentalCoroutinesApi
public interface ChunkingMethod {
public fun <T> Flow<T>.chunk(): Flow<List<T>>

public companion object {

/**
* Collects upstream and emits to downstream in separate coroutines - as soon as possible. If consumer keeps
* up with the producer, it emits lists with single element.
*
* In case of slow consumer, it groups emissions into bigger lists. When consumer "speeds up", chunks
* will get smaller.
*
* @param maxSize Maximum size of a single chunk. If reached, producer gets suspended until consumer "consumes"
* a chunk. If maxSize is not specified, then chunk may grow indefinitely until jvm runs out of memory.
*/
@Suppress("FunctionName")
public fun Natural(maxSize: Int = Int.MAX_VALUE): ChunkingMethod = NaturalChunking(maxSize)

/**
* Collects upstream into a buffer and emits its content as a list at every interval. When upstream completes
* (or is empty), it will try to emit immediately what is left of a chunk, omitting the interval.
*
* @param intervalMs Interval between emissions in milliseconds. Every emission happens only after
* interval passes, unless upstream Flow completes sooner.
*
* @param maxSize Maximum size of a single chunk. If reached, producer gets suspended until consumer "consumes"
* a chunk. If maxSize is not specified, then chunk may grow indefinitely until jvm runs out of memory.
*/
@Suppress("FunctionName")
public fun ByTime(intervalMs: Long, maxSize: Int = Int.MAX_VALUE): ChunkingMethod =
TimeBased(intervalMs, maxSize)

/**
* Collects upstream into a buffer and emits its content as a list at every interval or when its buffer reaches
* maximum size. When upstream completes (or is empty), it will try to emit immediately what is left of
* a chunk, omitting the interval and maxSize constraints.
*
* @param intervalMs Interval between emissions in milliseconds. Every emission happens only after
* interval passes, unless upstream Flow completes sooner or maximum size of a chunk is reached.
*
* @param maxSize Maximum size of a single chunk. If reached, it will try to emit a chunk, ignoring the
* interval constraint. If so happens, time-to-next-chunk gets reset to the interval value.
*/
@Suppress("FunctionName")
public fun ByTimeOrSize(intervalMs: Long, maxSize: Int): ChunkingMethod = TimeOrSizeBased(intervalMs, maxSize)

/**
* Collects upstream into a buffer and emits its content as a list, when specified size is reached.
* This implementation is purely sequential. If concurrent upstream collection and downstream emissions are
* desired, one can use a buffer() operator after chunking
*
* @param size Exact size of emitted chunks. Only the last emission may be smaller.
*/
@Suppress("FunctionName")
public fun BySize(size: Int): ChunkingMethod = SizeBased(size)
}
}

private class NaturalChunking(private val maxSize: Int) : ChunkingMethod {

init {
requirePositive(maxSize)
}

override fun <T> Flow<T>.chunk(): Flow<List<T>> = scopedFlow { downstream ->
val upstream = buffer(maxSize).produceIn(this)

while (!upstream.isClosedForReceive) {
val chunk = upstream.awaitFirstAndDrain(maxSize)
if (chunk.isNotEmpty()) downstream.emit(chunk)
}
}
}

private class TimeBased(private val intervalMs: Long, private val maxSize: Int) : ChunkingMethod {

init {
requirePositive(intervalMs)
requirePositive(maxSize)
}

override fun <T> Flow<T>.chunk(): Flow<List<T>> = scopedFlow { downstream ->
val upstreamCollection = Job()
val upstream = produce<T>(capacity = maxSize) {
collect { element -> channel.send(element) }
upstreamCollection.complete()
}

whileSelect {
upstreamCollection.onJoin {
val chunk = upstream.drain(maxElements = maxSize)
if (chunk.isNotEmpty()) downstream.emit(chunk)
false
}

onTimeout(intervalMs) {
val chunk = upstream.drain(maxElements = maxSize)
if (chunk.isNotEmpty()) downstream.emit(chunk)
true
}
}
}
}

private class SizeBased(private val size: Int) : ChunkingMethod {

init {
requirePositive(size)
}

override fun <T> Flow<T>.chunk(): Flow<List<T>> = flow {
val accumulator = ArrayList<T>(size)
collect { element ->
accumulator.add(element)
if (accumulator.size == size) emit(accumulator.drain())
}
if (accumulator.isNotEmpty()) emit(accumulator)
}
}

private class TimeOrSizeBased(private val intervalMs: Long, private val maxSize: Int) : ChunkingMethod {

init {
requirePositive(intervalMs)
requirePositive(maxSize)
}

override fun <T> Flow<T>.chunk(): Flow<List<T>> = scopedFlow { downstream ->
val emitNowAndMaybeContinue = Channel<Boolean>(capacity = Channel.RENDEZVOUS)
val elements = produce<T>(capacity = maxSize) {
collect { element ->
val hasCapacity = channel.trySend(element).isSuccess
if (!hasCapacity) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think about size instead of maxSize like SizedBased? when channel reached to size it should be emit

emitNowAndMaybeContinue.send(true)
channel.send(element)
}
}
emitNowAndMaybeContinue.send(false)
}

whileSelect {
emitNowAndMaybeContinue.onReceive { shouldContinue ->
val chunk = elements.drain(maxElements = maxSize)
if (chunk.isNotEmpty()) downstream.emit(chunk)
shouldContinue
}

onTimeout(intervalMs) {
val chunk = elements.drain(maxElements = maxSize)
if (chunk.isNotEmpty()) downstream.emit(chunk)
true
}
}
}

}

private suspend fun <T> ReceiveChannel<T>.awaitFirstAndDrain(maxElements: Int): List<T> = try {
val first = receive()
drain(mutableListOf(first), maxElements)
} catch (e: ClosedReceiveChannelException) {
emptyList()
}


private tailrec fun <T> ReceiveChannel<T>.drain(acc: MutableList<T> = mutableListOf(), maxElements: Int): List<T> =
if (acc.size == maxElements) acc
else {
val nextValue = tryReceive().getOrElse { error: Throwable? -> error?.let { throw(it) } ?: return acc }
acc.add(nextValue)
drain(acc, maxElements)
}

private fun <T> MutableList<T>.drain() = toList().also { this.clear() }

private fun requirePositive(size: Int) = require(size > 0)

private fun requirePositive(intervalMs: Long) = require(intervalMs > 0)