Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace SML-level implementation of ABP deque with implementation in C #214

Merged
merged 1 commit into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions basis-library/schedulers/par-pcall/MkScheduler.sml
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,7 @@ struct
let
val {queue, ...} = vectorSub (workerLocalData, p)
in
if not (Queue.pollHasWork queue) then
NONE
else
Queue.tryPopTop queue
Queue.tryPopTop queue
end

fun communicate () = ()
Expand All @@ -434,13 +431,15 @@ struct
Queue.pushBot queue x
end

(*
fun clear () =
let
val myId = myWorkerId ()
val {queue, ...} = vectorSub (workerLocalData, myId)
in
Queue.clear queue
end
*)

fun pop () =
let
Expand Down Expand Up @@ -1143,7 +1142,7 @@ struct
in
case trySteal friend of
NONE => loop (tries+1)
| SOME (task, depth) => (task, depth)
| SOME task => task
end

val result = loop 0
Expand Down Expand Up @@ -1192,7 +1191,7 @@ struct

fun acquireWork () : unit =
let
val (task, depth) = stealLoop ()
val task = stealLoop ()
val _ = incrementNumSteals ()
in
case task of
Expand Down
7 changes: 6 additions & 1 deletion basis-library/schedulers/par-pcall/sources.mlb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ local
../shh/CumulativePerProcTimer.sml
../shh/FORK_JOIN.sig
../shh/SimpleRandom.sml
../shh/queue/DequeABP.sml
ann
"allowFFI true"
"allowPrim true"
in
../shh/queue/DequeABP.sml
end
(*Stack.sml*)
../shh/Result.sml
ann
Expand Down
175 changes: 21 additions & 154 deletions basis-library/schedulers/shh/queue/DequeABP.sml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ sig
val capacity : int

val new : unit -> 'a t
val pollHasWork : 'a t -> bool
val clear : 'a t -> unit

(* register this deque with the specified worker id *)
Expand All @@ -24,9 +23,7 @@ sig

(* returns NONE if deque is empty *)
val popBot : 'a t -> 'a option

(* returns an item and the depth of that item *)
val tryPopTop : 'a t -> ('a * int) option
val tryPopTop : 'a t -> 'a option

val size : 'a t -> int
val numResets : 'a t -> int
Expand Down Expand Up @@ -89,10 +86,18 @@ struct
end
end


type gcstate = MLton.Pointer.t
val gcstate = _prim "GC_state": unit -> gcstate;
val ABP_deque_push_bot = _import "ABP_deque_push_bot" runtime private: gcstate * TagIdx.t ref * Word32.word ref * 'a option array * 'a option -> bool;
val ABP_deque_try_pop_bot = _import "ABP_deque_try_pop_bot" runtime private: gcstate * TagIdx.t ref * Word32.word ref * 'a option array * 'a option -> 'a option;
val ABP_deque_try_pop_top = _import "ABP_deque_try_pop_top" runtime private: gcstate * TagIdx.t ref * Word32.word ref * 'a option array * 'a option -> 'a option;
val ABP_deque_set_depth = _import "ABP_deque_set_depth" runtime private: gcstate * TagIdx.t ref * Word32.word ref * 'a option array * Word32.word -> unit;


type 'a t = {data : 'a option array,
top : TagIdx.t ref,
bot : Word32.word ref,
depth : int ref}
bot : Word32.word ref}

exception Full

Expand All @@ -105,167 +110,29 @@ struct
fun new () =
{data = Array.array (capacity, NONE),
top = ref (TagIdx.pack {tag=0w0, idx=0}),
bot = ref (0w0 : Word32.word),
depth = ref 0}
bot = ref (0w0 : Word32.word)}

