Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add threadsafe Pool data structure #18

Merged
merged 1 commit into from
Apr 24, 2023
Merged

Add threadsafe Pool data structure #18

merged 1 commit into from
Apr 24, 2023

Conversation

quinnj
Copy link
Member

@quinnj quinnj commented Apr 20, 2023

Also split the now-large ConcurrentUtilities.jl file into more manageable individual files. For reviewing, mainly worry about the src/pools.jl and test/pools.jl file, since they're the only net-new code here.

quinnj added a commit to JuliaWeb/HTTP.jl that referenced this pull request Apr 20, 2023
  * Finally achieving my dream of moving the connection pool out of HTTP; it's going to live in the [ConcurrentUtilities.jl](JuliaServices/ConcurrentUtilities.jl#18)
    package instead. In short, it had no unit tests, scant documentation, and was generally a pain to deal with in HTTP. We also discovered at least 2 major issues with the current
    implementation during a deep dive into performance and issue diagnosis, including:
      * If a ConnectError was thrown while creating a connection, a "permit" to the pool was permanently lost; get enough of them and the entire connection pool grinds to a halt :grimace:
      * Being a threadsafe structure, the pool had the poor choice of holding the pool lock for the duration of trying to get a permit _and making new connections_. This meant that in the
        case of a connection taking a long time to be made, we were essentially holding the rest of the pool hostage. This is totally unnecessary and can cause big performance issues in
        really heavy workloads where there's lots of contention on the pool for managing requests.
    The new implementation in ConcurrentUtilities.jl solves both these problems while being about 1/4th the LOC of the previous implementation. And it has unit tests! yay!
    All in all, I think #1033 and #1032 should both be mostly resolved by these changes/updates.
  * Relatedly, we're adjusting the API for connection pools to allow the user to pass in their _own_ connection pool to be used for that request (to check for a connection to reuse and to
    return the connection to after completion). A pool can be constructed like `HTTP.Pool(; max::Int)` and passed to any of the `HTTP.request` methods like `HTTP.get(...; pool=pool)`.
    HTTP has its own global default pool `HTTP.Connections.POOL` that it uses by default to manage connection reuse. The `HTTP.set_default_connection_limit!` will still work as long as it
    is called before any requests are made. Calling it _after_ requests have been made will be a no-op. The `connection_limit` keyword arg is now formally deprecated and will issue a warning
    if passed. I'm comfortable with a full deprecation here because it turns out it wasn't even really working before anyway (unless it was passed/used on _every_ request and never changed).
    So instead of "changing" things, we're really just doing a proper implementation that now actually works, has better behavior, and is actually controllable by the user.
  * Add a try-finally in keepalive! around our global IO lock usage just for good house-keeping
  * Refactored `try_with_timeout` to use a `Channel` instead of the non-threaded `@async`; it's much simpler and seems cleaner
  * I refactored a few of the stream IO functions so that we always know the number of bytes downloaded, whether in memory or written to
    an IO, so we can log them and use them in verbose logging to give bit-rate calculations
  * Added a new `logerrors::Bool=false` keyword arg that allows doing `@error` logs on errors that may otherwise be "swallowed" when doing retries;
    it can be helpful to sometimes be able to at least see what kinds of errors are happening; also cleaned up our error handling in general so we don't lose backtraces which fixes #1003.
  * Added lots of metrics around various time spent in various layers, read vs. write durations, etc. These can be enabled, and stored in the request context, by passing `observelayers=true`
    This mostly resolves #1025 and #1019.
  * Fixed some missing keyword args that either weren't correct in the inline docs or weren't included in the client.md docs
  * Removed a few client-side layers (BasicAuth, DebugRequest, Canonicalize, etc.) since their implementations were _really_ trivial and moved their functionality into a single HeadersRequest
    layer where we now do all the header-related work during the request. This has the affect of _drastically_ reducing the exploding backtraces we now have when doing requests because of
    our "functional style" client-side layering.
  * Fixed #612 by ensuring `keepalive` is included in our `connectionkey` computation so only connections that specified `keepalive` will be reused if its passed the same value on subsequent requests
