Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental completebasecase protocol #509

Merged
merged 4 commits into from
Jan 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 70 additions & 8 deletions src/combinators.jl
Original file line number Diff line number Diff line change
Expand Up @@ -217,34 +217,60 @@ julia> foldxt(rf, Map(identity), 1:4; basesize = 1, init = OnInit(() -> []))
4
```
"""
struct AdHocRF{OnInit,Start,Next,Complete,Combine} <: _Function
struct AdHocRF{OnInit,Start,Next,CompleteBasecase,Complete,Combine} <: _Function
oninit::OnInit
start::Start
next::Next
completebasecase::CompleteBasecase
complete::Complete
combine::Combine

AdHocRF{OnInit,Start,Next,Complete,Combine}(
function AdHocRF{OnInit,Start,Next,CompleteBasecase,Complete,Combine}(
oninit,
start,
next,
completebasecase,
complete,
combine,
) where {OnInit,Start,Next,Complete,Combine} =
new{OnInit,Start,Next,Complete,Combine}(oninit, start, next, complete, combine)
) where {OnInit,Start,Next,CompleteBasecase,Complete,Combine}
return new{OnInit,Start,Next,CompleteBasecase,Complete,Combine}(
oninit,
start,
next,
completebasecase,
complete,
combine,
)
end
end

AdHocRF(oninit, start, op, complete, combine) =
AdHocRF{_typeof(oninit),_typeof(start),_typeof(op),_typeof(complete),_typeof(combine)}(
# Capture T::Type as Type{T}
function AdHocRF(oninit, start, op, completebasecase, complete, combine)
return AdHocRF{
_typeof(oninit),
_typeof(start),
_typeof(op),
_typeof(completebasecase),
_typeof(complete),
_typeof(combine),
}(
oninit,
start,
op,
completebasecase,
complete,
combine,
)
end

AdHocRF(op; oninit = nothing, start = identity, complete = identity, combine = nothing) =
AdHocRF(oninit, start, op, complete, combine)
AdHocRF(
op;
oninit = nothing,
start = identity,
completebasecase = identity,
complete = identity,
combine = nothing,
) = AdHocRF(oninit, start, op, completebasecase, complete, combine)

AdHocRF(op::AdHocRF; kwargs...) = setproperties(op, values(kwargs))

Expand All @@ -260,18 +286,22 @@ AdHocRF(op::AdHocRF; kwargs...) = setproperties(op, values(kwargs))
@inline start(rf::AdHocRF, init) = rf.start(initialize(init, rf.next))
@inline next(rf::AdHocRF, acc, x) = rf.next(acc, x)
@inline complete(rf::AdHocRF, acc) = rf.complete(acc)
@inline completebasecase(rf::AdHocRF, acc) = rf.completebasecase(acc)
@inline combine(rf::AdHocRF, a, b) = something(rf.combine, rf.next)(a, b)

_asmonoid(rf::AdHocRF) = @set rf.next = _asmonoid(rf.next)
Completing(rf::AdHocRF) = rf

wheninit(oninit, op) = AdHocRF(op; oninit = oninit)
whenstart(start, op) = AdHocRF(op; start = start)
whencompletebasecase(completebasecase, op) =
AdHocRF(op; completebasecase = completebasecase)
whencomplete(complete, op) = AdHocRF(op; complete = complete)
whencombine(combine, op) = AdHocRF(op; combine = combine)

wheninit(oninit) = op -> wheninit(oninit, op)
whenstart(start) = op -> whenstart(start, op)
whencompletebasecase(completebasecase) = op -> whencompletebasecase(completebasecase, op)
whencomplete(complete) = op -> whencomplete(complete, op)
whencombine(combine) = op -> whencombine(combine, op)

Expand Down Expand Up @@ -387,3 +417,35 @@ julia> foldxt(averaging2, Filter(isodd), 1:50; basesize = 1)
```
"""
(wheninit, whenstart, whencomplete, whencombine)

"""
whencompletebasecase(completebasecase, rf) -> rf′
whencompletebasecase(completebasecase) -> rf -> rf′

