-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add threadsafe Pool data structure #18
Conversation
* Finally achieving my dream of moving the connection pool out of HTTP; it's going to live in the [ConcurrentUtilities.jl](JuliaServices/ConcurrentUtilities.jl#18) package instead. In short, it had no unit tests, scant documentation, and was generally a pain to deal with in HTTP. We also discovered at least 2 major issues with the current implementation during a deep dive into performance and issue diagnosis, including: * If a ConnectError was thrown while creating a connection, a "permit" to the pool was permanently lost; get enough of them and the entire connection pool grinds to a halt :grimace: * Being a threadsafe structure, the pool had the poor choice of holding the pool lock for the duration of trying to get a permit _and making new connections_. This meant that in the case of a connection taking a long time to be made, we were essentially holding the rest of the pool hostage. This is totally unnecessary and can cause big performance issues in really heavy workloads where there's lots of contention on the pool for managing requests. The new implementation in ConcurrentUtilities.jl solves both these problems while being about 1/4th the LOC of the previous implementation. And it has unit tests! yay! All in all, I think #1033 and #1032 should both be mostly resolved by these changes/updates. * Relatedly, we're adjusting the API for connection pools to allow the user to pass in their _own_ connection pool to be used for that request (to check for a connection to reuse and to return the connection to after completion). A pool can be constructed like `HTTP.Pool(; max::Int)` and passed to any of the `HTTP.request` methods like `HTTP.get(...; pool=pool)`. HTTP has its own global default pool `HTTP.Connections.POOL` that it uses by default to manage connection reuse. The `HTTP.set_default_connection_limit!` will still work as long as it is called before any requests are made. Calling it _after_ requests have been made will be a no-op. The `connection_limit` keyword arg is now formally deprecated and will issue a warning if passed. I'm comfortable with a full deprecation here because it turns out it wasn't even really working before anyway (unless it was passed/used on _every_ request and never changed). So instead of "changing" things, we're really just doing a proper implementation that now actually works, has better behavior, and is actually controllable by the user. * Add a try-finally in keepalive! around our global IO lock usage just for good house-keeping * Refactored `try_with_timeout` to use a `Channel` instead of the non-threaded `@async`; it's much simpler and seems cleaner * I refactored a few of the stream IO functions so that we always know the number of bytes downloaded, whether in memory or written to an IO, so we can log them and use them in verbose logging to give bit-rate calculations * Added a new `logerrors::Bool=false` keyword arg that allows doing `@error` logs on errors that may otherwise be "swallowed" when doing retries; it can be helpful to sometimes be able to at least see what kinds of errors are happening; also cleaned up our error handling in general so we don't lose backtraces which fixes #1003. * Added lots of metrics around various time spent in various layers, read vs. write durations, etc. These can be enabled, and stored in the request context, by passing `observelayers=true` This mostly resolves #1025 and #1019. * Fixed some missing keyword args that either weren't correct in the inline docs or weren't included in the client.md docs * Removed a few client-side layers (BasicAuth, DebugRequest, Canonicalize, etc.) since their implementations were _really_ trivial and moved their functionality into a single HeadersRequest layer where we now do all the header-related work during the request. This has the affect of _drastically_ reducing the exploding backtraces we now have when doing requests because of our "functional style" client-side layering. * Fixed #612 by ensuring `keepalive` is included in our `connectionkey` computation so only connections that specified `keepalive` will be reused if its passed the same value on subsequent requests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a couple of comments about optimization opportunities. Would it make sense to specialize the Pool on being keyed/non-keyed and Semaphore-bounded vs not?
function Base.lock(f, l::Lockable) | ||
lock(l.lock) do | ||
f(l.value) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't the macro version of locking be preferred to the anonymous function one?
I've made a little benchmark:
julia> function foo()
pool = Pool(Int; max=10000)
for i in 1:9999
x = acquire(() -> i, pool)
end
for i in 10000:15000
x = acquire(() -> i, pool)
release(pool, i)
release(pool, i-9999)
end
return pool
end
And here are the results:
julia> @btime foo(); # uses Base.@lock
1.673 ms (10016 allocations: 313.28 KiB)
julia> @btime foo(); # uses do syntax
1.743 ms (10016 allocations: 313.28 KiB)
julia> @btime foo(); # uses do syntax
1.735 ms (10016 allocations: 313.28 KiB)
julia> @btime foo(); # uses Base.@lock
1.675 ms (10016 allocations: 313.28 KiB)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's do it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On my updated PR (refactored pool) using the above benchmark, I'm seeing:
julia> @btime foo()
673.667 μs (8 allocations: 132.39 KiB)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool:) Still, I think the macro is preferable
src/pools.jl
Outdated
values::Lockable{Dict{K, ConcurrentStack{T}}} | ||
end | ||
|
||
Pool(T; max::Int=typemax(Int)) = Pool{Nothing, T}(Semaphore(max), Lockable(Dict{Nothing, ConcurrentStack{T}}())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in this dummy benchmark:
function foo()
pool = Pool(Int; max=10000)
for i in 1:10000
x = acquire(() -> i, pool)
end
for i in 10001:20000
release(pool, i-10000)
x = acquire(() -> i, pool)
release(pool, i)
end
for i in 1:10000
x = acquire(() -> i, pool)
end
for i in 10001:20000
release(pool, i-10000)
x = acquire(() -> i, pool)
release(pool, i)
end
for i in 1:10000
x = acquire(() -> i, pool)
end
for i in 10001:20000
release(pool, i-10000)
x = acquire(() -> i, pool)
release(pool, i)
end
return pool
end
we seem to pay 20% penalty for using a Dict{Nothing,Stack}
instead of a Stack
directly:
Here is a quick comparison where I hacked a version that used the Stack directly iff K === Nothing
:
julia> @btime foo();
5.792 ms (60010 allocations: 1.83 MiB)
This is the original:
julia> @btime foo();
8.205 ms (60014 allocations: 1.83 MiB)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me wonder if the use case in HTTP would benefit from ditching the Dict and use e.g. a NamedTuple
of Stacks or something like that? EDIT: Just read the HTTP PR and noticed that we don't key based on OpenSSL/MbedTLS/TCP so the NamedTuple
idea is probably not applicable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might be able to do something faster in the non-key case, which I think is worth exploring (like having an extra field for a single stack that we use if the key type is Nothing
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated timing post-refactor of the above benchmark:
julia> @btime foo()
3.740 ms (8 allocations: 132.39 KiB)
Pool{Nothing, Int64}(Base.GenericCondition{ReentrantLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (12800, 0, 141733920768))), 10000, 0, #undef, [10001, 10002, 10003, 10004, 10005, 10006, 10007, 10008, 10009, 10010 … 19991, 19992, 19993, 19994, 19995, 19996, 19997, 19998, 19999, 20000])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love it! This is much more lean:)
I hope this is as performant in your HTTP.jl benchmarks -- the dummy benchmarks I used were not stressing any concurrent accesses. I think we're now spending more time behind lock, which originally caused you to look into lockfree datastructures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was mainly because we were calling connect
while holding the lock, which then depends on network conditions and can easily take milliseconds. I think we should be ok here. The only slight worry is that we do call isvalid
while holding the lock, so users have to be careful that their it isn't expensive to call that (it should be fine for how we use it in HTTP)
src/pools.jl
Outdated
Pool(T; max::Int=typemax(Int)) = Pool{Nothing, T}(Semaphore(max), Lockable(Dict{Nothing, ConcurrentStack{T}}())) | ||
Pool(K, T; max::Int=typemax(Int)) = Pool{K, T}(Semaphore(max), Lockable(Dict{K, ConcurrentStack{T}}())) | ||
|
||
Base.empty!(pool::Pool) = Base.@lock pool.values empty!(pool.values[]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this release all the permits too?
Also i think it should return the pool
, rather than the pool.values[]
Base.empty!(pool::Pool) = Base.@lock pool.values empty!(pool.values[]) | |
function Base.empty!(pool::Pool) | |
Base.@lock pool.values begin | |
foreach(_ -> release(pool.sem), pool.values[]) | |
empty!(pool.values[]) | |
end | |
return pool | |
end |
without this, a testset like this would hang:
@testset "empty!" begin
pool = Pool(Int; max=2)
# initially populate the pool with 1 object
x0 = acquire(() -> 0, pool)
@test x0 == 0
empty!(pool)
# we should now be able to refill pool with 2 objects (up to capacity)
x1 = acquire(() -> 1, pool)
x2 = acquire(() -> 2, pool)
@test x1 == 1
@test x2 == 2
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you noted elsewhere, I think this is just not named properly; we can't release the permits, because then if the tasks that actually did the acquire
s tried to release
, things get all out of sorts. This is just a means to ensure that all existing objects in the pool are evicted so nothing is reused (like forcenew
, but more of a global eviction).
test/pools.jl
Outdated
x1 = acquire(() -> 8, pool) | ||
@test x1 == 7 | ||
# calling empty! removes all objects for reuse | ||
empty!(pool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we support empty!
?
At first i thought Pool
was supporting the exact same interface as Semaphone
, which is just acquire
/release
... but then there's empty!
i think i'm confused by what the contract of empty!
is supposed to be here (see my comment above).
what does "removes all objects for reuse" mean? I wonder if empty!
is even the right name.
It also makes me wonder if there should also be a probably notrelease_all(pool)
that just releases all the permits?
And if there should be other Dict-like interface functions (e.g. keys
/haskey
or length
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's come up with a better name; HTTP has this function closeall
, but that's not quite the right thing I think for a generic pool. Maybe evictall
? empty!
felt the most right to me, but as you pointed out there's this tension between whether a Pool
is just a semaphore or a Dict storing non-checked-out objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the function name to drain!
in the PR
Also split the now-large ConcurrentUtilities.jl file into more manageable individual files.
key isa K || keyerror(key, K) | ||
Base.@lock pool.lock begin | ||
# first get a permit | ||
while pool.cur >= pool.max |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's assume noone holds the lock, but task (1) is waiting on the pool.lock
condition. Is it racy when task (2) releases from the pool, just as task (3) also tries to acquire? Task 1 would wake up, task 3 would hold the lock and both of them might now pass the pool.cur >= pool.max
check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task 1 would wake up, task 3 would hold the lock
This isn't possible. The design of Threads.Condition
means that when you "wake up", you are guaranteed exclusive access to the lock.
* Summary of changes in this PR: * Finally achieving my dream of moving the connection pool out of HTTP; it's going to live in the [ConcurrentUtilities.jl](JuliaServices/ConcurrentUtilities.jl#18) package instead. In short, it had no unit tests, scant documentation, and was generally a pain to deal with in HTTP. We also discovered at least 2 major issues with the current implementation during a deep dive into performance and issue diagnosis, including: * If a ConnectError was thrown while creating a connection, a "permit" to the pool was permanently lost; get enough of them and the entire connection pool grinds to a halt :grimace: * Being a threadsafe structure, the pool had the poor choice of holding the pool lock for the duration of trying to get a permit _and making new connections_. This meant that in the case of a connection taking a long time to be made, we were essentially holding the rest of the pool hostage. This is totally unnecessary and can cause big performance issues in really heavy workloads where there's lots of contention on the pool for managing requests. The new implementation in ConcurrentUtilities.jl solves both these problems while being about 1/4th the LOC of the previous implementation. And it has unit tests! yay! All in all, I think #1033 and #1032 should both be mostly resolved by these changes/updates. * Relatedly, we're adjusting the API for connection pools to allow the user to pass in their _own_ connection pool to be used for that request (to check for a connection to reuse and to return the connection to after completion). A pool can be constructed like `HTTP.Pool(; max::Int)` and passed to any of the `HTTP.request` methods like `HTTP.get(...; pool=pool)`. HTTP has its own global default pool `HTTP.Connections.POOL` that it uses by default to manage connection reuse. The `HTTP.set_default_connection_limit!` will still work as long as it is called before any requests are made. Calling it _after_ requests have been made will be a no-op. The `connection_limit` keyword arg is now formally deprecated and will issue a warning if passed. I'm comfortable with a full deprecation here because it turns out it wasn't even really working before anyway (unless it was passed/used on _every_ request and never changed). So instead of "changing" things, we're really just doing a proper implementation that now actually works, has better behavior, and is actually controllable by the user. * Add a try-finally in keepalive! around our global IO lock usage just for good house-keeping * Refactored `try_with_timeout` to use a `Channel` instead of the non-threaded `@async`; it's much simpler and seems cleaner * I refactored a few of the stream IO functions so that we always know the number of bytes downloaded, whether in memory or written to an IO, so we can log them and use them in verbose logging to give bit-rate calculations * Added a new `logerrors::Bool=false` keyword arg that allows doing `@error` logs on errors that may otherwise be "swallowed" when doing retries; it can be helpful to sometimes be able to at least see what kinds of errors are happening; also cleaned up our error handling in general so we don't lose backtraces which fixes #1003. * Added lots of metrics around various time spent in various layers, read vs. write durations, etc. These can be enabled, and stored in the request context, by passing `observelayers=true` This mostly resolves #1025 and #1019. * Fixed some missing keyword args that either weren't correct in the inline docs or weren't included in the client.md docs * Removed a few client-side layers (BasicAuth, DebugRequest, Canonicalize, etc.) since their implementations were _really_ trivial and moved their functionality into a single HeadersRequest layer where we now do all the header-related work during the request. This has the affect of _drastically_ reducing the exploding backtraces we now have when doing requests because of our "functional style" client-side layering. * Fixed #612 by ensuring `keepalive` is included in our `connectionkey` computation so only connections that specified `keepalive` will be reused if its passed the same value on subsequent requests * Update based on new Pool chnages * Updates * cleanup * Put back in exception unwrapping * Address PR review
Also split the now-large ConcurrentUtilities.jl file into more manageable individual files. For reviewing, mainly worry about the
src/pools.jl
andtest/pools.jl
file, since they're the only net-new code here.