Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread parking for Kotlin/Common #498

Open
wants to merge 47 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
6cd373c
Thread parking for kotlin/common
bbrockbernd Dec 18, 2024
ab7fcba
Comments
bbrockbernd Dec 19, 2024
7574618
Bugfix for jvm: use ThreadLocal.withInitial, unpark passed kThread no…
bbrockbernd Jan 17, 2025
d7318f2
Added Barriertests, and ported native parking tests to jvm sourceset.…
bbrockbernd Jan 17, 2025
352678a
Bugfix parkNanos on JVM
bbrockbernd Jan 17, 2025
c69cf64
Make barriertest not time dependent
bbrockbernd Jan 20, 2025
3bb31ad
Change modifier order back to test failing build on K1
bbrockbernd Jan 20, 2025
fc003a1
Explicitly define return types
bbrockbernd Jan 20, 2025
03f2acc
K1 compatibility
bbrockbernd Jan 20, 2025
9011bd6
Solve flaky test by giving timed parking tests more room for deviation.
bbrockbernd Jan 20, 2025
2bd1f73
Added CountDownLatch tests
bbrockbernd Jan 20, 2025
d28045d
Added CyclicBarrier synchronization primitive to further stresstest p…
bbrockbernd Jan 24, 2025
70994b6
Retry flaky timing tests
bbrockbernd Jan 24, 2025
cc2c8c9
Catch and rethrow assertion errros in multithreaded tests on jvm
bbrockbernd Jan 28, 2025
87cf4a2
Prevent potential deadlock in tests by waiting non-blocking and rethr…
bbrockbernd Jan 28, 2025
3c25e16
Bugfixes in cyclicBarrierTest
bbrockbernd Jan 28, 2025
8500459
Remove excessive printing in tests
bbrockbernd Jan 29, 2025
35101b7
Fix 32bit timedWait
bbrockbernd Jan 29, 2025
b6ad38c
Check return status of native calls
bbrockbernd Jan 30, 2025
959d5df
PR styling + memScoped
bbrockbernd Jan 31, 2025
5094853
Refactor: remove interface, make jvmparking delegator stateless. Type…
bbrockbernd Jan 31, 2025
22d9ca4
Introduce concurrentMain sourceSet to exclude js and wasm from parking.
bbrockbernd Jan 31, 2025
9adaf40
Moved tests to concurrentTest sourceSet
bbrockbernd Feb 3, 2025
cf4c2a2
Move spurious wakeup protection to parker logic
bbrockbernd Feb 4, 2025
626abd7
bugfix
bbrockbernd Feb 4, 2025
8dbe18e
Double check state before waiting
bbrockbernd Feb 5, 2025
67f3632
Comments and cleanup
bbrockbernd Feb 5, 2025
b44ca05
Fix problem with concurrent unpark calls.
bbrockbernd Feb 6, 2025
98a5033
Expanded test cases with exchanger primitive
bbrockbernd Feb 6, 2025
c187229
PR Review comments: internal what needs to be internal, refactor sour…
bbrockbernd Feb 10, 2025
d6b3bdc
apiDump..
bbrockbernd Feb 10, 2025
ab70bc9
Retrigger teamcity
bbrockbernd Feb 11, 2025
c317413
Add unsafeNumber annotations for androidNative.
bbrockbernd Feb 11, 2025
932bffa
Possible fix for failing linux build after commonizing 32 and 64 bit …
bbrockbernd Feb 11, 2025
f9cca39
Possible fix for failing linux build after commonizing 32 and 64 bit …
bbrockbernd Feb 11, 2025
9e9aa07
Add unsafe number classwide to linux sourceSet
bbrockbernd Feb 11, 2025
79761be
Remove concurrent unpark protection
bbrockbernd Feb 12, 2025
eec07bc
Test CI fix
bbrockbernd Feb 13, 2025
7ce62bd
Merge linux and apple sourceSets. androidNative and windows don't com…
bbrockbernd Feb 13, 2025
29add7b
Fix overflow issues. Max waiting time is capped at int.max (for apple…
bbrockbernd Feb 13, 2025
8a2335c
Gradle build file formatting
bbrockbernd Feb 13, 2025
f1a2317
Additional timed tests for overflowing deadlines
bbrockbernd Feb 19, 2025
9eaa280
Improved time arithmetic by using overloading for ints and longs.
bbrockbernd Feb 21, 2025
60e56a6
PR review and styling suggestions
bbrockbernd Feb 24, 2025
7935e64
Report errno on native call error.
bbrockbernd Feb 24, 2025
35793c5
Move spurious wakeup protection to tests.
bbrockbernd Mar 24, 2025
a9927b9
Cleanup
bbrockbernd Mar 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions atomicfu/api/atomicfu.api
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,25 @@ public final class kotlinx/atomicfu/TraceKt {
public static final fun named (Lkotlinx/atomicfu/TraceBase;Ljava/lang/String;)Lkotlinx/atomicfu/TraceBase;
}

public final class kotlinx/atomicfu/parking/KThread {
public static final field Companion Lkotlinx/atomicfu/parking/KThread$Companion;
}

public final class kotlinx/atomicfu/parking/KThread$Companion {
public final fun currentThread ()Lkotlinx/atomicfu/parking/KThread;
}

public final class kotlinx/atomicfu/parking/KThreadKt {
public static final fun currentThreadId ()J
}

public final class kotlinx/atomicfu/parking/Parker {
public static final field Companion Lkotlinx/atomicfu/parking/Parker$Companion;
}

public final class kotlinx/atomicfu/parking/Parker$Companion {
public final fun park ()V
public final fun parkNanos (J)V
public final fun unpark (Lkotlinx/atomicfu/parking/KThread;)V
}

85 changes: 51 additions & 34 deletions atomicfu/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import org.gradle.internal.impldep.org.junit.experimental.categories.Categories.CategoryFilter.exclude
import org.jetbrains.kotlin.gradle.ExperimentalKotlinGradlePluginApi
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
import org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTarget
Expand Down Expand Up @@ -93,6 +94,12 @@ kotlin {
implementation(libs.junit.junit)
}
}

