diff --git a/src/ConcurrentUtilities.jl b/src/ConcurrentUtilities.jl index 46825dd..d805ca6 100644 --- a/src/ConcurrentUtilities.jl +++ b/src/ConcurrentUtilities.jl @@ -1,388 +1,17 @@ module ConcurrentUtilities -export Lockable, OrderedSynchronizer, reset!, ReadWriteLock, readlock, readunlock, @wkspawn, ConcurrentStack, - Workers, remote_eval, remote_fetch, Worker, terminate!, WorkerTerminatedException +export Lockable, OrderedSynchronizer, reset!, ReadWriteLock, readlock, readunlock, @wkspawn, + Workers, remote_eval, remote_fetch, Worker, terminate!, WorkerTerminatedException, + Pool, acquire, release, drain! -include("concurrentstack.jl") include("workers.jl") using .Workers - -const WORK_QUEUE = Channel{Task}(0) -const WORKER_TASKS = Task[] - -""" - ConcurrentUtilities.@spawn expr - ConcurrentUtilities.@spawn passthroughstorage::Bool expr - -Similar to `Threads.@spawn`, schedule and execute a task (given by `expr`) -that will be run on a "background worker" (see [`ConcurrentUtilities.init`]((@ref))). - -In the 2-argument invocation, `passthroughstorage` controls whether the task-local storage of the -`current_task()` should be "passed through" to the spawned task. -""" -macro spawn(thunk) - esc(quote - tsk = @task $thunk - tsk.storage = current_task().storage - put!(ConcurrentUtilities.WORK_QUEUE, tsk) - tsk - end) -end - -""" - ConcurrentUtilities.@spawn expr - ConcurrentUtilities.@spawn passthroughstorage::Bool expr - -Similar to `Threads.@spawn`, schedule and execute a task (given by `expr`) -that will be run on a "background worker" (see [`ConcurrentUtilities.init`]((@ref))). - -In the 2-argument invocation, `passthroughstorage` controls whether the task-local storage of the -`current_task()` should be "passed through" to the spawned task. -""" -macro spawn(passthroughstorage, thunk) - esc(quote - tsk = @task $thunk - if $passthroughstorage - tsk.storage = current_task().storage - end - put!(ConcurrentUtilities.WORK_QUEUE, tsk) - tsk - end) -end - -""" - ConcurrentUtilities.init(nworkers=Threads.nthreads() - 1) - -Initialize background workers that will execute tasks spawned via -[`ConcurrentUtilities.@spawn`](@ref). If `nworkers == 1`, a single worker -will be started on thread 1 where tasks will be executed in contention -with other thread 1 work. Background worker tasks can be inspected by -looking at `ConcurrentUtilities.WORKER_TASKS`. -""" -function init(nworkers=Threads.nthreads()-1) - maxthreadid = nworkers + 1 - tids = Threads.nthreads() == 1 ? (1:1) : 2:maxthreadid - resize!(WORKER_TASKS, max(nworkers, 1)) - Threads.@threads for tid in 1:maxthreadid - if tid in tids - WORKER_TASKS[tid == 1 ? 1 : (tid - 1)] = Base.@async begin - for task in WORK_QUEUE - schedule(task) - wait(task) - end - end - end - end - return -end - -""" - Lockable(value, lock = ReentrantLock()) - -Creates a `Lockable` object that wraps `value` and -associates it with the provided `lock`. -""" -struct Lockable{T, L <: Base.AbstractLock} - value::T - lock::L -end - -Lockable(value) = Lockable(value, ReentrantLock()) - -""" - lock(f::Function, l::Lockable) - -Acquire the lock associated with `l`, execute `f` with the lock held, -and release the lock when `f` returns. `f` will receive one positional -argument: the value wrapped by `l`. If the lock is already locked by a -different task/thread, wait for it to become available. -When this function returns, the `lock` has been released, so the caller should -not attempt to `unlock` it. -""" -function Base.lock(f, l::Lockable) - lock(l.lock) do - f(l.value) - end -end - -# implement the rest of the Lock interface on Lockable -Base.islocked(l::Lockable) = islocked(l.lock) -Base.lock(l::Lockable) = lock(l.lock) -Base.trylock(l::Lockable) = trylock(l.lock) -Base.unlock(l::Lockable) = unlock(l.lock) - -""" - OrderedSynchronizer(i=1) - -A threadsafe synchronizer that allows ensuring concurrent work is done -in a specific order. The `OrderedSynchronizer` is initialized with an -integer `i` that represents the current "order" of the synchronizer. - -Work is "scheduled" by calling `put!(f, x, i)`, where `f` is a function -that will be called like `f()` when the synchronizer is at order `i`, -and will otherwise wait until other calls to `put!` have finished -to bring the synchronizer's state to `i`. Once `f()` is called, the -synchronizer's state is incremented by 1 and any waiting `put!` calls -check to see if it's their turn to execute. - -A synchronizer's state can be reset to a specific value (1 by default) -by calling `reset!(x, i)`. -""" -mutable struct OrderedSynchronizer - coordinating_task::Task - cond::Threads.Condition - i::Int -@static if VERSION < v"1.7" - closed::Threads.Atomic{Bool} -else - @atomic closed::Bool -end -end - -@static if VERSION < v"1.7" -OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, Threads.Atomic{Bool}(false)) -else -OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, false) -end - -""" - reset!(x::OrderedSynchronizer, i=1) - -Reset the state of `x` to `i`. -""" -function reset!(x::OrderedSynchronizer, i=1) - Base.@lock x.cond begin - x.i = i -@static if VERSION < v"1.7" - x.closed[] = false -else - @atomic :monotonic x.closed = false -end - end -end - -function Base.close(x::OrderedSynchronizer, excp::Exception=closed_exception()) - Base.@lock x.cond begin -@static if VERSION < v"1.7" - x.closed[] = true -else - @atomic :monotonic x.closed = true -end - Base.notify_error(x.cond, excp) - end - return -end - -@static if VERSION < v"1.7" - Base.isopen(x::OrderedSynchronizer) = !x.closed[] -else -Base.isopen(x::OrderedSynchronizer) = !(@atomic :monotonic x.closed) -end -closed_exception() = InvalidStateException("OrderedSynchronizer is closed.", :closed) - -function check_closed(x::OrderedSynchronizer) - if !isopen(x) - # if the monotonic load succeed, now do an acquire fence -@static if VERSION < v"1.7" - !x.closed[] && Base.concurrency_violation() -else - !(@atomic :acquire x.closed) && Base.concurrency_violation() -end - throw(closed_exception()) - end -end - -""" - put!(f::Function, x::OrderedSynchronizer, i::Int, incr::Int=1) - -Schedule `f` to be called when `x` is at order `i`. Note that `put!` -will block until `f` is executed. The typical usage involves something -like: - -```julia -x = OrderedSynchronizer() -@sync for i = 1:N - Threads.@spawn begin - # do some concurrent work - # once work is done, schedule synchronization - put!(x, \$i) do - # report back result of concurrent work - # won't be executed until all `i-1` calls to `put!` have already finished - end - end -end -``` - -The `incr` argument controls how much the synchronizer's state is -incremented after `f` is called. By default, `incr` is 1. -""" -function Base.put!(f, x::OrderedSynchronizer, i, incr=1) - check_closed(x) - Base.@lock x.cond begin - # wait until we're ready to execute f - while x.i != i - check_closed(x) - wait(x.cond) - end - check_closed(x) - try - f() - catch e - Base.throwto(x.coordinating_task, e) - end - x.i += incr - notify(x.cond) - end -end - -""" - ReadWriteLock() - -A threadsafe lock that allows multiple readers or a single writer. - -The read side is acquired/released via `readlock(rw)` and `readunlock(rw)`, -while the write side is acquired/released via `lock(rw)` and `unlock(rw)`. - -While a writer is active, all readers will block. Once the writer is finished, -all pending readers will be allowed to acquire the lock before the next writer. -""" -mutable struct ReadWriteLock - writelock::ReentrantLock - waitingwriter::Union{Nothing, Task} - readwait::Threads.Condition -@static if VERSION < v"1.7" - readercount::Threads.Atomic{Int} - readerwait::Threads.Atomic{Int} -else - @atomic readercount::Int - @atomic readerwait::Int -end -end - -@static if VERSION < v"1.7" -ReadWriteLock() = ReadWriteLock(ReentrantLock(), nothing, Threads.Condition(), Threads.Atomic{Int}(0), Threads.Atomic{Int}(0)) -else -ReadWriteLock() = ReadWriteLock(ReentrantLock(), nothing, Threads.Condition(), 0, 0) -end - -const MaxReaders = 1 << 30 - -function readlock(rw::ReadWriteLock) -@static if VERSION < v"1.7" - Threads.atomic_add!(rw.readercount, 1) - if rw.readercount[] < 0 - # A writer is active or pending, so we need to wait - Base.@lock rw.readwait begin - # check our condition again - if rw.readercount[] < 0 - # writer still active - wait(rw.readwait) - end - end - end -else - if (@atomic :acquire_release rw.readercount += 1) < 0 - # A writer is active or pending, so we need to wait - Base.@lock rw.readwait begin - # check our condition again - if rw.readercount < 0 - # writer still active - wait(rw.readwait) - end - end - end -end - return -end - -function readunlock(rw::ReadWriteLock) -@static if VERSION < v"1.7" - Threads.atomic_sub!(rw.readercount, 1) - if rw.readercount[] < 0 - # there's a pending write, check if we're the last reader - Threads.atomic_sub!(rw.readerwait, 1) - if rw.readerwait[] == 0 - # Last reader, wake up the writer. - @assert rw.waitingwriter !== nothing - schedule(rw.waitingwriter) - rw.waitingwriter = nothing - end - end -else - if (@atomic :acquire_release rw.readercount -= 1) < 0 - # there's a pending write, check if we're the last reader - if (@atomic :acquire_release rw.readerwait -= 1) == 0 - # Last reader, wake up the writer. - @assert rw.waitingwriter !== nothing - schedule(rw.waitingwriter) - rw.waitingwriter = nothing - end - end -end - return -end - -function Base.lock(rw::ReadWriteLock) - lock(rw.writelock) # only a single writer allowed at a time - # ok, here's how we do this: we subtract MaxReaders from readercount - # to make readercount negative; this will prevent any further readers - # from locking, while maintaining our actual reader count so we - # can track when we're able to write -@static if VERSION < v"1.7" - Threads.atomic_sub!(rw.readercount, MaxReaders) - r = rw.readercount[] + MaxReaders -else - r = (@atomic :acquire_release rw.readercount -= MaxReaders) + MaxReaders -end - # if r == 0, that means there were no readers, - # so we can proceed directly with the write lock - # if r == 1, this is an interesting case because there's only 1 reader - # and we might be racing to acquire the write lock and the reader - # unlocking; so we _also_ atomically set and check readerwait; - # if readerwait == 0, then the reader won the race and decremented readerwait - # to -1, and we increment by 1 to 0, so we know the reader is done and can proceed - # without waiting. If _we_ win the race, then we'll continue to waiting - # and the reader will decrement and then schedule us -@static if VERSION < v"1.7" - if r != 0 - Threads.atomic_add!(rw.readerwait, r) - if rw.readerwait[] != 0 - # otherwise, there are readers, so we need to wait for them to finish - # we do this by setting ourselves as the waiting writer - # and wait for the last reader to re-schedule us - rw.waitingwriter = current_task() - wait() - end - end -else - if r != 0 && (@atomic :acquire_release rw.readerwait += r) != 0 - # otherwise, there are readers, so we need to wait for them to finish - # we do this by setting ourselves as the waiting writer - # and wait for the last reader to re-schedule us - @assert rw.waitingwriter === nothing - rw.waitingwriter = current_task() - wait() - end -end - return -end - -Base.islocked(rw::ReadWriteLock) = islocked(rw.writelock) - -function Base.unlock(rw::ReadWriteLock) -@static if VERSION < v"1.7" - Threads.atomic_add!(rw.readercount, MaxReaders) - r = rw.readercount[] -else - r = (@atomic :acquire_release rw.readercount += MaxReaders) -end - if r > 0 - # wake up waiting readers - Base.@lock rw.readwait notify(rw.readwait) - end - unlock(rw.writelock) - return -end +include("lockable.jl") +include("spawn.jl") +include("synchronizer.jl") +include("rwlock.jl") +include("pools.jl") +using .Pools function clear_current_task() current_task().storage = nothing @@ -402,10 +31,7 @@ finish, call `wait` on the result of this macro, or call Values can be interpolated into `@wkspawn` via `\$`, which copies the value directly into the constructed underlying closure. This allows you to insert the _value_ of a variable, isolating the asynchronous code from changes to -the variable's value in the current task. Interpolating a _mutable_ variable -will also cause it to be wrapped in a `WeakRef`, so that Julia's internal -references to these arguments won't prevent them from being garbage collected -once the `Task` has finished running. +the variable's value in the current task. """ macro wkspawn(args...) e = args[end] diff --git a/src/concurrentstack.jl b/src/concurrentstack.jl deleted file mode 100644 index f4201a2..0000000 --- a/src/concurrentstack.jl +++ /dev/null @@ -1,76 +0,0 @@ -# copied from https://github.com/JuliaConcurrent/ConcurrentCollections.jl/blob/09a8cbe25a1a0d3cb9d0fb0d03cad60a7d5ccebd/src/stack.jl -@static if VERSION < v"1.7" - -mutable struct Node{T} - value::T - next::Union{Node{T},Nothing} -end - -Node{T}(value::T) where {T} = Node{T}(value, nothing) - -mutable struct ConcurrentStack{T} - lock::ReentrantLock - next::Union{Node{T},Nothing} -end - -ConcurrentStack{T}() where {T} = ConcurrentStack{T}(ReentrantLock(), nothing) - -function Base.push!(stack::ConcurrentStack{T}, v) where {T} - v === nothing && throw(ArgumentError("cannot push nothing onto a ConcurrentStack")) - v = convert(T, v) - node = Node{T}(v) - lock(stack.lock) do - node.next = stack.next - stack.next = node - end - return stack -end - -function Base.pop!(stack::ConcurrentStack) - lock(stack.lock) do - node = stack.next - node === nothing && return nothing - stack.next = node.next - return node.value - end -end - -else - -mutable struct Node{T} - value::T - @atomic next::Union{Node{T},Nothing} -end - -Node{T}(value::T) where {T} = Node{T}(value, nothing) - -mutable struct ConcurrentStack{T} - @atomic next::Union{Node{T},Nothing} -end - -ConcurrentStack{T}() where {T} = ConcurrentStack{T}(nothing) - -function Base.push!(stack::ConcurrentStack{T}, v) where {T} - v === nothing && throw(ArgumentError("cannot push nothing onto a ConcurrentStack")) - v = convert(T, v) - node = Node{T}(v) - next = @atomic stack.next - while true - @atomic node.next = next - next, ok = @atomicreplace(stack.next, next => node) - ok && break - end - return stack -end - -function Base.pop!(stack::ConcurrentStack) - while true - node = @atomic stack.next - node === nothing && return nothing - next = @atomic node.next - next, ok = @atomicreplace(stack.next, node => next) - ok && return node.value - end -end - -end # @static if VERSION < v"1.7" \ No newline at end of file diff --git a/src/lockable.jl b/src/lockable.jl new file mode 100644 index 0000000..0abaa1f --- /dev/null +++ b/src/lockable.jl @@ -0,0 +1,36 @@ +""" + Lockable(value, lock = ReentrantLock()) + +Creates a `Lockable` object that wraps `value` and +associates it with the provided `lock`. +""" +struct Lockable{T, L <: Base.AbstractLock} + value::T + lock::L +end + +Lockable(value) = Lockable(value, ReentrantLock()) + +Base.getindex(l::Lockable) = l.value + +""" + lock(f::Function, l::Lockable) + +Acquire the lock associated with `l`, execute `f` with the lock held, +and release the lock when `f` returns. `f` will receive one positional +argument: the value wrapped by `l`. If the lock is already locked by a +different task/thread, wait for it to become available. +When this function returns, the `lock` has been released, so the caller should +not attempt to `unlock` it. +""" +function Base.lock(f, l::Lockable) + lock(l.lock) do + f(l.value) + end +end + +# implement the rest of the Lock interface on Lockable +Base.islocked(l::Lockable) = islocked(l.lock) +Base.lock(l::Lockable) = lock(l.lock) +Base.trylock(l::Lockable) = trylock(l.lock) +Base.unlock(l::Lockable) = unlock(l.lock) \ No newline at end of file diff --git a/src/pools.jl b/src/pools.jl new file mode 100644 index 0000000..ee95c39 --- /dev/null +++ b/src/pools.jl @@ -0,0 +1,152 @@ +module Pools + +export Pool, acquire, release, drain! +import Base: acquire, release + +""" + Pool{T}(max::Int=4096) + Pool{K, T}(max::Int=4096) + +A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects +of type `K`. Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a +function that returns a new object of type `T`. +The `key` argument is optional and can be used to lookup objects that match a certain criteria +(a Dict is used internally, so matching is `isequal`). + +The `max` argument will limit the number of objects +that can be acquired at any given time. If the limit has been reached, `acquire` will +block until an object is returned to the pool via `release`. + +By default, `release(pool, obj)` will return the object to the pool for reuse. +`release(pool)` will return the "permit" to the pool while not returning +any object for reuse. + +`drain!` can be used to remove any cached objects for reuse, but it does *not* release +any active acquires. +""" +mutable struct Pool{K, T} + lock::Threads.Condition + max::Int + cur::Int + keyedvalues::Dict{K, Vector{T}} + values::Vector{T} + + function Pool{K, T}(max::Int=4096) where {K, T} + T === Nothing && throw(ArgumentError("Pool type can not be `Nothing`")) + x = new(Threads.Condition(), max, 0) + if K === Nothing + x.values = T[] + safesizehint!(x.values, max) + else + x.keyedvalues = Dict{K, Vector{T}}() + end + return x + end +end + +Pool{T}(max::Int=4096) where {T} = Pool{Nothing, T}(max) + +safesizehint!(x, n) = sizehint!(x, min(4096, n)) + +# determines whether we'll look up object caches in .keyedvalues or .values +iskeyed(::Pool{K}) where {K} = K !== Nothing + +""" + drain!(pool) + +Remove all objects from the pool for reuse, but do not release any active acquires. +""" +function drain!(pool::Pool{K}) where {K} + Base.@lock pool.lock begin + if iskeyed(pool) + for objs in values(pool.keyedvalues) + empty!(objs) + end + else + empty!(pool.values) + end + end +end + +# in VERSION >= v"1.7", we can replace `TRUE` with `Returns(true)` +TRUE(x) = true + +@noinline keyerror(key, K) = throw(ArgumentError("invalid key `$key` provided for pool key type $K")) +@noinline releaseerror() = throw(ArgumentError("cannot release permit when pool is empty")) + +# NOTE: assumes you have the lock! +function releasepermit(pool::Pool) + pool.cur > 0 || releaseerror() + pool.cur -= 1 + notify(pool.lock; all=false) + return +end + +""" + acquire(f, pool::Pool{K, T}, [key::K]; forcenew::Bool=false, isvalid::Function) -> T + +Get an object from a `pool`, optionally keyed by the provided `key`. +The provided function `f` must create a new object instance of type `T`. +Each `acquire` call MUST be matched by exactly one `release` call. +The `forcenew` keyword argument can be used to force the creation of a new object, ignoring any existing objects in the pool. +The `isvalid` keyword argument can be used to specify a function that will be called to determine if an object is still valid +for reuse. By default, all objects are considered valid. +If there are no objects available for reuse, `f` will be called to create a new object. +If the pool is already at its maximum capacity, `acquire` will block until an object is returned to the pool via `release`. +""" +function Base.acquire(f, pool::Pool{K, T}, key=nothing; forcenew::Bool=false, isvalid::Function=TRUE) where {K, T} + key isa K || keyerror(key, K) + Base.@lock pool.lock begin + # first get a permit + while pool.cur >= pool.max + wait(pool.lock) + end + pool.cur += 1 + # now see if we can get an object from the pool for reuse + if !forcenew + objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.max), pool.keyedvalues, key) : pool.values + while !isempty(objs) + obj = pop!(objs) + isvalid(obj) && return obj + end + end + end + try + # if there weren't any objects to reuse or we were forcenew, we'll create a new one + return f() + catch + # if we error creating a new object, it's critical we return the permit to the pool + Base.@lock pool.lock releasepermit(pool) + rethrow() + end +end + +""" + release(pool::Pool{K, T}, key::K, obj::Union{T, Nothing}=nothing) + release(pool::Pool{K, T}, obj::T) + release(pool::Pool{K, T}) + +Return an object to a `pool`, optionally keyed by the provided `key`. +If `obj` is provided, it will be returned to the pool for reuse. +Otherwise, if `nothing` is returned, or `release(pool)` is called, +just the "permit" will be returned to the pool. +""" +function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T} + key isa K || keyerror(key, K) + Base.@lock pool.lock begin + # return the permit + releasepermit(pool) + # if we're given an object, we'll put it back in the pool + if obj !== nothing + # if an invalid key is provided, we let the KeyError propagate + objs = iskeyed(pool) ? pool.keyedvalues[key] : pool.values + push!(objs, obj) + end + end + return +end + +Base.release(pool::Pool{K, T}, obj::T) where {K, T} = release(pool, nothing, obj) +Base.release(pool::Pool{K, T}) where {K, T} = release(pool, nothing, nothing) + +end # module diff --git a/src/rwlock.jl b/src/rwlock.jl new file mode 100644 index 0000000..46e7c45 --- /dev/null +++ b/src/rwlock.jl @@ -0,0 +1,148 @@ +""" + ReadWriteLock() + +A threadsafe lock that allows multiple readers or a single writer. + +The read side is acquired/released via `readlock(rw)` and `readunlock(rw)`, +while the write side is acquired/released via `lock(rw)` and `unlock(rw)`. + +While a writer is active, all readers will block. Once the writer is finished, +all pending readers will be allowed to acquire the lock before the next writer. +""" +mutable struct ReadWriteLock + writelock::ReentrantLock + waitingwriter::Union{Nothing, Task} + readwait::Threads.Condition +@static if VERSION < v"1.7" + readercount::Threads.Atomic{Int} + readerwait::Threads.Atomic{Int} +else + @atomic readercount::Int + @atomic readerwait::Int +end +end + +@static if VERSION < v"1.7" +ReadWriteLock() = ReadWriteLock(ReentrantLock(), nothing, Threads.Condition(), Threads.Atomic{Int}(0), Threads.Atomic{Int}(0)) +else +ReadWriteLock() = ReadWriteLock(ReentrantLock(), nothing, Threads.Condition(), 0, 0) +end + +const MaxReaders = 1 << 30 + +function readlock(rw::ReadWriteLock) +@static if VERSION < v"1.7" + Threads.atomic_add!(rw.readercount, 1) + if rw.readercount[] < 0 + # A writer is active or pending, so we need to wait + Base.@lock rw.readwait begin + # check our condition again + if rw.readercount[] < 0 + # writer still active + wait(rw.readwait) + end + end + end +else + if (@atomic :acquire_release rw.readercount += 1) < 0 + # A writer is active or pending, so we need to wait + Base.@lock rw.readwait begin + # check our condition again + if rw.readercount < 0 + # writer still active + wait(rw.readwait) + end + end + end +end + return +end + +function readunlock(rw::ReadWriteLock) +@static if VERSION < v"1.7" + Threads.atomic_sub!(rw.readercount, 1) + if rw.readercount[] < 0 + # there's a pending write, check if we're the last reader + Threads.atomic_sub!(rw.readerwait, 1) + if rw.readerwait[] == 0 + # Last reader, wake up the writer. + @assert rw.waitingwriter !== nothing + schedule(rw.waitingwriter) + rw.waitingwriter = nothing + end + end +else + if (@atomic :acquire_release rw.readercount -= 1) < 0 + # there's a pending write, check if we're the last reader + if (@atomic :acquire_release rw.readerwait -= 1) == 0 + # Last reader, wake up the writer. + @assert rw.waitingwriter !== nothing + schedule(rw.waitingwriter) + rw.waitingwriter = nothing + end + end +end + return +end + +function Base.lock(rw::ReadWriteLock) + lock(rw.writelock) # only a single writer allowed at a time + # ok, here's how we do this: we subtract MaxReaders from readercount + # to make readercount negative; this will prevent any further readers + # from locking, while maintaining our actual reader count so we + # can track when we're able to write +@static if VERSION < v"1.7" + Threads.atomic_sub!(rw.readercount, MaxReaders) + r = rw.readercount[] + MaxReaders +else + r = (@atomic :acquire_release rw.readercount -= MaxReaders) + MaxReaders +end + # if r == 0, that means there were no readers, + # so we can proceed directly with the write lock + # if r == 1, this is an interesting case because there's only 1 reader + # and we might be racing to acquire the write lock and the reader + # unlocking; so we _also_ atomically set and check readerwait; + # if readerwait == 0, then the reader won the race and decremented readerwait + # to -1, and we increment by 1 to 0, so we know the reader is done and can proceed + # without waiting. If _we_ win the race, then we'll continue to waiting + # and the reader will decrement and then schedule us +@static if VERSION < v"1.7" + if r != 0 + Threads.atomic_add!(rw.readerwait, r) + if rw.readerwait[] != 0 + # otherwise, there are readers, so we need to wait for them to finish + # we do this by setting ourselves as the waiting writer + # and wait for the last reader to re-schedule us + rw.waitingwriter = current_task() + wait() + end + end +else + if r != 0 && (@atomic :acquire_release rw.readerwait += r) != 0 + # otherwise, there are readers, so we need to wait for them to finish + # we do this by setting ourselves as the waiting writer + # and wait for the last reader to re-schedule us + @assert rw.waitingwriter === nothing + rw.waitingwriter = current_task() + wait() + end +end + return +end + +Base.islocked(rw::ReadWriteLock) = islocked(rw.writelock) + +function Base.unlock(rw::ReadWriteLock) +@static if VERSION < v"1.7" + Threads.atomic_add!(rw.readercount, MaxReaders) + r = rw.readercount[] +else + r = (@atomic :acquire_release rw.readercount += MaxReaders) +end + if r > 0 + # wake up waiting readers + Base.@lock rw.readwait notify(rw.readwait) + end + unlock(rw.writelock) + return +end \ No newline at end of file diff --git a/src/spawn.jl b/src/spawn.jl new file mode 100644 index 0000000..44318c6 --- /dev/null +++ b/src/spawn.jl @@ -0,0 +1,81 @@ +const WORK_QUEUE = Channel{Task}(0) +const WORKER_TASKS = Task[] + +""" + ConcurrentUtilities.@spawn expr + ConcurrentUtilities.@spawn passthroughstorage::Bool expr + +Similar to `Threads.@spawn`, schedule and execute a task (given by `expr`) +that will be run on a "background worker" (see [`ConcurrentUtilities.init`]((@ref))). + +In the 2-argument invocation, `passthroughstorage` controls whether the task-local storage of the +`current_task()` should be "passed through" to the spawned task. +""" +macro spawn(thunk) + esc(quote + tsk = @task $thunk + tsk.storage = current_task().storage + put!(ConcurrentUtilities.WORK_QUEUE, tsk) + tsk + end) +end + +""" + ConcurrentUtilities.@spawn expr + ConcurrentUtilities.@spawn passthroughstorage::Bool expr + +Similar to `Threads.@spawn`, schedule and execute a task (given by `expr`) +that will be run on a "background worker" (see [`ConcurrentUtilities.init`]((@ref))). + +In the 2-argument invocation, `passthroughstorage` controls whether the task-local storage of the +`current_task()` should be "passed through" to the spawned task. +""" +macro spawn(passthroughstorage, thunk) + esc(quote + tsk = @task $thunk + if $passthroughstorage + tsk.storage = current_task().storage + end + put!(ConcurrentUtilities.WORK_QUEUE, tsk) + tsk + end) +end + +""" + ConcurrentUtilities.init(nworkers=Threads.nthreads() - 1) + +Initialize background workers that will execute tasks spawned via +[`ConcurrentUtilities.@spawn`](@ref). If `nworkers == 1`, a single worker +will be started on thread 1 where tasks will be executed in contention +with other thread 1 work. Background worker tasks can be inspected by +looking at `ConcurrentUtilities.WORKER_TASKS`. +""" +function init(nworkers=Threads.nthreads()-1) + maxthreadid = nworkers + 1 + tids = Threads.nthreads() == 1 ? (1:1) : 2:maxthreadid + resize!(WORKER_TASKS, max(nworkers, 1)) +@static if VERSION < v"1.8.0" + Threads.@threads for tid in 1:maxthreadid + if tid in tids + WORKER_TASKS[tid == 1 ? 1 : (tid - 1)] = Base.@async begin + for task in WORK_QUEUE + schedule(task) + wait(task) + end + end + end + end +else + Threads.@threads :static for tid in 1:maxthreadid + if tid in tids + WORKER_TASKS[tid == 1 ? 1 : (tid - 1)] = Base.@async begin + for task in WORK_QUEUE + schedule(task) + wait(task) + end + end + end + end +end + return +end \ No newline at end of file diff --git a/src/synchronizer.jl b/src/synchronizer.jl new file mode 100644 index 0000000..41ab47b --- /dev/null +++ b/src/synchronizer.jl @@ -0,0 +1,123 @@ +""" + OrderedSynchronizer(i=1) + +A threadsafe synchronizer that allows ensuring concurrent work is done +in a specific order. The `OrderedSynchronizer` is initialized with an +integer `i` that represents the current "order" of the synchronizer. + +Work is "scheduled" by calling `put!(f, x, i)`, where `f` is a function +that will be called like `f()` when the synchronizer is at order `i`, +and will otherwise wait until other calls to `put!` have finished +to bring the synchronizer's state to `i`. Once `f()` is called, the +synchronizer's state is incremented by 1 and any waiting `put!` calls +check to see if it's their turn to execute. + +A synchronizer's state can be reset to a specific value (1 by default) +by calling `reset!(x, i)`. +""" +mutable struct OrderedSynchronizer + coordinating_task::Task + cond::Threads.Condition + i::Int +@static if VERSION < v"1.7" + closed::Threads.Atomic{Bool} +else + @atomic closed::Bool +end +end + +@static if VERSION < v"1.7" +OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, Threads.Atomic{Bool}(false)) +else +OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, false) +end + +""" + reset!(x::OrderedSynchronizer, i=1) + +Reset the state of `x` to `i`. +""" +function reset!(x::OrderedSynchronizer, i=1) + Base.@lock x.cond begin + x.i = i +@static if VERSION < v"1.7" + x.closed[] = false +else + @atomic :monotonic x.closed = false +end + end +end + +function Base.close(x::OrderedSynchronizer, excp::Exception=closed_exception()) + Base.@lock x.cond begin +@static if VERSION < v"1.7" + x.closed[] = true +else + @atomic :monotonic x.closed = true +end + Base.notify_error(x.cond, excp) + end + return +end + +@static if VERSION < v"1.7" + Base.isopen(x::OrderedSynchronizer) = !x.closed[] +else +Base.isopen(x::OrderedSynchronizer) = !(@atomic :monotonic x.closed) +end +closed_exception() = InvalidStateException("OrderedSynchronizer is closed.", :closed) + +function check_closed(x::OrderedSynchronizer) + if !isopen(x) + # if the monotonic load succeed, now do an acquire fence +@static if VERSION < v"1.7" + !x.closed[] && Base.concurrency_violation() +else + !(@atomic :acquire x.closed) && Base.concurrency_violation() +end + throw(closed_exception()) + end +end + +""" + put!(f::Function, x::OrderedSynchronizer, i::Int, incr::Int=1) + +Schedule `f` to be called when `x` is at order `i`. Note that `put!` +will block until `f` is executed. The typical usage involves something +like: + +```julia +x = OrderedSynchronizer() +@sync for i = 1:N + Threads.@spawn begin + # do some concurrent work + # once work is done, schedule synchronization + put!(x, \$i) do + # report back result of concurrent work + # won't be executed until all `i-1` calls to `put!` have already finished + end + end +end +``` + +The `incr` argument controls how much the synchronizer's state is +incremented after `f` is called. By default, `incr` is 1. +""" +function Base.put!(f, x::OrderedSynchronizer, i, incr=1) + check_closed(x) + Base.@lock x.cond begin + # wait until we're ready to execute f + while x.i != i + check_closed(x) + wait(x.cond) + end + check_closed(x) + try + f() + catch e + Base.throwto(x.coordinating_task, e) + end + x.i += incr + notify(x.cond) + end +end diff --git a/test/concurrentstack.jl b/test/concurrentstack.jl deleted file mode 100644 index 547ce59..0000000 --- a/test/concurrentstack.jl +++ /dev/null @@ -1,56 +0,0 @@ -using ConcurrentUtilities -using Test - -@testset "ConcurrentStack: simple" begin - stack = ConcurrentStack{Int}() - xs = 1:10 - foldl(push!, xs; init = stack) - @test [pop!(stack) for _ in xs] == reverse(xs) - @test pop!(stack) === nothing -end - -function pushpop(xs, ntasks = Threads.nthreads()) - stack = ConcurrentStack{eltype(xs)}() - local tasks - done = Threads.Atomic{Bool}(false) - try - tasks = map(1:ntasks) do _ - Threads.@spawn begin - local ys = eltype(xs)[] - while true - r = pop!(stack) - if r === nothing - done[] && break - continue - end - push!(ys, r) - end - ys - end - end - for x in xs - push!(stack, x) - end - finally - done[] = true - end - return fetch.(tasks), stack -end - -function test_random_push_pop(T::Type, xs = 1:2^10) - if T !== eltype(xs) - xs = collect(T, xs) - end - yss, _ = pushpop(xs) - @test all(allunique, yss) - @debug "pushpop(xs)" length.(yss) - ys = sort!(foldl(append!, yss; init = T[])) - @debug "pushpop(xs)" setdiff(ys, xs) setdiff(xs, ys) length(xs) length(ys) - @test ys == xs -end - -@testset "ConcurrentStack: random push/pop" begin - @testset for T in [Int, Any, Int, Any] - test_random_push_pop(T) - end -end diff --git a/test/pools.jl b/test/pools.jl new file mode 100644 index 0000000..d2ff678 --- /dev/null +++ b/test/pools.jl @@ -0,0 +1,103 @@ +using ConcurrentUtilities, Test + +@testset "Pools" begin + @testset "nonkeyed and pool basics" begin + pool = Pool{Int}(3) + # acquire an object from the pool + x1 = acquire(() -> 1, pool) + # no existing objects in the pool, so our function was called to create a new one + @test x1 == 1 + # release back to the pool for reuse + release(pool, x1) + # acquire another object from the pool + x1 = acquire(() -> 2, pool) + # this time, the pool had an existing object, so our function was not called + @test x1 == 1 + # but now there are no objects to reuse again, so the next acquire will call our function + x2 = acquire(() -> 2, pool) + @test x2 == 2 + x3 = acquire(() -> 3, pool) + @test x3 == 3 + # the pool is now at capacity, so the next acquire will block until an object is released + tsk = @async acquire(() -> 4, pool; forcenew=true) + yield() + @test !istaskdone(tsk) + # release an object back to the pool + release(pool, x1) + # now the acquire can complete + x1 = fetch(tsk) + # even though we released 1 for reuse, we passed forcenew, so our function was called to create new + @test x1 == 4 + # error to try and provide a key to a non-keyed pool + @test_throws ArgumentError acquire(() -> 1, pool, 1) + # release objects back to the pool + release(pool, x1) + release(pool, x2) + release(pool, x3) + # acquire an object, but checking isvalid + x1 = acquire(() -> 5, pool; isvalid=x -> x == 1) + @test x1 == 1 + # no valid objects, so our function was called to create a new one + x2 = acquire(() -> 6, pool; isvalid=x -> x == 1) + @test x2 == 6 + # we have one slot left in the pool, we now throw while creating new + # and we want to test that the permit isn't permanently lost for the pool + @test_throws ErrorException acquire(() -> error("oops"), pool; forcenew=true) + # we can still acquire a new object + x3 = acquire(() -> 7, pool; forcenew=true) + @test x3 == 7 + # release objects back to the pool + release(pool, x1) + release(pool, x2) + release(pool, x3) + # try to do an invalid release + @test_throws ArgumentError release(pool, 10) + # test that the invalid release didn't push the object to our pool for reuse + x1 = acquire(() -> 8, pool) + @test x1 == 7 + # calling drain! removes all objects for reuse + drain!(pool) + x2 = acquire(() -> 9, pool) + @test x2 == 9 + end + + @testset "keyed pool" begin + # now test a keyed pool + pool = Pool{String, Int}(3) + # acquire an object from the pool + x1 = acquire(() -> 1, pool, "a") + # no existing objects in the pool, so our function was called to create a new one + @test x1 == 1 + # release back to the pool for reuse + release(pool, "a", x1) + # test for a different key + x2 = acquire(() -> 2, pool, "b") + # there's an existing object, but for a different key, so we don't reuse + @test x2 == 2 + # acquire another object from the pool + x1 = acquire(() -> 2, pool, "a") + # this time, the pool had an existing object, so our function was not called + @test x1 == 1 + x3 = acquire(() -> 3, pool, "a") + @test x3 == 3 + # the pool is now at capacity, so the next acquire will block until an object is released + # even though we've acquired using different keys, the capacity is shared across the pool + tsk = @async acquire(() -> 4, pool, "c"; forcenew=true) + yield() + @test !istaskdone(tsk) + # release an object back to the pool + release(pool, "a", x1) + # now the acquire can complete + x1 = fetch(tsk) + # even though we released 1 for reuse, we passed forcenew, so our function was called to create new + @test x1 == 4 + # error to try and provide an invalid key to a keyed pool + @test_throws ArgumentError acquire(() -> 1, pool, 1) + # error to release an invalid key back to the pool + @test_throws KeyError release(pool, "z", 1) + # error to *not* provide a key to a keyed pool + @test_throws ArgumentError acquire(() -> 1, pool) + # error to *not* provide a key when releasing to a keyed pool + @test_throws ArgumentError release(pool) + end +end diff --git a/test/runtests.jl b/test/runtests.jl index c63e7f4..893b414 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -6,6 +6,7 @@ using Test, ConcurrentUtilities ConcurrentUtilities.init() threadid = fetch(ConcurrentUtilities.@spawn(Threads.threadid())) + @show Threads.nthreads(), threadid @test Threads.nthreads() == 1 ? (threadid == 1) : (threadid != 1) @test ConcurrentUtilities.@spawn(false, 1 + 1).storage === nothing @@ -208,8 +209,6 @@ using Test, ConcurrentUtilities @test !islocked(rw) end - include("concurrentstack.jl") - # track all workers every created ALL_WORKERS = [] ConcurrentUtilities.Workers.GLOBAL_CALLBACK_PER_WORKER[] = w -> push!(ALL_WORKERS, w) @@ -226,6 +225,7 @@ using Test, ConcurrentUtilities @test istaskstarted(w.output) && istaskdone(w.output) @test isempty(w.futures) end + include("pools.jl") end # @testset "@wkspawn" begin