Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse Nodes in Pools #19

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
391 changes: 9 additions & 382 deletions src/ConcurrentUtilities.jl

Large diffs are not rendered by default.

28 changes: 20 additions & 8 deletions src/concurrentstack.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
@static if VERSION < v"1.7"

mutable struct Node{T}
value::T
next::Union{Node{T},Nothing}
value::T
Node{T}(value::T) where {T} = new{T}(nothing, value)
Node{T}() where {T} = new{T}(nothing)
end

Node{T}(value::T) where {T} = Node{T}(value, nothing)
Expand All @@ -18,15 +20,14 @@ ConcurrentStack{T}() where {T} = ConcurrentStack{T}(ReentrantLock(), nothing)
function Base.push!(stack::ConcurrentStack{T}, v) where {T}
Drvi marked this conversation as resolved.
Show resolved Hide resolved
v === nothing && throw(ArgumentError("cannot push nothing onto a ConcurrentStack"))
v = convert(T, v)
Drvi marked this conversation as resolved.
Show resolved Hide resolved
node = Node{T}(v)
lock(stack.lock) do
node.next = stack.next
stack.next = node
end
return stack
end

function Base.pop!(stack::ConcurrentStack)
function _popnode!(stack::ConcurrentStack{T}) where {T}
lock(stack.lock) do
node = stack.next
node === nothing && return nothing
Expand All @@ -35,25 +36,31 @@ function Base.pop!(stack::ConcurrentStack)
end
end

function Base.pop!(stack::ConcurrentStack)
node = _popnode!(stack)
return node === nothing ? node : node.value
end

else

mutable struct Node{T}
value::T
@atomic next::Union{Node{T},Nothing}
value::T
Node{T}(value::T) where {T} = new{T}(nothing, value)
Node{T}() where {T} = new{T}(nothing)
end

Node{T}(value::T) where {T} = Node{T}(value, nothing)

mutable struct ConcurrentStack{T}
@atomic next::Union{Node{T},Nothing}
end

ConcurrentStack{T}() where {T} = ConcurrentStack{T}(nothing)

function Base.push!(stack::ConcurrentStack{T}, v) where {T}
function Base.push!(stack::ConcurrentStack{T}, v, node::Node{T}=Node{T}()) where {T}
v === nothing && throw(ArgumentError("cannot push nothing onto a ConcurrentStack"))
v = convert(T, v)
node = Node{T}(v)
node.value = v
next = @atomic stack.next
while true
@atomic node.next = next
Expand All @@ -64,12 +71,17 @@ function Base.push!(stack::ConcurrentStack{T}, v) where {T}
end

function Base.pop!(stack::ConcurrentStack)
node = _popnode!(stack)
return node === nothing ? node : node.value
end

function _popnode!(stack::ConcurrentStack{T}) where {T}
while true
node = @atomic stack.next
node === nothing && return nothing
next = @atomic node.next
next, ok = @atomicreplace(stack.next, node => next)
ok && return node.value
ok && return node
end
end

Expand Down
36 changes: 36 additions & 0 deletions src/lockable.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""
Lockable(value, lock = ReentrantLock())

Creates a `Lockable` object that wraps `value` and
associates it with the provided `lock`.
"""
struct Lockable{T, L <: Base.AbstractLock}
value::T
lock::L
end

Lockable(value) = Lockable(value, ReentrantLock())

Base.getindex(l::Lockable) = l.value

"""
lock(f::Function, l::Lockable)

Acquire the lock associated with `l`, execute `f` with the lock held,
and release the lock when `f` returns. `f` will receive one positional
argument: the value wrapped by `l`. If the lock is already locked by a
different task/thread, wait for it to become available.
When this function returns, the `lock` has been released, so the caller should
not attempt to `unlock` it.
"""
function Base.lock(f, l::Lockable)
lock(l.lock) do
f(l.value)
end
end

# implement the rest of the Lock interface on Lockable
Base.islocked(l::Lockable) = islocked(l.lock)
Base.lock(l::Lockable) = lock(l.lock)
Base.trylock(l::Lockable) = trylock(l.lock)
Base.unlock(l::Lockable) = unlock(l.lock)
135 changes: 135 additions & 0 deletions src/pools.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
module Pools

import ..ConcurrentStack, ..Node, ..Lockable, .._popnode!

export Pool, acquire, release

import Base: Semaphore, acquire, release

"""
Pool(T; max::Int=typemax(Int))
Pool(K, T; max::Int=typemax(Int))

A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects
of type `K`. Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a
function that returns a new object of type `T`.
The `key` argument is optional and can be used to lookup objects that match a certain criteria.
If no pool exists for the given key, one will be created.

