forked from ocaml-multicore/eio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheio.mli
1436 lines (1050 loc) · 55.1 KB
/
eio.mli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(** Effects based parallel IO for OCaml.
Eio provides support for concurrency (juggling many tasks) and
parallelism (using multiple CPU cores for performance).
It provides facilities for creating and coordinating fibers (light-weight
threads) and domains (for parallel processing), as well as interfaces for
interacting with resources provided by the operating system.
These features must be used within an {e event loop},
provided by an Eio {e backend}.
Applications can use {!Eio_main.run} to run a suitable loop.
See {{:https://github.com/ocaml-multicore/eio}} for a tutorial. *)
(** {1 Concurrency primitives} *)
(** Grouping fibers and other resources so they can be turned off together. *)
module Switch : sig
(** Many resources in Eio (such as fibers and file handles) require a switch to
be provided when they are created. The resource cannot outlive its switch.
If a function wants to create such resources, and was not passed a switch
as an argument, it will need to create a switch using {!run}.
This doesn't return until all resources attached to it have been freed,
preventing the function from leaking resources.
Any function creating resources that outlive it needs to be given a
switch by its caller.
Each switch includes its own {!Cancel.t} context.
Calling {!fail} cancels all fibers attached to the switch and, once they
have exited, reports the error.
Note: this concept is known as a "nursery" or "bundle" in some other systems.
Example:
{[
Switch.run (fun sw ->
let flow = Dir.open_in ~sw dir "myfile.txt" in
...
);
(* [flow] will have been closed by this point *)
]}
*)
type t
(** A switch contains a group of fibers and other resources (such as open file handles). *)
(** {2 Switch creation} *)
val run : (t -> 'a) -> 'a
(** [run fn] runs [fn] with a fresh switch (initially on).
When [fn] finishes, [run] waits for all fibers registered with the switch to finish,
and then releases all attached resources.
If {!fail} is called, [run] will re-raise the exception (after everything is cleaned up).
If [fn] raises an exception, it is passed to {!fail}. *)
val run_protected : (t -> 'a) -> 'a
(** [run_protected fn] is like [run] but ignores cancellation requests from the parent context. *)
(** {2 Cancellation and failure} *)
val check : t -> unit
(** [check t] checks that [t] is still on.
@raise Cancel.Cancelled If the switch has been cancelled. *)
val get_error : t -> exn option
(** [get_error t] is like [check t] except that it returns the exception instead of raising it.
If [t] is finished, this returns (rather than raising) the [Invalid_argument] exception too. *)
val fail : ?bt:Printexc.raw_backtrace -> t -> exn -> unit
(** [fail t ex] adds [ex] to [t]'s set of failures and
ensures that the switch's cancellation context is cancelled,
to encourage all fibers to exit as soon as possible.
[fail] returns immediately, without waiting for the shutdown actions to complete.
The exception will be raised later by {!run}, and [run]'s caller is responsible for handling it.
{!Exn.combine} is used to avoid duplicate or unnecessary exceptions.
@param bt A backtrace to attach to [ex] *)
(** {2 Cleaning up resources}
It is possible to attach clean-up hooks to a switch.
Once all fibres within the switch have finished, these hooks are called.
For example, when a file is opened it will register a release hook to close it.
Functions that create such resources will take a switch argument
and call these functions for you.
You usually don't need to call these directly. *)
val on_release : t -> (unit -> unit) -> unit
(** [on_release t fn] registers [fn] to be called once [t]'s main function has returned
and all fibers have finished.
If [fn] raises an exception, it is passed to {!fail}.
Release handlers are run in LIFO order, in series.
Note that [fn] is called within a {!Cancel.protect}, since aborting clean-up actions is usually a bad idea
and the switch may have been cancelled by the time it runs. *)
type hook
(** A handle for removing a clean-up callback. *)
val null_hook : hook
(** A dummy hook. Removing it does nothing. *)
val on_release_cancellable : t -> (unit -> unit) -> hook
(** Like [on_release], but the handler can be removed later.
For example, opening a file will call [on_release_cancellable] to ensure the file is closed later.
However, if the file is manually closed before that, it will use {!remove_hook} to remove the hook,
which is no longer needed. *)
val remove_hook : hook -> unit
(** [remove_hook h] removes a previously-added hook.
If the hook has already been removed, this does nothing. *)
(** {2 Debugging} *)
val dump : t Fmt.t
(** Dump out details of the switch's state for debugging. *)
end
(** A promise is a placeholder for result that will arrive in the future. *)
module Promise : sig
(** Unlike lazy values, you cannot "force" promises;
a promise is resolved when the maker of the promise is ready.
Promises are thread-safe and so can be shared between domains and used
to communicate between them.
Example:
{[
let promise, resolver = Promise.create () in
Fiber.both
(fun () -> traceln "Got %d" (Promise.await promise))
(fun () -> Promise.resolve resolver 42)
]} *)
type !'a t
(** An ['a t] is a promise for a value of type ['a]. *)
type 'a u
(** An ['a u] is a resolver for a promise of type ['a]. *)
val create : ?label:string -> unit -> 'a t * 'a u
(** [create ()] is a fresh promise/resolver pair.
The promise is initially unresolved. *)
val create_resolved : 'a -> 'a t
(** [create_resolved x] is a promise that is already resolved with result [x]. *)
val await : 'a t -> 'a
(** [await t] blocks until [t] is resolved.
If [t] is already resolved then this returns immediately. *)
val resolve : 'a u -> 'a -> unit
(** [resolve u v] resolves [u]'s promise with the value [v].
Any threads waiting for the result will be added to the run queue.
@raise Invalid_argument if [u] is already resolved. *)
val peek : 'a t -> 'a option
(** [peek t] is [Some v] if the promise has been resolved to [v], or [None] otherwise.
If the result is [None] then it may change in future, otherwise it won't.
If another domain has access to the resolver then the state may have already
changed by the time this call returns. *)
val is_resolved : 'a t -> bool
(** [is_resolved t] is [Option.is_some (peek t)]. *)
(** {1 Result promises} *)
type 'a or_exn = ('a, exn) result t
val resolve_ok : ('a, 'b) result u -> 'a -> unit
(** [resolve_ok u x] is [resolve u (Ok x)]. *)
val resolve_error : ('a, 'b) result u -> 'b -> unit
(** [resolve_error u x] is [resolve u (Error x)]. *)
val await_exn : 'a or_exn -> 'a
(** [await_exn t] is like [await t], but if the result is [Error ex] then it raises [ex]. *)
end
(** A fiber is a light-weight thread. *)
module Fiber : sig
(** Within a domain, only one fiber can be running at a time.
A fiber runs until it performs an IO operation (directly or indirectly).
At that point, it may be suspended and the next fiber on the run queue runs. *)
val both : (unit -> unit) -> (unit -> unit) -> unit
(** [both f g] runs [f ()] and [g ()] concurrently.
They run in a new cancellation sub-context, and
if either raises an exception, the other is cancelled.
[both] waits for both functions to finish even if one raises
(it will then re-raise the original exception).
[f] runs immediately, without switching to any other thread.
[g] is inserted at the head of the run-queue, so it runs next even if other threads are already enqueued.
You can get other scheduling orders by adding calls to {!yield} in various places.
e.g. to append both fibers to the end of the run-queue, yield immediately before calling [both].
If both fibers fail, {!Exn.combine} is used to combine the exceptions. *)
val pair : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b
(** [pair f g] is like [both], but returns the two results. *)
val all : (unit -> unit) list -> unit
(** [all fs] is like [both], but for any number of fibers.
[all []] returns immediately. *)
val first : (unit -> 'a) -> (unit -> 'a) -> 'a
(** [first f g] runs [f ()] and [g ()] concurrently.
They run in a new cancellation sub-context, and when one finishes the other is cancelled.
If one raises, the other is cancelled and the exception is reported.
As with [both], [f] runs immediately and [g] is scheduled next, ahead of any other queued work.
If both fibers fail, {!Exn.combine} is used to combine the exceptions. *)
val any : (unit -> 'a) list -> 'a
(** [any fs] is like [first], but for any number of fibers.
[any []] just waits forever (or until cancelled). *)
val await_cancel : unit -> 'a
(** [await_cancel ()] waits until cancelled.
@raise Cancel.Cancelled *)
val fork : sw:Switch.t -> (unit -> unit) -> unit
(** [fork ~sw fn] runs [fn ()] in a new fiber, but does not wait for it to complete.
The new fiber is attached to [sw] (which can't finish until the fiber ends).
The new fiber inherits [sw]'s cancellation context.
If the fiber raises an exception, [Switch.fail sw] is called.
If [sw] is already off then [fn] fails immediately, but the calling thread continues.
[fn] runs immediately, without switching to any other fiber first.
The calling fiber is placed at the head of the run queue, ahead of any previous items. *)
val fork_sub : sw:Switch.t -> on_error:(exn -> unit) -> (Switch.t -> unit) -> unit
(** [fork_sub ~sw ~on_error fn] is like [fork], but it creates a new sub-switch for the fiber.
This means that you can cancel the child switch without cancelling the parent.
This is a convenience function for running {!Switch.run} inside a {!fork}.
@param on_error This is called if the fiber raises an exception.
If it raises in turn, the parent switch is failed.
It is not called if the parent [sw] itself is cancelled. *)
val fork_on_accept :
on_handler_error:(exn -> unit) ->
sw:Switch.t ->
(Switch.t -> 'a) ->
(Switch.t -> 'a -> unit) ->
unit
(** [fork_on_accept ~sw accept handle ~on_handler_error] creates a new sub-switch [t].
It runs [accept t] in the current fiber and, on success, runs [handle t result] in a new fiber.
It is useful for e.g. accepting network connections,
where we need to provide a switch for the new client socket before we have forked,
but then move it to a child fiber later.
If [accept] raises an exception then the effect is the same as [Switch.run accept].
If [handle] raises an exception, it is passed to [on_handler_error].
If that raises in turn, the parent switch is failed.
[on_handler_error] is not called if the parent [sw] is itself cancelled. *)
val fork_promise : sw:Switch.t -> (unit -> 'a) -> 'a Promise.or_exn
(** [fork_promise ~sw fn] schedules [fn ()] to run in a new fiber and returns a promise for its result.
This is just a convenience wrapper around {!fork}.
If [fn] raises an exception then the promise is resolved to the error, but [sw] is not failed. *)
val check : unit -> unit
(** [check ()] checks that the fiber's context hasn't been cancelled.
Many operations automatically check this before starting.
@raise Cancel.Cancelled if the fiber's context has been cancelled. *)
val yield : unit -> unit
(** [yield ()] asks the scheduler to switch to the next runnable task.
The current task remains runnable, but goes to the back of the queue.
Automatically calls {!check} just before resuming. *)
end
(**/**)
module Fibre = Fiber [@@deprecated "Now spelt Fiber"]
(**/**)
(** A counting semaphore. *)
module Semaphore : sig
(** The API is based on OCaml's [Semaphore.Counting].
The difference is that when waiting for the semaphore this will switch to the next runnable fiber,
whereas the stdlib one will block the whole domain.
Semaphores are thread-safe and so can be shared between domains and used
to synchronise between them. *)
type t
(** The type of counting semaphores. *)
val make : int -> t
(** [make n] returns a new counting semaphore, with initial value [n].
The initial value [n] must be nonnegative.
@raise Invalid_argument if [n < 0] *)
val release : t -> unit
(** [release t] increments the value of semaphore [t].
If other fibers are waiting on [t], the one that has been waiting the longest is resumed.
@raise Sys_error if the value of the semaphore would overflow [max_int] *)
val acquire : t -> unit
(** [acquire t] blocks the calling fiber until the value of semaphore [t]
is not zero, then atomically decrements the value of [t] and returns. *)
val get_value : t -> int
(** [get_value t] returns the current value of semaphore [t]. *)
end
(** A stream/queue. *)
module Stream : sig
(** Reading from an empty queue will wait until an item is available.
Writing to a full queue will wait until there is space.
Example:
{[
let t = Stream.create 100 in
Stream.add t 1;
Stream.add t 2;
assert (Stream.take t = 1);
assert (Stream.take t = 2)
]}
Streams are thread-safe and so can be shared between domains and used
to communicate between them. *)
type 'a t
(** A queue of items of type ['a]. *)
val create : int -> 'a t
(** [create capacity] is a new stream which can hold up to [capacity] items without blocking writers.
- If [capacity = 0] then writes block until a reader is ready.
- If [capacity = 1] then this acts as a "mailbox".
- If [capacity = max_int] then the stream is effectively unbounded. *)
val add : 'a t -> 'a -> unit
(** [add t item] adds [item] to [t].
If this would take [t] over capacity, it blocks until there is space. *)
val take : 'a t -> 'a
(** [take t] takes the next item from the head of [t].
If no items are available, it waits until one becomes available. *)
val take_nonblocking : 'a t -> 'a option
(** [take_nonblocking t] is like [Some (take t)] except that
it returns [None] if the stream is empty rather than waiting.
Note that if another domain may add to the stream then a [None]
result may already be out-of-date by the time this returns. *)
val length : 'a t -> int
(** [length t] returns the number of items currently in [t]. *)
val is_empty : 'a t -> bool
(** [is_empty t] is [length t = 0]. *)
end
(** Cancelling fibers. *)
module Cancel : sig
(** This is the low-level interface to cancellation.
Every {!Switch} includes a cancellation context and most users will just use that API instead.
Each domain has a tree of cancellation contexts, and every fiber is registered with one context.
A fiber can switch to a different context (e.g. by calling {!sub}).
When a context is cancelled, all registered fibers have their current cancellation function (if any)
called and removed. Child contexts are cancelled too, recursively, unless marked as protected.
Many operations also check that the current context hasn't been cancelled,
so if a fiber is performing a non-cancellable operation it will still get cancelled soon afterwards.
This check is typically done when starting an operation, not at the end.
If an operation is cancelled after succeeding, but while still waiting on the run queue,
it will still return the operation's result.
A notable exception is {!Fiber.yield}, which checks at the end.
You can also use {!Fiber.check} to check manually.
Whether a fiber is cancelled through a cancellation function or by checking its context,
it will receive a {!Cancelled} exception.
It is possible the exception will get lost (if something catches it and forgets to re-raise).
It is also possible to get this exception even when not cancelled, for example by awaiting
a promise which another fiber has resolved to a cancelled exception.
When in doubt, use [Fiber.check ()] to find out if your fiber is really cancelled.
Ideally this should be done any time you have caught an exception and are planning to ignore it,
although if you forget then the next IO operation will typically abort anyway.
Quick clean-up actions (such as releasing a mutex or deleting a temporary file) are OK,
but operations that may block should be avoided.
For example, a network connection should simply be closed,
without attempting to send a goodbye message.
The purpose of the cancellation system is to stop fibers quickly, not to report errors.
Use {!Switch.fail} instead to record an error. *)
type t
(** A cancellation context. *)
exception Cancelled of exn
(** [Cancelled ex] indicates that the context was cancelled with exception [ex].
It is usually not necessary to report a [Cancelled] exception to the user,
as the original problem will be handled elsewhere.
The nested exception is only intended for debug-level logging and should generally be ignored. *)
exception Cancel_hook_failed of exn list
(** Raised by {!cancel} if any of the cancellation hooks themselves fail. *)
val sub : (t -> 'a) -> 'a
(** [sub fn] installs a new cancellation context [t], runs [fn t] inside it, and then restores the old context.
If the old context is cancelled while [fn] is running then [t] is cancelled too.
[t] cannot be used after [sub] returns. *)
val protect : (unit -> 'a) -> 'a
(** [protect fn] runs [fn] in a new cancellation context that isn't cancelled when its parent is.
This can be used to clean up resources on cancellation.
However, it is usually better to use {!Switch.on_release} (which calls this for you).
Note that [protect] does not check its parent context when it finishes. *)
val check : t -> unit
(** [check t] checks that [t] hasn't been cancelled.
@raise Cancelled If the context has been cancelled. *)
val get_error : t -> exn option
(** [get_error t] is like [check t] except that it returns the exception instead of raising it.
If [t] is finished, this returns (rather than raising) the [Invalid_argument] exception too. *)
val cancel : t -> exn -> unit
(** [cancel t ex] marks [t] and its child contexts as cancelled, recursively,
and calls all registered fibers' cancellation functions, passing [Cancelled ex] as the argument.
All cancellation functions are run, even if some of them raise exceptions.
If [t] is already cancelled then this does nothing.
Note that the caller of this function is still responsible for handling the error somehow
(e.g. reporting it to the user); it does not become the responsibility of the cancelled thread(s).
@raise Cancel_hook_failed if one or more hooks fail. *)
val dump : t Fmt.t
(** Show the cancellation sub-tree rooted at [t], for debugging. *)
end
(** Commonly used standard features. This module is intended to be [open]ed. *)
module Std : sig
module Promise = Promise
module Fiber = Fiber
(**/**)
module Fibre = Fiber [@@deprecated "Now spelt Fiber"]
(**/**)
module Switch = Switch
val traceln :
?__POS__:string * int * int * int ->
('a, Format.formatter, unit, unit) format4 -> 'a
(** Same as {!Eio.traceln}. *)
end
(** {1 Cross-platform OS API}
The general pattern here is that each type of resource has a set of functions for using it,
plus an object type to allow defining your own implementations.
To use the resources, it is recommended that you use the functions rather than calling
methods directly. Using the functions results in better error messages from the compiler,
and may provide extra features or sanity checks.
The system resources are available from the {!Stdenv.t} provided by your event loop
(e.g. {!Lwt_main.run}). *)
(** A base class for objects that can be queried at runtime for extra features. *)
module Generic : sig
type 'a ty = ..
(** An ['a ty] is a query for a feature of type ['a]. *)
class type t = object
method probe : 'a. 'a ty -> 'a option
end
val probe : #t -> 'a ty -> 'a option
(** [probe t feature] checks whether [t] supports [feature].
This is mostly for internal use.
For example, {!Eio_unix.FD.peek} uses this to get the underlying Unix file descriptor from a flow. *)
end
(** Byte streams. *)
module Flow : sig
(** Flows are used to represent byte streams, such as open files and network sockets.
A {!source} provides a stream of bytes. A {!sink} consumes a stream.
A {!two_way} can do both.
To read structured data (e.g. a line at a time), wrap a source using {!Buf_read}. *)
(** {2 Reading} *)
type read_method = ..
(** Sources can offer a list of ways to read them, in order of preference. *)
class virtual source : object
inherit Generic.t
method read_methods : read_method list
method virtual read_into : Cstruct.t -> int
end
val read : #source -> Cstruct.t -> int
(** [read src buf] reads one or more bytes into [buf].
It returns the number of bytes read (which may be less than the
buffer size even if there is more data to be read).
[read src] just makes a single call to [src#read_into]
(and asserts that the result is in range).
- Use {!read_exact} instead if you want to fill [buf] completely.
- Use {!Buf_read.line} to read complete lines.
- Use {!copy} to stream data directly from a source to a sink.
[buf] must not be zero-length.
@raise End_of_file if there is no more data to read *)
val read_exact : #source -> Cstruct.t -> unit
(** [read_exact src dst] keeps reading into [dst] until it is full.
@raise End_of_file if the buffer could not be filled. *)
val read_methods : #source -> read_method list
(** [read_methods flow] is a list of extra ways of reading from [flow],
with the preferred (most efficient) methods first.
If no method is suitable, {!read} should be used as the fallback. *)
val string_source : string -> source
(** [string_source s] is a source that gives the bytes of [s]. *)
val cstruct_source : Cstruct.t list -> source
(** [cstruct_source cs] is a source that gives the bytes of [cs]. *)
type read_method += Read_source_buffer of ((Cstruct.t list -> unit) -> unit)
(** If a source offers [Read_source_buffer rsb] then the user can call [rsb fn]
to borrow a view of the source's buffers.
[rsb] will raise [End_of_file] if no more data will be produced.
If no data is currently available, [rsb] will wait for some to become available before calling [fn].
[fn] must not continue to use the buffers after it returns. *)
(** {2 Writing} *)
(** Consumer base class. *)
class virtual sink : object
inherit Generic.t
method virtual copy : 'a. (#source as 'a) -> unit
end
val copy : #source -> #sink -> unit
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *)
val copy_string : string -> #sink -> unit
(** [copy_string s = copy (string_source s)] *)
val buffer_sink : Buffer.t -> sink
(** [buffer_sink b] is a sink that adds anything sent to it to [b]. *)
(** {2 Bidirectional streams} *)
type shutdown_command = [
| `Receive (** Indicate that no more reads will be done *)
| `Send (** Indicate that no more writes will be done *)
| `All (** Indicate that no more reads or writes will be done *)
]
class virtual two_way : object
inherit source
inherit sink
method virtual shutdown : shutdown_command -> unit
end
val shutdown : #two_way -> shutdown_command -> unit
(** [shutdown t cmd] indicates that the caller has finished reading or writing [t]
(depending on [cmd]).
This is useful in some protocols to indicate that you have finished sending the request,
and that the remote peer should now send the response. *)
(** {2 Closing}
Flows are usually attached to switches and closed automatically when the switch
finished. However, it can be useful to close them sooner manually in some cases. *)
class type close = object
method close : unit
end
val close : #close -> unit
(** [close t] marks the flow as closed. It can no longer be used after this. *)
end
(** Buffered input and parsing *)
module Buf_read : sig
(** This module provides fairly efficient non-backtracking parsers.
It is modelled on Angstrom's API, and you should use that if
backtracking is needed.
Example:
{[
let r = Buf_read.of_flow flow ~max_size:1_000_000 in
Buf_read.line r
]}
*)
type t
(** An input buffer. *)
exception Buffer_limit_exceeded
(** Raised if parsing an item would require enlarging the buffer beyond its configured limit. *)
type 'a parser = t -> 'a
(** An ['a parser] is a function that consumes and returns a value of type ['a].
@raise Failure The flow can't be parsed as a value of type ['a].
@raise End_of_file The flow ended without enough data to parse an ['a].
@raise Buffer_limit_exceeded Parsing the value would exceed the configured size limit. *)
val parse : ?initial_size:int -> max_size:int -> 'a parser -> #Flow.source -> ('a, [> `Msg of string]) result
(** [parse p flow ~max_size] uses [p] to parse everything in [flow].
It is a convenience function that does
{[
let buf = of_flow flow ~max_size in
format_errors (p <* eof) buf
]}
@param initial_size see {!of_flow}. *)
val parse_exn : ?initial_size:int -> max_size:int -> 'a parser -> #Flow.source -> 'a
(** [parse_exn] wraps {!parse}, but raises [Failure msg] if that returns [Error (`Msg msg)].
Catching exceptions with [parse] and then raising them might seem pointless,
but this has the effect of turning e.g. an [End_of_file] exception into a [Failure]
with a more user-friendly message. *)
val of_flow : ?initial_size:int -> max_size:int -> #Flow.source -> t
(** [of_flow ~max_size flow] is a buffered reader backed by [flow].
@param initial_size The initial amount of memory to allocate for the buffer.
@param max_size The maximum size to which the buffer may grow.
This must be large enough to hold the largest single item
you want to parse (e.g. the longest line, if using
{!line}), plus any terminator needed to know the value is
complete (e.g. the newline character(s)). This is just to
prevent a run-away input from consuming all memory, and
you can usually just set it much larger than you expect
to need. *)
val as_flow : t -> Flow.source
(** [as_flow t] is a buffered flow.
Reading from it will return data from the buffer,
only reading the underlying flow if the buffer is empty. *)
(** {2 Reading data} *)
val line : string parser
(** [line] parses one line.
Lines can be terminated by either LF or CRLF.
The returned string does not include the terminator.
If [End_of_file] is reached after seeing some data but before seeing a line
terminator, the data seen is returned as the last line. *)
val lines : string Seq.t parser
(** [lines] returns a sequence that lazily reads the next line until the end of the input is reached.
[lines = seq line ~stop:at_end_of_input] *)
val char : char -> unit parser
(** [char c] checks that the next byte is [c] and consumes it.
@raise Failure if the next byte is not [c] *)
val any_char : char parser
(** [any_char] parses one character. *)
val peek_char : char option parser
(** [peek_char] returns [Some c] where [c] is the next character, but does not consume it.
Returns [None] at the end of the input stream rather than raising [End_of_file]. *)
val string : string -> unit parser
(** [string s] checks that [s] is the next string in the stream and consumes it.
@raise Failure if [s] is not a prefix of the stream. *)
val take : int -> string parser
(** [take n] takes exactly [n] bytes from the input. *)
val take_all : string parser
(** [take_all] takes all remaining data until end-of-file.
Returns [""] if already at end-of-file.
@raise Buffer_limit_exceeded if the remaining data exceeds or equals the buffer limit
(it needs one extra byte to confirm it has reached end-of-file). *)
val take_while : (char -> bool) -> string parser
(** [take_while p] finds the first byte for which [p] is false
and consumes and returns all bytes before that.
If [p] is true for all remaining bytes, it returns everything until end-of-file.
It will return the empty string if there are no matching characters
(and therefore never raises [End_of_file]). *)
val skip_while : (char -> bool) -> unit parser
(** [skip_while p] skips zero or more bytes for which [p] is [true].
[skip_while p t] does the same thing as [ignore (take_while p t)],
except that it is not limited by the buffer size. *)
val skip : int -> unit parser
(** [skip n] discards the next [n] bytes.
[skip n] = [map ignore (take n)],
except that the number of skipped bytes may be larger than the buffer (it will not grow).
Note: if [End_of_file] is raised, all bytes in the stream will have been consumed. *)
val at_end_of_input : bool parser
(** [at_end_of_input] returns [true] when at the end of the stream, or
[false] if there is at least one more byte to be read. *)
val end_of_input : unit parser
(** [end_of_input] checks that there are no further bytes in the stream.
@raise Failure if there are further bytes *)
(** {2 Combinators} *)
val seq : ?stop:bool parser -> 'a parser -> 'a Seq.t parser
(** [seq p] is a sequence that uses [p] to get the next item.
A sequence node can only be used while the stream is at
the expected position, and will raise [Invalid_argument]
if any bytes have been consumed in the meantime. This
also means that each node can only be used once; use
{!Seq.memoize} to make the sequence persistent.
It is not necessary to consume all the elements of the
sequence.
@param stop This is used before parsing each item.
The sequence ends if this returns [true].
The default is {!at_end_of_input}. *)
val pair : 'a parser -> 'b parser -> ('a * 'b) parser
(** [pair a b] is a parser that first uses [a] to parse a value [x],
then uses [b] to parse a value [y], then returns [(x, y)].
Note that this module does not support backtracking, so if [b] fails
then the bytes consumed by [a] are lost. *)
val map : ('a -> 'b) -> ('a parser -> 'b parser)
(** [map f a] is a parser that parses the stream with [a] to get [v],
and then returns [f v]. *)
val bind : 'a parser -> ('a -> 'b parser) -> 'b parser
(** [bind a f] is a parser that first uses [a] to parse a value [v],
then uses [f v] to select the next parser, and then uses that. *)
val format_errors : 'a parser -> ('a, [> `Msg of string]) result parser
(** [format_errors p] catches [Failure], [End_of_file] and
[Buffer_limit_exceeded] exceptions and returns them as a formatted error message. *)
(** Convenient syntax for some of the combinators. *)
module Syntax : sig
val ( let+ ) : 'a parser -> ('a -> 'b) -> 'b parser
(** Syntax for {!map}. *)
val ( let* ) : 'a parser -> ('a -> 'b parser) -> 'b parser
(** Syntax for {!bind} *)
val ( and+ ) : 'a parser -> 'b parser -> ('a * 'b) parser
(** Syntax for {!pair} *)
val ( and* ) : 'a parser -> 'b parser -> ('a * 'b) parser
(** Syntax for {!pair} (same as [and+]). *)
val ( <* ) : 'a parser -> 'b parser -> 'a parser
(** [a <* b] is [map fst (pair a b)].
It parses two things and keeps only the first. *)
val ( *> ) : 'a parser -> 'b parser -> 'b parser
(** [a *> b] is [map snd (pair a b)].
It parses two things and keeps only the second. *)
end
(** {2 Low-level API} *)
val buffered_bytes : t -> int
(** [buffered_bytes t] is the number of bytes that can be read without
reading from the underlying flow. *)
val peek : t -> Cstruct.t
(** [peek t] returns a view onto the active part of [t]'s internal buffer.
Performing any operation that might add to the buffer may invalidate this,
so it should be used immediately and then forgotten.
[Cstruct.length (peek t) = buffered_bytes t]. *)
val ensure : t -> int -> unit
(** [ensure t n] ensures that the buffer contains at least [n] bytes of data.
If not, it reads from the flow until there is.
[buffered_bytes (ensure t n) >= n].
@raise End_of_file if the flow ended before [n] bytes were available
@raise Buffer_limit_exceeded if [n] exceeds the buffer's maximum size *)
val consume : t -> int -> unit
(** [consume t n] discards the first [n] bytes from [t]'s buffer.
Use this after {!peek} to mark some bytes as consumed.
[buffered_bytes t' = buffered_bytes t - n]
Note: unlike {!skip}, this will not read data from the underlying flow. *)
val consumed_bytes : t -> int
(** [consumed_bytes t] is the total number of bytes consumed.
i.e. it is the offset into the stream of the next byte to be parsed. *)
val eof_seen : t -> bool
(** [eof_seen t] indicates whether we've received [End_of_file] from the underlying flow.
If so, there will never be any further data beyond what [peek] already returns.
Note that this returns [false] if we're at the end of the stream but don't know it yet.
Use {!at_end_of_input} to be sure. *)
end
(** Networking. *)
module Net : sig
(** Example:
{[
let addr = `Tcp (Ipaddr.V4.loopback, 8080)
let http_get ~net ~stdout addr =
Switch.run @@ fun sw ->
let flow = Net.connect ~sw net addr in
Flow.copy_string "GET / HTTP/1.0\r\n\r\n" flow;
Flow.shutdown flow `Send;
Flow.copy flow stdout
]}
*)
exception Connection_reset of exn
(** IP addresses. *)
module Ipaddr : sig
type 'a t = private string
(** The raw bytes of the IP address.
It is either 4 bytes long (for an IPv4 address) or
16 bytes long (for IPv6). *)
(** IPv4 addresses. *)
module V4 : sig
val any : [> `V4] t
(** A special IPv4 address, for use only with [listen], representing
all the Internet addresses that the host machine possesses. *)
val loopback : [> `V4] t
(** A special IPv4 address representing the host machine ([127.0.0.1]). *)
end
(** IPv6 addresses. *)
module V6 : sig
val any : [> `V6] t
(** A special IPv6 address, for use only with [listen], representing
all the Internet addresses that the host machine possesses. *)
val loopback : [> `V6] t
(** A special IPv6 address representing the host machine ([::1]). *)
end
val pp : [< `V4 | `V6] t Fmt.t
(** [pp] formats IP addresses.
For IPv6 addresses, it follows {{:http://tools.ietf.org/html/rfc5952}}. *)
type v4v6 = [`V4 | `V6] t
val fold :
v4:([> `V4] t -> 'a) ->
v6:([> `V6] t -> 'a) ->
[< `V4 | `V6] t ->
'a
(** [fold ~v4 ~v6 t] is [v4 t] if [t] is an IPv4 address, or [v6 t] if it's an IPv6 address. *)
val of_raw : string -> v4v6
(** [of_raw addr] casts [addr] to an IP address.
@raise Invalid_argument if it is not 4 or 16 bytes long. *)
end
(** Network addresses. *)
module Sockaddr : sig
type stream = [
| `Unix of string
| `Tcp of Ipaddr.v4v6 * int
]
(** Socket addresses that we can build a {! Flow.two_way} for i.e. stream-oriented
protocols. *)
type datagram = [
| `Udp of Ipaddr.v4v6 * int
]
(** Socket addresses that are message-oriented. *)
type t = [ stream | datagram ]
val pp : Format.formatter -> [< t] -> unit
end
(** {2 Provider Interfaces} *)
class virtual listening_socket : object
inherit Generic.t
method virtual close : unit
method virtual accept : sw:Switch.t -> <Flow.two_way; Flow.close> * Sockaddr.stream
end
class virtual datagram_socket : object
method virtual send : Sockaddr.datagram -> Cstruct.t -> unit
method virtual recv : Cstruct.t -> Sockaddr.datagram * int
end
class virtual t : object
method virtual listen : reuse_addr:bool -> reuse_port:bool -> backlog:int -> sw:Switch.t -> Sockaddr.stream -> listening_socket
method virtual connect : sw:Switch.t -> Sockaddr.stream -> <Flow.two_way; Flow.close>
method virtual datagram_socket : sw:Switch.t -> Sockaddr.datagram -> datagram_socket
end
(** {2 Out-bound Connections} *)
val connect : sw:Switch.t -> #t -> Sockaddr.stream -> <Flow.two_way; Flow.close>
(** [connect ~sw t addr] is a new socket connected to remote address [addr].
The new socket will be closed when [sw] finishes, unless closed manually first. *)
(** {2 Incoming Connections} *)
val listen : ?reuse_addr:bool -> ?reuse_port:bool -> backlog:int -> sw:Switch.t -> #t -> Sockaddr.stream -> listening_socket
(** [listen ~sw ~backlog t addr] is a new listening socket bound to local address [addr].
The new socket will be closed when [sw] finishes, unless closed manually first.
For (non-abstract) Unix domain sockets, the path will be removed afterwards.
@param backlog The number of pending connections that can be queued up (see listen(2)).
@param reuse_addr Set the {!Unix.SO_REUSEADDR} socket option.
For Unix paths, also remove any stale left-over socket.
@param reuse_port Set the {!Unix.SO_REUSEPORT} socket option. *)
val accept :
sw:Switch.t ->
#listening_socket ->
<Flow.two_way; Flow.close> * Sockaddr.stream
(** [accept ~sw socket] waits until a new connection is ready on [socket] and returns it.
The new socket will be closed automatically when [sw] finishes, if not closed earlier.
If you want to handle multiple connections, consider using {!accept_sub} instead. *)
val accept_sub :
sw:Switch.t ->
#listening_socket ->
on_error:(exn -> unit) ->
(sw:Switch.t -> <Flow.two_way; Flow.close> -> Sockaddr.stream -> unit) ->
unit
(** [accept socket fn] accepts a connection and handles it in a new fiber.