-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathaio.ml
206 lines (183 loc) · 6.53 KB
/
aio.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
(* Asynchronous IO scheduler.
*
* For each blocking action, if the action can be performed immediately, then it
* is. Otherwise, the thread performing the blocking task is suspended and
* automatically wakes up when the action completes. The suspend/resume is
* transparent to the programmer.
*)
open Effect
open Effect.Deep
type file_descr = Unix.file_descr
type sockaddr = Unix.sockaddr
type msg_flag = Unix.msg_flag
type _ Effect.t += Fork : (unit -> unit) -> unit Effect.t
type _ Effect.t += Accept : file_descr -> (file_descr * sockaddr) Effect.t
type _ Effect.t +=
| Recv : file_descr * bytes * int * int * msg_flag list -> int Effect.t
type _ Effect.t +=
| Send : file_descr * bytes * int * int * msg_flag list -> int Effect.t
type _ Effect.t += Sleep : float -> unit Effect.t
let fork f = perform (Fork f)
let accept fd = perform (Accept fd)
let recv fd buf pos len mode = perform (Recv (fd, buf, pos, len, mode))
let send fd bus pos len mode = perform (Send (fd, bus, pos, len, mode))
let sleep timeout = perform (Sleep timeout)
(** Poll to see if the file descriptor is available to read. *)
let poll_rd fd =
let r, _, _ = Unix.select [ fd ] [] [] 0. in
match r with [] -> false | _ -> true
(** Poll to see if the file descriptor is available to write. *)
let poll_wr fd =
let _, r, _ = Unix.select [] [ fd ] [] 0. in
match r with [] -> false | _ -> true
type read =
| Accept of (file_descr * sockaddr, unit) continuation
| Recv of bytes * int * int * msg_flag list * (int, unit) continuation
type write =
| Send of bytes * int * int * msg_flag list * (int, unit) continuation
type timeout = Sleep of (unit, unit) continuation
type runnable =
| Thread : ('a, unit) continuation * 'a -> runnable
| Read : file_descr * read -> runnable
| Write : file_descr * write -> runnable
type state = {
run_q : runnable Queue.t;
read_ht : (file_descr, read) Hashtbl.t;
write_ht : (file_descr, write) Hashtbl.t;
sleep_ht : (float, timeout) Hashtbl.t;
}
let init () =
{
run_q = Queue.create ();
read_ht = Hashtbl.create 13;
write_ht = Hashtbl.create 13;
sleep_ht = Hashtbl.create 13;
}
let enqueue_thread st k x = Queue.push (Thread (k, x)) st.run_q
let enqueue_read st fd op = Queue.push (Read (fd, op)) st.run_q
let enqueue_write st fd op = Queue.push (Write (fd, op)) st.run_q
let dequeue st =
match Queue.pop st.run_q with
| Thread (k, x) -> continue k x
| Read (fd, Accept k) ->
let res = Unix.accept fd in
continue k res
| Read (fd, Recv (buf, pos, len, mode, k)) ->
let res = Unix.recv fd buf pos len mode in
continue k res
| Write (fd, Send (buf, pos, len, mode, k)) ->
let res = Unix.send fd buf pos len mode in
continue k res
let block_accept st fd k = Hashtbl.add st.read_ht fd (Accept k)
let block_recv st fd buf pos len mode k =
Hashtbl.add st.read_ht fd (Recv (buf, pos, len, mode, k))
let block_send st fd buf pos len mode k =
Hashtbl.add st.write_ht fd (Send (buf, pos, len, mode, k))
let block_sleep st span k =
let time = Unix.gettimeofday () +. span in
Hashtbl.add st.sleep_ht time (Sleep k)
(* Wakes up sleeping threads.
*
* Returns [(b, t)] where [t] is the eariest time in the future when a thread
* needs to wake up, and [b] is true if some thread is woken up.
*)
let wakeup st now : bool * float =
let l, w, n =
Hashtbl.fold
(fun t (Sleep k) (l, w, next) ->
if t <= now then (
enqueue_thread st k ();
(t :: l, true, next))
else if t < next then (l, w, t)
else (l, w, next))
st.sleep_ht ([], false, max_float)
in
List.iter (fun t -> Hashtbl.remove st.sleep_ht t) l;
(w, n)
let rec schedule st =
if Queue.is_empty st.run_q then
(* No runnable threads *)
if
Hashtbl.length st.read_ht = 0
&& Hashtbl.length st.write_ht = 0
&& Hashtbl.length st.sleep_ht = 0
then () (* We are done. *)
else
let now = Unix.gettimeofday () in
let thrd_has_woken_up, next_wakeup_time = wakeup st now in
if thrd_has_woken_up then schedule st
else if next_wakeup_time = max_float then perform_io st (-1.)
else perform_io st (next_wakeup_time -. now)
else (* Still have runnable threads *)
dequeue st
and perform_io st timeout =
let rd_fds = Hashtbl.fold (fun fd _ acc -> fd :: acc) st.read_ht [] in
let wr_fds = Hashtbl.fold (fun fd _ acc -> fd :: acc) st.write_ht [] in
let rdy_rd_fds, rdy_wr_fds, _ = Unix.select rd_fds wr_fds [] timeout in
let rec resume ht enqueue = function
| [] -> ()
| x :: xs ->
enqueue st x (Hashtbl.find ht x);
Hashtbl.remove ht x;
resume ht enqueue xs
in
resume st.read_ht enqueue_read rdy_rd_fds;
resume st.write_ht enqueue_write rdy_wr_fds;
if timeout > 0. then ignore (wakeup st (Unix.gettimeofday ())) else ();
schedule st
let run main =
let st = init () in
let rec fork st f =
match_with f ()
{
retc = (fun () -> schedule st);
exnc =
(fun exn ->
print_string (Printexc.to_string exn);
schedule st);
effc =
(fun (type a) (e : a Effect.t) ->
match e with
| Fork f ->
Some
(fun (k : (a, _) continuation) ->
enqueue_thread st k ();
fork st f)
| Accept fd ->
Some
(fun k ->
if poll_rd fd then
let res = Unix.accept fd in
continue k res
else (
block_accept st fd k;
schedule st))
| Recv (fd, buf, pos, len, mode) ->
Some
(fun k ->
if poll_rd fd then
let res = Unix.recv fd buf pos len mode in
continue k res
else (
block_recv st fd buf pos len mode k;
schedule st))
| Send (fd, buf, pos, len, mode) ->
Some
(fun k ->
if poll_wr fd then
let res = Unix.send fd buf pos len mode in
continue k res
else (
block_send st fd buf pos len mode k;
schedule st))
| Sleep t ->
Some
(fun k ->
if t <= 0. then continue k ()
else (
block_sleep st t k;
schedule st))
| _ -> None);
}
in
fork st main