If `max` is specified, the pool will limit the number of objects
that can be checked out at any given time. If the limit has been reached, `acquire` will
block until an object is returned to the pool via `release`.

By default, `release(pool, obj)` will return the object to the pool for reuse.
`release(pool)` will return the "permit" to the pool while not returning
any object for reuse.
"""
struct Pool{K, T}
sem::Semaphore
values::Lockable{Dict{K, ConcurrentStack{T}}}
nodepool::Vector{Node{T}}
maxnodes::Int
end

function Pool(T; max::Union{Nothing,Int}=nothing)
maxnodes = something(max, 128)
return Pool{Nothing, T}(
Semaphore(something(max, typemax(Int))),
Lockable(Dict{Nothing, ConcurrentStack{T}}()),
sizehint!([], min(maxnodes, 128)),
maxnodes,
)
end
function Pool(K, T; max::Union{Nothing,Int}=nothing)
maxnodes = something(max, 128)
return Pool{K, T}(
Semaphore(something(max, typemax(Int))),
Lockable(Dict{K, ConcurrentStack{T}}()),
sizehint!([], min(maxnodes, 128)),
maxnodes,
)
end
Base.empty!(pool::Pool) = Base.@lock pool.values empty!(pool.values[])
_popnode!(pool::Pool{K,T}) where {K,T} = isempty(pool.nodepool) ? Node{T}() : pop!(pool.nodepool)

TRUE(x) = true

"""
acquire(f, pool::Pool{K, T}, [key::K]; forcenew::Bool=false, isvalid::Function) -> T

Get an object from a `pool`, optionally keyed by the provided `key`. If no pool exists for the given key, one will be created.
The provided function `f` must create a new object instance of type `T`.
The acquired object MUST be returned to the pool by calling `release(pool, key, obj)` exactly once.
The `forcenew` keyword argument can be used to force the creation of a new object, ignoring any existing objects in the pool.
The `isvalid` keyword argument can be used to specify a function that will be called to determine if an object is still valid
for reuse. By default, all objects are considered valid.
If there are no objects available for reuse, `f` will be called to create a new object.
If the pool is already at its maximum capacity, `acquire` will block until an object is returned to the pool via `release`.
"""
function Base.acquire(f, pool::Pool{K, T}, key=nothing; forcenew::Bool=false, isvalid::Function=TRUE) where {K, T}
key isa K || throw(ArgumentError("invalid key `$key` provided for pool key type $K"))
acquire(pool.sem)
try
# once we've received our permit, we can figure out where the object should come from
# if we're forcing a new object, we can just call f() and return it
forcenew && return f()
# otherwise, check if there's an existing object in the pool to reuse
objs = Base.@lock pool.values get!(() -> ConcurrentStack{T}(), pool.values[], key)
prev_node = node = _popnode!(objs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this prev_node for?

while node !== nothing
obj = node.value
# if the object is valid, return it
if isvalid(obj)
length(pool.nodepool) < pool.maxnodes && Base.@lock pool.values push!(pool.nodepool, node)
return obj
end
# otherwise, try the next object
prev_node = node
node = _popnode!(objs)
end
isnothing(prev_node) || push!(pool.nodepool, prev_node)
# if we get here, we didn't find any valid objects, so we'll just create a new one
return f()
catch
# an error occurred while acquiring, so make sure we return the semaphore permit
release(pool.sem)
rethrow()
end
end

"""
release(pool::Pool{K, T}, key::K, obj::Union{T, Nothing}=nothing)
release(pool::Pool{K, T}, obj::T)
release(pool::Pool{K, T})

Return an object to a `pool`, optionally keyed by the provided `key`.
If `obj` is provided, it will be returned to the pool for reuse. Otherwise, if `nothing` is returned,
just the "permit" will be returned to the pool.
"""
function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T}
key isa K || throw(ArgumentError("invalid key `$key` provided for pool key type $K"))
# if we're given an object, we'll put it back in the pool
# otherwise, we'll just return the permit
if obj !== nothing
# first though, we repeat Base.Semaphore's error check in the case of an invalid release
# where we don't want to push an object for reuse for an invalid release
Base.@lock pool.sem.cond_wait begin
if pool.sem.curr_cnt > 0
# if the key is invalid, we'll just let the KeyError propagate
Base.@lock pool.values begin
objs = pool.values[][key]
push!(objs, obj, _popnode!(pool))
end
end
end
# we don't throw an error or unlock in the invalid case, because we'll let
# the release call below do all that for us
end
release(pool.sem)
return
end

Base.release(pool::Pool{K, T}, obj::T) where {K, T} = release(pool, nothing, obj)
Base.release(pool::Pool{K, T}) where {K, T} = release(pool, nothing, nothing)

end # module
Loading