Add [`completebasecase`](@ref) protocol to arbitrary reducing function.

The function `completebasecase` is used as follows in the basecase
implementation of `reduce` as follows:

```julia
init′ = oninit()
acc = start(init′)
for x in collection
acc += rf(acc, x)
end
result = completebasecase(acc)
return result
```

The `result₁` from basecase 1 and `result₂` from basecase 2 are combined
using [`combine`](@ref) protcol:

```julia
combine(result₁, result₂)
```

!!! note

This function is an internal experimental interface for FoldsCUDA.
"""
whencompletebasecase
18 changes: 18 additions & 0 deletions src/core.jl
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ ensurerf(f) = BottomRF(f)
# `Completing` etc.
start(rf::BottomRF, result) = start(inner(rf), result)
@inline next(rf::BottomRF, result, input) = next(inner(rf), result, input)
@inline completebasecase(rf::BottomRF, result) = completebasecase(inner(rf), result)
complete(rf::BottomRF, result) = complete(inner(rf), result)
combine(rf::BottomRF, a, b) = combine(inner(rf), a, b)

Expand Down Expand Up @@ -495,6 +496,23 @@ real-world examples.

# done(rf, result)

"""
Transducers.completebasecase(rf, state)

Process basecase result `state` before merged by [`combine`](@ref).

For example, on GPU, this function can be used to translate mutable states to
immutable values for exchanging them through (un-GC-managed) memory. See
[`whencompletebasecase`](@ref).

!!! note

This function is an internal experimental interface for FoldsCUDA.
"""
completebasecase(_, result) = result
completebasecase(rf::RF, result) where {RF <: AbstractReduction} =
completebasecase(inner(rf), result)

"""
Transducers.complete(rf::R_{X}, state)

Expand Down
8 changes: 5 additions & 3 deletions src/processes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,14 @@ function simple_transduce(xform, f, init, coll)
end

"""
foldl_nocomplete(rf, init, coll)
foldl_basecase(rf, init, coll)

Call [`__foldl__`](@ref) without calling [`complete`](@ref).
"""
@inline foldl_nocomplete(rf::RF, init, coll) where {RF} =
__foldl__(skipcomplete(rf), init, coll)
@inline foldl_basecase(rf::RF, init, coll) where {RF} =
completebasecase(rf, __foldl__(skipcomplete(rf), init, coll))

const foldl_nocomplete = foldl_basecase

"""
foldxl(step, xf::Transducer, reducible; init, simd) :: T
Expand Down
32 changes: 31 additions & 1 deletion test/test_adhocrf.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@ module TestAdHocRF

using Test
using Transducers
using Transducers: wheninit, whencomplete, whencombine, complete, whenstart
using Transducers:
complete,
foldl_basecase,
start,
whencombine,
whencomplete,
whencompletebasecase,
wheninit,
whenstart
using MicroCollections: EmptyVector
using StaticArrays: MVector, SVector

@testset "setters" begin
rf = nothing |> wheninit(1) |> whenstart(2) |> whencomplete(3) |> whencombine(4)
Expand All @@ -27,6 +36,27 @@ end
@test foldxt(collector!!, Filter(isodd), 1:5; basesize = 1) == 1:2:5
end

counter(n::Integer) = counter(Val(Int(n)))
function counter(::Val{n}) where {n}
init() = zero(MVector{n,Int})
function inc!(b, i)
@inbounds b[max(1, min(i, n))] += 1
b
end
completebasecase(b) = SVector(b)
combine(h, b) = h .+ b
return inc! |>
wheninit(init) |>
whencompletebasecase(completebasecase) |>
whencombine(combine)
end

@testset "counter" begin
@test foldxl(counter(10), 1:10)::MVector == ones(10)
rf = counter(10)
@test foldl_basecase(rf, start(rf, Init)::MVector, 1:10)::SVector == ones(10)
end

getoninit(rf) = rf.oninit

@testset "inference" begin
Expand Down