-
Notifications
You must be signed in to change notification settings - Fork 547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LMDB storage #16274
Merged
Merged
LMDB storage #16274
Changes from 5 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
e47152a
Revert "Revert "Auxiliary commit to revert individual files from 9fb9…
georgeee 18843fc
Revert "Auxiliary commit to revert individual files from ba6c6000f3d1…
dkijania 053a80e
reformat and compilation fixes
dkijania 2f41b00
fix write_many_blocks
dkijania bfda6b5
Revert changes to block.ml
georgeee eee5e19
remove non-generic modules
dkijania e48c65d
Update src/lib/lmdb_storage/conv.ml
dkijania d16c53a
Update src/lib/lmdb_storage/conv.ml
dkijania 3fb53b9
Update src/lib/lmdb_storage/conv.ml
dkijania e065473
reformat
dkijania f651f7f
fix is_int_size function
dkijania cf82964
remove unused deps
dkijania a018089
Merge branch 'compatible' into dkijania/cache_keys_with_lmdb
dkijania f17659e
remove special case for os with 4 bits
dkijania File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,215 @@ | ||
(* Only show stdout for failed inline tests. *) | ||
open Inline_test_quiet_logs | ||
open Core_kernel | ||
|
||
module F (Db : Generic.Db) = struct | ||
type holder = | ||
{ statuses : (Consensus.Body_reference.t, int) Db.t | ||
; blocks : (Blake2.t, Bigstring.t) Db.t | ||
} | ||
|
||
let mk_maps { Db.create } = | ||
let open Conv in | ||
let blocks = create blake2 Lmdb.Conv.bigstring in | ||
let statuses = create blake2 uint8 ~name:"status" in | ||
{ statuses; blocks } | ||
|
||
let config = { Generic.default_config with initial_mmap_size = 256 lsl 20 } | ||
end | ||
|
||
module Storage = Generic.Read_only (F) | ||
|
||
type t = Storage.t * Storage.holder | ||
|
||
module Root_block_status = struct | ||
type t = Partial | Full | Deleting [@@deriving enum, equal] | ||
end | ||
|
||
let body_tag = Mina_net2.Bitswap_tag.(to_enum Body) | ||
|
||
let full_status = Root_block_status.to_enum Full | ||
|
||
let create = Storage.create | ||
|
||
let get_status ~logger ((env, { statuses; _ }) : t) body_ref = | ||
let%bind.Option raw_status = Storage.get ~env statuses body_ref in | ||
let res = Root_block_status.of_enum raw_status in | ||
if Option.is_none res then | ||
[%log error] "Unexpected status $status for $body_reference" | ||
~metadata: | ||
[ ("status", `Int raw_status) | ||
; ("body_reference", Consensus.Body_reference.to_yojson body_ref) | ||
] ; | ||
res | ||
|
||
let read_body_impl find_block root_ref = | ||
let%bind.Or_error raw_root_block = | ||
Option.value_map | ||
~f:(fun x -> Ok x) | ||
~default: | ||
(Or_error.error_string | ||
(sprintf "root block %s not found" @@ Blake2.to_hex root_ref) ) | ||
(find_block root_ref) | ||
in | ||
let%bind.Or_error root_links, root_data = | ||
Staged_ledger_diff.Bitswap_block.parse_block ~hash:root_ref raw_root_block | ||
in | ||
let%bind.Or_error () = | ||
if Bigstring.length root_data < 5 then | ||
Or_error.error_string | ||
@@ sprintf "Couldn't read root block for %s: data section is too short" | ||
@@ Consensus.Body_reference.to_hex root_ref | ||
else Ok () | ||
in | ||
let len = Bigstring.get_uint32_le root_data ~pos:0 - 1 in | ||
let%bind.Or_error () = | ||
let raw_tag = Bigstring.get_uint8 root_data ~pos:4 in | ||
if body_tag = raw_tag then Ok () | ||
else | ||
Or_error.error_string | ||
@@ sprintf "Unexpected tag %s for block %s" (Int.to_string raw_tag) | ||
(Consensus.Body_reference.to_hex root_ref) | ||
in | ||
let buf = Bigstring.create len in | ||
let pos = ref (Bigstring.length root_data - 5) in | ||
Bigstring.blit ~src:root_data ~src_pos:5 ~dst:buf ~dst_pos:0 ~len:!pos ; | ||
let q = Queue.create () in | ||
Queue.enqueue_all q root_links ; | ||
let%map.Or_error () = | ||
Staged_ledger_diff.Bitswap_block.iter_links q | ||
~report_chunk:(fun data -> | ||
Bigstring.blit ~src:data ~src_pos:0 ~dst:buf ~dst_pos:!pos | ||
~len:(Bigstring.length data) ; | ||
pos := !pos + Bigstring.length data ) | ||
~find_block | ||
in | ||
Staged_ledger_diff.Body.Stable.Latest.bin_read_t buf ~pos_ref:(ref 0) | ||
|
||
let read_body ((env, { statuses; blocks }) : t) body_ref = | ||
Storage.with_txn env ~f:(fun { get; _ } -> | ||
if Option.equal Int.equal (get statuses body_ref) (Some full_status) then | ||
read_body_impl (get blocks) body_ref | ||
|> Result.map_error ~f:(fun e -> `Invalid_structure e) | ||
else Error `Non_full ) | ||
|> function None -> Error `Tx_failed | Some res -> res | ||
|
||
let%test_module "Block storage tests" = | ||
( module struct | ||
open Full_frontier.For_tests | ||
open Async_kernel | ||
open Frontier_base | ||
|
||
let () = | ||
Backtrace.elide := false ; | ||
Async.Scheduler.set_record_backtraces true | ||
|
||
let logger = Logger.create () | ||
|
||
let verifier = verifier () | ||
|
||
let with_helper ~writer f = | ||
let handle_push_message _ msg = | ||
( match msg with | ||
| Libp2p_ipc.Reader.DaemonInterface.PushMessage.ResourceUpdated m -> ( | ||
let open Libp2p_ipc.Reader.DaemonInterface.ResourceUpdate in | ||
match (type_get m, ids_get_list m) with | ||
| Added, [ id_ ] -> | ||
let id = Libp2p_ipc.Reader.RootBlockId.blake2b_hash_get id_ in | ||
Pipe.write_without_pushback writer id | ||
| _ -> | ||
() ) | ||
| _ -> | ||
() ) ; | ||
Deferred.unit | ||
in | ||
let open Mina_net2.For_tests in | ||
Helper.test_with_libp2p_helper ~logger ~handle_push_message | ||
(fun conf_dir helper -> | ||
let%bind me = generate_random_keypair helper in | ||
let maddr = | ||
multiaddr_to_libp2p_ipc | ||
@@ Mina_net2.Multiaddr.of_string "/ip4/127.0.0.1/tcp/12878" | ||
in | ||
let libp2p_config = | ||
Libp2p_ipc.create_libp2p_config | ||
~private_key:(Mina_net2.Keypair.secret me) | ||
~statedir:conf_dir ~listen_on:[ maddr ] ~external_multiaddr:maddr | ||
~network_id:"s" ~unsafe_no_trust_ip:true ~flood:false | ||
~direct_peers:[] ~seed_peers:[] ~known_private_ip_nets:[] | ||
~peer_exchange:true ~peer_protection_ratio:0.2 ~min_connections:20 | ||
~max_connections:40 ~validation_queue_size:250 | ||
~gating_config:empty_libp2p_ipc_gating_config ?metrics_port:None | ||
~topic_config:[] () | ||
in | ||
let%bind _ = | ||
Helper.do_rpc helper | ||
(module Libp2p_ipc.Rpcs.Configure) | ||
(Libp2p_ipc.Rpcs.Configure.create_request ~libp2p_config) | ||
>>| Or_error.ok_exn | ||
in | ||
f conf_dir helper ) | ||
|
||
let send_and_receive ~helper ~reader ~db breadcrumb = | ||
let body = Breadcrumb.block breadcrumb |> Mina_block.body in | ||
let body_ref = | ||
Staged_ledger_diff.Body.compute_reference | ||
~tag:Mina_net2.Bitswap_tag.(to_enum Body) | ||
body | ||
in | ||
let data = | ||
Staged_ledger_diff.Body.to_binio_bigstring body |> Bigstring.to_string | ||
in | ||
[%log info] "Sending add resource" ; | ||
Mina_net2.For_tests.Helper.send_add_resource | ||
~tag:Mina_net2.Bitswap_tag.Body ~data helper ; | ||
[%log info] "Waiting for push message" ; | ||
let%map id_ = Pipe.read reader in | ||
let id = match id_ with `Ok a -> a | _ -> failwith "unexpected" in | ||
[%log info] "Push message received" ; | ||
[%test_eq: String.t] (Consensus.Body_reference.to_raw_string body_ref) id ; | ||
[%test_eq: | ||
( Mina_block.Body.t | ||
, [ `Invalid_structure of Error.t | `Non_full | `Tx_failed ] ) | ||
Result.t] (Ok body) (read_body db body_ref) | ||
|
||
let%test_unit "Write many blocks" = | ||
let n = 300 in | ||
Quickcheck.test (gen_breadcrumb ~verifier ()) ~trials:1 | ||
~f:(fun make_breadcrumb -> | ||
let frontier = create_frontier () in | ||
let root = Full_frontier.root frontier in | ||
let reader, writer = Pipe.create () in | ||
with_helper ~writer (fun conf_dir helper -> | ||
let db = | ||
create (String.concat ~sep:"/" [ conf_dir; "block-db" ]) | ||
in | ||
let%bind () = | ||
make_breadcrumb root >>= send_and_receive ~db ~helper ~reader | ||
in | ||
Quickcheck.test | ||
(String.gen_with_length 1000 | ||
(* increase to 1000000 to reach past mmap size of 256 MiB*) | ||
Base_quickcheck.quickcheck_generator_char ) ~trials:n | ||
~f:(fun data -> | ||
Mina_net2.For_tests.Helper.send_add_resource | ||
~tag:Mina_net2.Bitswap_tag.Body ~data helper ) ; | ||
match%bind Pipe.read_exactly reader ~num_values:n with | ||
| `Exactly _ -> | ||
make_breadcrumb root >>= send_and_receive ~db ~helper ~reader | ||
| _ -> | ||
failwith "unexpected" ) ; | ||
clean_up_persistent_root ~frontier ) | ||
|
||
let%test_unit "Write a block body to db and read it" = | ||
Quickcheck.test (gen_breadcrumb ~verifier ()) ~trials:4 | ||
~f:(fun make_breadcrumb -> | ||
let frontier = create_frontier () in | ||
let root = Full_frontier.root frontier in | ||
let reader, writer = Pipe.create () in | ||
with_helper ~writer (fun conf_dir helper -> | ||
let db = | ||
create (String.concat ~sep:"/" [ conf_dir; "block-db" ]) | ||
in | ||
make_breadcrumb root >>= send_and_receive ~db ~helper ~reader ) ; | ||
clean_up_persistent_root ~frontier ) | ||
end ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
open Core_kernel | ||
|
||
type t | ||
|
||
module Root_block_status : sig | ||
type t = Partial | Full | Deleting [@@deriving enum, equal] | ||
end | ||
|
||
val create : string -> t | ||
|
||
val get_status : | ||
logger:Logger.t | ||
-> t | ||
-> Consensus.Body_reference.t | ||
-> Root_block_status.t option | ||
|
||
val read_body : | ||
t | ||
-> Consensus.Body_reference.t | ||
-> ( Mina_block.Body.t | ||
, [> `Invalid_structure of Error.t | `Non_full | `Tx_failed ] ) | ||
Result.t |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
open Core_kernel | ||
|
||
let uint32_be = | ||
Lmdb.Conv.make | ||
~flags:Lmdb.Conv.Flags.(integer_key + integer_dup + dup_fixed) | ||
dkijania marked this conversation as resolved.
Show resolved
Hide resolved
|
||
~serialise:(fun alloc x -> | ||
let a = alloc 4 in | ||
Bigstring.set_uint32_be_exn a ~pos:0 x ; | ||
a ) | ||
~deserialise:(Bigstring.get_uint32_be ~pos:0) | ||
() | ||
|
||
let uint8 = | ||
Lmdb.Conv.make | ||
~flags:Lmdb.Conv.Flags.(integer_key + integer_dup + dup_fixed) | ||
dkijania marked this conversation as resolved.
Show resolved
Hide resolved
|
||
~serialise:(fun alloc x -> | ||
let a = alloc 1 in | ||
Bigstring.set_uint8_exn a ~pos:0 x ; | ||
a ) | ||
~deserialise:(Bigstring.get_uint8 ~pos:0) | ||
() | ||
|
||
let blake2 = | ||
Lmdb.Conv.( | ||
make | ||
~serialise:(fun alloc x -> | ||
let str = Blake2.to_raw_string x in | ||
serialise string alloc str ) | ||
~deserialise:(fun s -> deserialise string s |> Blake2.of_raw_string) | ||
()) | ||
|
||
let bin_prot_conv (t : 'a Bin_prot.Type_class.t) = | ||
Lmdb.Conv.( | ||
make | ||
dkijania marked this conversation as resolved.
Show resolved
Hide resolved
|
||
~serialise:(fun alloc x -> | ||
let sz = t.writer.size x in | ||
let res = alloc sz in | ||
let _pos = t.writer.write ~pos:0 res in | ||
res ) | ||
~deserialise: | ||
(let pos_ref = ref 0 in | ||
t.reader.read ~pos_ref ) | ||
()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
(library | ||
(name lmdb_storage) | ||
(public_name lmdb_storage) | ||
(libraries | ||
;; opam libraries | ||
async | ||
base58 | ||
base64 | ||
capnp | ||
digestif | ||
stdio | ||
core | ||
libp2p_ipc | ||
yojson | ||
async_kernel | ||
core_kernel | ||
bin_prot.shape | ||
ppx_inline_test.config | ||
async_unix | ||
sexplib0 | ||
base.caml | ||
base.base_internalhash_types | ||
splittable_random | ||
lmdb | ||
integers | ||
ppx_version.runtime | ||
;; local libraries | ||
blake2 | ||
error_json | ||
child_processes | ||
file_system | ||
logger | ||
network_peer | ||
pipe_lib | ||
timeout_lib | ||
mina_metrics | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some of these dependencies are redundant to be sure |
||
o1trace | ||
staged_ledger_diff | ||
consensus | ||
mina_net2 | ||
mina_base | ||
transition_frontier_base | ||
mina_block | ||
transition_frontier_full_frontier | ||
mina_numbers | ||
data_hash_lib | ||
;; test deps | ||
inline_test_quiet_logs | ||
) | ||
(inline_tests (flags -verbose -show-counts)) | ||
(instrumentation (backend bisect_ppx)) | ||
(preprocess (pps ppx_mina ppx_version ppx_jane ppx_deriving.std ppx_let ppx_deriving_yojson))) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is how
int32_be
defined inNotice the difference in flags. Specification of LMDB prescribes a certain pattern of usage for the flags: http://www.lmdb.tech/doc/group__mdb.html.
It seems like the check for
Sys.big_endian && is_int_size 4
might actually be needed to ensure the code behaves well under all circumstances.