From e78b0e3e2d065cacfe2fd1668d1a05fd128dbbda Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Wed, 19 Apr 2023 23:36:13 -0600 Subject: [PATCH] Add threadsafe Pool data structure Also split the now-large ConcurrentUtilities.jl file into more manageable individual files. --- src/ConcurrentUtilities.jl | 367 +------------------------------------ src/lockable.jl | 36 ++++ src/pools.jl | 109 +++++++++++ src/rwlock.jl | 124 +++++++++++++ src/spawn.jl | 68 +++++++ src/synchronizer.jl | 123 +++++++++++++ test/pools.jl | 99 ++++++++++ 7 files changed, 568 insertions(+), 358 deletions(-) create mode 100644 src/lockable.jl create mode 100644 src/pools.jl create mode 100644 src/rwlock.jl create mode 100644 src/spawn.jl create mode 100644 src/synchronizer.jl create mode 100644 test/pools.jl diff --git a/src/ConcurrentUtilities.jl b/src/ConcurrentUtilities.jl index dd8f4f8..60e3ca7 100644 --- a/src/ConcurrentUtilities.jl +++ b/src/ConcurrentUtilities.jl @@ -1,361 +1,15 @@ module ConcurrentUtilities -export Lockable, OrderedSynchronizer, reset!, ReadWriteLock, readlock, readunlock, @wkspawn, ConcurrentStack +export Lockable, OrderedSynchronizer, reset!, ReadWriteLock, readlock, readunlock, @wkspawn, ConcurrentStack, + Pool, acquire, release include("concurrentstack.jl") - -const WORK_QUEUE = Channel{Task}(0) -const WORKER_TASKS = Task[] - -""" - ConcurrentUtilities.@spawn expr - ConcurrentUtilities.@spawn passthroughstorage::Bool expr - -Similar to `Threads.@spawn`, schedule and execute a task (given by `expr`) -that will be run on a "background worker" (see [`ConcurrentUtilities.init`]((@ref))). - -In the 2-argument invocation, `passthroughstorage` controls whether the task-local storage of the -`current_task()` should be "passed through" to the spawned task. -""" -macro spawn(thunk) - esc(quote - tsk = @task $thunk - tsk.storage = current_task().storage - put!(ConcurrentUtilities.WORK_QUEUE, tsk) - tsk - end) -end - -""" - ConcurrentUtilities.@spawn expr - ConcurrentUtilities.@spawn passthroughstorage::Bool expr - -Similar to `Threads.@spawn`, schedule and execute a task (given by `expr`) -that will be run on a "background worker" (see [`ConcurrentUtilities.init`]((@ref))). - -In the 2-argument invocation, `passthroughstorage` controls whether the task-local storage of the -`current_task()` should be "passed through" to the spawned task. -""" -macro spawn(passthroughstorage, thunk) - esc(quote - tsk = @task $thunk - if $passthroughstorage - tsk.storage = current_task().storage - end - put!(ConcurrentUtilities.WORK_QUEUE, tsk) - tsk - end) -end - -""" - ConcurrentUtilities.init(nworkers=Threads.nthreads() - 1) - -Initialize background workers that will execute tasks spawned via -[`ConcurrentUtilities.@spawn`](@ref). If `nworkers == 1`, a single worker -will be started on thread 1 where tasks will be executed in contention -with other thread 1 work. Background worker tasks can be inspected by -looking at `ConcurrentUtilities.WORKER_TASKS`. -""" -function init(nworkers=Threads.nthreads()-1) - maxthreadid = nworkers + 1 - tids = Threads.nthreads() == 1 ? (1:1) : 2:maxthreadid - resize!(WORKER_TASKS, max(nworkers, 1)) - Threads.@threads for tid in 1:maxthreadid - if tid in tids - WORKER_TASKS[tid == 1 ? 1 : (tid - 1)] = Base.@async begin - for task in WORK_QUEUE - schedule(task) - wait(task) - end - end - end - end - return -end - -""" - Lockable(value, lock = ReentrantLock()) - -Creates a `Lockable` object that wraps `value` and -associates it with the provided `lock`. -""" -struct Lockable{T, L <: Base.AbstractLock} - value::T - lock::L -end - -Lockable(value) = Lockable(value, ReentrantLock()) - -""" - lock(f::Function, l::Lockable) - -Acquire the lock associated with `l`, execute `f` with the lock held, -and release the lock when `f` returns. `f` will receive one positional -argument: the value wrapped by `l`. If the lock is already locked by a -different task/thread, wait for it to become available. -When this function returns, the `lock` has been released, so the caller should -not attempt to `unlock` it. -""" -function Base.lock(f, l::Lockable) - lock(l.lock) do - f(l.value) - end -end - -# implement the rest of the Lock interface on Lockable -Base.islocked(l::Lockable) = islocked(l.lock) -Base.lock(l::Lockable) = lock(l.lock) -Base.trylock(l::Lockable) = trylock(l.lock) -Base.unlock(l::Lockable) = unlock(l.lock) - -""" - OrderedSynchronizer(i=1) - -A threadsafe synchronizer that allows ensuring concurrent work is done -in a specific order. The `OrderedSynchronizer` is initialized with an -integer `i` that represents the current "order" of the synchronizer. - -Work is "scheduled" by calling `put!(f, x, i)`, where `f` is a function -that will be called like `f()` when the synchronizer is at order `i`, -and will otherwise wait until other calls to `put!` have finished -to bring the synchronizer's state to `i`. Once `f()` is called, the -synchronizer's state is incremented by 1 and any waiting `put!` calls -check to see if it's their turn to execute. - -A synchronizer's state can be reset to a specific value (1 by default) -by calling `reset!(x, i)`. -""" -mutable struct OrderedSynchronizer - coordinating_task::Task - cond::Threads.Condition - i::Int -@static if VERSION < v"1.7" - closed::Threads.Atomic{Bool} -else - @atomic closed::Bool -end -end - -@static if VERSION < v"1.7" -OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, Threads.Atomic{Bool}(false)) -else -OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, false) -end - -""" - reset!(x::OrderedSynchronizer, i=1) - -Reset the state of `x` to `i`. -""" -function reset!(x::OrderedSynchronizer, i=1) - Base.@lock x.cond begin - x.i = i -@static if VERSION < v"1.7" - x.closed[] = false -else - @atomic :monotonic x.closed = false -end - end -end - -function Base.close(x::OrderedSynchronizer, excp::Exception=closed_exception()) - Base.@lock x.cond begin -@static if VERSION < v"1.7" - x.closed[] = true -else - @atomic :monotonic x.closed = true -end - Base.notify_error(x.cond, excp) - end - return -end - -@static if VERSION < v"1.7" - Base.isopen(x::OrderedSynchronizer) = !x.closed[] -else -Base.isopen(x::OrderedSynchronizer) = !(@atomic :monotonic x.closed) -end -closed_exception() = InvalidStateException("OrderedSynchronizer is closed.", :closed) - -function check_closed(x::OrderedSynchronizer) - if !isopen(x) - # if the monotonic load succeed, now do an acquire fence -@static if VERSION < v"1.7" - !x.closed[] && Base.concurrency_violation() -else - !(@atomic :acquire x.closed) && Base.concurrency_violation() -end - throw(closed_exception()) - end -end - -""" - put!(f::Function, x::OrderedSynchronizer, i::Int, incr::Int=1) - -Schedule `f` to be called when `x` is at order `i`. Note that `put!` -will block until `f` is executed. The typical usage involves something -like: - -```julia -x = OrderedSynchronizer() -@sync for i = 1:N - Threads.@spawn begin - # do some concurrent work - # once work is done, schedule synchronization - put!(x, \$i) do - # report back result of concurrent work - # won't be executed until all `i-1` calls to `put!` have already finished - end - end -end -``` - -The `incr` argument controls how much the synchronizer's state is -incremented after `f` is called. By default, `incr` is 1. -""" -function Base.put!(f, x::OrderedSynchronizer, i, incr=1) - check_closed(x) - Base.@lock x.cond begin - # wait until we're ready to execute f - while x.i != i - check_closed(x) - wait(x.cond) - end - check_closed(x) - try - f() - catch e - Base.throwto(x.coordinating_task, e) - end - x.i += incr - notify(x.cond) - end -end - -mutable struct ReadWriteLock - writelock::ReentrantLock -@static if VERSION < v"1.7" - waitingwriter::Union{Nothing, Task} -else - @atomic waitingwriter::Union{Nothing, Task} -end - readwait::Base.ThreadSynchronizer -@static if VERSION < v"1.7" - readercount::Threads.Atomic{Int} - readerwait::Threads.Atomic{Int} -else - @atomic readercount::Int - @atomic readerwait::Int -end -end - -@static if VERSION < v"1.7" -ReadWriteLock() = ReadWriteLock(ReentrantLock(), nothing, Base.ThreadSynchronizer(), Threads.Atomic{Int}(0), Threads.Atomic{Int}(0)) -else -ReadWriteLock() = ReadWriteLock(ReentrantLock(), nothing, Base.ThreadSynchronizer(), 0, 0) -end - -const MaxReaders = 1 << 30 - -function readlock(rw::ReadWriteLock) -@static if VERSION < v"1.7" - Threads.atomic_add!(rw.readercount, 1) - if rw.readercount[] < 0 - # A writer is active or pending, so we need to wait - Base.@lock rw.readwait wait(rw.readwait) - end -else - if (@atomic :acquire_release rw.readercount += 1) < 0 - # A writer is active or pending, so we need to wait - Base.@lock rw.readwait wait(rw.readwait) - end -end - return -end - -function readunlock(rw::ReadWriteLock) -@static if VERSION < v"1.7" - Threads.atomic_sub!(rw.readercount, 1) - if rw.readercount[] < 0 - # there's a pending write, check if we're the last reader - Threads.atomic_sub!(rw.readerwait, 1) - if rw.readerwait[] == 0 - # Last reader, wake up the writer. - schedule(rw.waitingwriter) - end - end -else - if (@atomic :acquire_release rw.readercount -= 1) < 0 - # there's a pending write, check if we're the last reader - if (@atomic :acquire_release rw.readerwait -= 1) == 0 - # Last reader, wake up the writer. - schedule(rw.waitingwriter) - end - end -end - return -end - -function Base.lock(rw::ReadWriteLock) - lock(rw.writelock) # only a single writer allowed at a time - # ok, here's how we do this: we subtract MaxReaders from readercount - # to make readercount negative; this will prevent any further readers - # from locking, while maintaining our actual reader count so we - # can track when we're able to write -@static if VERSION < v"1.7" - Threads.atomic_sub!(rw.readercount, MaxReaders) - r = rw.readercount[] + MaxReaders -else - r = (@atomic :acquire_release rw.readercount -= MaxReaders) + MaxReaders -end - # if r == 0, that means there were no readers, - # so we can proceed directly with the write lock - # if r == 1, this is an interesting case because there's only 1 reader - # and we might be racing to acquire the write lock and the reader - # unlocking; so we _also_ atomically set and check readerwait; - # if readerwait == 0, then the reader won the race and decremented readerwait - # to -1, and we increment by 1 to 0, so we know the reader is done and can proceed - # without waiting. If _we_ win the race, then we'll continue to waiting - # and the reader will decrement and then schedule us -@static if VERSION < v"1.7" - if r != 0 - Threads.atomic_add!(rw.readerwait, r) - if rw.readerwait[] != 0 - # otherwise, there are readers, so we need to wait for them to finish - # we do this by setting ourselves as the waiting writer - # and wait for the last reader to re-schedule us - rw.waitingwriter = current_task() - wait() - end - end -else - if r != 0 && (@atomic :acquire_release rw.readerwait += r) != 0 - # otherwise, there are readers, so we need to wait for them to finish - # we do this by setting ourselves as the waiting writer - # and wait for the last reader to re-schedule us - @atomic rw.waitingwriter = current_task() - wait() - end -end - return -end - -Base.islocked(rw::ReadWriteLock) = islocked(rw.writelock) - -function Base.unlock(rw::ReadWriteLock) -@static if VERSION < v"1.7" - Threads.atomic_add!(rw.readercount, MaxReaders) - r = rw.readercount[] -else - r = (@atomic :acquire_release rw.readercount += MaxReaders) -end - if r > 0 - # wake up waiting readers - Base.@lock rw.readwait notify(rw.readwait) - end - unlock(rw.writelock) - return -end +include("lockable.jl") +include("spawn.jl") +include("synchronizer.jl") +include("rwlock.jl") +include("pools.jl") +using .Pools function clear_current_task() current_task().storage = nothing @@ -375,10 +29,7 @@ finish, call `wait` on the result of this macro, or call Values can be interpolated into `@wkspawn` via `\$`, which copies the value directly into the constructed underlying closure. This allows you to insert the _value_ of a variable, isolating the asynchronous code from changes to -the variable's value in the current task. Interpolating a _mutable_ variable -will also cause it to be wrapped in a `WeakRef`, so that Julia's internal -references to these arguments won't prevent them from being garbage collected -once the `Task` has finished running. +the variable's value in the current task. """ macro wkspawn(args...) e = args[end] diff --git a/src/lockable.jl b/src/lockable.jl new file mode 100644 index 0000000..0abaa1f --- /dev/null +++ b/src/lockable.jl @@ -0,0 +1,36 @@ +""" + Lockable(value, lock = ReentrantLock()) + +Creates a `Lockable` object that wraps `value` and +associates it with the provided `lock`. +""" +struct Lockable{T, L <: Base.AbstractLock} + value::T + lock::L +end + +Lockable(value) = Lockable(value, ReentrantLock()) + +Base.getindex(l::Lockable) = l.value + +""" + lock(f::Function, l::Lockable) + +Acquire the lock associated with `l`, execute `f` with the lock held, +and release the lock when `f` returns. `f` will receive one positional +argument: the value wrapped by `l`. If the lock is already locked by a +different task/thread, wait for it to become available. +When this function returns, the `lock` has been released, so the caller should +not attempt to `unlock` it. +""" +function Base.lock(f, l::Lockable) + lock(l.lock) do + f(l.value) + end +end + +# implement the rest of the Lock interface on Lockable +Base.islocked(l::Lockable) = islocked(l.lock) +Base.lock(l::Lockable) = lock(l.lock) +Base.trylock(l::Lockable) = trylock(l.lock) +Base.unlock(l::Lockable) = unlock(l.lock) \ No newline at end of file diff --git a/src/pools.jl b/src/pools.jl new file mode 100644 index 0000000..f0a7447 --- /dev/null +++ b/src/pools.jl @@ -0,0 +1,109 @@ +module Pools + +import ..ConcurrentStack, ..Lockable + +export Pool, acquire, release + +import Base: Semaphore, acquire, release + +""" + Pool(T; max::Int=typemax(Int)) + Pool(K, T; max::Int=typemax(Int)) + +A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects +of type `K`. Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a +function that returns a new object of type `T`. +The `key` argument is optional and can be used to lookup objects that match a certain criteria. +If no pool exists for the given key, one will be created. + +If `max` is specified, the pool will limit the number of objects +that can be checked out at any given time. If the limit has been reached, `acquire` will +block until an object is returned to the pool via `release`. + +By default, `release(pool, obj)` will return the object to the pool for reuse. +`release(pool)` will return the "permit" to the pool while not returning +any object for reuse. +""" +struct Pool{K, T} + sem::Semaphore + values::Lockable{Dict{K, ConcurrentStack{T}}} +end + +Pool(T; max::Int=typemax(Int)) = Pool{Nothing, T}(Semaphore(max), Lockable(Dict{Nothing, ConcurrentStack{T}}())) +Pool(K, T; max::Int=typemax(Int)) = Pool{K, T}(Semaphore(max), Lockable(Dict{K, ConcurrentStack{T}}())) + +Base.empty!(pool::Pool) = Base.@lock pool.values empty!(pool.values[]) + +TRUE(x) = true + +""" + acquire(f, pool::Pool{K, T}, [key::K]; forcenew::Bool=false, isvalid::Function) -> T + +Get an object from a `pool`, optionally keyed by the provided `key`. If no pool exists for the given key, one will be created. +The provided function `f` must create a new object instance of type `T`. +The acquired object MUST be returned to the pool by calling `release(pool, key, obj)` exactly once. +The `forcenew` keyword argument can be used to force the creation of a new object, ignoring any existing objects in the pool. +The `isvalid` keyword argument can be used to specify a function that will be called to determine if an object is still valid +for reuse. By default, all objects are considered valid. +If there are no objects available for reuse, `f` will be called to create a new object. +If the pool is already at its maximum capacity, `acquire` will block until an object is returned to the pool via `release`. +""" +function Base.acquire(f, pool::Pool{K, T}, key=nothing; forcenew::Bool=false, isvalid::Function=TRUE) where {K, T} + key isa K || throw(ArgumentError("invalid key `$key` provided for pool key type $K")) + acquire(pool.sem) + try + # once we've received our permit, we can figure out where the object should come from + # if we're forcing a new object, we can just call f() and return it + forcenew && return f() + # otherwise, check if there's an existing object in the pool to reuse + objs = Base.@lock pool.values get!(() -> ConcurrentStack{T}(), pool.values[], key) + obj = pop!(objs) + while obj !== nothing + # if the object is valid, return it + isvalid(obj) && return obj + # otherwise, try the next object + obj = pop!(objs) + end + # if we get here, we didn't find any valid objects, so we'll just create a new one + return f() + catch + # an error occurred while acquiring, so make sure we return the semaphore permit + release(pool.sem) + rethrow() + end +end + +""" + release(pool::Pool{K, T}, key::K, obj::Union{T, Nothing}=nothing) + release(pool::Pool{K, T}, obj::T) + release(pool::Pool{K, T}) + +Return an object to a `pool`, optionally keyed by the provided `key`. +If `obj` is provided, it will be returned to the pool for reuse. Otherwise, if `nothing` is returned, +just the "permit" will be returned to the pool. +""" +function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T} + key isa K || throw(ArgumentError("invalid key `$key` provided for pool key type $K")) + # if we're given an object, we'll put it back in the pool + # otherwise, we'll just return the permit + if obj !== nothing + # first though, we repeat Base.Semaphore's error check in the case of an invalid release + # where we don't want to push an object for reuse for an invalid release + Base.@lock pool.sem.cond_wait begin + if pool.sem.curr_cnt > 0 + # if the key is invalid, we'll just let the KeyError propagate + objs = Base.@lock pool.values pool.values[][key] + push!(objs, obj) + end + end + # we don't throw an error or unlock in the invalid case, because we'll let + # the release call below do all that for us + end + release(pool.sem) + return +end + +Base.release(pool::Pool{K, T}, obj::T) where {K, T} = release(pool, nothing, obj) +Base.release(pool::Pool{K, T}) where {K, T} = release(pool, nothing, nothing) + +end # module diff --git a/src/rwlock.jl b/src/rwlock.jl new file mode 100644 index 0000000..f86197d --- /dev/null +++ b/src/rwlock.jl @@ -0,0 +1,124 @@ +mutable struct ReadWriteLock + writelock::ReentrantLock +@static if VERSION < v"1.7" + waitingwriter::Union{Nothing, Task} +else + @atomic waitingwriter::Union{Nothing, Task} +end + readwait::Base.ThreadSynchronizer +@static if VERSION < v"1.7" + readercount::Threads.Atomic{Int} + readerwait::Threads.Atomic{Int} +else + @atomic readercount::Int + @atomic readerwait::Int +end +end + +@static if VERSION < v"1.7" +ReadWriteLock() = ReadWriteLock(ReentrantLock(), nothing, Base.ThreadSynchronizer(), Threads.Atomic{Int}(0), Threads.Atomic{Int}(0)) +else +ReadWriteLock() = ReadWriteLock(ReentrantLock(), nothing, Base.ThreadSynchronizer(), 0, 0) +end + +const MaxReaders = 1 << 30 + +function readlock(rw::ReadWriteLock) +@static if VERSION < v"1.7" + Threads.atomic_add!(rw.readercount, 1) + if rw.readercount[] < 0 + # A writer is active or pending, so we need to wait + Base.@lock rw.readwait wait(rw.readwait) + end +else + if (@atomic :acquire_release rw.readercount += 1) < 0 + # A writer is active or pending, so we need to wait + Base.@lock rw.readwait wait(rw.readwait) + end +end + return +end + +function readunlock(rw::ReadWriteLock) +@static if VERSION < v"1.7" + Threads.atomic_sub!(rw.readercount, 1) + if rw.readercount[] < 0 + # there's a pending write, check if we're the last reader + Threads.atomic_sub!(rw.readerwait, 1) + if rw.readerwait[] == 0 + # Last reader, wake up the writer. + schedule(rw.waitingwriter) + end + end +else + if (@atomic :acquire_release rw.readercount -= 1) < 0 + # there's a pending write, check if we're the last reader + if (@atomic :acquire_release rw.readerwait -= 1) == 0 + # Last reader, wake up the writer. + schedule(rw.waitingwriter) + end + end +end + return +end + +function Base.lock(rw::ReadWriteLock) + lock(rw.writelock) # only a single writer allowed at a time + # ok, here's how we do this: we subtract MaxReaders from readercount + # to make readercount negative; this will prevent any further readers + # from locking, while maintaining our actual reader count so we + # can track when we're able to write +@static if VERSION < v"1.7" + Threads.atomic_sub!(rw.readercount, MaxReaders) + r = rw.readercount[] + MaxReaders +else + r = (@atomic :acquire_release rw.readercount -= MaxReaders) + MaxReaders +end + # if r == 0, that means there were no readers, + # so we can proceed directly with the write lock + # if r == 1, this is an interesting case because there's only 1 reader + # and we might be racing to acquire the write lock and the reader + # unlocking; so we _also_ atomically set and check readerwait; + # if readerwait == 0, then the reader won the race and decremented readerwait + # to -1, and we increment by 1 to 0, so we know the reader is done and can proceed + # without waiting. If _we_ win the race, then we'll continue to waiting + # and the reader will decrement and then schedule us +@static if VERSION < v"1.7" + if r != 0 + Threads.atomic_add!(rw.readerwait, r) + if rw.readerwait[] != 0 + # otherwise, there are readers, so we need to wait for them to finish + # we do this by setting ourselves as the waiting writer + # and wait for the last reader to re-schedule us + rw.waitingwriter = current_task() + wait() + end + end +else + if r != 0 && (@atomic :acquire_release rw.readerwait += r) != 0 + # otherwise, there are readers, so we need to wait for them to finish + # we do this by setting ourselves as the waiting writer + # and wait for the last reader to re-schedule us + @atomic rw.waitingwriter = current_task() + wait() + end +end + return +end + +Base.islocked(rw::ReadWriteLock) = islocked(rw.writelock) + +function Base.unlock(rw::ReadWriteLock) +@static if VERSION < v"1.7" + Threads.atomic_add!(rw.readercount, MaxReaders) + r = rw.readercount[] +else + r = (@atomic :acquire_release rw.readercount += MaxReaders) +end + if r > 0 + # wake up waiting readers + Base.@lock rw.readwait notify(rw.readwait) + end + unlock(rw.writelock) + return +end diff --git a/src/spawn.jl b/src/spawn.jl new file mode 100644 index 0000000..78d7050 --- /dev/null +++ b/src/spawn.jl @@ -0,0 +1,68 @@ +const WORK_QUEUE = Channel{Task}(0) +const WORKER_TASKS = Task[] + +""" + ConcurrentUtilities.@spawn expr + ConcurrentUtilities.@spawn passthroughstorage::Bool expr + +Similar to `Threads.@spawn`, schedule and execute a task (given by `expr`) +that will be run on a "background worker" (see [`ConcurrentUtilities.init`]((@ref))). + +In the 2-argument invocation, `passthroughstorage` controls whether the task-local storage of the +`current_task()` should be "passed through" to the spawned task. +""" +macro spawn(thunk) + esc(quote + tsk = @task $thunk + tsk.storage = current_task().storage + put!(ConcurrentUtilities.WORK_QUEUE, tsk) + tsk + end) +end + +""" + ConcurrentUtilities.@spawn expr + ConcurrentUtilities.@spawn passthroughstorage::Bool expr + +Similar to `Threads.@spawn`, schedule and execute a task (given by `expr`) +that will be run on a "background worker" (see [`ConcurrentUtilities.init`]((@ref))). + +In the 2-argument invocation, `passthroughstorage` controls whether the task-local storage of the +`current_task()` should be "passed through" to the spawned task. +""" +macro spawn(passthroughstorage, thunk) + esc(quote + tsk = @task $thunk + if $passthroughstorage + tsk.storage = current_task().storage + end + put!(ConcurrentUtilities.WORK_QUEUE, tsk) + tsk + end) +end + +""" + ConcurrentUtilities.init(nworkers=Threads.nthreads() - 1) + +Initialize background workers that will execute tasks spawned via +[`ConcurrentUtilities.@spawn`](@ref). If `nworkers == 1`, a single worker +will be started on thread 1 where tasks will be executed in contention +with other thread 1 work. Background worker tasks can be inspected by +looking at `ConcurrentUtilities.WORKER_TASKS`. +""" +function init(nworkers=Threads.nthreads()-1) + maxthreadid = nworkers + 1 + tids = Threads.nthreads() == 1 ? (1:1) : 2:maxthreadid + resize!(WORKER_TASKS, max(nworkers, 1)) + Threads.@threads for tid in 1:maxthreadid + if tid in tids + WORKER_TASKS[tid == 1 ? 1 : (tid - 1)] = Base.@async begin + for task in WORK_QUEUE + schedule(task) + wait(task) + end + end + end + end + return +end \ No newline at end of file diff --git a/src/synchronizer.jl b/src/synchronizer.jl new file mode 100644 index 0000000..41ab47b --- /dev/null +++ b/src/synchronizer.jl @@ -0,0 +1,123 @@ +""" + OrderedSynchronizer(i=1) + +A threadsafe synchronizer that allows ensuring concurrent work is done +in a specific order. The `OrderedSynchronizer` is initialized with an +integer `i` that represents the current "order" of the synchronizer. + +Work is "scheduled" by calling `put!(f, x, i)`, where `f` is a function +that will be called like `f()` when the synchronizer is at order `i`, +and will otherwise wait until other calls to `put!` have finished +to bring the synchronizer's state to `i`. Once `f()` is called, the +synchronizer's state is incremented by 1 and any waiting `put!` calls +check to see if it's their turn to execute. + +A synchronizer's state can be reset to a specific value (1 by default) +by calling `reset!(x, i)`. +""" +mutable struct OrderedSynchronizer + coordinating_task::Task + cond::Threads.Condition + i::Int +@static if VERSION < v"1.7" + closed::Threads.Atomic{Bool} +else + @atomic closed::Bool +end +end + +@static if VERSION < v"1.7" +OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, Threads.Atomic{Bool}(false)) +else +OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, false) +end + +""" + reset!(x::OrderedSynchronizer, i=1) + +Reset the state of `x` to `i`. +""" +function reset!(x::OrderedSynchronizer, i=1) + Base.@lock x.cond begin + x.i = i +@static if VERSION < v"1.7" + x.closed[] = false +else + @atomic :monotonic x.closed = false +end + end +end + +function Base.close(x::OrderedSynchronizer, excp::Exception=closed_exception()) + Base.@lock x.cond begin +@static if VERSION < v"1.7" + x.closed[] = true +else + @atomic :monotonic x.closed = true +end + Base.notify_error(x.cond, excp) + end + return +end + +@static if VERSION < v"1.7" + Base.isopen(x::OrderedSynchronizer) = !x.closed[] +else +Base.isopen(x::OrderedSynchronizer) = !(@atomic :monotonic x.closed) +end +closed_exception() = InvalidStateException("OrderedSynchronizer is closed.", :closed) + +function check_closed(x::OrderedSynchronizer) + if !isopen(x) + # if the monotonic load succeed, now do an acquire fence +@static if VERSION < v"1.7" + !x.closed[] && Base.concurrency_violation() +else + !(@atomic :acquire x.closed) && Base.concurrency_violation() +end + throw(closed_exception()) + end +end + +""" + put!(f::Function, x::OrderedSynchronizer, i::Int, incr::Int=1) + +Schedule `f` to be called when `x` is at order `i`. Note that `put!` +will block until `f` is executed. The typical usage involves something +like: + +```julia +x = OrderedSynchronizer() +@sync for i = 1:N + Threads.@spawn begin + # do some concurrent work + # once work is done, schedule synchronization + put!(x, \$i) do + # report back result of concurrent work + # won't be executed until all `i-1` calls to `put!` have already finished + end + end +end +``` + +The `incr` argument controls how much the synchronizer's state is +incremented after `f` is called. By default, `incr` is 1. +""" +function Base.put!(f, x::OrderedSynchronizer, i, incr=1) + check_closed(x) + Base.@lock x.cond begin + # wait until we're ready to execute f + while x.i != i + check_closed(x) + wait(x.cond) + end + check_closed(x) + try + f() + catch e + Base.throwto(x.coordinating_task, e) + end + x.i += incr + notify(x.cond) + end +end diff --git a/test/pools.jl b/test/pools.jl new file mode 100644 index 0000000..93b4117 --- /dev/null +++ b/test/pools.jl @@ -0,0 +1,99 @@ +using ConcurrentUtilities, Test + +@testset "Pools" begin + pool = Pool(Int; max=3) + # acquire an object from the pool + x1 = acquire(() -> 1, pool) + # no existing objects in the pool, so our function was called to create a new one + @test x1 == 1 + # release back to the pool for reuse + release(pool, x1) + # acquire another object from the pool + x1 = acquire(() -> 2, pool) + # this time, the pool had an existing object, so our function was not called + @test x1 == 1 + # but now there are no objects to reuse again, so the next acquire will call our function + x2 = acquire(() -> 2, pool) + @test x2 == 2 + x3 = acquire(() -> 3, pool) + @test x3 == 3 + # the pool is now at capacity, so the next acquire will block until an object is released + tsk = @async acquire(() -> 4, pool; forcenew=true) + yield() + @test !istaskdone(tsk) + # release an object back to the pool + release(pool, x1) + # now the acquire can complete + x1 = fetch(tsk) + # even though we released 1 for reuse, we passed forcenew, so our function was called to create new + @test x1 == 4 + # error to try and provide a key to a non-keyed pool + @test_throws ArgumentError acquire(() -> 1, pool, 1) + # release objects back to the pool + release(pool, x1) + release(pool, x2) + release(pool, x3) + # acquire an object, but checking isvalid + x1 = acquire(() -> 5, pool; isvalid=x -> x == 1) + @test x1 == 1 + # no valid objects, so our function was called to create a new one + x2 = acquire(() -> 6, pool; isvalid=x -> x == 1) + @test x2 == 6 + # we have one slot left in the pool, we now throw while creating new + # and we want to test that the permit isn't permanently lost for the pool + @test_throws ErrorException acquire(() -> error("oops"), pool; forcenew=true) + # we can still acquire a new object + x3 = acquire(() -> 7, pool; forcenew=true) + @test x3 == 7 + # release objects back to the pool + release(pool, x1) + release(pool, x2) + release(pool, x3) + # try to do an invalid release + @test_throws ErrorException release(pool, 10) + # test that the invalid release didn't push the object to our pool for reuse + x1 = acquire(() -> 8, pool) + @test x1 == 7 + # calling empty! removes all objects for reuse + empty!(pool) + x2 = acquire(() -> 9, pool) + @test x2 == 9 + + # now test a keyed pool + pool = Pool(String, Int; max=3) + # acquire an object from the pool + x1 = acquire(() -> 1, pool, "a") + # no existing objects in the pool, so our function was called to create a new one + @test x1 == 1 + # release back to the pool for reuse + release(pool, "a", x1) + # test for a different key + x2 = acquire(() -> 2, pool, "b") + # there's an existing object, but for a different key, so we don't reuse + @test x2 == 2 + # acquire another object from the pool + x1 = acquire(() -> 2, pool, "a") + # this time, the pool had an existing object, so our function was not called + @test x1 == 1 + x3 = acquire(() -> 3, pool, "a") + @test x3 == 3 + # the pool is now at capacity, so the next acquire will block until an object is released + # even though we've acquired using different keys, the capacity is shared across the pool + tsk = @async acquire(() -> 4, pool, "c"; forcenew=true) + yield() + @test !istaskdone(tsk) + # release an object back to the pool + release(pool, "a", x1) + # now the acquire can complete + x1 = fetch(tsk) + # even though we released 1 for reuse, we passed forcenew, so our function was called to create new + @test x1 == 4 + # error to try and provide an invalid key to a keyed pool + @test_throws ArgumentError acquire(() -> 1, pool, 1) + # error to release an invalid key back to the pool + @test_throws KeyError release(pool, "z", 1) + # error to *not* provide a key to a keyed pool + @test_throws ArgumentError acquire(() -> 1, pool) + # error to *not* provide a key when releasing to a keyed pool + @test_throws ArgumentError release(pool) +end