val concurrentMain by creating { dependsOn(commonMain.get()) }
val concurrentTest by creating { dependsOn(commonTest.get()) }

val jvmMain by getting { dependsOn(concurrentMain) }
val jvmTest by getting { dependsOn(concurrentTest) }
}
}

Expand Down Expand Up @@ -128,50 +135,60 @@ kotlin {
linuxArm32Hfp()

@OptIn(ExperimentalKotlinGradlePluginApi::class)
applyDefaultHierarchyTemplate {
group("native") {
group("nativeUnixLike") {
withLinux()
withApple()
}
}
group("androidNative32Bit") {
withAndroidNativeX86()
withCompilations { compilation ->
(compilation.target as? KotlinNativeTarget)?.konanTarget?.name == "android_arm32"
}
}
group("androidNative64Bit") {
withAndroidNativeArm64()
withAndroidNativeX64()
}

}
applyDefaultHierarchyTemplate()

sourceSets {
val nativeUnixLikeMain by getting {
kotlin.srcDir("src/nativeUnixLikeMain/kotlin")
dependsOn(nativeMain.get())

val concurrentMain by getting {}
val concurrentTest by getting {}
val nativeMain by getting { dependsOn(concurrentMain) }
val nativeTest by getting { dependsOn(concurrentTest) }

val nativeUnixLikeMain by creating { dependsOn(nativeMain) }


val androidNative64BitMain by creating { dependsOn(nativeMain) }
androidNative64BitMain.also {
androidNativeArm64Main.get().dependsOn(it)
androidNativeX64Main.get().dependsOn(it)
}

val androidNative32BitMain by creating { dependsOn(nativeMain) }
androidNative32BitMain.let {
androidNativeArm32Main.get().dependsOn(it)
androidNativeX86Main.get().dependsOn(it)
}

val androidNative32BitMain by getting {
kotlin.srcDir("src/androidNative32BitMain/kotlin")
dependsOn(nativeMain.get())
val linux64Main by creating { dependsOn(nativeUnixLikeMain) }
linux64Main.let {
linuxX64Main.get().dependsOn(it)
linuxArm64Main.get().dependsOn(it)
}

val androidNative64BitMain by getting {
kotlin.srcDir("src/androidNative64BitMain/kotlin")
dependsOn(nativeMain.get())
val linux32Main by creating { dependsOn(nativeUnixLikeMain) }
linux32Main.let {
linuxArm32HfpMain.get().dependsOn(it)
}

val androidNative32BitTest by getting {
kotlin.srcDir("src/androidNative32BitTest/kotlin")
dependsOn(nativeTest.get())
val apple64Main by creating { dependsOn(nativeUnixLikeMain) }
apple64Main.let {
watchosDeviceArm64Main.get().dependsOn(it)
iosArm64Main.get().dependsOn(it)
tvosArm64Main.get().dependsOn(it)
tvosX64Main.get().dependsOn(it)
tvosSimulatorArm64Main.get().dependsOn(it)
watchosX64Main.get().dependsOn(it)
watchosSimulatorArm64Main.get().dependsOn(it)
iosX64Main.get().dependsOn(it)
iosSimulatorArm64Main.get().dependsOn(it)
macosX64Main.get().dependsOn(it)
macosArm64Main.get().dependsOn(it)
}

val androidNative64BitTest by getting {
kotlin.srcDir("src/androidNative64BitTest/kotlin")
dependsOn(nativeTest.get())
val apple32Main by creating { dependsOn(nativeUnixLikeMain) }
apple32Main.let {
watchosArm32Main.get().dependsOn(it)
watchosArm64Main.get().dependsOn(it) // Uses Int for timespec
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package kotlinx.atomicfu.parking

import kotlinx.cinterop.*
import kotlinx.cinterop.alloc
import kotlinx.cinterop.pointed
import kotlinx.cinterop.ptr
import platform.posix.*

@OptIn(ExperimentalForeignApi::class)
internal actual object ParkingDelegator {
actual fun createRef(): ParkingData {
val mut = nativeHeap.alloc<pthread_mutex_t>().ptr
val cond = nativeHeap.alloc<pthread_cond_t>().ptr
callAndVerifyNative(0) { pthread_mutex_init(mut, null) }
callAndVerifyNative(0) { pthread_cond_init(cond, null) }
return ParkingData(mut, cond)
}

actual fun wait(ref: ParkingData, shouldWait: () -> Boolean){
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
if (shouldWait()) callAndVerifyNative(0) { pthread_cond_wait(ref.cond, ref.mut) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}

actual fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped {
val ts = alloc<timespec>().ptr

// Add nanos to current time
clock_gettime(CLOCK_REALTIME.toInt(), ts)
ts.pointed.tv_sec += nanos.toInt() / 1_000_000_000
ts.pointed.tv_nsec += nanos.toInt() % 1_000_000_000

//Fix overflow
if (ts.pointed.tv_nsec >= 1_000_000_000) {
ts.pointed.tv_sec += 1
ts.pointed.tv_nsec -= 1_000_000_000
}
var rc = 0
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
if (shouldWait()) callAndVerifyNative(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}

actual fun wake(ref: ParkingData) {
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
callAndVerifyNative(0) { pthread_cond_signal(ref.cond) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}


actual fun destroyRef(ref: ParkingData) {
callAndVerifyNative(0) { pthread_mutex_destroy(ref.mut) }
callAndVerifyNative(0) { pthread_cond_destroy(ref.cond) }
nativeHeap.free(ref.mut)
nativeHeap.free(ref.cond)
}

}
internal actual class ParkingData(val mut: CPointer<pthread_mutex_t>, val cond: CPointer<pthread_cond_t>)
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package kotlinx.atomicfu.parking

import kotlinx.cinterop.*
import kotlinx.cinterop.alloc
import kotlinx.cinterop.pointed
import kotlinx.cinterop.ptr
import platform.posix.*

@OptIn(ExperimentalForeignApi::class)
internal actual object ParkingDelegator {
actual fun createRef(): ParkingData {
val mut = nativeHeap.alloc<pthread_mutex_t>().ptr
val cond = nativeHeap.alloc<pthread_cond_t>().ptr
callAndVerifyNative(0) { pthread_mutex_init(mut, null) }
callAndVerifyNative(0) { pthread_cond_init(cond, null) }
return ParkingData(mut, cond)
}

actual fun wait(ref: ParkingData, shouldWait: () -> Boolean) {
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
if (shouldWait()) callAndVerifyNative(0) { pthread_cond_wait(ref.cond, ref.mut) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}

actual fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped {
val ts = alloc<timespec>().ptr

// Add nanos to current time
clock_gettime(CLOCK_REALTIME.toInt(), ts)
ts.pointed.tv_sec += nanos / 1_000_000_000
ts.pointed.tv_nsec += nanos % 1_000_000_000

//Fix overflow
if (ts.pointed.tv_nsec >= 1_000_000_000) {
ts.pointed.tv_sec += 1
ts.pointed.tv_nsec -= 1_000_000_000
}
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
if (shouldWait()) callAndVerifyNative(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}

actual fun wake(ref: ParkingData) {
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
callAndVerifyNative(0) { pthread_cond_signal(ref.cond) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}

actual fun destroyRef(ref: ParkingData) {
callAndVerifyNative(0) { pthread_mutex_destroy(ref.mut) }
callAndVerifyNative(0) { pthread_cond_destroy(ref.cond) }
nativeHeap.free(ref.mut)
nativeHeap.free(ref.cond)
}

}
internal actual class ParkingData(val mut: CPointer<pthread_mutex_t>, val cond: CPointer<pthread_cond_t>)
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package kotlinx.atomicfu.parking

import kotlinx.cinterop.*
import kotlinx.cinterop.alloc
import kotlinx.cinterop.pointed
import kotlinx.cinterop.ptr
import platform.posix.*

@OptIn(ExperimentalForeignApi::class)
internal actual object ParkingDelegator {
actual fun createRef(): ParkingData {
val mut = nativeHeap.alloc<pthread_mutex_t>().ptr
val cond = nativeHeap.alloc<pthread_cond_t>().ptr
callAndVerifyNative(0) { pthread_mutex_init(mut, null) }
callAndVerifyNative(0) { pthread_cond_init(cond, null) }
return ParkingData(mut, cond)
}

actual fun wait(ref: ParkingData, shouldWait: () -> Boolean){
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
if (shouldWait()) callAndVerifyNative(0) { pthread_cond_wait(ref.cond, ref.mut) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}

actual fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped {
val ts = alloc<timespec>().ptr

// Add nanos to current time
clock_gettime(CLOCK_REALTIME.toUInt(), ts)
ts.pointed.tv_sec += nanos.toInt() / 1_000_000_000
ts.pointed.tv_nsec += nanos.toInt() % 1_000_000_000

//Fix overflow
if (ts.pointed.tv_nsec >= 1_000_000_000) {
ts.pointed.tv_sec += 1
ts.pointed.tv_nsec -= 1_000_000_000
}
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
if (shouldWait()) callAndVerifyNative(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}

actual fun wake(ref: ParkingData) {
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
callAndVerifyNative(0) { pthread_cond_signal(ref.cond) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}


actual fun destroyRef(ref: ParkingData) {
callAndVerifyNative(0) { pthread_mutex_destroy(ref.mut) }
callAndVerifyNative(0) { pthread_cond_destroy(ref.cond) }
nativeHeap.free(ref.mut)
nativeHeap.free(ref.cond)
}

}
internal actual class ParkingData(val mut: CPointer<pthread_mutex_t>, val cond: CPointer<pthread_cond_t>)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package kotlinx.atomicfu.parking

import kotlinx.atomicfu.AtomicBoolean
import kotlinx.atomicfu.atomic
import kotlinx.cinterop.*
import kotlinx.cinterop.alloc
import kotlinx.cinterop.pointed
import kotlinx.cinterop.ptr
import platform.posix.*
import kotlin.toUInt

@OptIn(ExperimentalForeignApi::class)
internal actual object ParkingDelegator {
actual fun createRef(): ParkingData {
val mut = nativeHeap.alloc<pthread_mutex_t>().ptr
val cond = nativeHeap.alloc<pthread_cond_t>().ptr
callAndVerifyNative(0) { pthread_mutex_init(mut, null) }
callAndVerifyNative(0) { pthread_cond_init(cond, null) }
return ParkingData(mut, cond)
}

actual fun wait(ref: ParkingData, shouldWait: () -> Boolean) {
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
if (shouldWait()) callAndVerifyNative(0) { pthread_cond_wait(ref.cond, ref.mut) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}

actual fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped {
val ts = alloc<timespec>().ptr

// Add nanos to current time
clock_gettime(CLOCK_REALTIME.toUInt(), ts)
ts.pointed.tv_sec += nanos / 1_000_000_000
ts.pointed.tv_nsec += nanos % 1_000_000_000

//Fix overflow
if (ts.pointed.tv_nsec >= 1_000_000_000) {
ts.pointed.tv_sec += 1
ts.pointed.tv_nsec -= 1_000_000_000
}
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
if (shouldWait()) callAndVerifyNative(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}

actual fun wake(ref: ParkingData) {
callAndVerifyNative(0) { pthread_mutex_lock(ref.mut) }
callAndVerifyNative(0) { pthread_cond_signal(ref.cond) }
callAndVerifyNative(0) { pthread_mutex_unlock(ref.mut) }
}

actual fun destroyRef(ref: ParkingData) {
callAndVerifyNative(0) { pthread_mutex_destroy(ref.mut) }
callAndVerifyNative(0) { pthread_cond_destroy(ref.cond) }
nativeHeap.free(ref.mut)
nativeHeap.free(ref.cond)
}

}
internal actual class ParkingData(val mut: CPointer<pthread_mutex_t>, val cond: CPointer<pthread_cond_t>)
Loading