Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Nick Robinson <[email protected]>

Another refactor

fix
  • Loading branch information
quinnj committed Apr 21, 2023
1 parent 6e3b705 commit fb01d42
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 283 deletions.
5 changes: 2 additions & 3 deletions src/ConcurrentUtilities.jl
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
module ConcurrentUtilities

export Lockable, OrderedSynchronizer, reset!, ReadWriteLock, readlock, readunlock, @wkspawn, ConcurrentStack,
export Lockable, OrderedSynchronizer, reset!, ReadWriteLock, readlock, readunlock, @wkspawn,
Workers, remote_eval, remote_fetch, Worker, terminate!, WorkerTerminatedException,
Pool, acquire, release
Pool, acquire, release, drain!

include("workers.jl")
using .Workers
include("concurrentstack.jl")
include("lockable.jl")
include("spawn.jl")
include("synchronizer.jl")
Expand Down
76 changes: 0 additions & 76 deletions src/concurrentstack.jl

This file was deleted.

147 changes: 95 additions & 52 deletions src/pools.jl
Original file line number Diff line number Diff line change
@@ -1,74 +1,122 @@
module Pools

import ..ConcurrentStack, ..Lockable

export Pool, acquire, release

import Base: Semaphore, acquire, release
export Pool, acquire, release, drain!
import Base: acquire, release

"""
Pool(T; max::Int=typemax(Int))
Pool(K, T; max::Int=typemax(Int))
Pool{T}(max::Int=4096)
Pool{K, T}(max::Int=4096)
A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects
of type `K`. Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a
function that returns a new object of type `T`.
The `key` argument is optional and can be used to lookup objects that match a certain criteria.
If no pool exists for the given key, one will be created.
The `key` argument is optional and can be used to lookup objects that match a certain criteria
(a Dict is used internally, so matching is `isequal`).
If `max` is specified, the pool will limit the number of objects
that can be checked out at any given time. If the limit has been reached, `acquire` will
The `max` argument will limit the number of objects
that can be acquired at any given time. If the limit has been reached, `acquire` will
block until an object is returned to the pool via `release`.
By default, `release(pool, obj)` will return the object to the pool for reuse.
`release(pool)` will return the "permit" to the pool while not returning
any object for reuse.
`drain!` can be used to remove any cached objects for reuse, but it does *not* release
any active acquires.
"""
struct Pool{K, T}
sem::Semaphore
values::Lockable{Dict{K, ConcurrentStack{T}}}
mutable struct Pool{K, T}
lock::Threads.Condition
max::Int
cur::Int
keyedvalues::Dict{K, Vector{T}}
values::Vector{T}

function Pool{K, T}(max::Int=4096) where {K, T}
T === Nothing && throw(ArgumentError("Pool type can not be `Nothing`"))
x = new(Threads.Condition(), max, 0)
if K === Nothing
x.values = T[]
safesizehint!(x.values, max)
else
x.keyedvalues = Dict{K, Vector{T}}()
end
return x
end
end

Pool(T; max::Int=typemax(Int)) = Pool{Nothing, T}(Semaphore(max), Lockable(Dict{Nothing, ConcurrentStack{T}}()))
Pool(K, T; max::Int=typemax(Int)) = Pool{K, T}(Semaphore(max), Lockable(Dict{K, ConcurrentStack{T}}()))
Pool{T}(max::Int=4096) where {T} = Pool{Nothing, T}(max)

safesizehint!(x, n) = sizehint!(x, min(4096, n))

Base.empty!(pool::Pool) = Base.@lock pool.values empty!(pool.values[])
# determines whether we'll look up object caches in .keyedvalues or .values
iskeyed(::Pool{K}) where {K} = K !== Nothing

"""
drain!(pool)
Remove all objects from the pool for reuse, but do not release any active acquires.
"""
function drain!(pool::Pool{K}) where {K}
Base.@lock pool.lock begin
if iskeyed(pool)
for objs in values(pool.keyedvalues)
empty!(objs)
end
else
empty!(pool.values)
end
end
end

# in VERSION >= v"1.7", we can replace `TRUE` with `Returns(true)`
TRUE(x) = true

@noinline keyerror(key, K) = throw(ArgumentError("invalid key `$key` provided for pool key type $K"))
@noinline releaseerror() = throw(ArgumentError("cannot release permit when pool is empty"))

# NOTE: assumes you have the lock!
function releasepermit(pool::Pool)
pool.cur > 0 || releaseerror()
pool.cur -= 1
notify(pool.lock; all=false)
return
end

"""
acquire(f, pool::Pool{K, T}, [key::K]; forcenew::Bool=false, isvalid::Function) -> T
Get an object from a `pool`, optionally keyed by the provided `key`. If no pool exists for the given key, one will be created.
Get an object from a `pool`, optionally keyed by the provided `key`.
The provided function `f` must create a new object instance of type `T`.
The acquired object MUST be returned to the pool by calling `release(pool, key, obj)` exactly once.
Each `acquire` call MUST be matched by exactly one `release` call.
The `forcenew` keyword argument can be used to force the creation of a new object, ignoring any existing objects in the pool.
The `isvalid` keyword argument can be used to specify a function that will be called to determine if an object is still valid
for reuse. By default, all objects are considered valid.
If there are no objects available for reuse, `f` will be called to create a new object.
If the pool is already at its maximum capacity, `acquire` will block until an object is returned to the pool via `release`.
"""
function Base.acquire(f, pool::Pool{K, T}, key=nothing; forcenew::Bool=false, isvalid::Function=TRUE) where {K, T}
key isa K || throw(ArgumentError("invalid key `$key` provided for pool key type $K"))
acquire(pool.sem)
try
# once we've received our permit, we can figure out where the object should come from
# if we're forcing a new object, we can just call f() and return it
forcenew && return f()
# otherwise, check if there's an existing object in the pool to reuse
objs = Base.@lock pool.values get!(() -> ConcurrentStack{T}(), pool.values[], key)
obj = pop!(objs)
while obj !== nothing
# if the object is valid, return it
isvalid(obj) && return obj
# otherwise, try the next object
obj = pop!(objs)
key isa K || keyerror(key, K)
Base.@lock pool.lock begin
# first get a permit
while pool.cur >= pool.max
wait(pool.lock)
end
pool.cur += 1
# now see if we can get an object from the pool for reuse
if !forcenew
objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.max), pool.keyedvalues, key) : pool.values
while !isempty(objs)
obj = pop!(objs)
isvalid(obj) && return obj
end
end
# if we get here, we didn't find any valid objects, so we'll just create a new one
end
try
# if there weren't any objects to reuse or we were forcenew, we'll create a new one
return f()
catch
# an error occurred while acquiring, so make sure we return the semaphore permit
release(pool.sem)
# if we error creating a new object, it's critical we return the permit to the pool
Base.@lock pool.lock releasepermit(pool)
rethrow()
end
end
Expand All @@ -79,27 +127,22 @@ end
release(pool::Pool{K, T})
Return an object to a `pool`, optionally keyed by the provided `key`.
If `obj` is provided, it will be returned to the pool for reuse. Otherwise, if `nothing` is returned,
If `obj` is provided, it will be returned to the pool for reuse.
Otherwise, if `nothing` is returned, or `release(pool)` is called,
just the "permit" will be returned to the pool.
"""
function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T}
key isa K || throw(ArgumentError("invalid key `$key` provided for pool key type $K"))
# if we're given an object, we'll put it back in the pool
# otherwise, we'll just return the permit
if obj !== nothing
# first though, we repeat Base.Semaphore's error check in the case of an invalid release
# where we don't want to push an object for reuse for an invalid release
Base.@lock pool.sem.cond_wait begin
if pool.sem.curr_cnt > 0
# if the key is invalid, we'll just let the KeyError propagate
objs = Base.@lock pool.values pool.values[][key]
push!(objs, obj)
end
key isa K || keyerror(key, K)
Base.@lock pool.lock begin
# return the permit
releasepermit(pool)
# if we're given an object, we'll put it back in the pool
if obj !== nothing
# if an invalid key is provided, we let the KeyError propagate
objs = iskeyed(pool) ? pool.keyedvalues[key] : pool.values
push!(objs, obj)
end
# we don't throw an error or unlock in the invalid case, because we'll let
# the release call below do all that for us
end
release(pool.sem)
return
end

Expand Down
13 changes: 13 additions & 0 deletions src/spawn.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ function init(nworkers=Threads.nthreads()-1)
maxthreadid = nworkers + 1
tids = Threads.nthreads() == 1 ? (1:1) : 2:maxthreadid
resize!(WORKER_TASKS, max(nworkers, 1))
@static if VERSION < v"1.8.0"
Threads.@threads for tid in 1:maxthreadid
if tid in tids
WORKER_TASKS[tid == 1 ? 1 : (tid - 1)] = Base.@async begin
Expand All @@ -64,5 +65,17 @@ function init(nworkers=Threads.nthreads()-1)
end
end
end
else
Threads.@threads :static for tid in 1:maxthreadid
if tid in tids
WORKER_TASKS[tid == 1 ? 1 : (tid - 1)] = Base.@async begin
for task in WORK_QUEUE
schedule(task)
wait(task)
end
end
end
end
end
return
end
56 changes: 0 additions & 56 deletions test/concurrentstack.jl

This file was deleted.

Loading

0 comments on commit fb01d42

Please sign in to comment.