Copy link
Member

@Drvi Drvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple of comments about optimization opportunities. Would it make sense to specialize the Pool on being keyed/non-keyed and Semaphore-bounded vs not?

function Base.lock(f, l::Lockable)
lock(l.lock) do
f(l.value)
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the macro version of locking be preferred to the anonymous function one?
I've made a little benchmark:

julia> function foo()
           pool = Pool(Int; max=10000)
           for i in 1:9999
               x = acquire(() -> i, pool)
           end

           for i in 10000:15000
               x = acquire(() -> i, pool)
               release(pool, i)
               release(pool, i-9999)
           end
           return pool
       end

And here are the results:

julia> @btime foo(); # uses Base.@lock
  1.673 ms (10016 allocations: 313.28 KiB)

julia> @btime foo(); # uses do syntax
  1.743 ms (10016 allocations: 313.28 KiB)

julia> @btime foo(); # uses do syntax
  1.735 ms (10016 allocations: 313.28 KiB)

julia> @btime foo(); # uses Base.@lock
  1.675 ms (10016 allocations: 313.28 KiB)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's do it!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On my updated PR (refactored pool) using the above benchmark, I'm seeing:

julia> @btime foo()
  673.667 μs (8 allocations: 132.39 KiB)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool:) Still, I think the macro is preferable

src/pools.jl Outdated
values::Lockable{Dict{K, ConcurrentStack{T}}}
end

Pool(T; max::Int=typemax(Int)) = Pool{Nothing, T}(Semaphore(max), Lockable(Dict{Nothing, ConcurrentStack{T}}()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in this dummy benchmark:

function foo()
    pool = Pool(Int; max=10000)
    for i in 1:10000
        x = acquire(() -> i, pool)
    end

    for i in 10001:20000
        release(pool, i-10000)
        x = acquire(() -> i, pool)
        release(pool, i)
    end

    for i in 1:10000
        x = acquire(() -> i, pool)
    end

    for i in 10001:20000
        release(pool, i-10000)
        x = acquire(() -> i, pool)
        release(pool, i)
    end

    for i in 1:10000
        x = acquire(() -> i, pool)
    end

    for i in 10001:20000
        release(pool, i-10000)
        x = acquire(() -> i, pool)
        release(pool, i)
    end
    return pool
end

we seem to pay 20% penalty for using a Dict{Nothing,Stack} instead of a Stack directly:
Screenshot from 2023-04-21 08-48-09

Here is a quick comparison where I hacked a version that used the Stack directly iff K === Nothing:

julia> @btime foo();
  5.792 ms (60010 allocations: 1.83 MiB)

This is the original:

julia> @btime foo();
  8.205 ms (60014 allocations: 1.83 MiB)

Copy link
Member

@Drvi Drvi Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me wonder if the use case in HTTP would benefit from ditching the Dict and use e.g. a NamedTuple of Stacks or something like that? EDIT: Just read the HTTP PR and noticed that we don't key based on OpenSSL/MbedTLS/TCP so the NamedTuple idea is probably not applicable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to do something faster in the non-key case, which I think is worth exploring (like having an extra field for a single stack that we use if the key type is Nothing)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated timing post-refactor of the above benchmark:

julia> @btime foo()
  3.740 ms (8 allocations: 132.39 KiB)
Pool{Nothing, Int64}(Base.GenericCondition{ReentrantLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (12800, 0, 141733920768))), 10000, 0, #undef, [10001, 10002, 10003, 10004, 10005, 10006, 10007, 10008, 10009, 10010  …  19991, 19992, 19993, 19994, 19995, 19996, 19997, 19998, 19999, 20000])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it! This is much more lean:)
I hope this is as performant in your HTTP.jl benchmarks -- the dummy benchmarks I used were not stressing any concurrent accesses. I think we're now spending more time behind lock, which originally caused you to look into lockfree datastructures.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was mainly because we were calling connect while holding the lock, which then depends on network conditions and can easily take milliseconds. I think we should be ok here. The only slight worry is that we do call isvalid while holding the lock, so users have to be careful that their it isn't expensive to call that (it should be fine for how we use it in HTTP)

src/pools.jl Outdated Show resolved Hide resolved
src/pools.jl Outdated Show resolved Hide resolved
src/pools.jl Outdated
Pool(T; max::Int=typemax(Int)) = Pool{Nothing, T}(Semaphore(max), Lockable(Dict{Nothing, ConcurrentStack{T}}()))
Pool(K, T; max::Int=typemax(Int)) = Pool{K, T}(Semaphore(max), Lockable(Dict{K, ConcurrentStack{T}}()))

Base.empty!(pool::Pool) = Base.@lock pool.values empty!(pool.values[])
Copy link
Member

@nickrobinson251 nickrobinson251 Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this release all the permits too?

Also i think it should return the pool, rather than the pool.values[]

Suggested change
Base.empty!(pool::Pool) = Base.@lock pool.values empty!(pool.values[])
function Base.empty!(pool::Pool)
Base.@lock pool.values begin
foreach(_ -> release(pool.sem), pool.values[])
empty!(pool.values[])
end
return pool
end

without this, a testset like this would hang:

@testset "empty!" begin
    pool = Pool(Int; max=2)
    # initially populate the pool with 1 object
    x0 = acquire(() -> 0, pool)
    @test x0 == 0
    empty!(pool)
    # we should now be able to refill pool with 2 objects (up to capacity)
    x1 = acquire(() -> 1, pool)
    x2 = acquire(() -> 2, pool)
    @test x1 == 1
    @test x2 == 2
end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you noted elsewhere, I think this is just not named properly; we can't release the permits, because then if the tasks that actually did the acquires tried to release, things get all out of sorts. This is just a means to ensure that all existing objects in the pool are evicted so nothing is reused (like forcenew, but more of a global eviction).

src/pools.jl Show resolved Hide resolved
test/pools.jl Outdated Show resolved Hide resolved
test/pools.jl Outdated
x1 = acquire(() -> 8, pool)
@test x1 == 7
# calling empty! removes all objects for reuse
empty!(pool)
Copy link
Member

@nickrobinson251 nickrobinson251 Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we support empty! ?

At first i thought Pool was supporting the exact same interface as Semaphone, which is just acquire/release... but then there's empty!

i think i'm confused by what the contract of empty! is supposed to be here (see my comment above).
what does "removes all objects for reuse" mean? I wonder if empty! is even the right name.

It also makes me wonder if there should also be a release_all(pool) that just releases all the permits? probably not

And if there should be other Dict-like interface functions (e.g. keys/haskey or length)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's come up with a better name; HTTP has this function closeall, but that's not quite the right thing I think for a generic pool. Maybe evictall? empty! felt the most right to me, but as you pointed out there's this tension between whether a Pool is just a semaphore or a Dict storing non-checked-out objects.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the function name to drain! in the PR

src/pools.jl Outdated Show resolved Hide resolved
src/pools.jl Outdated Show resolved Hide resolved
src/pools.jl Show resolved Hide resolved
Also split the now-large ConcurrentUtilities.jl file into more
manageable individual files.
@quinnj quinnj mentioned this pull request Apr 21, 2023
@quinnj quinnj merged commit a76d54d into main Apr 24, 2023
@quinnj quinnj deleted the jq-pools branch April 24, 2023 21:00
key isa K || keyerror(key, K)
Base.@lock pool.lock begin
# first get a permit
while pool.cur >= pool.max
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assume noone holds the lock, but task (1) is waiting on the pool.lock condition. Is it racy when task (2) releases from the pool, just as task (3) also tries to acquire? Task 1 would wake up, task 3 would hold the lock and both of them might now pass the pool.cur >= pool.max check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task 1 would wake up, task 3 would hold the lock

This isn't possible. The design of Threads.Condition means that when you "wake up", you are guaranteed exclusive access to the lock.

quinnj added a commit to JuliaWeb/HTTP.jl that referenced this pull request Apr 27, 2023
* Summary of changes in this PR:
  * Finally achieving my dream of moving the connection pool out of HTTP; it's going to live in the [ConcurrentUtilities.jl](JuliaServices/ConcurrentUtilities.jl#18)
    package instead. In short, it had no unit tests, scant documentation, and was generally a pain to deal with in HTTP. We also discovered at least 2 major issues with the current
    implementation during a deep dive into performance and issue diagnosis, including:
      * If a ConnectError was thrown while creating a connection, a "permit" to the pool was permanently lost; get enough of them and the entire connection pool grinds to a halt :grimace:
      * Being a threadsafe structure, the pool had the poor choice of holding the pool lock for the duration of trying to get a permit _and making new connections_. This meant that in the
        case of a connection taking a long time to be made, we were essentially holding the rest of the pool hostage. This is totally unnecessary and can cause big performance issues in
        really heavy workloads where there's lots of contention on the pool for managing requests.
    The new implementation in ConcurrentUtilities.jl solves both these problems while being about 1/4th the LOC of the previous implementation. And it has unit tests! yay!
    All in all, I think #1033 and #1032 should both be mostly resolved by these changes/updates.
  * Relatedly, we're adjusting the API for connection pools to allow the user to pass in their _own_ connection pool to be used for that request (to check for a connection to reuse and to
    return the connection to after completion). A pool can be constructed like `HTTP.Pool(; max::Int)` and passed to any of the `HTTP.request` methods like `HTTP.get(...; pool=pool)`.
    HTTP has its own global default pool `HTTP.Connections.POOL` that it uses by default to manage connection reuse. The `HTTP.set_default_connection_limit!` will still work as long as it
    is called before any requests are made. Calling it _after_ requests have been made will be a no-op. The `connection_limit` keyword arg is now formally deprecated and will issue a warning
    if passed. I'm comfortable with a full deprecation here because it turns out it wasn't even really working before anyway (unless it was passed/used on _every_ request and never changed).
    So instead of "changing" things, we're really just doing a proper implementation that now actually works, has better behavior, and is actually controllable by the user.
  * Add a try-finally in keepalive! around our global IO lock usage just for good house-keeping
  * Refactored `try_with_timeout` to use a `Channel` instead of the non-threaded `@async`; it's much simpler and seems cleaner
  * I refactored a few of the stream IO functions so that we always know the number of bytes downloaded, whether in memory or written to
    an IO, so we can log them and use them in verbose logging to give bit-rate calculations
  * Added a new `logerrors::Bool=false` keyword arg that allows doing `@error` logs on errors that may otherwise be "swallowed" when doing retries;
    it can be helpful to sometimes be able to at least see what kinds of errors are happening; also cleaned up our error handling in general so we don't lose backtraces which fixes #1003.
  * Added lots of metrics around various time spent in various layers, read vs. write durations, etc. These can be enabled, and stored in the request context, by passing `observelayers=true`
    This mostly resolves #1025 and #1019.
  * Fixed some missing keyword args that either weren't correct in the inline docs or weren't included in the client.md docs
  * Removed a few client-side layers (BasicAuth, DebugRequest, Canonicalize, etc.) since their implementations were _really_ trivial and moved their functionality into a single HeadersRequest
    layer where we now do all the header-related work during the request. This has the affect of _drastically_ reducing the exploding backtraces we now have when doing requests because of
    our "functional style" client-side layering.
  * Fixed #612 by ensuring `keepalive` is included in our `connectionkey` computation so only connections that specified `keepalive` will be reused if its passed the same value on subsequent requests

* Update based on new Pool chnages

* Updates

* cleanup

* Put back in exception unwrapping

* Address PR review
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants