Skip to content

High CPU usate in SharedFlowImpl #4030

@fvasco

Description

@fvasco

We detected high CPU usage on kotlinx.coroutines.flow.SharedFlowImpl using Java Flight Recorder on a 2 CPU machine.
The consumed CPU was two order of magnitude than others, neither other code looks causing this CPU usage.

image

JFR's thread dump:

"DefaultDispatcher-worker-1" #46 [66] daemon prio=5 os_prio=0 cpu=1192711.97ms elapsed=723096.39s tid=0x00007fe79081eb00 nid=66 waiting for monitor entry  [0x00007fe7a9bfe000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at kotlinx.coroutines.flow.SharedFlowImpl.tryTakeValue(SharedFlow.kt:783)
	- waiting to lock <0x00000007f20bf098> (a kotlinx.coroutines.flow.SharedFlowImpl)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:377)
	at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

"DefaultDispatcher-worker-6" #326 [300] daemon prio=5 os_prio=0 cpu=1192775.65ms elapsed=723053.16s tid=0x00007fe79c32e4d0 nid=300 runnable  [0x00007fe7652e3000]
   java.lang.Thread.State: RUNNABLE
	at kotlinx.coroutines.flow.SharedFlowImpl.updateCollectorIndexLocked$kotlinx_coroutines_core(SharedFlow.kt:774)
	at kotlinx.coroutines.flow.SharedFlowImpl.tryTakeValue(SharedFlow.kt:634)
	- locked <0x00000007f20bf098> (a kotlinx.coroutines.flow.SharedFlowImpl)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:377)
	at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

a bit later:

"DefaultDispatcher-worker-1" #46 [66] daemon prio=5 os_prio=0 cpu=1192715.16ms elapsed=723096.42s tid=0x00007fe79081eb00 nid=66 runnable  [0x00007fe7a9bfd000]
   java.lang.Thread.State: RUNNABLE
	at kotlinx.coroutines.scheduling.CoroutineScheduler.signalCpuWork(CoroutineScheduler.kt:439)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.dispatch(CoroutineScheduler.kt:415)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.dispatch$default(CoroutineScheduler.kt:392)
	at kotlinx.coroutines.scheduling.SchedulerCoroutineDispatcher.dispatch(Dispatcher.kt:112)
	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161)
	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:474)
	at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:590)
	at kotlinx.coroutines.selects.SelectKt.tryResume(Select.kt:842)
	at kotlinx.coroutines.selects.SelectKt.access$tryResume(Select.kt:1)
	at kotlinx.coroutines.selects.SelectImplementation.trySelectInternal(Select.kt:623)
	at kotlinx.coroutines.selects.SelectImplementation.trySelect(Select.kt:600)
	at kotlinx.coroutines.channels.BufferedChannel.tryResumeReceiver(BufferedChannel.kt:634)
	at kotlinx.coroutines.channels.BufferedChannel.updateCellSend(BufferedChannel.kt:458)
	at kotlinx.coroutines.channels.BufferedChannel.access$updateCellSend(BufferedChannel.kt:36)
	at kotlinx.coroutines.channels.BufferedChannel.send$suspendImpl(BufferedChannel.kt:3089)
	at kotlinx.coroutines.channels.BufferedChannel.send(BufferedChannel.kt)
	at kotlinx.coroutines.channels.ChannelCoroutine.send(ChannelCoroutine.kt)
	at kotlinx.coroutines.flow.internal.SendingCollector.emit(SendingCollector.kt:19)
	at com.now4real.server.backend.LsMessageUserListItemBeAdapter$consumeChannel$$inlined$filterIsInstance$1$2.emit(Emitters.kt:223)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:382)
	at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

"DefaultDispatcher-worker-6" #326 [300] daemon prio=5 os_prio=0 cpu=1192787.63ms elapsed=723053.19s tid=0x00007fe79c32e4d0 nid=300 runnable  [0x00007fe7652e3000]
   java.lang.Thread.State: RUNNABLE
	at kotlinx.coroutines.flow.SharedFlowImpl.updateCollectorIndexLocked$kotlinx_coroutines_core(SharedFlow.kt:774)
	at kotlinx.coroutines.flow.SharedFlowImpl.tryTakeValue(SharedFlow.kt:634)
	- locked <0x00000007f20bf098> (a kotlinx.coroutines.flow.SharedFlowImpl)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:377)
	at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

Unfornutately I am not able to provide a reproducer, we don't have idea how can cause this issue in our code. Secondary, this version of our server is running from Jan 8, 2024 without issue.

Our code use lesser 1% of CPU (SharedFlowImpl.emit), should we check some API usage that can cause this issue?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions