-
Notifications
You must be signed in to change notification settings - Fork 177
/
Copy pathlwt.ml
2809 lines (2176 loc) · 96.7 KB
/
lwt.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(* OCaml promise library
* http://www.ocsigen.org/lwt
* Copyright (C) 2005-2008 Jérôme Vouillon
* Laboratoire PPS - CNRS Université Paris Diderot
* 2009-2012 Jérémie Dimino
* 2017 Anton Bachin
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, with linking exceptions;
* either version 2.1 of the License, or (at your option) any later
* version. See COPYING file for details.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
* 02111-1307, USA.
*)
(* Reading guide
Welcome to the implementation of the Lwt core! This is a big file, but we
hope that reading it (parts at a time!) will not be scary :) Here is why:
* Sectioning
The code is broken up into sections, each one of which is an internal module.
Most of the modules have a signature, which serves as a neat table of
contents.
It is recommended that you read this file with code folding enabled. If you
fold all the modules, you can visualize the logical structure of Lwt quite
easily. You can then expand modules as needed, depending on what part of the
implementation you are interested in. Without code folding, you face an
intimidating wall of code :( You can still visually parse the file, however,
because there are plenty of blank lines to help section things off. You can
also view this file folded online:
https://gist.github.com/aantron/9fab0bdead98a60fccf06e0189186863
https://gist.github.com/aantron/97b58520d5bb4858ccac6f54700a24d7
The signatures are unusual: big comments are absent. They are moved into the
modules, so that they are hidden by code folding when you (the reader!) are
not interested in those modules.
* Documentation
The documentation begins with an overview of major concepts and components.
This overview puts everything into context. You don't have to read the whole
thing. The overview begins with basic concepts, moves on to advanced ones,
and then gets into the truly esoteric. You can read about each concept on an
as-needed basis. However, once you have read the whole overview, you will be
aware of *everything* that is needed to understand, and work with, the core
of Lwt.
Littered in the code are additional comments, that go in-depth on various
local implementation details, opportunities, regrets, and the like.
The sections (modules) of the code correspond closely to sections of the
overview.
* Whitespace
The total line count of this file may seem frightening, but one third of it
is whitespace and comments, both there to help you read the remaining two
thirds!
Also, within those two thirds, there are large groups of functions that are
repetitive and formulaic, so there is much less conceptually-unique code in
Lwt than you might think at first.
* Please edit the code and the docs!
This code is meant to be readable, and to be edited. If you are reading
something, and think there is a better way to express it, please go ahead and
open a pull request to the Lwt repository at
https://github.com/ocsigen/lwt
Even if your pull request somehow doesn't get merged, you will have educated
the maintainers, not to mention other contributors, and users. This is true
even if the change is trivial -- sometimes, maintainers just need to be
educated multiple times before they see the wisdom of it :/
Likewise, if you would like to make a code contribution to the Lwt core, it
is quite welcome, and we hope that this code is readable enough for you to be
able to make it!
Enjoy! *)
(* Overview
In this file, there is a "model" function -- [Lwt.bind] -- which pulls
together many (though not all) of the concepts and helpers discussed in this
overview. To find it, search for "let bind," and you can examine it while
reading the overview. The authors of this file intend to put extra effort
into writing nice comments inside [Lwt.bind] :)
0. Main mechanism and two aspects
The Lwt interface ([lwt.mli]) provides one main mechanism, promises, and two
"aspects," which are *not* necessary to understand the main mechanism
promises, but they are still there:
- promise cancelation
- sequence-associated storage
If you are not interested in cancelation or storage, you can ignore these two
complications, and still get a pretty good understanding of the code. To
help, all identifiers related to cancelation contain the string "cancel," and
all identifiers related to storage contain "storage."
1. Promises
A promise is a cell that can be in one of two states: "completed" or
"pending."
- Completed promises
A completed promise is either "resolved" with a value, or "failed" with an
exception. The state of a completed promise will never change again: a
completed promise is immutable. A completed promise is basically equivalent
to an [('a, exn) Pervasives.result]. Completed promises are produced in two
ways:
- [Lwt.return], [Lwt.fail], and related functions, produce "trivial"
promises that are completed from the start.
- The other way is to complete a promise that started out pending.
Note that failed promises have nothing to do with unhandled exceptions.
- Pending promises
...are those that may become completed in the future. Each pending promise
carries a list of callbacks. These callbacks are added by functions like
[Lwt.bind], and called by Lwt if/when the promise completes. These
callbacks typically end up completing additional promises; see section
"Completion loop" below.
Pending promises are produced in three ways, according to how they can be
completed:
- Initial promises
...are created by [Lwt.wait] and [Lwt.task]. The user of Lwt completes
these promises manually, through the resolvers returned by those
functions.
- Sequential composition
For example, [Lwt.bind]. These promises only complete when the preceding
sequence of promises completes. The user cannot complete these promises
directly (but see the section on cancelation below).
- Concurrent composition
For example, [Lwt.join] or [Lwt.choose]. These promises only complete
when all or one of a set of "preceding" promises complete. The user
cannot complete these promises directly (but see the section on
cancelation below).
2. Resolvers
Resolvers are given to the user by [Lwt.wait] and [Lwt.task], and can be used
by the user to complete the corresponding promises. Note that this means the
user only ever gets resolvers for initial promises.
Internally, resolvers are the exact same objects as the promises they
complete, even though the resolver is exposed as a reference of a different
type by [lwt.mli]. For details on why, see section "Type system abuse" below.
3. Callbacks
...are attached by Lwt to pending promises, and are run by Lwt if/when those
promises complete. These callbacks are not directly exposed through
[lwt.mli] -- they are a low-level mechanism. For example, to implement
[Lwt.bind p f], Lwt attaches a callback to [p] that does some internal Lwt
book-keeping, and then calls [f] if [p] resolved, and does something else if
[p] failed.
Callbacks come in two flavors: regular callbacks and cancel callbacks. The
only material differences between them are that:
- regular callbacks are always called when a promise is completed, but cancel
callbacks are called, in addition, only if the promise is canceled, and
- all cancel callbacks of a promise are called before any regular callback
is called.
Cancelation is a special case of completion, but see the section on
cancelation later below.
4. Completion loop
Completing a pending promise triggers its callbacks, and those might complete
more pending promises, triggering more callbacks, etc. This behavior is the
*completion loop*. Lwt has some machinery to avoid stack overflow and other
unfortunate situations during this loop.
This chaining of promise completions through callbacks can be seen as a kind
of promise dependency graph, in which the nodes are pending promises, and the
edges are callbacks. During the completion loop, Lwt starts at some initial
promise that is getting completed by the user, and recursively completes all
dependent promises. The graph is modified: completed promises are removed
from it.
Some of these dependencies are explicit to Lwt, e.g. the callbacks registered
by [Lwt.bind]. Others are not visible to Lwt, because the user can always
register a callback using a function like [Lwt.on_success], and use that
callback to complete another initial promise. All the explicit dependencies
are created by Lwt's own sequential and concurrent composition functions
(so, [Lwt.bind], [Lwt.join], etc). Whether dependencies are explicit or not
is relevant only to cancelation.
5. Cancelation
As described above, ordinary promise completion proceeds from an initial
promise, forward along callbacks through the dependency graph. Since it
starts from an initial promise, it can only be triggered using a resolver.
Cancelation is a sort of dual to ordinary completion. Instead of starting at
an initial promise/resolver, cancelation starts at *any* promise. It then
goes *backwards* through the explicit dependency graph, looking for
cancelable initial promises to cancel -- those that were created by
[Lwt.task]. After finding them, cancelation completes them normally with
[Failed Lwt.Canceled], causing an ordinary promise completion process.
To summarize, cancelation is a way to trigger an *ordinary* completion of
promises created with [Lwt.task], by first searching for them in the promise
dependency graph (which is assembled by [Lwt.bind], [Lwt.join], etc).
This backwards search is triggered only by [Lwt.cancel]. It is also possible
for the user to cancel a promise directly by failing it with [Lwt.Canceled],
but in all cases where the user can do so, the search would be redundant
anyway -- the user has only two ways of directly failing a promise with
[Lwt.Canceled] (or any exception, for that matter):
- The user can create an initial promise, then fail it through its resolver.
The search is redundant because it would find only the same initial promise
to cancel.
- The user can create a trivial promise by calling [Lwt.fail Lwt.Canceled].
The search is again redundant; in this case it would find nothing to
cancel.
Note that there is a quirk: only promises created by [Lwt.task] are
susceptible to being canceled by [Lwt.cancel], but the user can manually
cancel initial promises created by both [Lwt.task] and [Lwt.wait].
Due to [Lwt.cancel], promise cancelation, and therefore completion, can be
initiated by the user without access to a resolver. This is important for
reasoning about state changes in the implementation of Lwt, and is referenced
in some implementation detail comments.
6. No I/O
The Lwt core deliberately doesn't do I/O. The completion loop stops running
once no promises can be completed immediately. It has to be restarted later
by some surrouding I/O loop. This I/O loop typically keeps track of pending
promises that represent blocked or in-progress I/O; other pending promises
that indirectly depend on I/O are not explicitly tracked. They are retained
in memory by references captured inside callbacks.
On Unix and Windows, a separate top-level loop, typically [Lwt_main.run], is
necessary to repeatedly call [select], [epoll], or [kevent], and complete
blocked I/O promises.
In JavaScript, references to promises are retained by JavaScript code, which
is, in turn, triggered by the JS engine. In other words, the top-level loop
is buried inside the JS engine.
This separation of the Lwt core from the top-level I/O loop keeps the core
portable.
7. Promise "proxying"
In [Lwt.bind : 'a t -> ('a -> 'b t) -> 'b t], the outer ['b t] is created by
[bind] first, and returned to the user. The inner ['b t] is created by the
user later, and then returned to [bind]. At that point, [bind] needs to make
the inner and outer ['b t]s behave identically.
This is accomplished by making one of the promises point to the other. The
first of the promises thus becomes a "proxy," and the other is its
"underlying" promise.
After that, all operations that would be performed by Lwt on the proxy are
instead performed on the underlying promise. This is ensured by the numerous
calls to the internal function [underlying] in this file.
Because of the pervasive use of [underlying], proxies can be more or less
ignored on a first reading the code. However, becoming a proxy is a kind of
state change, and any promise that is returned by a callback to [bind], or to
a similar Lwt function, might become a proxy. That means: just about any
promise that is handed to the user, might become a proxy promise by the next
time Lwt sees it. This is important for reasoning about possible state
changes in implementation of Lwt, and is referenced in some implementation
detail comments.
8. Sequence-associated storage
Lwt has a global key-value map. The map can be preserved across sequential
composition functions, so that it has the same state in the user's callback
[f] as it did at the time the user called [Lwt.bind p f].
The details are pretty straightforward, and discussed in module
[Sequence_associated_storage]. The main thing to be aware of is the many
references to [current_storage] throughout Lwt, which are needed to properly
save and restore the mapping.
9. Type system abuse
The implementation uses the type system somewhat extensively. For example,
the promise state is a GADT which encodes the state in its type parameters.
Thus, if you do [let p = underlying p], the shadowing reference [p] is
statically known *not* to be a proxy, and the compiler knows that the
corresponding match case [Proxy _] is impossible.
The external promise type, ['a t], and the external resolver type, ['a u],
are not GADTs. Furthermore, they are, respectively, covariant and
contravariant in ['a], while the internal promise type is invariant in
['a]. For these reasons, there are nasty casts between ['a t], ['a u], and
the internal promise type. The implementation is, of course, written in
terms of the internal type.
Casting from an ['a t] to an internal promise produces a reference for
which the state is "unknown": this is simulated with a helper GADT, which
encodes existential types. There are several similar casts, which are used
to document possible state changes between the time a promise is created,
and the later time it is used in a callback. You can see these casts in
action in [Lwt.bind]. The cast syntax is pretty light, and, besides being
commented in [bind], all such casts are documented in modules [Public_types]
and [Basic_helpers].
There is an abstract type [in_completion_loop], which is actually just
[unit], that is passed around to help ensure that certain functions can only
be called during the completion loop. Those functions can't be called without
a value of type [in_completion_loop], and the only way to obtain such a value
is to enter the loop ([run_in_completion_loop]). This mechanism is described
at [Completion_loop.complete].
If you've made it this far, you are an Lwt expert! Rejoice! *)
(* Suppress warning 4, "fragile pattern matching," in this file only, due to
https://caml.inria.fr/mantis/view.php?id=7451
This can be removed if/when Lwt requires a minimum OCaml version 4.05. *)
[@@@ocaml.warning "-4"]
(* Some sequence-associated storage types
Sequence-associated storage is defined and documented later, in module
[Sequence_associated_storage]. However, the following types are mentioned in
the definition of [promise], so they must be defined here first. *)
module Storage_map =
Map.Make
(struct
type t = int
let compare = compare
end)
type storage = (unit -> unit) Storage_map.t
module Main_internal_types =
struct
(* Phantom types for use with types [promise] and [state]. These are never
constructed; the purpose of the constructors is to prove to the type
checker that these types are distinct from each other. Warning 37, "unused
constructor," therefore has to be temporarily suppressed. *)
[@@@ocaml.warning "-37"]
type underlying = private Underlying_and_this_constructor_is_not_used
type proxy = private Proxy_and_this_constructor_is_not_used
type completed = private Completed_and_this_constructor_is_not_used
type pending = private Pending_and_this_constructor_is_not_used
[@@@ocaml.warning "+37"]
(* Promises proper. *)
type ('a, 'u, 'c) promise = {
mutable state : ('a, 'u, 'c) state;
}
and (_, _, _) state =
| Resolved : 'a -> ('a, underlying, completed) state
| Failed : exn -> ( _, underlying, completed) state
| Pending : 'a callbacks -> ('a, underlying, pending) state
| Proxy : ('a, _, 'c) promise -> ('a, proxy, 'c) state
(* Note:
A promise whose state is [Proxy _] is a "proxy" promise. A promise whose
state is *not* [Proxy _] is an "underlying" promise.
The "underlying promise of [p]" is:
- [p], if [p] is itself underlying.
- Otherwise, [p] is a proxy and has state [Proxy p']. The underlying
promise of [p] is the underlying promise of [p'].
In other words, to find the underlying promise of a proxy, Lwt follows the
[Proxy _] links to the end. *)
(* Note:
When a promise is completed, or becomes a proxy, its state field is
mutated. This invalidates the type invariants on the promise. See internal
function [set_promise_state] for details about that.
When an Lwt function has a reference to a promise, and also registers a
callback that has a reference to the same promise, the invariants on the
reference may become invalid by the time the callback is called. All such
callbacks have comments explaining what the valid invariants are at that
point, and/or casts to (1) get the correct typing and (2) document the
potential state change for readers of the code. *)
(* Callback information for pending promises. *)
and 'a callbacks = {
mutable regular_callbacks : 'a regular_callback_list;
mutable cancel_callbacks : 'a cancel_callback_list;
mutable how_to_cancel : how_to_cancel;
mutable cleanups_deferred : int;
}
and 'a regular_callback = in_completion_loop -> 'a completed_state -> unit
and cancel_callback = in_completion_loop -> unit
and 'a completed_state = ('a, underlying, completed) state
and in_completion_loop (* = unit, see [Completion_loop.complete]. *)
and how_to_cancel =
| Not_cancelable : how_to_cancel
| Cancel_this_promise : how_to_cancel
| Propagate_cancel_to_one : (_, _, _) promise -> how_to_cancel
| Propagate_cancel_to_several : (_, _, _) promise list -> how_to_cancel
and 'a regular_callback_list =
| Regular_callback_list_empty
| Regular_callback_list_concat of
'a regular_callback_list * 'a regular_callback_list
| Regular_callback_list_implicitly_removed_callback of
'a regular_callback
| Regular_callback_list_explicitly_removable_callback of
'a regular_callback option ref
and _ cancel_callback_list =
| Cancel_callback_list_empty :
_ cancel_callback_list
| Cancel_callback_list_concat :
'a cancel_callback_list * 'a cancel_callback_list ->
'a cancel_callback_list
| Cancel_callback_list_callback :
storage * cancel_callback ->
_ cancel_callback_list
| Cancel_callback_list_remove_sequence_node :
('a, _, _) promise Lwt_sequence.node ->
'a cancel_callback_list
(* Notes:
These type definitions are guilty of performing several optimizations,
without which they would be much easier to understand.
- The type parameters of ['a completed_state] guarantee that it is either
[Resolved _] or [Failed _]. So, it is equivalent to
[('a, exn) Pervasives.result], and, indeed, should have an identical
memory representation.
- As per the Overview, there are regular callbacks and cancel callbacks.
Cancel callbacks are called only on cancelation, and, then, before any
regular callbacks are called.
Despite the different types for the two kinds of callbacks, they are
otherwise the same. Cancel callbacks just don't need a result state
argument, because it is known to be [Failed Canceled].
- Regular callbacks are not allowed to raise exceptions. All regular
callbacks are created in this file, so this can be checked.
Cancel callbacks can raise exceptions, but if they do so, the exceptions
are passed to [async_exception_hook].
- [how_to_cancel] implements the dependency graph mentioned in the
Overview. It is traversed backwards during [Lwt.cancel]. It is a GADT
because we don't care about the actual types of the promise references
stored, or their invariants. The constructors correspond to pending
promise kinds as follows:
- [Not_cancelable]: initial, [Lwt.wait].
- [Cancel_this_promise]: initial, [Lwt.task].
- [Propagate_cancel_to_one]: sequential composition, e.g. [Lwt.bind].
- [Propagate_cancel_to_several]: concurrent composition, e.g.
[Lwt.join].
- The two callback list types are ordinary append-friendly lists, with two
optimizations inlined:
- ['a regular_callback_list] apparently has two "kinds" of regular
callbacks, implicitly removed and explicitly removable. All callbacks
are removable. It's just that, for some callbacks, they will only be
removed at the same time that the promise they are attached to becomes
completed. When that happens, the entire state of that promise changes
to [Resolved _] or [Failed _], and the reference to the whole callback
list is simply lost. This "removes" the callback. For these callbacks,
['a regular_callback_list] attempts to trim an option and a reference
cell with the [Regular_callback_list_implicitly_removed_callback]
constructor.
- ['a cancel_callback_list] has
[Cancel_callback_list_remove_sequence_node node], which is the same as
[Cancel_callback_list_callback (_, (fun _ ->
Lwt_sequence.remove node))].
This was probably done to avoid a closure allocation.
- The [cleanups_deferred] field is explained in module [Callbacks]. *)
end
open Main_internal_types
module Public_types =
struct
type +'a t
type -'a u
let to_public_promise : ('a, _, _) promise -> 'a t = Obj.magic
let to_public_resolver : ('a, _, _) promise -> 'a u = Obj.magic
type _ packed_promise =
| Internal : ('a, _, _) promise -> 'a packed_promise
[@@ocaml.unboxed]
let to_internal_promise (p : 'a t) : 'a packed_promise =
Internal (Obj.magic p)
let to_internal_resolver (r : 'a u) : 'a packed_promise =
Internal (Obj.magic r)
(* Most functions that take a public promise (['a t]) convert it to an
internal promise as follows:
(* p : 'a t *)
let Internal p = to_internal_promise p in
(* p : ('a, u, c) promise, where u and c are fresh types, i.e. the
invariants on p are unknown. *)
This cast is a no-op cast. It only produces a reference with a different
type. The introduction and immediate elimination of [Internal _] seems to
be optimized away even on older versions of OCaml that don't have Flambda
and don't support [[@@ocaml.unboxed]]. *)
(* Internal name of the public [+'a Lwt.result]. The public name is defined
later in the module. This is to avoid potential confusion with
[Pervasives.result]/[Result.result], as the public name would not be
prefixed with [Lwt.] inside this file. *)
type +'a lwt_result = ('a, exn) Result.result
(* This could probably save an allocation by using [Obj.magic]. *)
let state_of_result = function
| Result.Ok x -> Resolved x
| Result.Error exn -> Failed exn
end
include Public_types
module Basic_helpers :
sig
val identical : ('a, _, _) promise -> ('a, _, _) promise -> bool
val underlying : ('a, 'u, 'c) promise -> ('a, underlying, 'c) promise
type ('a, 'u, 'c) state_changed =
| State_may_have_changed of ('a, 'u, 'c) promise
[@@ocaml.unboxed]
val set_promise_state :
('a, _, _) promise -> ('a, 'u, 'c) state -> ('a, 'u, 'c) state_changed
type 'a may_now_be_proxy =
| State_may_now_be_pending_proxy :
('a, _, pending) promise -> 'a may_now_be_proxy
[@@ocaml.unboxed]
val may_now_be_proxy :
('a, underlying, pending) promise -> 'a may_now_be_proxy
end =
struct
(* Checks physical equality ([==]) of two internal promises. Unlike [==], does
not force unification of their invariants. *)
let identical p1 p2 =
(to_public_promise p1) == (to_public_promise p2)
(* [underlying p] evaluates to the underlying promise of [p].
If multiple [Proxy _] links are traversed, [underlying] updates all the
proxies to point immediately to their final underlying promise. *)
let rec underlying
: 'u 'c. ('a, 'u, 'c) promise -> ('a, underlying, 'c) promise =
fun
(type u)
(type c)
(p : ('a, u, c) promise) ->
match p.state with
| Resolved _ -> (p : (_, underlying, _) promise)
| Failed _ -> p
| Pending _ -> p
| Proxy p' ->
let p'' = underlying p' in
if not (identical p'' p') then
p.state <- Proxy p'';
p''
type ('a, 'u, 'c) state_changed =
| State_may_have_changed of ('a, 'u, 'c) promise
[@@ocaml.unboxed]
let set_promise_state p state =
let p : (_, _, _) promise = Obj.magic p in
p.state <- state;
State_may_have_changed p
(* [set_promise_state p state] mutates the state of [p], and evaluates to a
(wrapped) reference to [p] with the same invariants as on [state]. The
original reference [p] should be shadowed when calling this function:
let State_may_have_changed p = set_promise_state p (Resolved 42) in ...
This is a kind of cheap imitation of linear typing, which is good enough
for the needs of [lwt.ml].
Internal functions that transitively call [set_promise_state] likewise
return the new reference. This ends at some top-level function, typically
either a callback or a function in the public API. There, the new reference
is still bound, but is then explicitly ignored.
The state of a promise is never updated directly outside this module
[Basic_helpers]. All updates elsewhere are done through
[set_promise_state].
To avoid problems with type-level invariants not matching reality, data
structures do not store promises with concrete invariants -- except
completed promises, which are immutable. Indeed, if one looks at
definitions of data structures that can store pending promises, e.g. the
[how_to_cancel] graph, the invariants are existentially quantified.
Note: it's possible to statically disallow the setting of the [state] field
by making type [promise] private. However, that seems to require writing a
signature that is a near-duplicate of [Main_internal_types], or some abuse
of functors. *)
type 'a may_now_be_proxy =
| State_may_now_be_pending_proxy :
('a, _, pending) promise -> 'a may_now_be_proxy
[@@ocaml.unboxed]
let may_now_be_proxy p = State_may_now_be_pending_proxy p
(* Many functions, for example [Lwt.bind] and [Lwt.join], create a fresh
pending promise [p] and return it to the user.
They do not return a corresponding resolver. That means that only the
function itself (typically, a callback registered by it) can complete [p].
The only thing the user can do directly is try to cancel [p], but, since
[p] is not an initial promise, the cancelation attempt simply propagates
past [p] to [p]'s predecessors. If that eventually results in canceling
[p], it will be through the normal mechanisms of the function (e.g.
[Lwt.bind]'s callback).
As a result, the only possible state change, before the callback, is that
[p] may have become a proxy. Now,
- If [p] does not undergo this state change and become a proxy, it remains
an underlying, pending promise.
- If [p] does become a proxy, it will be a proxy for another promise [p']
created fresh by [Lwt.bind], to which this same argument applies. See
[make_into_proxy].
So, by induction on the length of the proxy ([Proxy _]) chain, at the time
the callback is called, [p] is either an underlying, pending promise, or a
proxy for a pending promise.
The cast
let State_may_now_be_pending_proxy p = may_now_be_proxy p in ...
encodes the possibility of this state change. It replaces a reference
p : ('a, underlying, pending)
with
p : ('a, $Unknown, pending)
and is typically seen at the beginning of callbacks registered by
[Lwt.bind] and similar functions.
The cast is a no-op cast. The introduction and immediate elimination of
[State_may_have_changed _] seems to be optimized away even on old versions
of OCaml. *)
end
open Basic_helpers
module Sequence_associated_storage :
sig
(* Public interface *)
type 'v key
val new_key : unit -> _ key
val get : 'v key -> 'v option
val with_value : 'v key -> 'v option -> (unit -> 'b) -> 'b
(* Internal interface *)
val current_storage : storage ref
end =
struct
(* The idea behind sequence-associated storage is to preserve some values
during a call to [bind] or other sequential composition operation, and
restore those values in the callback function:
Lwt.with_value my_key (Some "foo") (fun () ->
p >|= fun () ->
assert (Lwt.get my_key = Some "foo"))
(* Will succeed even if this callback is called later. *)
Note that it does not matter that the callback is defined within an
argument of [with_value], i.e., this does the same:
let f = fun () -> assert (Lwt.get my_key = Some "foo") in
Lwt.with_value my_key (Some "foo") (fun () -> p >|= f)
All that matters is that the top-most sequencing operation (in this case,
map) is executed by that argument.
This is implemented using a single global heterogeneous key-value map.
Sequential composition functions snapshot this map when they are called,
and restore the snapshot right before calling the user's callback. The same
happens for cancel triggers added by [on_cancel].
Maintainer's note: I think using this mechanism should be discouraged in
new code. *)
type 'v key = {
id : int;
mutable value : 'v option;
}
let next_key_id = ref 0
let new_key () =
let id = !next_key_id in
next_key_id := id + 1;
{id = id; value = None}
let current_storage = ref Storage_map.empty
let get key =
try
let refresh = Storage_map.find key.id !current_storage in
refresh ();
let value = key.value in
key.value <- None;
value
with Not_found ->
None
let with_value key value f =
let new_storage =
match value with
| Some _ ->
let refresh = fun () -> key.value <- value in
Storage_map.add key.id refresh !current_storage
| None ->
Storage_map.remove key.id !current_storage
in
let saved_storage = !current_storage in
current_storage := new_storage;
try
let result = f () in
current_storage := saved_storage;
result
with exn ->
current_storage := saved_storage;
raise exn
end
include Sequence_associated_storage
module Callbacks :
sig
(* Mutating callback lists attached to pending promises *)
val add_implicitly_removed_callback :
'a callbacks -> 'a regular_callback -> unit
val add_explicitly_removable_callback_to_each_of :
'a t list -> 'a regular_callback -> unit
val add_explicitly_removable_callback_and_give_remove_function :
'a t list -> 'a regular_callback -> (unit -> unit)
val add_cancel_callback : 'a callbacks -> (unit -> unit) -> unit
val merge_callbacks : from:'a callbacks -> into:'a callbacks -> unit
end =
struct
let concat_regular_callbacks l1 l2 =
begin match l1, l2 with
| Regular_callback_list_empty, _ -> l2
| _, Regular_callback_list_empty -> l1
| _, _ -> Regular_callback_list_concat (l1, l2)
end [@ocaml.warning "-4"]
let concat_cancel_callbacks l1 l2 =
begin match l1, l2 with
| Cancel_callback_list_empty, _ -> l2
| _, Cancel_callback_list_empty -> l1
| _, _ -> Cancel_callback_list_concat (l1, l2)
end [@ocaml.warning "-4"]
(* In a callback list, filters out cells of explicitly removable callbacks
that have been removed. *)
let rec clean_up_callback_cells = function
| Regular_callback_list_explicitly_removable_callback {contents = None} ->
Regular_callback_list_empty
| Regular_callback_list_explicitly_removable_callback {contents = Some _}
| Regular_callback_list_implicitly_removed_callback _
| Regular_callback_list_empty as callbacks ->
callbacks
| Regular_callback_list_concat (l1, l2) ->
let l1 = clean_up_callback_cells l1 in
let l2 = clean_up_callback_cells l2 in
concat_regular_callbacks l1 l2
(* See [clear_explicitly_removable_callback_cell] and [merge_callbacks]. *)
let cleanup_throttle = 42
(* Explicitly removable callbacks are added (mainly) by [Lwt.choose] and its
similar functions. In [Lwt.choose [p; p']], if [p'] completes first, the
callback added by [Lwt.choose] to [p] is removed.
The removal itself is accomplished when this function clears the reference
cell [cell], which contains the reference to that callback.
If [p] is a long-pending promise that repeatedly participates in
[Lwt.choose], perhaps in a loop, it will accumulate a large number of
cleared reference cells in this fashion. To avoid a memory leak, they must
be cleaned up. However, the cells are not cleaned up on *every* removal,
presumably because scanning the callback list that often, and rebuilding
it, can get expensive.
Cleanup is throttled by maintaining a counter, [cleanups_deferred], on each
pending promise. The counter is incremented each time this function wants
to clean the callback list (right after clearing a cell). When the counter
reaches [cleanup_throttle], the callback list is actually scanned and
cleared callback cells are removed. *)
let clear_explicitly_removable_callback_cell cell ~originally_added_to:ps =
cell := None;
(* Go through the promises the cell had originally been added to, and either
defer a cleanup, or actually cleanup their callback lists. *)
ps |> List.iter (fun p ->
let Internal p = to_internal_promise p in
match (underlying p).state with
(* Some of the promises may already have completed at the time this
function is called. *)
| Resolved _ -> ()
| Failed _ -> ()
| Pending callbacks ->
match callbacks.regular_callbacks with
(* If the promise has only one regular callback, and it is removable, it
must have been the cell cleared in this function, above. In that
case, just set its callback list to empty. *)
| Regular_callback_list_explicitly_removable_callback _ ->
callbacks.regular_callbacks <- Regular_callback_list_empty
(* Maintainer's note: I think this function shouldn't try to trigger a
cleanup in the first two cases, but I am preserving them for now, as
this is how the code was written in the past. *)
| Regular_callback_list_empty
| Regular_callback_list_implicitly_removed_callback _
| Regular_callback_list_concat _ ->
let cleanups_deferred = callbacks.cleanups_deferred + 1 in
if cleanups_deferred > cleanup_throttle then begin
callbacks.cleanups_deferred <- 0;
callbacks.regular_callbacks <-
clean_up_callback_cells callbacks.regular_callbacks
end else
callbacks.cleanups_deferred <- cleanups_deferred)
(* Concatenates both kinds of callbacks on [~from] to the corresponding lists
of [~into]. The callback lists on [~from] are *not* then cleared, because
this function is called only by [Sequential_composition.make_into_proxy],
which immediately changes the state of [~from] and loses references to the
original callback lists.
The [cleanups_deferred] fields of both promises are summed, and if the sum
exceeds [cleanup_throttle], a cleanup of regular callbacks is triggered.
This is to prevent memory leaks; see
[clear_explicitly_removable_callback_cell]. *)
let merge_callbacks ~from ~into =
let regular_callbacks =
concat_regular_callbacks into.regular_callbacks from.regular_callbacks in
let cleanups_deferred = into.cleanups_deferred + from.cleanups_deferred in
let regular_callbacks, cleanups_deferred =
if cleanups_deferred > cleanup_throttle then
clean_up_callback_cells regular_callbacks, 0
else
regular_callbacks, cleanups_deferred
in
let cancel_callbacks =
concat_cancel_callbacks into.cancel_callbacks from.cancel_callbacks in
into.regular_callbacks <- regular_callbacks;
into.cancel_callbacks <- cancel_callbacks;
into.cleanups_deferred <- cleanups_deferred
(* General, internal, function for adding a regular callback. *)
let add_regular_callback_list_node callbacks node =
callbacks.regular_callbacks <-
match callbacks.regular_callbacks with
| Regular_callback_list_empty ->
node
| Regular_callback_list_implicitly_removed_callback _
| Regular_callback_list_explicitly_removable_callback _
| Regular_callback_list_concat _ as existing ->
Regular_callback_list_concat (node, existing)
let add_implicitly_removed_callback callbacks f =
add_regular_callback_list_node
callbacks (Regular_callback_list_implicitly_removed_callback f)
(* Adds [callback] as removable to each promise in [ps]. The first promise in
[ps] to trigger [callback] removes [callback] from the other promises; this
guarantees that [callback] is called at most once. All the promises in [ps]
must be pending.
This is an internal function, indirectly used by the implementations of
[Lwt.choose] and related functions. *)
let add_explicitly_removable_callback_and_give_cell ps f =
let rec cell = ref (Some self_removing_callback_wrapper)
and self_removing_callback_wrapper result =
clear_explicitly_removable_callback_cell cell ~originally_added_to:ps;
f result
in
let node = Regular_callback_list_explicitly_removable_callback cell in
ps |> List.iter (fun p ->
let Internal p = to_internal_promise p in
match (underlying p).state with
| Pending callbacks -> add_regular_callback_list_node callbacks node
| Resolved _ -> assert false
| Failed _ -> assert false);