fun register ({top, bot, data, ...} : 'a t) p =
( MLton.HM.registerQueue (Word32.fromInt p, data)
; MLton.HM.registerQueueTop (Word32.fromInt p, top)
; MLton.HM.registerQueueBot (Word32.fromInt p, bot)
)

fun setDepth (q as {depth, top, bot, data} : 'a t) d =
let
fun forceSetTop oldTop =
let
val newTop =
TagIdx.pack {idx = d, tag = 0w1 + #tag (TagIdx.unpack oldTop)}
val oldTop' = cas top (oldTop, newTop)
in
if oldTop = oldTop' then
()
else
(* The GC must have interfered, so just do it again. GC shouldn't
* be able to interfere a second time... *)
forceSetTop oldTop'
end

val oldTop = !top
val oldBot = Word32.toInt (!bot)
val {idx, ...} = TagIdx.unpack oldTop
in
if idx < oldBot then
die (fn _ => "scheduler bug: setDepth must be on empty deque " ^
"(top=" ^ Int.toString idx ^ " bot=" ^ Int.toString oldBot ^ ")")
else
( depth := d
; if d < idx then
(bot := Word32.fromInt d; forceSetTop oldTop)
else
(forceSetTop oldTop; bot := Word32.fromInt d)
)
end
fun setDepth (q as {top, bot, data}) d =
ABP_deque_set_depth (gcstate (), top, bot, data, Word32.fromInt d)

fun clear ({data, ...} : 'a t) =
for (0, Array.length data) (fn i => arrayUpdate (data, i, NONE))

fun pollHasWork ({top, bot, ...} : 'a t) =
let
val b = Word32.toInt (!bot)
val {idx, ...} = TagIdx.unpack (!top)
in
idx < b
end
fun pushBot (q as {data, top, bot}) x =
if ABP_deque_push_bot (gcstate (), top, bot, data, SOME x) then ()
else exceededCapacityError ()

fun pushBot (q as {data, top, bot, depth}) x =
let
val oldBot = Word32.toInt (!bot)
in
if oldBot >= capacity then exceededCapacityError () else
(* Normally, an ABP deque would do this:
* 1. update array
* 2. increment bot
* 3. issue a memory fence
* However we don't have memory fence primitives in Parallel ML yet.
* So, let's hack it and do a compare-and-swap. Note that the CAS is
* guaranteed to succeed, because multiple pushBot operations are never
* executed concurrently. *)
( arrayUpdate (data, oldBot, SOME x)
; cas32 bot (oldBot, oldBot+1)
; ()
)
end

fun tryPopTop (q as {data, top, bot, depth}) =
let
val oldTop = !top
val {tag, idx} = TagIdx.unpack oldTop
val oldBot = Word32.toInt (!bot)
in
if oldBot <= idx then
NONE
else
let
val x = MLton.HM.arraySubNoBarrier (data, idx)
val newTop = TagIdx.pack {tag=tag, idx=idx+1}
in
if oldTop = cas top (oldTop, newTop) then
SOME (Option.valOf x, idx)
handle Option =>
( print ("[" ^ Int.toString (myWorkerId()) ^ "] Queue.tryPopTop error at idx " ^ Int.toString idx ^ "\n")
; raise Option
)
else
NONE
end
end
fun tryPopTop (q as {data, top, bot}) =
ABP_deque_try_pop_top (gcstate (), top, bot, data, NONE)

fun popBot (q as {data, top, bot, depth}) =
let
val oldBot = Word32.toInt (!bot)
val d = !depth
in
if oldBot <= d then
NONE
else
let
val newBot = oldBot-1
(* once again, we need a fence here, but the best we can do is a
* compare-and-swap. *)
(* val _ = bot := newBot *)
val _ = cas32 bot (oldBot, newBot)
val x = MLton.HM.arraySubNoBarrier (data, newBot)
val oldTop = !top
val {tag, idx} = TagIdx.unpack oldTop
in
if newBot > idx then
(arrayUpdate (data, newBot, NONE); x)
else if newBot < idx then
(* We are racing with a concurrent steal to take this single
* element x, but we already lost the race. So we only need to set
* the bottom to match the idx, i.e. set the deque to a proper
* empty state. *)
(* TODO: can we relax the cas to a normal write?
* Note that this cas is guaranteed to succeed... *)
(cas32 bot (newBot, idx); NONE)
else
(* We are racing with a concurrent steal to take this single
* element x, but we haven't lost the race yet. *)
let
val newTop = TagIdx.pack {tag=tag+0w1, idx=idx}
val oldTop' = cas top (oldTop, newTop)
in
if oldTop' = oldTop then
(* success; we get to keep x *)
(arrayUpdate (data, newBot, NONE); x)
else
(* two possibilities: either the steal succeeded (in which case
* the idx will have moved) or the GC will have interfered (in
* which case the tag will have advanced). *)
let
val {idx=idx', ...} = TagIdx.unpack oldTop'
in
if idx' <> idx then
(* The steal succeeded, so we don't get to keep this
* element. It's possible that the GC interfered also,
* but that doesn't change the fact that we don't get to
* keep this element! *)
(* TODO: can we relax the cas to a normal write?
* Note that this cas is guaranteed to succeed... *)
(cas32 bot (newBot, idx'); NONE)
else
(* the GC must have interfered, so try again. It _should_ be
* the case that the GC can't interfere on the second try,
* because we haven't done any significant allocation
* in-between tries. *)
(* SAM_NOTE: with new local scope handling in the GC, this
* case should be impossible. *)
die (fn _ => "scheduler bug: unexpected GC interference")
(* popBot q *)
end
end
end
end
fun popBot (q as {data, top, bot}) =
ABP_deque_try_pop_bot (gcstate (), top, bot, data, NONE)

fun size ({top, bot, ...} : 'a t) =
let
Expand Down
1 change: 1 addition & 0 deletions runtime/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ extern C_Pthread_Key_t gcstate_key;

#include "gc/gdtoa-multiple-threads-defs.c"

#include "gc/abp-deque.c"
#include "gc/assign.c"
#include "gc/atomic.c"
#include "gc/block-allocator.c"
Expand Down
1 change: 1 addition & 0 deletions runtime/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ typedef GC_state GCState_t;
#include "gc/entanglement-suspects.h"
#include "gc/local-scope.h"
#include "gc/local-heap.h"
#include "gc/abp-deque.h"
#include "gc/assign.h"
#include "gc/concurrent-list.h"
#include "gc/remembered-set.h"
Expand Down
Loading