Skip to content

Commit

Permalink
replace SML-level implementation of ABP deque with implementation in C
Browse files Browse the repository at this point in the history
This makes it possible to use the appropriate memory order annotations,
to ensure correct compilation on weakly ordered architectures such as ARM.

I used memory ordering annotations inspired by the parlaylib implementation:
  https://github.com/cmuparlay/parlaylib/blob/36459f42a84207330eae706c47e6fab712e6a149/include/parlay/internal/work_stealing_deque.h

I believe this patch may also fix another subtle bug related to the deque,
specifically the tag of the packed tag+idx not being advanced here:
  https://github.com/MPLLang/mpl/blob/70bfe0fc4eef5c5ead66eb2af8698b42185585aa/basis-library/schedulers/shh/queue/DequeABP.sml#L223-L230
I don't have any evidence of a failing test or otherwise, but it seems
like it can cause an ABA problem, if another concurrent steal were
in-flight at the same index but then managed to succeed later with the
same top tag.
  • Loading branch information
shwestrick committed Jan 4, 2025
1 parent 70bfe0f commit eef6f00
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 198 deletions.
11 changes: 5 additions & 6 deletions basis-library/schedulers/par-pcall/MkScheduler.sml
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,7 @@ struct
let
val {queue, ...} = vectorSub (workerLocalData, p)
in
if not (Queue.pollHasWork queue) then
NONE
else
Queue.tryPopTop queue
Queue.tryPopTop queue
end

fun communicate () = ()
Expand All @@ -434,13 +431,15 @@ struct
Queue.pushBot queue x
end

(*
fun clear () =
let
val myId = myWorkerId ()
val {queue, ...} = vectorSub (workerLocalData, myId)
in
Queue.clear queue
end
*)

fun pop () =
let
Expand Down Expand Up @@ -1143,7 +1142,7 @@ struct
in
case trySteal friend of
NONE => loop (tries+1)
| SOME (task, depth) => (task, depth)
| SOME task => task
end

val result = loop 0
Expand Down Expand Up @@ -1192,7 +1191,7 @@ struct

fun acquireWork () : unit =
let
val (task, depth) = stealLoop ()
val task = stealLoop ()
val _ = incrementNumSteals ()
in
case task of
Expand Down
7 changes: 6 additions & 1 deletion basis-library/schedulers/par-pcall/sources.mlb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ local
../shh/CumulativePerProcTimer.sml
../shh/FORK_JOIN.sig
../shh/SimpleRandom.sml
../shh/queue/DequeABP.sml
ann
"allowFFI true"
"allowPrim true"
in
../shh/queue/DequeABP.sml
end
(*Stack.sml*)
../shh/Result.sml
ann
Expand Down
175 changes: 21 additions & 154 deletions basis-library/schedulers/shh/queue/DequeABP.sml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ sig
val capacity : int

val new : unit -> 'a t
val pollHasWork : 'a t -> bool
val clear : 'a t -> unit

(* register this deque with the specified worker id *)
Expand All @@ -24,9 +23,7 @@ sig

(* returns NONE if deque is empty *)
val popBot : 'a t -> 'a option

(* returns an item and the depth of that item *)
val tryPopTop : 'a t -> ('a * int) option
val tryPopTop : 'a t -> 'a option

val size : 'a t -> int
val numResets : 'a t -> int
Expand Down Expand Up @@ -89,10 +86,18 @@ struct
end
end


type gcstate = MLton.Pointer.t
val gcstate = _prim "GC_state": unit -> gcstate;
val ABP_deque_push_bot = _import "ABP_deque_push_bot" runtime private: gcstate * TagIdx.t ref * Word32.word ref * 'a option array * 'a option -> bool;
val ABP_deque_try_pop_bot = _import "ABP_deque_try_pop_bot" runtime private: gcstate * TagIdx.t ref * Word32.word ref * 'a option array * 'a option -> 'a option;
val ABP_deque_try_pop_top = _import "ABP_deque_try_pop_top" runtime private: gcstate * TagIdx.t ref * Word32.word ref * 'a option array * 'a option -> 'a option;
val ABP_deque_set_depth = _import "ABP_deque_set_depth" runtime private: gcstate * TagIdx.t ref * Word32.word ref * 'a option array * Word32.word -> unit;


type 'a t = {data : 'a option array,
top : TagIdx.t ref,
bot : Word32.word ref,
depth : int ref}
bot : Word32.word ref}

exception Full

Expand All @@ -105,167 +110,29 @@ struct
fun new () =
{data = Array.array (capacity, NONE),
top = ref (TagIdx.pack {tag=0w0, idx=0}),
bot = ref (0w0 : Word32.word),
depth = ref 0}
bot = ref (0w0 : Word32.word)}

