Skip to content

Publisher.asFlow() sometimes fails to propagate thread context elements #4422

Open
@dkhalanskyjb

Description

@dkhalanskyjb

This issue was discovered analytically, by reading the source code. No one reported it yet.

val threadLocal = ThreadLocal<String>.withInitial { "default" }

@Test
fun testFlowOnThreadContext() = runBlocking {
    val publisher = Publisher<Int> { it ->
        it.onSubscribe(object : Subscription {
            override fun request(n: Long) {
                assertEquals("value", threadLocal.get())
                it.onNext(1)
                it.onComplete()
            }

            override fun cancel() {}
        })
    }

    val context1 = Dispatchers.IO + threadLocal.asContextElement("value")
    val context2 = threadLocal.asContextElement("value")

    // succeeds
    publisher.asFlow().flowOn(context1).collect { }
    // fails with org.junit.ComparisonFailure: expected:<[value]> but was:<[default]>
    publisher.asFlow().flowOn(context2).collect { }
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions