-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathDequeABP.sml
285 lines (251 loc) · 9.16 KB
/
DequeABP.sml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
(* non-resizing concurrent deque for work-stealing.
* hard-coded capacity, see below. *)
structure DequeABP :
sig
type 'a t
exception Full
val capacity : int
val new : unit -> 'a t
val pollHasWork : 'a t -> bool
val clear : 'a t -> unit
(* register this deque with the specified worker id *)
val register : 'a t -> int -> unit
(* set the minimum depth of this deque, i.e. the fork depth of the
* MLton thread that is currently using this deque. This is used to
* interface with the runtime, to coordinate local garbage collections. *)
val setDepth : 'a t -> int -> unit
(* raises Full if at capacity *)
val pushBot : 'a t -> 'a -> unit
(* returns NONE if deque is empty *)
val popBot : 'a t -> 'a option
(* returns an item and the depth of that item *)
val tryPopTop : 'a t -> ('a * int) option
val size : 'a t -> int
val numResets : 'a t -> int
end =
struct
(* capacity is configurable, but should be small. We need to be able to
* tag indices and pack them into 64-bit words.
* We also subtract 1 so that we can use index ranges of the form
* [lo, hi) where 0 <= lo,hi < capacity
*)
val capacityPow = 6 (* DO NOT CHANGE THIS WITHOUT ALSO CHANGING runtime/gc/... *)
val capacity = Word.toInt (Word.<< (0w1, Word.fromInt capacityPow)) - 1
fun myWorkerId () =
MLton.Parallel.processorNumber ()
fun die strfn =
( print (strfn () ^ "\n")
; OS.Process.exit OS.Process.failure
)
val capacityStr = Int.toString capacity
fun exceededCapacityError () =
die (fn _ => "Scheduler error: exceeded max fork depth (" ^ capacityStr ^ ")")
(* we tag indices and pack into a single 64-bit word, to
* compare-and-swap as a unit. *)
structure TagIdx :
sig
type t = Word64.word
val maxTag : Word64.word
val maxIdx : int
val pack : {tag : Word64.word, idx : int} -> t
val unpack : t -> {tag : Word64.word, idx : int}
end =
struct
type t = Word64.word
(* NOTE: this actually computes 1 + floor(log_2(n)), i.e. the number of
* bits required to represent n in binary *)
fun log2 n = if (n < 1) then 0 else 1 + log2(n div 2)
val maxIdx = capacity
val idxBits = Word.fromInt (log2 maxIdx)
val idxMask = Word64.fromInt maxIdx
val tagBits = 0w64 - idxBits
val maxTag = Word64.- (Word64.<< (0w1, tagBits), 0w1)
fun pack {tag, idx} =
Word64.orb (Word64.<< (tag, idxBits), Word64.fromInt idx)
fun unpack ti =
let
val idx = Word64.toInt (Word64.andb (ti, idxMask))
val tag = Word64.>> (ti, idxBits)
in
{tag=tag, idx=idx}
end
end
type 'a t = {data : 'a option array,
top : TagIdx.t ref,
bot : Word32.word ref,
depth : int ref}
exception Full
fun for (i, j) f = if i = j then () else (f i; for (i+1, j) f)
fun arrayUpdate (a, i, x) = MLton.HM.arrayUpdateNoBarrier (a, i, x)
fun cas r (x, y) = MLton.Parallel.compareAndSwap r (x, y)
fun cas32 b (x, y) = cas b (Word32.fromInt x, Word32.fromInt y)
fun new () =
{data = Array.array (capacity, NONE),
top = ref (TagIdx.pack {tag=0w0, idx=0}),
bot = ref (0w0 : Word32.word),
depth = ref 0}
fun register ({top, bot, data, ...} : 'a t) p =
( MLton.HM.registerQueue (Word32.fromInt p, data)
; MLton.HM.registerQueueTop (Word32.fromInt p, top)
; MLton.HM.registerQueueBot (Word32.fromInt p, bot)
)
fun setDepth (q as {depth, top, bot, data} : 'a t) d =
let
fun forceSetTop oldTop =
let
val newTop =
TagIdx.pack {idx = d, tag = 0w1 + #tag (TagIdx.unpack oldTop)}
val oldTop' = cas top (oldTop, newTop)
in
if oldTop = oldTop' then
()
else
(* The GC must have interfered, so just do it again. GC shouldn't
* be able to interfere a second time... *)
forceSetTop oldTop'
end
val oldTop = !top
val oldBot = Word32.toInt (!bot)
val {idx, ...} = TagIdx.unpack oldTop
in
if idx < oldBot then
die (fn _ => "scheduler bug: setDepth must be on empty deque " ^
"(top=" ^ Int.toString idx ^ " bot=" ^ Int.toString oldBot ^ ")")
else
( depth := d
; if d < idx then
(bot := Word32.fromInt d; forceSetTop oldTop)
else
(forceSetTop oldTop; bot := Word32.fromInt d)
)
end
fun clear ({data, ...} : 'a t) =
for (0, Array.length data) (fn i => arrayUpdate (data, i, NONE))
fun pollHasWork ({top, bot, ...} : 'a t) =
let
val b = Word32.toInt (!bot)
val {idx, ...} = TagIdx.unpack (!top)
in
idx < b
end
fun pushBot (q as {data, top, bot, depth}) x =
let
val oldBot = Word32.toInt (!bot)
in
if oldBot >= capacity then exceededCapacityError () else
(* Normally, an ABP deque would do this:
* 1. update array
* 2. increment bot
* 3. issue a memory fence
* However we don't have memory fence primitives in Parallel ML yet.
* So, let's hack it and do a compare-and-swap. Note that the CAS is
* guaranteed to succeed, because multiple pushBot operations are never
* executed concurrently. *)
( arrayUpdate (data, oldBot, SOME x)
; cas32 bot (oldBot, oldBot+1)
; ()
)
end
fun tryPopTop (q as {data, top, bot, depth}) =
let
val oldTop = !top
val {tag, idx} = TagIdx.unpack oldTop
val oldBot = Word32.toInt (!bot)
in
if oldBot <= idx then
NONE
else
let
val x = MLton.HM.arraySubNoBarrier (data, idx)
val newTop = TagIdx.pack {tag=tag, idx=idx+1}
in
if oldTop = cas top (oldTop, newTop) then
SOME (Option.valOf x, idx)
handle Option =>
( print ("[" ^ Int.toString (myWorkerId()) ^ "] Queue.tryPopTop error at idx " ^ Int.toString idx ^ "\n")
; raise Option
)
else
NONE
end
end
fun popBot (q as {data, top, bot, depth}) =
let
val oldBot = Word32.toInt (!bot)
val d = !depth
in
if oldBot <= d then
NONE
else
let
val newBot = oldBot-1
(* once again, we need a fence here, but the best we can do is a
* compare-and-swap. *)
(* val _ = bot := newBot *)
val _ = cas32 bot (oldBot, newBot)
val x = MLton.HM.arraySubNoBarrier (data, newBot)
val oldTop = !top
val {tag, idx} = TagIdx.unpack oldTop
in
if newBot > idx then
(arrayUpdate (data, newBot, NONE); x)
else if newBot < idx then
(* We are racing with a concurrent steal to take this single
* element x, but we already lost the race. So we only need to set
* the bottom to match the idx, i.e. set the deque to a proper
* empty state. *)
(* TODO: can we relax the cas to a normal write?
* Note that this cas is guaranteed to succeed... *)
(cas32 bot (newBot, idx); NONE)
else
(* We are racing with a concurrent steal to take this single
* element x, but we haven't lost the race yet. *)
let
val newTop = TagIdx.pack {tag=tag+0w1, idx=idx}
val oldTop' = cas top (oldTop, newTop)
in
if oldTop' = oldTop then
(* success; we get to keep x *)
(arrayUpdate (data, newBot, NONE); x)
else
(* two possibilities: either the steal succeeded (in which case
* the idx will have moved) or the GC will have interfered (in
* which case the tag will have advanced). *)
let
val {idx=idx', ...} = TagIdx.unpack oldTop'
in
if idx' <> idx then
(* The steal succeeded, so we don't get to keep this
* element. It's possible that the GC interfered also,
* but that doesn't change the fact that we don't get to
* keep this element! *)
(* TODO: can we relax the cas to a normal write?
* Note that this cas is guaranteed to succeed... *)
(cas32 bot (newBot, idx'); NONE)
else
(* the GC must have interfered, so try again. It _should_ be
* the case that the GC can't interfere on the second try,
* because we haven't done any significant allocation
* in-between tries. *)
(* SAM_NOTE: with new local scope handling in the GC, this
* case should be impossible. *)
die (fn _ => "scheduler bug: unexpected GC interference")
(* popBot q *)
end
end
end
end
fun size ({top, bot, ...} : 'a t) =
let
val thisBot = Word32.toInt (!bot)
val {idx, ...} = TagIdx.unpack (!top)
in
thisBot - idx
end
fun numResets ({top, ...} : 'a t) =
let
val {tag, ...} = TagIdx.unpack (!top)
in
Word64.toInt tag
end
end