Skip to content

Commit

Permalink
Fix some logic in our ReadWriteLock (#20)
Browse files Browse the repository at this point in the history
* Remove support for pre-1.8; it doesn't seem likely that we can correctly implement this given pre-1.8 atomics/primitives
* Avoid some race conditions between acquiring the write lock and the last reader unlocking
  * We do this by introducing a `writeready` `Event`, instead of the `waitingwriter` `Task` field
  * This avoids the awkwardness of that field not being atomic, yet being possibly simultaneously observed by
    2 different tasks
  * The Event helps by allowing the reader to notify and the writer to observe the notification, without worrying
    about which task gets to the notify/wait first
  • Loading branch information
quinnj authored Apr 25, 2023
1 parent 9f1174a commit a9428c8
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 83 deletions.
128 changes: 45 additions & 83 deletions src/rwlock.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
@static if VERSION >= v"1.8"

"""
ReadWriteLock()
Expand All @@ -7,142 +9,102 @@ The read side is acquired/released via `readlock(rw)` and `readunlock(rw)`,
while the write side is acquired/released via `lock(rw)` and `unlock(rw)`.
While a writer is active, all readers will block. Once the writer is finished,
all pending readers will be allowed to acquire the lock before the next writer.
all pending readers will be allowed to acquire/release before the next writer.
"""
mutable struct ReadWriteLock
writelock::ReentrantLock
waitingwriter::Union{Nothing, Task}
readwait::Threads.Condition
@static if VERSION < v"1.7"
readercount::Threads.Atomic{Int}
readerwait::Threads.Atomic{Int}
else
writeready::Threads.Event
# `readercount` keeps track of how many readers are active or pending
# if > 0, then that's the number of active readers
# if < 0, then there is a writer active or pending, and
# `readercount + MaxReaders` is the number of active or pending readers
@atomic readercount::Int
@atomic readerwait::Int
end
end

@static if VERSION < v"1.7"
ReadWriteLock() = ReadWriteLock(ReentrantLock(), nothing, Threads.Condition(), Threads.Atomic{Int}(0), Threads.Atomic{Int}(0))
function ReadWriteLock()
@static if VERSION < v"1.8"
throw(ArgumentError("ReadWriteLock requires Julia v1.8 or greater"))
else
ReadWriteLock() = ReadWriteLock(ReentrantLock(), nothing, Threads.Condition(), 0, 0)
return ReadWriteLock(ReentrantLock(), Threads.Condition(), Threads.Event(true), 0, 0)
end
end

const MaxReaders = 1 << 30

function readlock(rw::ReadWriteLock)
@static if VERSION < v"1.7"
Threads.atomic_add!(rw.readercount, 1)
if rw.readercount[] < 0
# A writer is active or pending, so we need to wait
Base.@lock rw.readwait begin
# check our condition again
if rw.readercount[] < 0
# writer still active
wait(rw.readwait)
end
end
end
else
# first step is to increment readercount atomically
if (@atomic :acquire_release rw.readercount += 1) < 0
# A writer is active or pending, so we need to wait
# if we observe from our atomic operation that readercount is < 0,
# a writer was active or pending, so we need initiate the "slowpath"
# by acquiring the readwait lock
Base.@lock rw.readwait begin
# check our condition again
# check our condition again, if it holds, then we get in line
# to be notified once the writer is done (the writer also acquires
# the readwait lock, so we'll be waiting until it releases it)
if rw.readercount < 0
# writer still active
wait(rw.readwait)
end
end
end
end
return
end

function readunlock(rw::ReadWriteLock)
@static if VERSION < v"1.7"
Threads.atomic_sub!(rw.readercount, 1)
if rw.readercount[] < 0
# there's a pending write, check if we're the last reader
Threads.atomic_sub!(rw.readerwait, 1)
if rw.readerwait[] == 0
# Last reader, wake up the writer.
@assert rw.waitingwriter !== nothing
schedule(rw.waitingwriter)
rw.waitingwriter = nothing
end
end
else
# similar to `readlock`, the first step is to decrement `readercount` atomically
if (@atomic :acquire_release rw.readercount -= 1) < 0
# there's a pending write, check if we're the last reader
# if we observe from our atomic operation that readercount is < 0,
# there's a pending write, so check if we're the last reader
if (@atomic :acquire_release rw.readerwait -= 1) == 0
# Last reader, wake up the writer.
@assert rw.waitingwriter !== nothing
schedule(rw.waitingwriter)
rw.waitingwriter = nothing
# we observed that there was a pending write AND we just
# observed that we were the last reader, so it's our
# responsibility to notify the writer that it can proceed
notify(rw.writeready)
end
end
end
return
end

function Base.lock(rw::ReadWriteLock)
lock(rw.writelock) # only a single writer allowed at a time
# ok, here's how we do this: we subtract MaxReaders from readercount
# first, we subtract MaxReaders from readercount
# to make readercount negative; this will prevent any further readers
# from locking, while maintaining our actual reader count so we
# can track when we're able to write
@static if VERSION < v"1.7"
Threads.atomic_sub!(rw.readercount, MaxReaders)
r = rw.readercount[] + MaxReaders
else
r = (@atomic :acquire_release rw.readercount -= MaxReaders) + MaxReaders
end
# we also _add_ MaxReaders so that `r` is the number of active readers
# if r == 0, that means there were no readers,
# so we can proceed directly with the write lock
# if r == 1, this is an interesting case because there's only 1 reader
# and we might be racing to acquire the write lock and the reader
# unlocking; so we _also_ atomically set and check readerwait;
# if readerwait == 0, then the reader won the race and decremented readerwait
# to -1, and we increment by 1 to 0, so we know the reader is done and can proceed
# without waiting. If _we_ win the race, then we'll continue to waiting
# and the reader will decrement and then schedule us
@static if VERSION < v"1.7"
if r != 0
Threads.atomic_add!(rw.readerwait, r)
if rw.readerwait[] != 0
# otherwise, there are readers, so we need to wait for them to finish
# we do this by setting ourselves as the waiting writer
# and wait for the last reader to re-schedule us
rw.waitingwriter = current_task()
wait()
end
end
else
# so we can proceed directly with the write lock (the 1st check below in the if)
# if there _are_ active readers, then we need to wait until they all unlock
# by incrementing `readerwait` by `r`, we're atomically setting the # of read
# unlocks we expect to happen before we can proceed with the write lock
# (readers decrement `readerwait` if they observe `readercount` is negative)
# if, by chance, the last reader manages to unlock before we increment `readerwait`,
# then `readerwait` will actually be negative and we'll increment it back to 0
# in that case, we can proceed directly with the write lock (the 2nd check below)
if r != 0 && (@atomic :acquire_release rw.readerwait += r) != 0
# otherwise, there are readers, so we need to wait for them to finish
# we do this by setting ourselves as the waiting writer
# and wait for the last reader to re-schedule us
@assert rw.waitingwriter === nothing
rw.waitingwriter = current_task()
wait()
# otherwise, there are still pending readers, so we need to wait for them to finish
# we do this by waiting on the `writeready` event, which will be
# notified when the last reader unlocks; if the last reader happens
# to be racing with us, then `writeready` will already be set and
# we'll proceed immediately
wait(rw.writeready)
end
end
return
end

Base.islocked(rw::ReadWriteLock) = islocked(rw.writelock)

function Base.unlock(rw::ReadWriteLock)
@static if VERSION < v"1.7"
Threads.atomic_add!(rw.readercount, MaxReaders)
r = rw.readercount[]
else
r = (@atomic :acquire_release rw.readercount += MaxReaders)
end
if r > 0
# wake up waiting readers
Base.@lock rw.readwait notify(rw.readwait)
end
unlock(rw.writelock)
return
end

end
4 changes: 4 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ using Test, ConcurrentUtilities
end

@testset "ReadWriteLock" begin
@static if VERSION < v"1.8"
@warn "skipping ReadWriteLock tests since VERSION ($VERSION) < v\"1.8\""
else
rw = ReadWriteLock()
println("test read is blocked while writing")
lock(rw)
Expand Down Expand Up @@ -207,6 +210,7 @@ using Test, ConcurrentUtilities
@test thirdReaderLocked[]
@test fetch(r3)
@test !islocked(rw)
end # @static if VERSION < v"1.8"
end

# track all workers every created
Expand Down

0 comments on commit a9428c8

Please sign in to comment.