fun register ({top, bot, data, ...} : 'a t) p =
( MLton.HM.registerQueue (Word32.fromInt p, data)
; MLton.HM.registerQueueTop (Word32.fromInt p, top)
; MLton.HM.registerQueueBot (Word32.fromInt p, bot)
)

fun setDepth (q as {depth, top, bot, data} : 'a t) d =
let
fun forceSetTop oldTop =
let
val newTop =
TagIdx.pack {idx = d, tag = 0w1 + #tag (TagIdx.unpack oldTop)}
val oldTop' = cas top (oldTop, newTop)
in
if oldTop = oldTop' then
()
else
(* The GC must have interfered, so just do it again. GC shouldn't
* be able to interfere a second time... *)
forceSetTop oldTop'
end

val oldTop = !top
val oldBot = Word32.toInt (!bot)
val {idx, ...} = TagIdx.unpack oldTop
in
if idx < oldBot then
die (fn _ => "scheduler bug: setDepth must be on empty deque " ^
"(top=" ^ Int.toString idx ^ " bot=" ^ Int.toString oldBot ^ ")")
else
( depth := d
; if d < idx then
(bot := Word32.fromInt d; forceSetTop oldTop)
else
(forceSetTop oldTop; bot := Word32.fromInt d)
)
end
fun setDepth (q as {top, bot, data}) d =
ABP_deque_set_depth (gcstate (), top, bot, data, Word32.fromInt d)

fun clear ({data, ...} : 'a t) =
for (0, Array.length data) (fn i => arrayUpdate (data, i, NONE))

fun pollHasWork ({top, bot, ...} : 'a t) =
let
val b = Word32.toInt (!bot)
val {idx, ...} = TagIdx.unpack (!top)
in
idx < b
end
fun pushBot (q as {data, top, bot}) x =
if ABP_deque_push_bot (gcstate (), top, bot, data, SOME x) then ()
else exceededCapacityError ()

fun pushBot (q as {data, top, bot, depth}) x =
let
val oldBot = Word32.toInt (!bot)
in
if oldBot >= capacity then exceededCapacityError () else
(* Normally, an ABP deque would do this:
* 1. update array
* 2. increment bot
* 3. issue a memory fence
* However we don't have memory fence primitives in Parallel ML yet.
* So, let's hack it and do a compare-and-swap. Note that the CAS is
* guaranteed to succeed, because multiple pushBot operations are never
* executed concurrently. *)
( arrayUpdate (data, oldBot, SOME x)
; cas32 bot (oldBot, oldBot+1)
; ()
)
end

fun tryPopTop (q as {data, top, bot, depth}) =
let
val oldTop = !top
val {tag, idx} = TagIdx.unpack oldTop
val oldBot = Word32.toInt (!bot)
in
if oldBot <= idx then
NONE
else
let
val x = MLton.HM.arraySubNoBarrier (data, idx)
val newTop = TagIdx.pack {tag=tag, idx=idx+1}
in
if oldTop = cas top (oldTop, newTop) then
SOME (Option.valOf x, idx)
handle Option =>
( print ("[" ^ Int.toString (myWorkerId()) ^ "] Queue.tryPopTop error at idx " ^ Int.toString idx ^ "\n")
; raise Option
)
else
NONE
end
end
fun tryPopTop (q as {data, top, bot}) =
ABP_deque_try_pop_top (gcstate (), top, bot, data, NONE)

fun popBot (q as {data, top, bot, depth}) =
let
val oldBot = Word32.toInt (!bot)
val d = !depth
in
if oldBot <= d then
NONE
else
let
val newBot = oldBot-1
(* once again, we need a fence here, but the best we can do is a
* compare-and-swap. *)
(* val _ = bot := newBot *)
val _ = cas32 bot (oldBot, newBot)
val x = MLton.HM.arraySubNoBarrier (data, newBot)
val oldTop = !top
val {tag, idx} = TagIdx.unpack oldTop
in
if newBot > idx then
(arrayUpdate (data, newBot, NONE); x)
else if newBot < idx then
(* We are racing with a concurrent steal to take this single
* element x, but we already lost the race. So we only need to set
* the bottom to match the idx, i.e. set the deque to a proper
* empty state. *)
(* TODO: can we relax the cas to a normal write?
* Note that this cas is guaranteed to succeed... *)
(cas32 bot (newBot, idx); NONE)
else
(* We are racing with a concurrent steal to take this single
* element x, but we haven't lost the race yet. *)
let
val newTop = TagIdx.pack {tag=tag+0w1, idx=idx}
val oldTop' = cas top (oldTop, newTop)
in
if oldTop' = oldTop then
(* success; we get to keep x *)
(arrayUpdate (data, newBot, NONE); x)
else
(* two possibilities: either the steal succeeded (in which case
* the idx will have moved) or the GC will have interfered (in
* which case the tag will have advanced). *)
let
val {idx=idx', ...} = TagIdx.unpack oldTop'
in
if idx' <> idx then
(* The steal succeeded, so we don't get to keep this
* element. It's possible that the GC interfered also,
* but that doesn't change the fact that we don't get to
* keep this element! *)
(* TODO: can we relax the cas to a normal write?
* Note that this cas is guaranteed to succeed... *)
(cas32 bot (newBot, idx'); NONE)
else
(* the GC must have interfered, so try again. It _should_ be
* the case that the GC can't interfere on the second try,
* because we haven't done any significant allocation
* in-between tries. *)
(* SAM_NOTE: with new local scope handling in the GC, this
* case should be impossible. *)
die (fn _ => "scheduler bug: unexpected GC interference")
(* popBot q *)
end
end
end
end
fun popBot (q as {data, top, bot}) =
ABP_deque_try_pop_bot (gcstate (), top, bot, data, NONE)

fun size ({top, bot, ...} : 'a t) =
let
Expand Down
1 change: 1 addition & 0 deletions runtime/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ extern C_Pthread_Key_t gcstate_key;

#include "gc/gdtoa-multiple-threads-defs.c"

#include "gc/abp-deque.c"
#include "gc/assign.c"
#include "gc/atomic.c"
#include "gc/block-allocator.c"
Expand Down
1 change: 1 addition & 0 deletions runtime/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ typedef GC_state GCState_t;
#include "gc/entanglement-suspects.h"
#include "gc/local-scope.h"
#include "gc/local-heap.h"
#include "gc/abp-deque.h"
#include "gc/assign.h"
#include "gc/concurrent-list.h"
#include "gc/remembered-set.h"
Expand Down
Loading

0 comments on commit eef6f00

Please sign in to comment.