diff --git a/Project.toml b/Project.toml index 2467bec..c170586 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ThreadPools" uuid = "b189fb0b-2eb5-4ed4-bc0c-d34c51242431" authors = ["Trey Roessig pool = pwith(ThreadPools.LoggedQueuePool(1,2)) do pool julia> plot(pool) ``` """ -function tforeach(pool, fn::Function, itr) +function tforeach(pool::AbstractThreadPool, fn::Function, itr) tmap(pool, fn, itr) nothing end -tforeach(fn::Function, pool, itr) = tforeach(pool, fn, itr) -#tforeach(pool, fn::Function, itrs...) = tforeach(pool, (x) -> fn(x...), zip(itrs...)) -#tforeach(fn::Function, pool, itrs...) = tforeach(pool, (x) -> fn(x...), zip(itrs...)) +tforeach(fn::Function, pool::AbstractThreadPool, itr) = tforeach(pool, fn, itr) +tforeach(pool::AbstractThreadPool, fn::Function, itr1, itrs...) = tforeach(pool, x -> fn(x...), zip(itr1, itrs...)) +tforeach(fn::Function, pool::AbstractThreadPool, itr1, itrs...) = tforeach(pool, x -> fn(x...), zip(itr1, itrs...)) """ @@ -74,9 +74,9 @@ julia> pool = pwith(ThreadPools.LoggedQueuePool(1,2)) do pool julia> plot(pool) ``` """ -tmap(fn::Function, pool, itr) = tmap(pool, fn, itr) -# tmap(pool, fn::Function, itrs...) = tmap(pool, (x) -> fn(x...), zip(itrs...)) -# tmap(fn::Function, pool, itrs...) = tmap(pool, (x) -> fn(x...), zip(itrs...)) +tmap(fn::Function, pool::AbstractThreadPool, itr) = tmap(pool, fn, itr) +tmap(pool::AbstractThreadPool, fn::Function, itr1, itrs...) = tmap(pool, x -> fn(x...), zip(itr1, itrs...)) +tmap(fn::Function, pool::AbstractThreadPool, itr1, itrs...) = tmap(pool, x -> fn(x...), zip(itr1, itrs...)) """ diff --git a/src/logqpool.jl b/src/logqpool.jl index 8a904cb..bd9cd0a 100644 --- a/src/logqpool.jl +++ b/src/logqpool.jl @@ -168,7 +168,7 @@ results(pool::LoggedQueuePool) = ResultIterator(pool) function tmap(pool::LoggedQueuePool, fn::Function, itr) data = collect(itr) - applicable(fn, data[1]) || error("function can't be applied to iterator contents") + applicable(fn, first(data)) || error("function can't be applied to iterator contents") N = length(data) sizehint!(pool.recs, N) result = Array{_detect_type(fn, data), ndims(data)}(undef, size(data)) diff --git a/src/logstaticpool.jl b/src/logstaticpool.jl index 4c410ee..0bf69aa 100644 --- a/src/logstaticpool.jl +++ b/src/logstaticpool.jl @@ -39,7 +39,7 @@ end function tmap(pool::LoggedStaticPool, fn::Function, itr) data = collect(itr) - applicable(fn, data[1]) || error("function can't be applied to iterator contents") + applicable(fn, first(data)) || error("function can't be applied to iterator contents") N = length(data) sizehint!(pool.recs, N) result = Array{_detect_type(fn, data), ndims(data)}(undef, size(data)) diff --git a/src/qpool.jl b/src/qpool.jl index dc6e7c7..8ef689b 100644 --- a/src/qpool.jl +++ b/src/qpool.jl @@ -186,7 +186,7 @@ Base.IteratorEltype(::ResultIterator) = Base.EltypeUnknown() function tmap(pool::QueuePool, fn::Function, itr) data = collect(itr) - applicable(fn, data[1]) || error("function can't be applied to iterator contents") + applicable(fn, first(data)) || error("function can't be applied to iterator contents") N = length(data) result = Array{_detect_type(fn, data), ndims(data)}(undef, size(data)) _fn = (ind, x) -> (ind, fn(x)) diff --git a/src/simplefuncs.jl b/src/simplefuncs.jl index fce4ffb..0eec4f3 100644 --- a/src/simplefuncs.jl +++ b/src/simplefuncs.jl @@ -1,12 +1,12 @@ -@deprecate pmap(fn::Function, itr) tmap(fn::Function, itr) -@deprecate pforeach(fn::Function, itr) tforeach(fn::Function, itr) -@deprecate logpmap(fn::Function, itr) logtmap(fn::Function, itr) -@deprecate logpforeach(fn::Function, itr) logtforeach(fn::Function, itr) +@deprecate pmap(fn::Function, itrs...) tmap(fn::Function, itrs...) +@deprecate pforeach(fn::Function, itrs...) tforeach(fn::Function, itrs...) +@deprecate logpmap(fn::Function, itrs...) logtmap(fn::Function, itrs...) +@deprecate logpforeach(fn::Function, itrs...) logtforeach(fn::Function, itrs...) """ - tmap(fn::Function, itr) -> collection + tmap(fn::Function, itrs...) -> collection Mimics `Base.map`, but launches the function evaluations onto all available threads, using a pre-assigned scheduling strategy appropriate for uniform @@ -29,15 +29,15 @@ julia> tmap(x -> begin; println((x,Threads.threadid())); x^2; end, 1:8)' Note that while the execution order is not guaranteed, the result order is. Also note that the primary thread is used. """ -function tmap(fn::Function, itr) +function tmap(fn::Function, itrs...) pool = StaticPool() - result::Array{_detect_type(fn, itr), ndims(itr)} = tmap(pool, fn, itr) + result = tmap(pool, x->fn(x...), zip(itrs...)) close(pool) return result end """ - bmap(fn::Function, itr) -> collection + bmap(fn::Function, itrs...) -> collection Mimics `Base.map`, but launches the function evaluations onto all available threads except the primary, using a pre-assigned scheduling strategy @@ -60,15 +60,15 @@ julia> bmap(x -> begin; println((x,Threads.threadid())); x^2; end, 1:8)' Note that while the execution order is not guaranteed, the result order is, Also note that the primary thread is not used. """ -function bmap(fn, itr) +function bmap(fn::Function, itrs...) pool = StaticPool(2) - result::Array{_detect_type(fn, itr), ndims(itr)} = tmap(pool, fn, itr) + result = tmap(pool, x->fn(x...), zip(itrs...)) close(pool) return result end """ - qmap(fn::Function, itr) -> collection + qmap(fn::Function, itrs...) -> collection Mimics `Base.map`, but launches the function evaluations onto all available threads, using a queued scheduling strategy appropriate for nonuniform @@ -91,15 +91,15 @@ julia> qmap(x -> begin; println((x,Threads.threadid())); x^2; end, 1:8)' Note that while the execution order is not guaranteed, the result order is. Also note that the primary thread is used. """ -function qmap(fn, itr) +function qmap(fn::Function, itrs...) pool = QueuePool() - result::Array{_detect_type(fn, itr), ndims(itr)} = tmap(pool, fn, itr) + result = tmap(pool, x->fn(x...), zip(itrs...)) close(pool) return result end """ - qbmap(fn::Function, itr) -> collection + qbmap(fn::Function, itrs...) -> collection Mimics `Base.map`, but launches the function evaluations onto all available threads except the primary, using a queued scheduling strategy appropriate @@ -122,15 +122,15 @@ julia> qbmap(x -> begin; println((x,Threads.threadid())); x^2; end, 1:8)' Note that while the execution order is not guaranteed, the result order is, Also note that the primary thread is not used. """ -function qbmap(fn, itr) +function qbmap(fn::Function, itrs...) pool = QueuePool(2) - result::Array{_detect_type(fn, itr), ndims(itr)} = tmap(pool, fn, itr) + result = tmap(pool, x->fn(x...), zip(itrs...)) close(pool) return result end """ - logtmap(fn::Function, itr) -> (pool, collection) + logtmap(fn::Function, itrs...) -> (pool, collection) Mimics `Base.map`, but launches the function evaluations onto all available threads, using a pre-assigned scheduling strategy appropriate for uniform @@ -161,15 +161,15 @@ julia> plot(pool) Note that while the execution order is not guaranteed, the result order is. Also note that the primary thread is used. """ -function logtmap(fn::Function, itr) +function logtmap(fn::Function, itrs...) pool = LoggedStaticPool() - result::Array{_detect_type(fn, itr), ndims(itr)} = tmap(pool, fn, itr) + result = tmap(pool, x->fn(x...), zip(itrs...)) close(pool) return pool, result end """ - logbmap(fn::Function, itr) -> (pool, collection) + logbmap(fn::Function, itrs...) -> (pool, collection) Mimics `Base.map`, but launches the function evaluations onto all available threads except the primary, using a pre-assigned scheduling strategy @@ -200,15 +200,15 @@ julia> plot(pool) Note that while the execution order is not guaranteed, the result order is, Also note that the primary thread is not used. """ -function logbmap(fn, itr) +function logbmap(fn::Function, itrs...) pool = LoggedStaticPool(2) - result::Array{_detect_type(fn, itr), ndims(itr)} = tmap(pool, fn, itr) + result = tmap(pool, x->fn(x...), zip(itrs...)) close(pool) return pool, result end """ - logqmap(fn::Function, itr) -> (pool, collection) + logqmap(fn::Function, itrs...) -> (pool, collection) Mimics `Base.map`, but launches the function evaluations onto all available threads, using a queued scheduling strategy appropriate for nonuniform @@ -239,15 +239,15 @@ julia> plot(pool) Note that while the execution order is not guaranteed, the result order is. Also note that the primary thread is used. """ -function logqmap(fn, itr) +function logqmap(fn::Function, itrs...) pool = LoggedQueuePool() - result::Array{_detect_type(fn, itr), ndims(itr)} = tmap(pool, fn, itr) + result = tmap(pool, x->fn(x...), zip(itrs...)) close(pool) return pool, result end """ - logqbmap(fn::Function, itr) -> (pool, collection) + logqbmap(fn::Function, itrs...) -> (pool, collection) Mimics `Base.map`, but launches the function evaluations onto all available threads except the primary, using a queued scheduling strategy appropriate @@ -278,16 +278,16 @@ julia> plot(pool) Note that while the execution order is not guaranteed, the result order is, Also note that the primary thread is not used. """ -function logqbmap(fn, itr) +function logqbmap(fn::Function, itrs...) pool = LoggedQueuePool(2) - result::Array{_detect_type(fn, itr), ndims(itr)} = tmap(pool, fn, itr) + result = tmap(pool, x->fn(x...), zip(itrs...)) close(pool) return pool, result end """ - tforeach(fn::Function, itr) + tforeach(fn::Function, itrs...) Mimics `Base.foreach`, but launches the function evaluations onto all available threads, using a pre-assigned scheduling strategy appropriate for uniform @@ -308,16 +308,16 @@ julia> tforeach(x -> println((x,Threads.threadid())), 1:8) Note that the execution order is not guaranteed, and that the primary thread is used. """ -function tforeach(fn::Function, itr) +function tforeach(fn::Function, itrs...) pool = StaticPool() - tforeach(pool, fn, itr) + tforeach(pool, x->fn(x...), zip(itrs...)) close(pool) nothing end """ - bforeach(fn::Function, itr) + bforeach(fn::Function, itrs...) Mimics `Base.foreach`, but launches the function evaluations onto all available threads except the primary, using a pre-assigned scheduling strategy appropriate @@ -338,15 +338,15 @@ julia> bforeach(x -> println((x,Threads.threadid())), 1:8) Note that the execution order is not guaranteed, and that the primary thread is not used. """ -function bforeach(fn, itr) +function bforeach(fn::Function, itrs...) pool = StaticPool(2) - tforeach(pool, fn, itr) + tforeach(pool, x->fn(x...), zip(itrs...)) close(pool) nothing end """ - qforeach(fn::Function, itr) + qforeach(fn::Function, itrs...) Mimics `Base.foreach`, but launches the function evaluations onto all available threads, using a queued scheduling strategy appropriate for nonuniform @@ -367,16 +367,16 @@ julia> qforeach(x -> println((x,Threads.threadid())), 1:8) Note that the execution order is not guaranteed, and that the primary thread is used. """ -function qforeach(fn, itr) +function qforeach(fn::Function, itrs...) pool = QueuePool() - tforeach(pool, fn, itr) + tforeach(pool, x->fn(x...), zip(itrs...)) close(pool) nothing end """ - qbforeach(fn::Function, itr) + qbforeach(fn::Function, itrs...) Mimics `Base.foreach`, but launches the function evaluations onto all available threads except the primary, using a queued scheduling strategy appropriate for @@ -397,9 +397,9 @@ julia> qbforeach(x -> println((x,Threads.threadid())), 1:8) Note that the execution order is not guaranteed, and that the primary thread is not used. """ -function qbforeach(fn, itr) +function qbforeach(fn::Function, itrs...) pool = QueuePool(2) - tforeach(pool, fn, itr) + tforeach(pool, x->fn(x...), zip(itrs...)) close(pool) nothing end @@ -407,7 +407,7 @@ end """ - logtforeach(fn::Function, itr) -> pool + logtforeach(fn::Function, itrs...) -> pool Mimics `Base.foreach`, but launches the function evaluations onto all available threads, using a pre-assigned scheduling strategy appropriate for uniform @@ -431,16 +431,16 @@ julia> plot(pool) Note that the execution order is not guaranteed, and that the primary thread is used. """ -function logtforeach(fn::Function, itr) +function logtforeach(fn::Function, itrs...) pool = LoggedStaticPool() - tforeach(pool, fn, itr) + tforeach(pool, x->fn(x...), zip(itrs...)) close(pool) return pool end """ - logbforeach(fn::Function, itr) + logbforeach(fn::Function, itrs...) Mimics `Base.foreach`, but launches the function evaluations onto all available threads except the primary, using a pre-assigned scheduling strategy appropriate @@ -464,16 +464,16 @@ julia> plot(pool) Note that the execution order is not guaranteed, and that the primary thread is not used. """ -function logbforeach(fn, itr) +function logbforeach(fn::Function, itrs...) pool = LoggedStaticPool(2) - tforeach(pool, fn, itr) + tforeach(pool, x->fn(x...), zip(itrs...)) close(pool) return pool end """ - logqforeach(fn::Function, itr) + logqforeach(fn::Function, itrs...) Mimics `Base.foreach`, but launches the function evaluations onto all available threads, using a queued scheduling strategy appropriate for nonuniform @@ -497,16 +497,16 @@ Note that the execution order is not guaranteed, and that the primary thread is used. Returns a logged pool that can be analyzed with the logging functions and `plot`ted. """ -function logqforeach(fn, itr) +function logqforeach(fn::Function, itrs...) pool = LoggedQueuePool() - tforeach(pool, fn, itr) + tforeach(pool, x->fn(x...), zip(itrs...)) close(pool) return pool end """ - logqbforeach(fn::Function, itr) + logqbforeach(fn::Function, itrs...) Mimics `Base.foreach`, but launches the function evaluations onto all available threads except the primary, using a queued scheduling strategy appropriate for @@ -530,9 +530,9 @@ julia> plot(pool) Note that the execution order is not guaranteed, and that the primary thread is not used. """ -function logqbforeach(fn, itr) +function logqbforeach(fn::Function, itrs...) pool = LoggedQueuePool(2) - tforeach(pool, fn, itr) + tforeach(pool, x->fn(x...), zip(itrs...)) close(pool) return pool end diff --git a/src/staticpool.jl b/src/staticpool.jl index 58c6daf..7382333 100644 --- a/src/staticpool.jl +++ b/src/staticpool.jl @@ -32,7 +32,7 @@ end function tmap(pool::StaticPool, fn::Function, itr) data = collect(itr) - applicable(fn, data[1]) || error("function can't be applied to iterator contents") + applicable(fn, first(data)) || error("function can't be applied to iterator contents") N = length(data) result = Array{_detect_type(fn, data), ndims(data)}(undef, size(data)) nthrds = length(pool.tids) diff --git a/test/runtests.jl b/test/runtests.jl index e274bc8..03ea821 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -11,3 +11,4 @@ include("stacktests.jl") include("testspawnat.jl") include("testplots.jl") include("testmultidim.jl") +include("testmultiarg.jl") diff --git a/test/testmultiarg.jl b/test/testmultiarg.jl new file mode 100644 index 0000000..657e652 --- /dev/null +++ b/test/testmultiarg.jl @@ -0,0 +1,228 @@ +module TestMulitArg + +using Test +using ThreadPools + +include("util.jl") + + +@testset "multiarg" begin + + @testset "tmap" begin + N = 2 * Threads.nthreads() + adders = [1 for _ in 1:N] + primary = Threads.nthreads() == 1 + fn! = (x, y) -> begin + Threads.threadid() == 1 && (primary = true) + x.data + y + end + @test tmap(fn!, (TestObj(x) for x in 1:N), adders) == collect(1+1:N+1) + @test primary + end + + @testset "bmap" begin + N = 2 * Threads.nthreads() + adders = [1 for _ in 1:N] + fn! = (x, y) -> begin + Threads.nthreads() == 1 || Threads.threadid() == 1 && error("Task on primary") + x.data + y + end + @test bmap(fn!, (TestObj(x) for x in 1:N), adders) == collect(1+1:N+1) + @inferred bmap(fn!, (TestObj(x) for x in 1:N), adders) + end + + @testset "qmap" begin + N = 2 * Threads.nthreads() + adders = [1 for _ in 1:N] + primary = Threads.nthreads() == 1 + fn! = (x, y) -> begin + Threads.threadid() == 1 && (primary = true) + x.data + y + end + @test qmap(fn!, (TestObj(x) for x in 1:N), adders) == collect(1+1:N+1) + @test primary + @inferred qmap(fn!, (TestObj(x) for x in 1:N), adders) + end + + @testset "qbmap" begin + N = 2 * Threads.nthreads() + adders = [1 for _ in 1:N] + fn! = (x, y) -> begin + Threads.nthreads() == 1 || Threads.threadid() == 1 && error("Task on primary") + x.data + y + end + @test qbmap(fn!, (TestObj(x) for x in 1:N), adders) == collect(1+1:N+1) + @inferred qbmap(fn!, (TestObj(x) for x in 1:N), adders) + end + + @testset "logtmap" begin + N = 2 * Threads.nthreads() + adders = [1 for _ in 1:N] + primary = Threads.nthreads() == 1 + fn! = (x, y) -> begin + Threads.threadid() == 1 && (primary = true) + x.data + y + end + p, r = logtmap(fn!, (TestObj(x) for x in 1:N), adders) + @test r == collect(1+1:N+1) + @test length(p.recs) == N*2 + @test primary + @inferred logtmap(fn!, (TestObj(x) for x in 1:N), adders) + end + + @testset "logbmap" begin + N = 2 * Threads.nthreads() + adders = [1 for _ in 1:N] + fn! = (x, y) -> begin + Threads.nthreads() == 1 || Threads.threadid() == 1 && error("Task on primary") + x.data + y + end + p, r = logbmap(fn!, (TestObj(x) for x in 1:N), adders) + @test r == collect(1+1:N+1) + @test length(p.recs) == N*2 + @inferred logbmap(fn!, (TestObj(x) for x in 1:N), adders) + end + + @testset "logqmap" begin + N = 2 * Threads.nthreads() + adders = [1 for _ in 1:N] + primary = Threads.nthreads() == 1 + fn! = (x, y) -> begin + Threads.threadid() == 1 && (primary = true) + x.data + y + end + p, r = logqmap(fn!, (TestObj(x) for x in 1:N), adders) + @test r == collect(1+1:N+1) + @test length(p.recs) == N*2 + @test primary + @inferred logqmap(fn!, (TestObj(x) for x in 1:N), adders) + end + + @testset "logqbmap" begin + N = 2 * Threads.nthreads() + adders = [1 for _ in 1:N] + fn! = (x, y) -> begin + Threads.nthreads() == 1 || Threads.threadid() == 1 && error("Task on primary") + x.data + y + end + p, r = logqbmap(fn!, (TestObj(x) for x in 1:N), adders) + @test r == collect(1+1:N+1) + @test length(p.recs) == N*2 + @inferred logqbmap(fn!, (TestObj(x) for x in 1:N), adders) + end + + + @testset "tforeach" begin + N = 2 * Threads.nthreads() + objs = [TestObj(x) for x in 1:N] + primary = Threads.nthreads() == 1 + fn! = (x, y) -> begin + Threads.threadid() == 1 && (primary = true) + x.data += y + end + tforeach(fn!, objs, 1:N) + @test [x.data for x in objs] == collect(1:N)*2 + @test primary + @inferred tforeach(fn!, objs, 1:N) + end + + @testset "bforeach" begin + N = 2 * Threads.nthreads() + objs = [TestObj(x) for x in 1:N] + fn! = (x, y) -> begin + Threads.nthreads() == 1 || Threads.threadid() == 1 && error("Task on primary") + x.data += y + end + bforeach(fn!, objs, 1:N) + @test [x.data for x in objs] == collect(1:N)*2 + @inferred bforeach(fn!, objs, 1:N) + end + + @testset "qforeach" begin + N = 2 * Threads.nthreads() + objs = [TestObj(x) for x in 1:N] + primary = Threads.nthreads() == 1 + fn! = (x, y) -> begin + Threads.threadid() == 1 && (primary = true) + x.data += y + end + qforeach(fn!, objs, 1:N) + @test [x.data for x in objs] == collect(1:N)*2 + @test primary + @inferred qforeach(fn!, objs, 1:N) + end + + @testset "qbforeach" begin + N = 2 * Threads.nthreads() + objs = [TestObj(x) for x in 1:N] + fn! = (x, y) -> begin + Threads.nthreads() == 1 || Threads.threadid() == 1 && error("Task on primary") + x.data += y + end + qbforeach(fn!, objs, 1:N) + @test [x.data for x in objs] == collect(1:N)*2 + @inferred qbforeach(fn!, objs, 1:N) + end + + @testset "logtforeach" begin + N = 2 * Threads.nthreads() + objs = [TestObj(x) for x in 1:N] + primary = Threads.nthreads() == 1 + fn! = (x, y) -> begin + Threads.threadid() == 1 && (primary = true) + x.data += y + end + p = logtforeach(fn!, objs, 1:N) + @test [x.data for x in objs] == collect(1:N)*2 + @test length(p.recs) == N*2 + @test primary + @inferred logtforeach(fn!, objs, 1:N) + end + + @testset "logbforeach" begin + N = 2 * Threads.nthreads() + objs = [TestObj(x) for x in 1:N] + fn! = (x, y) -> begin + Threads.nthreads() == 1 || Threads.threadid() == 1 && error("Task on primary") + x.data += y + end + p = logbforeach(fn!, objs, 1:N) + @test [x.data for x in objs] == collect(1:N)*2 + @test length(p.recs) == N*2 + @inferred logbforeach(fn!, objs, 1:N) + end + + @testset "logqforeach" begin + N = 2 * Threads.nthreads() + objs = [TestObj(x) for x in 1:N] + primary = Threads.nthreads() == 1 + fn! = (x, y) -> begin + Threads.threadid() == 1 && (primary = true) + x.data += y + end + p = logqforeach(fn!, objs, 1:N) + @test [x.data for x in objs] == collect(1:N)*2 + @test length(p.recs) == N*2 + @test primary + @inferred logqforeach(fn!, objs, 1:N) + end + + @testset "logqbforeach" begin + N = 2 * Threads.nthreads() + objs = [TestObj(x) for x in 1:N] + fn! = (x, y) -> begin + Threads.nthreads() == 1 || Threads.threadid() == 1 && error("Task on primary") + x.data += y + end + p = logqbforeach(fn!, objs, 1:N) + @test [x.data for x in objs] == collect(1:N)*2 + @test length(p.recs) == N*2 + @inferred logqbforeach(fn!, objs, 1:N) + end + +end + + + + +end # module \ No newline at end of file