Skip to content

Commit

Permalink
Fix tspawnat on julia 1.7+ by making the task sticky (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
IanButterworth authored Feb 1, 2022
1 parent 0711daa commit 4473037
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 32 deletions.
38 changes: 38 additions & 0 deletions .github/workflows/RunTests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Run tests

on:
pull_request:
push:
branches:
- master
tags: '*'

jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
julia-version: ['1.6', '1', 'nightly']
julia-arch: [x64]
os: [ubuntu-latest] # macos & windows don't support xvfb

steps:
- uses: actions/checkout@v2
- uses: julia-actions/setup-julia@latest
with:
version: ${{ matrix.julia-version }}
- uses: julia-actions/julia-runtest@master
env:
JULIA_NUM_THREADS: 2
with:
coverage: false
prefix: xvfb-run
# - uses: julia-actions/julia-processcoverage@v1
# - uses: codecov/codecov-action@v1
# with:
# file: ./lcov.info
# flags: unittests
# name: codecov-umbrella
# fail_ci_if_error: false
# token: ${{ secrets.CODECOV_TOKEN }}
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ThreadPools"
uuid = "b189fb0b-2eb5-4ed4-bc0c-d34c51242431"
authors = ["Trey Roessig <[email protected]"]
version = "2.1.0"
version = "2.1.1"

[deps]
Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7"
Expand Down
56 changes: 28 additions & 28 deletions src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ end
"""
@bthreads
Mimics `Base.Threads.@threads, but keeps the iterated tasks off if the primary
Mimics `Base.Threads.@threads, but keeps the iterated tasks off if the primary
thread.`
# Example
Expand All @@ -54,18 +54,18 @@ julia> @bthreads for x in 1:8
Note that execution order is not guaranteed, but the primary thread does not
show up on any of the jobs.
"""
macro bthreads(args...)
macro bthreads(args...)
return _pthread_macro(:(StaticPool(2)), false, args...)
nothing
end

"""
@qthreads
Mimics `Base.Threads.@threads`, but uses a task queueing strategy, only starting
Mimics `Base.Threads.@threads`, but uses a task queueing strategy, only starting
a new task when an previous one (on any thread) has completed. This can provide
performance advantages when the iterated tasks are very nonuniform in length.
The primary thread is used. To prevent usage of the primary thread, see
performance advantages when the iterated tasks are very nonuniform in length.
The primary thread is used. To prevent usage of the primary thread, see
[`@qbthreads`](@ref).
# Example
Expand All @@ -84,17 +84,17 @@ julia> @qthreads for x in 1:8
```
Note that execution order is not guaranteed and the primary thread is used.
"""
macro qthreads(args...)
macro qthreads(args...)
return _pthread_macro(:(QueuePool(1)), false, args...)
end

"""
@qbthreads
Mimics `Base.Threads.@threads`, but uses a task queueing strategy, only starting
Mimics `Base.Threads.@threads`, but uses a task queueing strategy, only starting
a new task when an previous one (on any thread) has completed. This can provide
performance advantages when the iterated tasks are very nonuniform in length.
The primary thread is not used. To allow usage of the primary thread, see
performance advantages when the iterated tasks are very nonuniform in length.
The primary thread is not used. To allow usage of the primary thread, see
[`@qthreads`](@ref).
# Example
Expand All @@ -114,14 +114,14 @@ julia> @qbthreads for x in 1:8
Note that execution order is not guaranteed, but the primary thread does not
show up on any of the jobs.
"""
macro qbthreads(args...)
macro qbthreads(args...)
return _pthread_macro(:(QueuePool(2)), false, args...)
end

"""
@logthreads -> pool
Mimics `Base.Threads.@threads`. Returns a logged pool that can be analyzed with
Mimics `Base.Threads.@threads`. Returns a logged pool that can be analyzed with
the logging functions and `plot`ted.
# Example
Expand All @@ -142,15 +142,15 @@ julia> plot(pool)
```
Note that execution order is not guaranteed and the primary thread is used.
"""
macro logthreads(args...)
macro logthreads(args...)
return _pthread_macro(:(LoggedStaticPool(1)), true, args...)
end

"""
@logbthreads -> pool
Mimics `Base.Threads.@threads, but keeps the iterated tasks off if the primary
thread.` Returns a logged pool that can be analyzed with the logging functions
Mimics `Base.Threads.@threads, but keeps the iterated tasks off if the primary
thread.` Returns a logged pool that can be analyzed with the logging functions
and `plot`ted.
# Example
Expand All @@ -172,17 +172,17 @@ julia> plot(pool)
Note that execution order is not guaranteed, but the primary thread does not
show up on any of the jobs.
"""
macro logbthreads(args...)
macro logbthreads(args...)
return _pthread_macro(:(LoggedStaticPool(2)), true, args...)
end

"""
@logqthreads -> pool
Mimics `Base.Threads.@threads`, but uses a task queueing strategy, only starting
a new task when an previous one (on any thread) has completed. Returns a logged
pool that can be analyzed with the logging functions and `plot`ted. The primary
thread is used. To prevent usage of the primary thread, see
Mimics `Base.Threads.@threads`, but uses a task queueing strategy, only starting
a new task when an previous one (on any thread) has completed. Returns a logged
pool that can be analyzed with the logging functions and `plot`ted. The primary
thread is used. To prevent usage of the primary thread, see
[`@logqbthreads`](@ref).
# Example
Expand All @@ -203,17 +203,17 @@ julia> plot(pool)
```
Note that execution order is not guaranteed and the primary thread is used.
"""
macro logqthreads(args...)
macro logqthreads(args...)
return _pthread_macro(:(LoggedQueuePool(1)), true, args...)
end

"""
@logqbthreads -> pool
Mimics `Base.Threads.@threads`, but uses a task queueing strategy, only starting
a new task when an previous one (on any thread) has completed. Returns a logged
pool that can be analyzed with the logging functions and `plot`ted. The primary
thread is not used. To allow usage of the primary thread, see
Mimics `Base.Threads.@threads`, but uses a task queueing strategy, only starting
a new task when an previous one (on any thread) has completed. Returns a logged
pool that can be analyzed with the logging functions and `plot`ted. The primary
thread is not used. To allow usage of the primary thread, see
[`@logqthreads`](@ref).
# Example
Expand All @@ -235,7 +235,7 @@ julia> plot(pool)
Note that execution order is not guaranteed, but the primary thread does not
show up on any of the jobs.
"""
macro logqbthreads(args...)
macro logqbthreads(args...)
return _pthread_macro(:(LoggedQueuePool(2)), true, args...)
end

Expand All @@ -255,9 +255,9 @@ julia> fetch(t)
```
"""
macro tspawnat(thrdid, expr)
if VERSION >= v"1.4"
@static if VERSION >= v"1.4"
letargs = Base._lift_one_interp!(expr)

thunk = esc(:(()->($expr)))
var = esc(Base.sync_varname)
tid = esc(thrdid)
Expand All @@ -267,7 +267,7 @@ macro tspawnat(thrdid, expr)
end
let $(letargs...)
local task = Task($thunk)
task.sticky = false
task.sticky = VERSION >= v"1.7" # disallow task migration which was introduced in 1.7
ccall(:jl_set_task_tid, Cvoid, (Any, Cint), task, $tid-1)
if $(Expr(:islocal, var))
put!($var, task)
Expand Down
2 changes: 1 addition & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

let cmd = `$(Base.julia_cmd()) --depwarn=error --startup-file=no runtests_exec.jl`
for test_nthreads in (1, 2, 4) # run once to try single-threaded mode, then try a couple times to trigger bad races
for test_nthreads in sort(collect(Set((1, 2, Threads.nthreads())))) # run once to try single-threaded mode, then try a couple times to trigger bad races
new_env = copy(ENV)
new_env["JULIA_NUM_THREADS"] = string(test_nthreads)
println("\n# Threads = $test_nthreads")
Expand Down
25 changes: 23 additions & 2 deletions test/testspawnat.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ end

@testset "@tspawnat" begin

@testset "@normal operation" begin
@testset "normal operation" begin
obj = TestObj(0)
function fn!(obj)
sleep(0.1)
Expand All @@ -30,6 +30,27 @@ end
@test obj.data == Threads.nthreads()
end

function busywait(secs)
tstart = time()
while time() < tstart + secs
end
end

@testset "sticky tasks" begin
tasks = Task[]
@sync for tid in rand(1:Threads.nthreads(), 1000)
task = @tspawnat tid begin
yield()
busywait(rand() * 0.01)
yield()
(tid, Threads.threadid())
end
push!(tasks, task)
end
results = fetch.(tasks)
@test all(t->first(t) == last(t), results)
end

@ifv1p4 begin
@testset "interpolation" begin
function foo(x)
Expand All @@ -42,7 +63,7 @@ end
t1 = @tspawnat max(1, Threads.nthreads()) foo($x)
x += 1
t2 = @tspawnat max(1, Threads.nthreads()-1) foo($x)

test_sum = fetch(t1) + fetch(t2)
@test expect_sum == test_sum
end
Expand Down

2 comments on commit 4473037

@IanButterworth
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/53646

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v2.1.1 -m "<description of version>" 44730372b9b00ba38192e5d2850dfe24c20cdbeb
git push origin v2.1.1

Please sign in to comment.