Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow clients to request different flush behaviour #52

Merged
merged 12 commits into from
Sep 21, 2016
2 changes: 1 addition & 1 deletion _oasis
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Library mirage_block_unix
Path: lib
Findlibname: mirage-block-unix
Modules: Block
BuildDepends: cstruct (>= 1.3.0), cstruct.lwt, lwt, lwt.unix, mirage-types, logs
BuildDepends: cstruct (>= 1.3.0), cstruct.lwt, lwt, lwt.unix, mirage-types, logs, uri
CSources: odirect_stubs.c, blkgetsize_stubs.c, lseekhole_stubs.c, flush_stubs.c
CCOpt: -I $(pkg_lwt)

Expand Down
9 changes: 8 additions & 1 deletion _tags
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# OASIS_START
# DO NOT EDIT (digest: 2523786f1804452927ad7ed0a182fe0f)
# DO NOT EDIT (digest: b4dad6087354892e863ef19ab7d35982)
# Ignore VCS directories, you can use the same kind of rule outside
# OASIS_START/STOP if you want to exclude directories that contains
# useless stuff for the build process
Expand Down Expand Up @@ -28,30 +28,35 @@ true: annot, bin_annot
<lib/*.ml{,i,y}>: pkg_lwt
<lib/*.ml{,i,y}>: pkg_lwt.unix
<lib/*.ml{,i,y}>: pkg_mirage-types
<lib/*.ml{,i,y}>: pkg_uri
"lib/odirect_stubs.c": pkg_cstruct
"lib/odirect_stubs.c": pkg_cstruct.lwt
"lib/odirect_stubs.c": pkg_logs
"lib/odirect_stubs.c": pkg_lwt
"lib/odirect_stubs.c": pkg_lwt.unix
"lib/odirect_stubs.c": pkg_mirage-types
"lib/odirect_stubs.c": pkg_uri
"lib/blkgetsize_stubs.c": pkg_cstruct
"lib/blkgetsize_stubs.c": pkg_cstruct.lwt
"lib/blkgetsize_stubs.c": pkg_logs
"lib/blkgetsize_stubs.c": pkg_lwt
"lib/blkgetsize_stubs.c": pkg_lwt.unix
"lib/blkgetsize_stubs.c": pkg_mirage-types
"lib/blkgetsize_stubs.c": pkg_uri
"lib/lseekhole_stubs.c": pkg_cstruct
"lib/lseekhole_stubs.c": pkg_cstruct.lwt
"lib/lseekhole_stubs.c": pkg_logs
"lib/lseekhole_stubs.c": pkg_lwt
"lib/lseekhole_stubs.c": pkg_lwt.unix
"lib/lseekhole_stubs.c": pkg_mirage-types
"lib/lseekhole_stubs.c": pkg_uri
"lib/flush_stubs.c": pkg_cstruct
"lib/flush_stubs.c": pkg_cstruct.lwt
"lib/flush_stubs.c": pkg_logs
"lib/flush_stubs.c": pkg_lwt
"lib/flush_stubs.c": pkg_lwt.unix
"lib/flush_stubs.c": pkg_mirage-types
"lib/flush_stubs.c": pkg_uri
# Executable test
<lib_test/test.{native,byte}>: pkg_cstruct
<lib_test/test.{native,byte}>: pkg_cstruct.lwt
Expand All @@ -62,6 +67,7 @@ true: annot, bin_annot
<lib_test/test.{native,byte}>: pkg_lwt.unix
<lib_test/test.{native,byte}>: pkg_mirage-types
<lib_test/test.{native,byte}>: pkg_oUnit
<lib_test/test.{native,byte}>: pkg_uri
<lib_test/test.{native,byte}>: use_mirage_block_unix
<lib_test/*.ml{,i,y}>: pkg_cstruct
<lib_test/*.ml{,i,y}>: pkg_cstruct.lwt
Expand All @@ -72,6 +78,7 @@ true: annot, bin_annot
<lib_test/*.ml{,i,y}>: pkg_lwt.unix
<lib_test/*.ml{,i,y}>: pkg_mirage-types
<lib_test/*.ml{,i,y}>: pkg_oUnit
<lib_test/*.ml{,i,y}>: pkg_uri
<lib_test/*.ml{,i,y}>: use_mirage_block_unix
<lib_test/test.{native,byte}>: custom
# OASIS_STOP
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ environment:

install:
- appveyor DownloadFile https://raw.githubusercontent.com/ocaml/ocaml-ci-scripts/master/appveyor-opam.sh
- "%CYG_ROOT%\\setup-x86.exe -qnNdO -R %CYG_ROOT% -s http://cygwin.mirror.constant.com -l C:/cygwin/var/cache/setup -P rsync -P patch -P diffutils -P curl -P make -P unzip -P git -P m4 -P perl -P mingw64-x86_64-gcc-core"
- "%CYG_ROOT%\\setup-x86.exe -qnNdO -R %CYG_ROOT% -s http://cygwin.mirror.constant.com -l C:/cygwin/var/cache/setup -P rsync -P patch -P diffutils -P make -P unzip -P git -P m4 -P perl -P mingw64-x86_64-gcc-core"

build_script:
- "%CYG_BASH% '${APPVEYOR_BUILD_FOLDER}/appveyor-opam.sh'"
4 changes: 2 additions & 2 deletions lib/META
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# OASIS_START
# DO NOT EDIT (digest: 312a616af69e79921b12c9b4fe4c9c17)
# DO NOT EDIT (digest: ef967ecf0e377d8e40ec630b22bf2faf)
version = "2.3.0"
description = "Mirage block driver for Unix"
requires = "cstruct cstruct.lwt lwt lwt.unix mirage-types logs"
requires = "cstruct cstruct.lwt lwt lwt.unix mirage-types logs uri"
archive(byte) = "mirage_block_unix.cma"
archive(byte, plugin) = "mirage_block_unix.cma"
archive(native) = "mirage_block_unix.cmxa"
Expand Down
96 changes: 69 additions & 27 deletions lib/block.ml
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,47 @@ type info = {
size_sectors: int64;
}

module Config = struct
type t = {
buffered: bool;
sync: bool;
path: string;
}

let create ?(buffered = false) ?(sync = true) path =
{ buffered; sync; path }

let to_string t =
let query = [
"buffered", [ if t.buffered then "1" else "0" ];
"sync", [ if t.sync then "1" else "0" ];
] in
let u = Uri.make ~scheme:"file" ~path:t.path ~query () in
Uri.to_string u

let of_string x =
let u = Uri.of_string x in
match Uri.scheme u with
| Some "file" ->
let query = Uri.query u in
let buffered = try List.assoc "buffered" query = [ "1" ] with Not_found -> false in
let sync = try List.assoc "sync" query = [ "1" ] with Not_found -> false in
let path = Uri.(pct_decode @@ path u) in
`Ok { buffered; sync; path }
| _ ->
`Error (`Msg "Config.to_string expected a string of the form file://<path>?sync=(0|1)&buffered=(0|1)")
end

type t = {
mutable fd: Lwt_unix.file_descr option;
m: Lwt_mutex.t;
name: string;
mutable info: info;
use_fsync: bool;
config: Config.t;
use_fsync_after_write: bool;
use_fsync_on_flush: bool;
}

let id { name } = name
let to_config { config } = config

module Result = struct
type ('a, 'b) result = [
Expand Down Expand Up @@ -106,18 +138,8 @@ let get_file_size filename fd =
(`Unknown
(Printf.sprintf "get_file_size %s: neither a file nor a block device" filename))

(* prefix which signals we want to use buffered I/O *)
let buffered_prefix = "buffered:"

let remove_prefix prefix x =
let prefix' = String.length prefix and x' = String.length x in
if x' >= prefix' && (String.sub x 0 prefix' = prefix)
then true, String.sub x prefix' (x' - prefix')
else false, x

let connect name =
let buffered, name = remove_prefix buffered_prefix name in
let openfile, use_fsync = match buffered, is_win32 with
let of_config ({ Config.buffered; sync; path } as config) =
let openfile, use_fsync_after_write = match buffered, is_win32 with
| true, _ -> Raw.openfile_buffered, false
| false, false -> Raw.openfile_unbuffered, false
| false, true ->
Expand All @@ -128,10 +150,10 @@ let connect name =
try
let fd, read_write =
try
openfile name true 0o0, true
openfile path true 0o0, true
with _ ->
openfile name false 0o0, false in
match get_file_size name fd with
openfile path false 0o0, false in
match get_file_size path fd with
| `Error e ->
Unix.close fd;
return (`Error e)
Expand All @@ -140,10 +162,28 @@ let connect name =
let size_sectors = Int64.(div x (of_int sector_size)) in
let fd = Lwt_unix.of_unix_file_descr fd in
let m = Lwt_mutex.create () in
return (`Ok { fd = Some fd; m; name; info = { sector_size; size_sectors; read_write }; use_fsync })
let use_fsync_on_flush = sync in
return (`Ok { fd = Some fd; m; info = { sector_size; size_sectors; read_write };
config; use_fsync_after_write; use_fsync_on_flush })
with e ->
Log.err (fun f -> f "connect %s: failed to open file" name);
return (`Error (`Unknown (Printf.sprintf "connect %s: failed to open file" name)))
Log.err (fun f -> f "connect %s: failed to open file" path);
return (`Error (`Unknown (Printf.sprintf "connect %s: failed to open file" path)))

(* prefix which signals we want to use buffered I/O *)
let buffered_prefix = "buffered:"

let remove_prefix prefix x =
let prefix' = String.length prefix and x' = String.length x in
if x' >= prefix' && (String.sub x 0 prefix' = prefix)
then true, String.sub x prefix' (x' - prefix')
else false, x

let connect ?buffered ?sync name =
let legacy_buffered, path = remove_prefix buffered_prefix name in
(* Keep support for the legacy buffered: prefix until version 3.x.y *)
let buffered = if legacy_buffered then Some true else buffered in
let config = Config.create ?buffered ?sync name in
of_config config

let disconnect t = match t.fd with
| Some fd ->
Expand Down Expand Up @@ -172,17 +212,17 @@ let lwt_wrap_exn t op offset ?buffer f =
| Some b ->
let len = Cstruct.len b in
if len mod t.info.sector_size <> 0
then fatalf "%s: buffer length (%d) is not a multiple of sector_size (%d) for file %s" op len t.info.sector_size t.name
then fatalf "%s: buffer length (%d) is not a multiple of sector_size (%d) for file %s" op len t.info.sector_size t.config.Config.path
else Lwt.return (`Ok ())
) >>*= fun () ->
Lwt.catch f
(function
| End_of_file ->
fatalf "%s: End_of_file at file %s offset %Ld %s" op t.name offset (describe_buffer buffer)
fatalf "%s: End_of_file at file %s offset %Ld %s" op t.config.Config.path offset (describe_buffer buffer)
| Unix.Unix_error(code, fn, arg) ->
fatalf "%s: %s in %s '%s' at file %s offset %Ld %s" op (Unix.error_message code) fn arg t.name offset (describe_buffer buffer)
fatalf "%s: %s in %s '%s' at file %s offset %Ld %s" op (Unix.error_message code) fn arg t.config.Config.path offset (describe_buffer buffer)
| e ->
fatalf "%s: %s at file %s offset %Ld %s" op (Printexc.to_string e) t.name offset (describe_buffer buffer)
fatalf "%s: %s at file %s offset %Ld %s" op (Printexc.to_string e) t.config.Config.path offset (describe_buffer buffer)
)

let rec read x sector_start buffers = match buffers with
Expand Down Expand Up @@ -236,7 +276,7 @@ let rec write x sector_start buffers = match buffers with
really_write fd b
end
) >>= fun () ->
( if x.use_fsync then Lwt_unix.fsync fd else Lwt.return () )
( if x.use_fsync_after_write then Lwt_unix.fsync fd else Lwt.return () )
>>= fun () ->
return (`Ok ())
) >>= function
Expand Down Expand Up @@ -272,7 +312,9 @@ let flush t =
| Some fd ->
lwt_wrap_exn t "fsync" 0L
(fun () ->
Lwt_unix.run_job (flush_job (Lwt_unix.unix_file_descr fd))
( if t.use_fsync_on_flush
then Lwt_unix.run_job (flush_job (Lwt_unix.unix_file_descr fd))
else Lwt.return_unit )
>>= fun () ->
return (`Ok ())
)
Expand Down
32 changes: 31 additions & 1 deletion lib/block.mli
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,31 @@ val blkgetsize: string -> Unix.file_descr -> [ `Ok of int64 | `Error of error ]
given by [fd]. [path] is only used to construct a human-readable error
message. *)

val connect : string -> [`Ok of t | `Error of error] io
module Config: sig
type t = {
buffered: bool; (** true if I/O hits the OS disk caches, false if "direct" *)
sync: bool; (** true if [flush] flushes all caches, including disk drive caches *)
path: string; (** path to the underlying file *)
}
(** Configuration of a device *)

val create: ?buffered:bool -> ?sync:bool -> string -> t
(** [create ?buffered ?sync path] constructs a configuration referencing the
file stored at [path]/ *)

val to_string: t -> string
(** Marshal a config into a string of the form
file://<path>?sync=(0|1)&buffered=(0|1) *)

val of_string: string -> [ `Ok of t | `Error of [ `Msg of string ] ]
(** Parse the result of a previous [to_string] invocation *)
end

val connect : ?buffered:bool -> ?sync:bool -> string -> [`Ok of t | `Error of error] io
(** [connect ?buffered ?sync path] connects to a block device on the filesystem
at [path]. By default I/O is unbuffered and fully synchronous. These defaults
can be changed by supplying the optional arguments [~buffered:true] and
[~sync:false] *)

val resize : t -> int64 -> [ `Ok of unit | `Error of error ] io
(** [resize t new_size_sectors] attempts to resize the connected device
Expand All @@ -51,3 +75,9 @@ val seek_mapped: t -> int64 -> [ `Ok of int64 | `Error of error ] io
(** [seek_mapped t start] returns the sector offset of the next regoin of the
device which may have data in it (typically this is the next mapped
region) *)

val to_config: t -> Config.t
(** [to_config t] returns the configuration of a device *)

val of_config: Config.t -> [ `Ok of t | `Error of error ] io
(** [of_config config] creates a fresh device from [config] *)
15 changes: 15 additions & 0 deletions lib_test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,19 @@ let test_flush () =
) in
Lwt_main.run t

let test_parse_print_config config =
let open Block.Config in
let s = to_string config in
Printf.sprintf "test parse(print(x)) == x for %s" s
>:: (fun () ->
match of_string s with
| `Error (`Msg m) -> failwith m
| `Ok config' ->
assert_equal ~printer:string_of_bool config.buffered config'.buffered;
assert_equal ~printer:string_of_bool config.sync config'.sync;
assert_equal ~printer:(fun x -> x) config.path config'.path;
)

let not_implemented_on_windows = [
"test resize" >:: test_resize;
]
Expand All @@ -214,6 +227,8 @@ let tests = [
*)
"test read/write after last sector" >:: test_eof;
"test flush" >:: test_flush;
test_parse_print_config { Block.Config.buffered = true; sync = false; path = "C:\\cygwin" };
test_parse_print_config { Block.Config.buffered = false; sync = true; path = "/var/tmp/foo.qcow2" };
"test write then read" >:: test_write_read;
"test that writes fail if the buffer has a bad length" >:: test_buffer_wrong_length;
] @ (if Sys.os_type <> "Win32" then not_implemented_on_windows else [])
Expand Down
Loading