Skip to content

Commit

Permalink
Fix handling of very long IO vectors
Browse files Browse the repository at this point in the history
`Flow.write` doesn't place any limit on how long the list of vectors can
be, but real operating systems do have limits and will fail if given too
many.

Also, Eio_posix was allocating the array on the stack, which could fail
if it was very large.
  • Loading branch information
talex5 committed Dec 4, 2023
1 parent 211279e commit b388d67
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 32 deletions.
19 changes: 14 additions & 5 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,23 @@ let _fast_copy_try_splice src dst =

(* XXX workaround for issue #319, PR #327 *)
let fast_copy_try_splice src dst = fast_copy src dst

let[@tail_mod_cons] rec list_take n = function
| [] -> []
| x :: xs ->
if n = 0 then []
else x :: list_take (n - 1) xs

let truncate_to_iomax xs =
if List.compare_length_with xs Uring.iov_max <= 0 then xs
else list_take Uring.iov_max xs

(* Copy using the [Read_source_buffer] optimisation.
Avoids a copy if the source already has the data. *)
let copy_with_rsb rsb dst =
let write xs = Low_level.writev_single dst (truncate_to_iomax xs) in
try
while true do
rsb (Low_level.writev_single dst)
done
while true do rsb write done
with End_of_file -> ()

(* Copy by allocating a chunk from the pre-shared buffer and asking
Expand Down Expand Up @@ -161,11 +170,11 @@ module Flow = struct
Low_level.readv ~file_offset t bufs

let pwrite t ~file_offset bufs =
Low_level.writev_single ~file_offset t bufs
Low_level.writev_single ~file_offset t (truncate_to_iomax bufs)

let read_methods = []

let single_write t bufs = Low_level.writev_single t bufs
let single_write t bufs = Low_level.writev_single t (truncate_to_iomax bufs)

let copy t ~src =
match Eio_unix.Resource.fd_opt src with
Expand Down
51 changes: 31 additions & 20 deletions lib_eio_posix/eio_posix_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <caml/bigarray.h>
#include <caml/socketaddr.h>
#include <caml/custom.h>
#include <caml/fail.h>

#include "fork_action.h"

Expand Down Expand Up @@ -66,9 +67,16 @@ CAMLprim value caml_eio_posix_getrandom(value v_ba, value v_off, value v_len) {
CAMLreturn(Val_long(ret));
}

/* Fill [iov] with pointers to the cstructs in the array [v_bufs]. */
static void fill_iov(struct iovec *iov, value v_bufs) {
/* Allocates an array of C iovecs using the cstructs in the array [v_bufs]. */
static struct iovec *alloc_iov(value v_bufs) {
struct iovec *iov;
int n_bufs = Wosize_val(v_bufs);

if (n_bufs == 0) return NULL;
iov = caml_stat_calloc_noexc(n_bufs, sizeof(struct iovec));
if (iov == NULL)
caml_raise_out_of_memory();

for (int i = 0; i < n_bufs; i++) {
value v_cs = Field(v_bufs, i);
value v_ba = Field(v_cs, 0);
Expand All @@ -77,17 +85,18 @@ static void fill_iov(struct iovec *iov, value v_bufs) {
iov[i].iov_base = (uint8_t *)Caml_ba_data_val(v_ba) + Long_val(v_off);
iov[i].iov_len = Long_val(v_len);
}
return iov;
}

CAMLprim value caml_eio_posix_readv(value v_fd, value v_bufs) {
CAMLparam1(v_bufs);
ssize_t r;
int n_bufs = Wosize_val(v_bufs);
struct iovec iov[n_bufs];

fill_iov(iov, v_bufs);
struct iovec *iov;

iov = alloc_iov(v_bufs);
r = readv(Int_val(v_fd), iov, n_bufs);
caml_stat_free_preserving_errno(iov);
if (r < 0) uerror("readv", Nothing);

CAMLreturn(Val_long(r));
Expand All @@ -97,11 +106,11 @@ CAMLprim value caml_eio_posix_writev(value v_fd, value v_bufs) {
CAMLparam1(v_bufs);
ssize_t r;
int n_bufs = Wosize_val(v_bufs);
struct iovec iov[n_bufs];

fill_iov(iov, v_bufs);
struct iovec *iov;

iov = alloc_iov(v_bufs);
r = writev(Int_val(v_fd), iov, n_bufs);
caml_stat_free_preserving_errno(iov);
if (r < 0) uerror("writev", Nothing);

CAMLreturn(Val_long(r));
Expand All @@ -111,11 +120,11 @@ CAMLprim value caml_eio_posix_preadv(value v_fd, value v_bufs, value v_offset) {
CAMLparam2(v_bufs, v_offset);
ssize_t r;
int n_bufs = Wosize_val(v_bufs);
struct iovec iov[n_bufs];

fill_iov(iov, v_bufs);
struct iovec *iov;

iov = alloc_iov(v_bufs);
r = preadv(Int_val(v_fd), iov, n_bufs, Int63_val(v_offset));
caml_stat_free_preserving_errno(iov);
if (r < 0) uerror("preadv", Nothing);

CAMLreturn(Val_long(r));
Expand All @@ -125,11 +134,11 @@ CAMLprim value caml_eio_posix_pwritev(value v_fd, value v_bufs, value v_offset)
CAMLparam2(v_bufs, v_offset);
ssize_t r;
int n_bufs = Wosize_val(v_bufs);
struct iovec iov[n_bufs];

fill_iov(iov, v_bufs);
struct iovec *iov;

iov = alloc_iov(v_bufs);
r = pwritev(Int_val(v_fd), iov, n_bufs, Int63_val(v_offset));
caml_stat_free_preserving_errno(iov);
if (r < 0) uerror("pwritev", Nothing);

CAMLreturn(Val_long(r));
Expand Down Expand Up @@ -402,12 +411,11 @@ CAMLprim value caml_eio_posix_send_msg(value v_fd, value v_n_fds, value v_fds, v
CAMLparam3(v_fds, v_dst_opt, v_bufs);
int n_bufs = Wosize_val(v_bufs);
int n_fds = Int_val(v_n_fds);
struct iovec iov[n_bufs];
union sock_addr_union dst_addr;
struct iovec *iov;
int controllen = n_fds > 0 ? CMSG_SPACE(sizeof(int) * n_fds) : 0;
char cmsg[controllen];
struct msghdr msg = {
.msg_iov = iov,
.msg_iovlen = n_bufs,
.msg_control = n_fds > 0 ? cmsg : NULL,
.msg_controllen = controllen,
Expand All @@ -421,12 +429,14 @@ CAMLprim value caml_eio_posix_send_msg(value v_fd, value v_n_fds, value v_fds, v
msg.msg_name = &dst_addr;
}

fill_iov(iov, v_bufs);
iov = alloc_iov(v_bufs);
msg.msg_iov = iov;
fill_fds(&msg, n_fds, v_fds);

caml_enter_blocking_section();
r = sendmsg(Int_val(v_fd), &msg, 0);
caml_leave_blocking_section();
caml_stat_free_preserving_errno(iov);
if (r < 0) uerror("send_msg", Nothing);

CAMLreturn(Val_long(r));
Expand Down Expand Up @@ -474,14 +484,13 @@ CAMLprim value caml_eio_posix_recv_msg(value v_fd, value v_max_fds, value v_bufs
CAMLlocal2(v_result, v_addr);
int max_fds = Int_val(v_max_fds);
int n_bufs = Wosize_val(v_bufs);
struct iovec iov[n_bufs];
struct iovec *iov;
union sock_addr_union source_addr;
int controllen = max_fds > 0 ? CMSG_SPACE(sizeof(int) * max_fds) : 0;
char cmsg[controllen];
struct msghdr msg = {
.msg_name = &source_addr,
.msg_namelen = sizeof(source_addr),
.msg_iov = iov,
.msg_iovlen = n_bufs,
.msg_control = max_fds > 0 ? cmsg : NULL,
.msg_controllen = controllen,
Expand All @@ -490,11 +499,13 @@ CAMLprim value caml_eio_posix_recv_msg(value v_fd, value v_max_fds, value v_bufs

memset(cmsg, 0, controllen);

fill_iov(iov, v_bufs);
iov = alloc_iov(v_bufs);
msg.msg_iov = iov;

caml_enter_blocking_section();
r = recvmsg(Int_val(v_fd), &msg, 0);
caml_leave_blocking_section();
caml_stat_free_preserving_errno(iov);
if (r < 0) uerror("recv_msg", Nothing);

v_addr = safe_caml_unix_alloc_sockaddr(&source_addr, msg.msg_namelen, -1);
Expand Down
29 changes: 24 additions & 5 deletions lib_eio_posix/flow.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@ let eio_of_stat x =
ctime = float_of_time (Low_level.ctime_sec x) (Low_level.ctime_nsec x);
}

let truncate_to_iomax xs =
let rec count i = function
| [] -> i
| _ when i = Config.iov_max -> Config.iov_max
| _ :: xs -> count (i + 1) xs
in
let len = count 0 xs in
let arr = Array.make len Cstruct.empty in
let rec fill i xs =
if i = len then arr
else (
match xs with
| x :: xs ->
Array.set arr i x;
fill (i + 1) xs
| [] -> assert false
) in
fill 0 xs

module Impl = struct
type tag = [`Generic | `Unix]

Expand All @@ -41,7 +60,7 @@ module Impl = struct

let single_write t bufs =
try
Low_level.writev t (Array.of_list bufs)
Low_level.writev t (truncate_to_iomax bufs)
with Unix.Unix_error (code, name, arg) ->
raise (Err.wrap code name arg)

Expand All @@ -66,17 +85,17 @@ module Impl = struct
let read_methods = []

let pread t ~file_offset bufs =
let got = Low_level.preadv ~file_offset t (Array.of_list bufs) in
let got = Low_level.preadv ~file_offset t (truncate_to_iomax bufs) in
if got = 0 then raise End_of_file
else got

let pwrite t ~file_offset bufs = Low_level.pwritev ~file_offset t (Array.of_list bufs)
let pwrite t ~file_offset bufs = Low_level.pwritev ~file_offset t (truncate_to_iomax bufs)

let send_msg t ~fds data =
Low_level.send_msg ~fds t (Array.of_list data)
Low_level.send_msg ~fds t (truncate_to_iomax data)

let recv_msg_with_fds t ~sw ~max_fds data =
let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds (Array.of_list data) in
let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds (truncate_to_iomax data) in
n, fds

let seek = Low_level.lseek
Expand Down
6 changes: 4 additions & 2 deletions lib_eio_posix/include/discover.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ let optional_flags = [

let () =
C.main ~name:"discover" (fun c ->
let c_flags = ["-D_LARGEFILE64_SOURCE"] in
let c_flags = ["-D_LARGEFILE64_SOURCE"; "-D_XOPEN_SOURCE=700"; "-D_DARWIN_C_SOURCE"] in
let includes = ["sys/types.h"; "sys/stat.h"; "fcntl.h"] in
let extra_flags, missing_defs =
C.C_define.import c ~c_flags ~includes
Expand All @@ -22,7 +22,7 @@ let () =
in
let present_defs =
C.C_define.import c ~c_flags
~includes:["sys/types.h"; "sys/stat.h"; "fcntl.h"]
~includes:["sys/types.h"; "sys/stat.h"; "fcntl.h"; "limits.h"]
C.C_define.Type.(extra_flags @ [
"O_RDONLY", Int;
"O_RDWR", Int;
Expand All @@ -41,6 +41,8 @@ let () =

"AT_FDCWD", Int;
"AT_SYMLINK_NOFOLLOW", Int;

"IOV_MAX", Int;
])
|> List.map (function
| name, C.C_define.Value.Int v when List.mem name optional_flags ->
Expand Down
24 changes: 24 additions & 0 deletions tests/flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,27 @@ Make sure we don't crash on SIGPIPE:
+Connection_reset (good)
- : unit = ()
```

## IO_MAX

Sending a very long vector over a flow should just send it in chunks, not fail:

```ocaml
# Eio_main.run @@ fun env ->
Switch.run @@ fun sw ->
let r, w = Eio_unix.pipe sw in
let a = Cstruct.of_string "abc" in
let vecs = List.init 10_000 (Fun.const a) in
Fiber.both
(fun () ->
Eio.Flow.write w vecs;
Eio.Flow.close w
)
(fun () ->
let got = Eio.Flow.read_all r in
traceln "Read %d bytes" (String.length got);
assert (got = Cstruct.to_string (Cstruct.concat vecs))
)
+Read 30000 bytes
- : unit = ()
```

0 comments on commit b388d67

Please sign in to comment.