Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a threadpool parameter to Channel constructor #50858

Merged
merged 2 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Channel(sz=0) = Channel{Any}(sz)

# special constructors
"""
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false)
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

Create a new task from `func`, bind it to a new channel of type
`T` and size `size`, and schedule the task, all in a single call.
Expand All @@ -70,9 +70,14 @@ The channel is automatically closed when the task terminates.
If you need a reference to the created task, pass a `Ref{Task}` object via
the keyword argument `taskref`.

If `spawn = true`, the Task created for `func` may be scheduled on another thread
If `spawn=true`, the `Task` created for `func` may be scheduled on another thread
in parallel, equivalent to creating a task via [`Threads.@spawn`](@ref).

If `spawn=true` and the `threadpool` argument is not set, it defaults to `:default`.

If the `threadpool` argument is set (to `:default` or `:interactive`), this implies
that `spawn=true` and the new Task is spawned to the specified threadpool.

Return a `Channel`.

# Examples
Expand Down Expand Up @@ -117,6 +122,9 @@ true
In earlier versions of Julia, Channel used keyword arguments to set `size` and `T`, but
those constructors are deprecated.

!!! compat "Julia 1.9"
The `threadpool=` argument was added in Julia 1.9.

```jldoctest
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
Expand All @@ -129,12 +137,18 @@ julia> String(collect(chnl))
"hello world"
```
"""
function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false) where T
function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing) where T
chnl = Channel{T}(size)
task = Task(() -> func(chnl))
if threadpool === nothing
threadpool = :default
else
spawn = true
end
task.sticky = !spawn
bind(chnl, task)
if spawn
Threads._spawn_set_thrpool(task, threadpool)
schedule(task) # start it on (potentially) another thread
else
yield(task) # immediately start it, yielding the current thread
Expand All @@ -149,17 +163,17 @@ Channel(func::Function, args...; kwargs...) = Channel{Any}(func, args...; kwargs
# of course not deprecated.)
# We use `nothing` default values to check which arguments were set in order to throw the
# deprecation warning if users try to use `spawn=` with `ctype=` or `csize=`.
function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing)
function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing, threadpool=nothing)
NHDaly marked this conversation as resolved.
Show resolved Hide resolved
# The spawn= keyword argument was added in Julia v1.3, and cannot be used with the
# deprecated keyword arguments `ctype=` or `csize=`.
if (ctype !== nothing || csize !== nothing) && spawn !== nothing
throw(ArgumentError("Cannot set `spawn=` in the deprecated constructor `Channel(f; ctype=Any, csize=0)`. Please use `Channel{T=Any}(f, size=0; taskref=nothing, spawn=false)` instead!"))
if (ctype !== nothing || csize !== nothing) && (spawn !== nothing || threadpool !== nothing)
throw(ArgumentError("Cannot set `spawn=` or `threadpool=` in the deprecated constructor `Channel(f; ctype=Any, csize=0)`. Please use `Channel{T=Any}(f, size=0; taskref=nothing, spawn=false, threadpool=nothing)` instead!"))
end
# Set the actual default values for the arguments.
ctype === nothing && (ctype = Any)
csize === nothing && (csize = 0)
spawn === nothing && (spawn = false)
return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn)
return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn, threadpool=threadpool)
end

closed_exception() = InvalidStateException("Channel is closed.", :closed)
Expand Down
14 changes: 14 additions & 0 deletions test/channel_threadpool.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Test
using Base.Threads

@testset "Task threadpools" begin
c = Channel{Symbol}() do c; put!(c, threadpool(current_task())); end
@test take!(c) === threadpool(current_task())
c = Channel{Symbol}(spawn = true) do c; put!(c, threadpool(current_task())); end
@test take!(c) === :default
c = Channel{Symbol}(threadpool = :interactive) do c; put!(c, threadpool(current_task())); end
@test take!(c) === :interactive
@test_throws ArgumentError Channel{Symbol}(threadpool = :foo) do c; put!(c, :foo); end
end
5 changes: 5 additions & 0 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ end
@test taskref[].sticky == false
@test collect(c) == [0]
end
let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no channel_threadpool.jl`
new_env = copy(ENV)
new_env["JULIA_NUM_THREADS"] = "1,1"
run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr))
end

@testset "multiple concurrent put!/take! on a channel for different sizes" begin
function testcpt(sz)
Expand Down