Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High CPU usate in SharedFlowImpl #4030

Open
fvasco opened this issue Feb 2, 2024 · 6 comments
Open

High CPU usate in SharedFlowImpl #4030

fvasco opened this issue Feb 2, 2024 · 6 comments

Comments

@fvasco
Copy link
Contributor

fvasco commented Feb 2, 2024

We detected high CPU usage on kotlinx.coroutines.flow.SharedFlowImpl using Java Flight Recorder on a 2 CPU machine.
The consumed CPU was two order of magnitude than others, neither other code looks causing this CPU usage.

image

JFR's thread dump:

"DefaultDispatcher-worker-1" #46 [66] daemon prio=5 os_prio=0 cpu=1192711.97ms elapsed=723096.39s tid=0x00007fe79081eb00 nid=66 waiting for monitor entry  [0x00007fe7a9bfe000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at kotlinx.coroutines.flow.SharedFlowImpl.tryTakeValue(SharedFlow.kt:783)
	- waiting to lock <0x00000007f20bf098> (a kotlinx.coroutines.flow.SharedFlowImpl)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:377)
	at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

"DefaultDispatcher-worker-6" #326 [300] daemon prio=5 os_prio=0 cpu=1192775.65ms elapsed=723053.16s tid=0x00007fe79c32e4d0 nid=300 runnable  [0x00007fe7652e3000]
   java.lang.Thread.State: RUNNABLE
	at kotlinx.coroutines.flow.SharedFlowImpl.updateCollectorIndexLocked$kotlinx_coroutines_core(SharedFlow.kt:774)
	at kotlinx.coroutines.flow.SharedFlowImpl.tryTakeValue(SharedFlow.kt:634)
	- locked <0x00000007f20bf098> (a kotlinx.coroutines.flow.SharedFlowImpl)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:377)
	at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

a bit later:

"DefaultDispatcher-worker-1" #46 [66] daemon prio=5 os_prio=0 cpu=1192715.16ms elapsed=723096.42s tid=0x00007fe79081eb00 nid=66 runnable  [0x00007fe7a9bfd000]
   java.lang.Thread.State: RUNNABLE
	at kotlinx.coroutines.scheduling.CoroutineScheduler.signalCpuWork(CoroutineScheduler.kt:439)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.dispatch(CoroutineScheduler.kt:415)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.dispatch$default(CoroutineScheduler.kt:392)
	at kotlinx.coroutines.scheduling.SchedulerCoroutineDispatcher.dispatch(Dispatcher.kt:112)
	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161)
	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:474)
	at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:590)
	at kotlinx.coroutines.selects.SelectKt.tryResume(Select.kt:842)
	at kotlinx.coroutines.selects.SelectKt.access$tryResume(Select.kt:1)
	at kotlinx.coroutines.selects.SelectImplementation.trySelectInternal(Select.kt:623)
	at kotlinx.coroutines.selects.SelectImplementation.trySelect(Select.kt:600)
	at kotlinx.coroutines.channels.BufferedChannel.tryResumeReceiver(BufferedChannel.kt:634)
	at kotlinx.coroutines.channels.BufferedChannel.updateCellSend(BufferedChannel.kt:458)
	at kotlinx.coroutines.channels.BufferedChannel.access$updateCellSend(BufferedChannel.kt:36)
	at kotlinx.coroutines.channels.BufferedChannel.send$suspendImpl(BufferedChannel.kt:3089)
	at kotlinx.coroutines.channels.BufferedChannel.send(BufferedChannel.kt)
	at kotlinx.coroutines.channels.ChannelCoroutine.send(ChannelCoroutine.kt)
	at kotlinx.coroutines.flow.internal.SendingCollector.emit(SendingCollector.kt:19)
	at com.now4real.server.backend.LsMessageUserListItemBeAdapter$consumeChannel$$inlined$filterIsInstance$1$2.emit(Emitters.kt:223)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:382)
	at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

"DefaultDispatcher-worker-6" #326 [300] daemon prio=5 os_prio=0 cpu=1192787.63ms elapsed=723053.19s tid=0x00007fe79c32e4d0 nid=300 runnable  [0x00007fe7652e3000]
   java.lang.Thread.State: RUNNABLE
	at kotlinx.coroutines.flow.SharedFlowImpl.updateCollectorIndexLocked$kotlinx_coroutines_core(SharedFlow.kt:774)
	at kotlinx.coroutines.flow.SharedFlowImpl.tryTakeValue(SharedFlow.kt:634)
	- locked <0x00000007f20bf098> (a kotlinx.coroutines.flow.SharedFlowImpl)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:377)
	at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

Unfornutately I am not able to provide a reproducer, we don't have idea how can cause this issue in our code. Secondary, this version of our server is running from Jan 8, 2024 without issue.

Our code use lesser 1% of CPU (SharedFlowImpl.emit), should we check some API usage that can cause this issue?

@fvasco
Copy link
Contributor Author

fvasco commented Feb 5, 2024

Hi @qwwdfsad
I sent you JFR recording privately via email for more details:

$ sha256sum recording.zip 
c2224744906842a45f71dc588481100f74b1bf014fad02bd93d6af2b288a3c69  recording.zip

@qwwdfsad
Copy link
Member

qwwdfsad commented Feb 5, 2024

Thanks!

I'm on vacation right now, so expect a bit of radio silence from me; I've got what you sent and will return to it later

@fvasco
Copy link
Contributor Author

fvasco commented Feb 6, 2024

Thank you @qwwdfsad
I found some other details in another JFR, I sent you for further reference:

$ sha256sum recording-0e09ce990f4cb86c2-regular.zip
e13570bcc543eb8803279089b330120c8b607211dea2dc6c164aeb3f84e9f338  recording-0e09ce990f4cb86c2-regular.zip

This time, the profiler points to the updateCollectorIndexLocked method.

Happy holiday

@fvasco
Copy link
Contributor Author

fvasco commented Feb 7, 2024

I tried to create a reproducer.

The code is:

fun main() {
    runBlocking {
        val consumerCount = 1_000
        val messageCount = 1000

        repeat(50) {
            val mutableStateFlow = MutableSharedFlow<Int>()
            val sharedFlow = mutableStateFlow.asSharedFlow()
            val nanos: Long
            coroutineScope {
                repeat(consumerCount) {
                    launch(start = CoroutineStart.UNDISPATCHED) {
                        val channel = sharedFlow.produceIn(this)
                        repeat(messageCount) { channel.receive() }
                        cancel()
                    }
                }

                delay(1.seconds)
                nanos = System.nanoTime()
                launch(Dispatchers.Default) {
                    repeat(messageCount) { mutableStateFlow.emit(it) }
                }
            }

            val delta = System.nanoTime() - nanos
            println(NumberFormat.getIntegerInstance().format(delta / consumerCount / messageCount))
        }
    }
}

I did some benchmark with different counts

val consumerCount = 1_000
val messageCount = 100
824
794
819
823

val consumerCount = 10_000
val messageCount = 100
11.109
11.145
11.019

val consumerCount = 1_000
val messageCount = 1_000
773
784
758

val consumerCount = 10_000
val messageCount = 1_000
6.010
10.349
10.427
10.556

val consumerCount = 10_000
val messageCount = 10_000
10.623

I attach the JFR
reproducer.zip

Maybe a large number of subscribers can cause this behavior, so these measures can be normal.
At same time, performance changes greatly depending on subscribers.

@fvasco
Copy link
Contributor Author

fvasco commented Feb 9, 2024

I confirm that our issue was caused by a code similar to my reproducer, we updated our code to reduce subscribers count.

If the above benchmark are OK for you, feel free to close this issue.

@qwwdfsad qwwdfsad added performance and removed bug labels Feb 15, 2024
@qwwdfsad
Copy link
Member

qwwdfsad commented Feb 15, 2024

Thanks for the self-contained reproducer and all the profiles, it made my investigation so much easier 🙇

You hit the weakest spot of the SharedFlow collector algorithm -- unfortunately, a single collect scales linearly with the number of collectors existing, which makes it quadratic for any reasonable use-case (each collector scales linearly -> the total CPU burnt is quadratic).

I have a draft idea of how to fix it -- for each unique update (value/index/version) we can fallback to concurrent helping for the linear part (which still might be quadratic if you are unlucky enough and all collectors get OS-scheduled at the same time), but should be much better and eliminate the issue for a single-threaded usages.
Yet it requires a proper investigation and thoughtful testing. I'll keep the issue open, as it's clearly a performance bottleneck

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants