From fa1c32cac443c87e7312d1ee91718d5786134ab7 Mon Sep 17 00:00:00 2001 From: georgeee Date: Thu, 5 Dec 2024 11:30:20 +0000 Subject: [PATCH 1/7] Remove constraint_constants from currency_consumed This parameter is not used. --- src/lib/network_pool/indexed_pool.ml | 67 +++++++------------ src/lib/network_pool/indexed_pool.mli | 3 +- .../network_pool/test/indexed_pool_tests.ml | 16 ++--- 3 files changed, 29 insertions(+), 57 deletions(-) diff --git a/src/lib/network_pool/indexed_pool.ml b/src/lib/network_pool/indexed_pool.ml index 733e53de6d6..1dbc0c6ec6e 100644 --- a/src/lib/network_pool/indexed_pool.ml +++ b/src/lib/network_pool/indexed_pool.ml @@ -66,11 +66,8 @@ let config t = t.config (* Compute the total currency required from the sender to execute a command. Returns None in case of overflow. *) -let currency_consumed_unchecked : - constraint_constants:Genesis_constants.Constraint_constants.t - -> User_command.t - -> Currency.Amount.t option = - fun ~constraint_constants:_ cmd -> +let currency_consumed_unchecked : User_command.t -> Currency.Amount.t option = + fun cmd -> let fee_amt = Currency.Amount.of_fee @@ User_command.fee cmd in let open Currency.Amount in let amt = @@ -88,17 +85,14 @@ let currency_consumed_unchecked : in fee_amt + amt -let currency_consumed ~constraint_constants cmd = - currency_consumed_unchecked ~constraint_constants +let currency_consumed cmd = + currency_consumed_unchecked (Transaction_hash.User_command_with_valid_signature.command cmd) let currency_consumed' : - constraint_constants:Genesis_constants.Constraint_constants.t - -> User_command.t - -> (Currency.Amount.t, Command_error.t) Result.t = - fun ~constraint_constants cmd -> - cmd - |> currency_consumed_unchecked ~constraint_constants + User_command.t -> (Currency.Amount.t, Command_error.t) Result.t = + fun cmd -> + cmd |> currency_consumed_unchecked |> Result.of_option ~error:Command_error.Overflow module For_tests = struct @@ -180,7 +174,6 @@ module For_tests = struct nonce ; let consumed = currency_consumed_unchecked - ~constraint_constants:pool.config.constraint_constants (Transaction_hash.User_command_with_valid_signature.command cmd ) |> Option.value_exn @@ -568,14 +561,13 @@ let run : it. Called from revalidate and remove_lowest_fee, and when replacing transactions. *) let remove_with_dependents_exn : - constraint_constants:_ - -> Transaction_hash.User_command_with_valid_signature.t + Transaction_hash.User_command_with_valid_signature.t -> Sender_local_state.t ref -> ( Transaction_hash.User_command_with_valid_signature.t Sequence.t , Update.single , _ ) Writer_result.t = - fun ~constraint_constants (* ({ constraint_constants; _ } as t) *) cmd state -> + fun cmd state -> let unchecked = Transaction_hash.User_command_with_valid_signature.command cmd in @@ -604,7 +596,7 @@ let remove_with_dependents_exn : Option.value_exn (* safe because we check for overflow when we add commands. *) (let open Option.Let_syntax in - let%bind consumed = currency_consumed ~constraint_constants cmd' in + let%bind consumed = currency_consumed cmd' in Currency.Amount.(consumed + acc)) ) Currency.Amount.zero drop_queue in @@ -642,11 +634,7 @@ let run' t cmd x = x let remove_with_dependents_exn' t cmd = - match - run' t cmd - (remove_with_dependents_exn - ~constraint_constants:t.config.constraint_constants cmd ) - with + match run' t cmd (remove_with_dependents_exn cmd) with | Ok x -> x | Error _ -> @@ -655,14 +643,13 @@ let remove_with_dependents_exn' t cmd = (** Drop commands from the end of the queue until the total currency consumed is <= the current balance. *) let drop_until_sufficient_balance : - constraint_constants:Genesis_constants.Constraint_constants.t - -> Transaction_hash.User_command_with_valid_signature.t F_sequence.t + Transaction_hash.User_command_with_valid_signature.t F_sequence.t * Currency.Amount.t -> Currency.Amount.t -> Transaction_hash.User_command_with_valid_signature.t F_sequence.t * Currency.Amount.t * Transaction_hash.User_command_with_valid_signature.t Sequence.t = - fun ~constraint_constants (queue, currency_reserved) current_balance -> + fun (queue, currency_reserved) current_balance -> let rec go queue' currency_reserved' dropped_so_far = if Currency.Amount.(currency_reserved' <= current_balance) then (queue', currency_reserved', dropped_so_far) @@ -674,9 +661,7 @@ let drop_until_sufficient_balance : sufficient balance" (F_sequence.unsnoc queue') in - let consumed = - Option.value_exn (currency_consumed ~constraint_constants liat) - in + let consumed = Option.value_exn (currency_consumed liat) in go daeh (Option.value_exn Currency.Amount.(currency_reserved' - consumed)) (Sequence.append dropped_so_far @@ Sequence.singleton liat) @@ -693,7 +678,7 @@ let revalidate : -> [ `Entire_pool | `Subset of Account_id.Set.t ] -> (Account_id.t -> Account.t) -> t * Transaction_hash.User_command_with_valid_signature.t Sequence.t = - fun ({ config = { constraint_constants; _ }; _ } as t) ~logger scope f -> + fun t ~logger scope f -> let requires_revalidation = match scope with | `Entire_pool -> @@ -770,14 +755,12 @@ let revalidate : F_sequence.foldl (fun c cmd -> Option.value_exn - Currency.Amount.( - c - - Option.value_exn - (currency_consumed ~constraint_constants cmd)) ) + Currency.Amount.(c - Option.value_exn (currency_consumed cmd)) + ) currency_reserved drop_queue in let keep_queue', currency_reserved'', dropped_for_balance = - drop_until_sufficient_balance ~constraint_constants + drop_until_sufficient_balance (keep_queue, currency_reserved') current_balance in @@ -898,7 +881,7 @@ module Add_from_gossip_exn (M : Writer_result.S) = struct , Command_error.t ) M.t = fun ~config: - ( { constraint_constants + ( { constraint_constants = _ ; consensus_constants ; time_controller ; slot_tx_end @@ -929,9 +912,7 @@ module Add_from_gossip_exn (M : Writer_result.S) = struct Result.Let_syntax.( (* C5 *) let%bind () = check_expiry config unchecked in - let%bind consumed = - currency_consumed' ~constraint_constants unchecked - in + let%bind consumed = currency_consumed' unchecked in let%map () = (* TODO: Proper exchange rate mechanism. *) let fee_token = User_command.fee_token unchecked in @@ -1058,7 +1039,7 @@ module Add_from_gossip_exn (M : Writer_result.S) = struct (* C3 *) in let%bind dropped = - remove_with_dependents_exn ~constraint_constants + remove_with_dependents_exn (F_sequence.head_exn drop_queue) by_sender |> M.lift @@ -1159,7 +1140,7 @@ let add_from_backtrack : -> Transaction_hash.User_command_with_valid_signature.t -> (t, Command_error.t) Result.t = fun ( { config = - { constraint_constants + { constraint_constants = _ ; consensus_constants ; time_controller ; slot_tx_end @@ -1186,9 +1167,7 @@ let add_from_backtrack : let fee_payer = User_command.fee_payer unchecked in let fee_per_wu = User_command.fee_per_wu unchecked in let cmd_hash = Transaction_hash.User_command_with_valid_signature.hash cmd in - let consumed = - Option.value_exn (currency_consumed ~constraint_constants cmd) - in + let consumed = Option.value_exn (currency_consumed cmd) in match Map.find t.all_by_sender fee_payer with | None -> { all_by_sender = diff --git a/src/lib/network_pool/indexed_pool.mli b/src/lib/network_pool/indexed_pool.mli index d2a25ebb0eb..172bf1ed6c6 100644 --- a/src/lib/network_pool/indexed_pool.mli +++ b/src/lib/network_pool/indexed_pool.mli @@ -151,7 +151,6 @@ module For_tests : sig Account_id.Map.t val currency_consumed : - constraint_constants:Genesis_constants.Constraint_constants.t - -> Transaction_hash.User_command_with_valid_signature.t + Transaction_hash.User_command_with_valid_signature.t -> Currency.Amount.t option end diff --git a/src/lib/network_pool/test/indexed_pool_tests.ml b/src/lib/network_pool/test/indexed_pool_tests.ml index 042175ada96..c3c33586d45 100644 --- a/src/lib/network_pool/test/indexed_pool_tests.ml +++ b/src/lib/network_pool/test/indexed_pool_tests.ml @@ -35,8 +35,7 @@ let singleton_properties () = (Amount.of_nanomina_int_exn 500) in if - Option.value_exn (currency_consumed ~constraint_constants cmd) - |> Amount.to_nanomina_int > 500 + Option.value_exn (currency_consumed cmd) |> Amount.to_nanomina_int > 500 then match add_res with | Error (Insufficient_funds _) -> @@ -214,9 +213,7 @@ let replacement () = ~common:(fun c -> { c with fee = Amount.to_fee fee }) ~body:(fun b -> { b with amount }) in - let consumed = - Option.value_exn (currency_consumed ~constraint_constants cmd') - in + let consumed = Option.value_exn (currency_consumed cmd') in let%map rest = go (Account_nonce.succ current_nonce) @@ -293,7 +290,7 @@ let replacement () = ~f:(fun consumed_so_far cmd -> Option.value_exn Option.( - currency_consumed ~constraint_constants cmd + currency_consumed cmd >>= fun consumed -> Amount.(consumed + consumed_so_far)) ) in assert (Amount.(currency_consumed_pre_replace <= init_balance)) ; @@ -301,12 +298,9 @@ let replacement () = Option.value_exn (let open Option.Let_syntax in let%bind replaced_currency_consumed = - currency_consumed ~constraint_constants - @@ List.nth_exn setup_cmds replaced_idx - in - let%bind replacer_currency_consumed = - currency_consumed ~constraint_constants replace_cmd + currency_consumed @@ List.nth_exn setup_cmds replaced_idx in + let%bind replacer_currency_consumed = currency_consumed replace_cmd in let%bind a = Amount.(currency_consumed_pre_replace - replaced_currency_consumed) in From 3e405df9a102f6c2b33e8a129313cfb468d2cc6e Mon Sep 17 00:00:00 2001 From: georgeee Date: Thu, 5 Dec 2024 12:14:43 +0000 Subject: [PATCH 2/7] Refactor revalidate: compute key intersection This makes code a bit more readable by decreasing the nestedness. --- src/lib/network_pool/indexed_pool.ml | 218 +++++++++++++-------------- 1 file changed, 107 insertions(+), 111 deletions(-) diff --git a/src/lib/network_pool/indexed_pool.ml b/src/lib/network_pool/indexed_pool.ml index 1dbc0c6ec6e..3ea4914122c 100644 --- a/src/lib/network_pool/indexed_pool.ml +++ b/src/lib/network_pool/indexed_pool.ml @@ -678,132 +678,128 @@ let revalidate : -> [ `Entire_pool | `Subset of Account_id.Set.t ] -> (Account_id.t -> Account.t) -> t * Transaction_hash.User_command_with_valid_signature.t Sequence.t = - fun t ~logger scope f -> + fun t ~logger scope get_account_by_id -> let requires_revalidation = match scope with | `Entire_pool -> - Fn.const true + t.all_by_sender | `Subset subset -> - Set.mem subset + (* intersection of scope and all_by_sender *) + Account_id.Map.merge t.all_by_sender + (Account_id.Set.to_map subset ~f:(const ())) + ~f:(fun ~key:_ -> function `Both (v, ()) -> Some v | _ -> None) in - Map.fold t.all_by_sender ~init:(t, Sequence.empty) + Map.fold requires_revalidation ~init:(t, Sequence.empty) ~f:(fun ~key:sender ~data:(queue, currency_reserved) ((t', dropped_acc) as acc) -> - if not (requires_revalidation sender) then acc + let account : Account.t = get_account_by_id sender in + let current_balance = + Currency.Balance.to_amount + (Account.liquid_balance_at_slot + ~global_slot:(global_slot_since_genesis t.config) + account ) + in + [%log debug] + "Revalidating account $account in transaction pool ($account_nonce, \ + $account_balance)" + ~metadata: + [ ("account", `String (Sexp.to_string @@ Account_id.sexp_of_t sender)) + ; ("account_nonce", `Int (Account_nonce.to_int account.nonce)) + ; ( "account_balance" + , `String (Currency.Amount.to_mina_string current_balance) ) + ] ; + let first_cmd = F_sequence.head_exn queue in + let first_nonce = + first_cmd |> Transaction_hash.User_command_with_valid_signature.command + |> User_command.applicable_at_nonce + in + if + not + ( Account.has_permission_to_send account + && Account.has_permission_to_increment_nonce account ) + then ( + [%log debug] "Account no longer has permission to send; dropping queue" ; + let dropped, t'' = remove_with_dependents_exn' t first_cmd in + (t'', Sequence.append dropped_acc dropped) ) + else if Account_nonce.(account.nonce < first_nonce) then ( + [%log debug] + "Current account nonce precedes first nonce in queue; dropping queue" ; + let dropped, t'' = remove_with_dependents_exn' t first_cmd in + (t'', Sequence.append dropped_acc dropped) ) else - let account : Account.t = f sender in - let current_balance = - Currency.Balance.to_amount - (Account.liquid_balance_at_slot - ~global_slot:(global_slot_since_genesis t.config) - account ) + (* current_nonce >= first_nonce *) + let first_applicable_nonce_index = + F_sequence.findi queue ~f:(fun cmd' -> + let nonce = + Transaction_hash.User_command_with_valid_signature.command cmd' + |> User_command.applicable_at_nonce + in + Account_nonce.equal nonce account.nonce ) + |> Option.value ~default:(F_sequence.length queue) in [%log debug] - "Revalidating account $account in transaction pool ($account_nonce, \ - $account_balance)" - ~metadata: - [ ( "account" - , `String (Sexp.to_string @@ Account_id.sexp_of_t sender) ) - ; ("account_nonce", `Int (Account_nonce.to_int account.nonce)) - ; ( "account_balance" - , `String (Currency.Amount.to_mina_string current_balance) ) - ] ; - let first_cmd = F_sequence.head_exn queue in - let first_nonce = - first_cmd - |> Transaction_hash.User_command_with_valid_signature.command - |> User_command.applicable_at_nonce + "Current account nonce succeeds first nonce in queue; splitting \ + queue at $index" + ~metadata:[ ("index", `Int first_applicable_nonce_index) ] ; + let drop_queue, keep_queue = + F_sequence.split_at queue first_applicable_nonce_index in - if - not - ( Account.has_permission_to_send account - && Account.has_permission_to_increment_nonce account ) - then ( - [%log debug] - "Account no longer has permission to send; dropping queue" ; - let dropped, t'' = remove_with_dependents_exn' t first_cmd in - (t'', Sequence.append dropped_acc dropped) ) - else if Account_nonce.(account.nonce < first_nonce) then ( - [%log debug] - "Current account nonce precedes first nonce in queue; dropping \ - queue" ; - let dropped, t'' = remove_with_dependents_exn' t first_cmd in - (t'', Sequence.append dropped_acc dropped) ) - else - (* current_nonce >= first_nonce *) - let first_applicable_nonce_index = - F_sequence.findi queue ~f:(fun cmd' -> - let nonce = - Transaction_hash.User_command_with_valid_signature.command - cmd' - |> User_command.applicable_at_nonce - in - Account_nonce.equal nonce account.nonce ) - |> Option.value ~default:(F_sequence.length queue) - in - [%log debug] - "Current account nonce succeeds first nonce in queue; splitting \ - queue at $index" - ~metadata:[ ("index", `Int first_applicable_nonce_index) ] ; - let drop_queue, keep_queue = - F_sequence.split_at queue first_applicable_nonce_index - in - let currency_reserved' = - F_sequence.foldl - (fun c cmd -> - Option.value_exn - Currency.Amount.(c - Option.value_exn (currency_consumed cmd)) - ) - currency_reserved drop_queue - in - let keep_queue', currency_reserved'', dropped_for_balance = - drop_until_sufficient_balance - (keep_queue, currency_reserved') - current_balance - in - let to_drop = - Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance - in - match Sequence.next to_drop with - | None -> - acc - | Some (head, tail) -> - let t'' = - Sequence.fold tail - ~init: - (remove_all_by_fee_and_hash_and_expiration_exn - (remove_applicable_exn t' head) - head ) - ~f:remove_all_by_fee_and_hash_and_expiration_exn - in - let t''' = - match F_sequence.uncons keep_queue' with - | None -> - { t'' with - all_by_sender = Map.remove t''.all_by_sender sender - } - | Some (first_kept, _) -> - let first_kept_unchecked = - Transaction_hash.User_command_with_valid_signature.command + let currency_reserved' = + F_sequence.foldl + (fun c cmd -> + Option.value_exn + Currency.Amount.(c - Option.value_exn (currency_consumed cmd)) + ) + currency_reserved drop_queue + in + let keep_queue', currency_reserved'', dropped_for_balance = + drop_until_sufficient_balance + (keep_queue, currency_reserved') + current_balance + in + let to_drop = + Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance + in + match Sequence.next to_drop with + | None -> + acc + | Some (head, tail) -> + let t'' = + Sequence.fold tail + ~init: + (remove_all_by_fee_and_hash_and_expiration_exn + (remove_applicable_exn t' head) + head ) + ~f:remove_all_by_fee_and_hash_and_expiration_exn + in + let t''' = + match F_sequence.uncons keep_queue' with + | None -> + { t'' with + all_by_sender = Map.remove t''.all_by_sender sender + } + | Some (first_kept, _) -> + let first_kept_unchecked = + Transaction_hash.User_command_with_valid_signature.command + first_kept + in + { t'' with + all_by_sender = + Map.set t''.all_by_sender ~key:sender + ~data:(keep_queue', currency_reserved'') + ; applicable_by_fee = + Map_set.insert + ( module Transaction_hash + .User_command_with_valid_signature ) + t''.applicable_by_fee + (User_command.fee_per_wu first_kept_unchecked) first_kept - in - { t'' with - all_by_sender = - Map.set t''.all_by_sender ~key:sender - ~data:(keep_queue', currency_reserved'') - ; applicable_by_fee = - Map_set.insert - ( module Transaction_hash - .User_command_with_valid_signature ) - t''.applicable_by_fee - (User_command.fee_per_wu first_kept_unchecked) - first_kept - } - in - (t''', Sequence.append dropped_acc to_drop) ) + } + in + (t''', Sequence.append dropped_acc to_drop) ) let expired_by_global_slot (t : t) : Transaction_hash.User_command_with_valid_signature.t Sequence.t = From d82727bc5026225ea9ae7b3aa74e3b816a44f274 Mon Sep 17 00:00:00 2001 From: georgeee Date: Thu, 5 Dec 2024 13:14:26 +0000 Subject: [PATCH 3/7] Change the order in drop_until_sufficient_balance Dropped sequence was returned in reverse order, then concatenated to a sequence in straight order. This is not causing any immediate issues, but is better for clarity. --- src/lib/network_pool/indexed_pool.ml | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/lib/network_pool/indexed_pool.ml b/src/lib/network_pool/indexed_pool.ml index 3ea4914122c..29bb2fa9f32 100644 --- a/src/lib/network_pool/indexed_pool.ml +++ b/src/lib/network_pool/indexed_pool.ml @@ -641,7 +641,11 @@ let remove_with_dependents_exn' t cmd = failwith "remove_with_dependents_exn" (** Drop commands from the end of the queue until the total currency consumed is - <= the current balance. *) + <= the current balance. + + Returns the prefix of a queue, updated currency reserved and sequence of + dropped transactions in the same order they appear in queue. + *) let drop_until_sufficient_balance : Transaction_hash.User_command_with_valid_signature.t F_sequence.t * Currency.Amount.t @@ -654,17 +658,17 @@ let drop_until_sufficient_balance : if Currency.Amount.(currency_reserved' <= current_balance) then (queue', currency_reserved', dropped_so_far) else - let daeh, liat = + let init, last = Option.value_exn ~message: "couldn't drop any more transactions when trying to preserve \ sufficient balance" (F_sequence.unsnoc queue') in - let consumed = Option.value_exn (currency_consumed liat) in - go daeh + let consumed = Option.value_exn (currency_consumed last) in + go init (Option.value_exn Currency.Amount.(currency_reserved' - consumed)) - (Sequence.append dropped_so_far @@ Sequence.singleton liat) + (Sequence.shift_right dropped_so_far last) in go queue currency_reserved Sequence.empty @@ -760,6 +764,7 @@ let revalidate : (keep_queue, currency_reserved') current_balance in + (* NB: to_drop is ordered by nonce *) let to_drop = Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance in From e1b6a06acbb4f3bda0e47c2ad1f1b461abaa985e Mon Sep 17 00:00:00 2001 From: georgeee Date: Thu, 5 Dec 2024 14:16:28 +0000 Subject: [PATCH 4/7] Fix two bugs in revalidate 1. Rewrite revalidate to enhance readability 2. Fix two similar issues originating from confusion between previous variable names `t` and `t'` ("Account no longer has permission to send" and "Current account nonce precedes first nonce in queue") 3. Fix the issue #16397 by ensuring removal from `applicable_by_fee` is done only for the previous head of queue. --- src/lib/network_pool/indexed_pool.ml | 114 ++++++++++++++------------- 1 file changed, 60 insertions(+), 54 deletions(-) diff --git a/src/lib/network_pool/indexed_pool.ml b/src/lib/network_pool/indexed_pool.ml index 29bb2fa9f32..7385880babd 100644 --- a/src/lib/network_pool/indexed_pool.ml +++ b/src/lib/network_pool/indexed_pool.ml @@ -682,29 +682,24 @@ let revalidate : -> [ `Entire_pool | `Subset of Account_id.Set.t ] -> (Account_id.t -> Account.t) -> t * Transaction_hash.User_command_with_valid_signature.t Sequence.t = - fun t ~logger scope get_account_by_id -> + fun t_initial ~logger scope get_account_by_id -> let requires_revalidation = match scope with | `Entire_pool -> - t.all_by_sender + t_initial.all_by_sender | `Subset subset -> (* intersection of scope and all_by_sender *) - Account_id.Map.merge t.all_by_sender + Account_id.Map.merge t_initial.all_by_sender (Account_id.Set.to_map subset ~f:(const ())) ~f:(fun ~key:_ -> function `Both (v, ()) -> Some v | _ -> None) in - Map.fold requires_revalidation ~init:(t, Sequence.empty) - ~f:(fun - ~key:sender - ~data:(queue, currency_reserved) - ((t', dropped_acc) as acc) - -> + let global_slot = global_slot_since_genesis t_initial.config in + Map.fold requires_revalidation ~init:(t_initial, Sequence.empty) + ~f:(fun ~key:sender ~data:(queue, currency_reserved) (t, dropped_acc) -> let account : Account.t = get_account_by_id sender in let current_balance = Currency.Balance.to_amount - (Account.liquid_balance_at_slot - ~global_slot:(global_slot_since_genesis t.config) - account ) + (Account.liquid_balance_at_slot ~global_slot account) in [%log debug] "Revalidating account $account in transaction pool ($account_nonce, \ @@ -726,13 +721,13 @@ let revalidate : && Account.has_permission_to_increment_nonce account ) then ( [%log debug] "Account no longer has permission to send; dropping queue" ; - let dropped, t'' = remove_with_dependents_exn' t first_cmd in - (t'', Sequence.append dropped_acc dropped) ) + let dropped, t_updated = remove_with_dependents_exn' t first_cmd in + (t_updated, Sequence.append dropped_acc dropped) ) else if Account_nonce.(account.nonce < first_nonce) then ( [%log debug] "Current account nonce precedes first nonce in queue; dropping queue" ; - let dropped, t'' = remove_with_dependents_exn' t first_cmd in - (t'', Sequence.append dropped_acc dropped) ) + let dropped, t_updated = remove_with_dependents_exn' t first_cmd in + (t_updated, Sequence.append dropped_acc dropped) ) else (* current_nonce >= first_nonce *) let first_applicable_nonce_index = @@ -759,52 +754,63 @@ let revalidate : ) currency_reserved drop_queue in + (* NB: dropped_for_balance is ordered by nonce *) let keep_queue', currency_reserved'', dropped_for_balance = drop_until_sufficient_balance (keep_queue, currency_reserved') current_balance in - (* NB: to_drop is ordered by nonce *) let to_drop = Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance in - match Sequence.next to_drop with - | None -> - acc - | Some (head, tail) -> - let t'' = - Sequence.fold tail - ~init: - (remove_all_by_fee_and_hash_and_expiration_exn - (remove_applicable_exn t' head) - head ) - ~f:remove_all_by_fee_and_hash_and_expiration_exn - in - let t''' = - match F_sequence.uncons keep_queue' with - | None -> - { t'' with - all_by_sender = Map.remove t''.all_by_sender sender - } - | Some (first_kept, _) -> - let first_kept_unchecked = - Transaction_hash.User_command_with_valid_signature.command - first_kept - in - { t'' with - all_by_sender = - Map.set t''.all_by_sender ~key:sender - ~data:(keep_queue', currency_reserved'') - ; applicable_by_fee = - Map_set.insert - ( module Transaction_hash - .User_command_with_valid_signature ) - t''.applicable_by_fee - (User_command.fee_per_wu first_kept_unchecked) - first_kept - } - in - (t''', Sequence.append dropped_acc to_drop) ) + (* t with all_by_sender and applicable_by_fee fields updated *) + let t_partially_updated = + match + ( F_sequence.uncons drop_queue + , Sequence.hd dropped_for_balance + , F_sequence.uncons keep_queue' ) + with + | None, None, _ -> + (* Nothing dropped, nothing needs to be updated *) + t + | Some (first_dropped, _), _, None | None, Some first_dropped, None -> + (* We drop the entire queue, first element needs to be removed from + applicable_by_fee *) + let t' = remove_applicable_exn t first_dropped in + { t' with all_by_sender = Map.remove t'.all_by_sender sender } + | None, _, Some _ -> + (* We drop only some transactions from the end of queue, keeping + the head untouched, no need to update applicable_by_fee *) + { t with + all_by_sender = + Map.set t.all_by_sender ~key:sender + ~data:(keep_queue', currency_reserved'') + } + | Some (first_dropped, _), _, Some (first_kept, _) -> + (* We need to replace old queue head with the new queue head + in applicable_by_fee *) + let first_kept_unchecked = + Transaction_hash.User_command_with_valid_signature.command + first_kept + in + let t' = remove_applicable_exn t first_dropped in + { t' with + all_by_sender = + Map.set t'.all_by_sender ~key:sender + ~data:(keep_queue', currency_reserved'') + ; applicable_by_fee = + Map_set.insert + (module Transaction_hash.User_command_with_valid_signature) + t'.applicable_by_fee + (User_command.fee_per_wu first_kept_unchecked) + first_kept + } + in + let t_updated = + Sequence.fold ~init:t_partially_updated + ~f:remove_all_by_fee_and_hash_and_expiration_exn to_drop + in + (t_updated, Sequence.append dropped_acc to_drop) ) let expired_by_global_slot (t : t) : Transaction_hash.User_command_with_valid_signature.t Sequence.t = From ed2b304e0d1413ee7d4a1d391d1170dea0147790 Mon Sep 17 00:00:00 2001 From: georgeee Date: Thu, 5 Dec 2024 15:01:23 +0000 Subject: [PATCH 5/7] Simplify the fix for #16397 --- src/lib/network_pool/indexed_pool.ml | 44 ++++++++++++++-------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/lib/network_pool/indexed_pool.ml b/src/lib/network_pool/indexed_pool.ml index 7385880babd..7474094ce36 100644 --- a/src/lib/network_pool/indexed_pool.ml +++ b/src/lib/network_pool/indexed_pool.ml @@ -743,61 +743,56 @@ let revalidate : "Current account nonce succeeds first nonce in queue; splitting \ queue at $index" ~metadata:[ ("index", `Int first_applicable_nonce_index) ] ; - let drop_queue, keep_queue = + let dropped_from_nonce, retained_by_nonce = F_sequence.split_at queue first_applicable_nonce_index in - let currency_reserved' = + let currency_reserved_partially_updated = F_sequence.foldl (fun c cmd -> Option.value_exn Currency.Amount.(c - Option.value_exn (currency_consumed cmd)) ) - currency_reserved drop_queue + currency_reserved dropped_from_nonce in (* NB: dropped_for_balance is ordered by nonce *) - let keep_queue', currency_reserved'', dropped_for_balance = + let keep_queue, currency_reserved_updated, dropped_for_balance = drop_until_sufficient_balance - (keep_queue, currency_reserved') + (retained_by_nonce, currency_reserved_partially_updated) current_balance in - let to_drop = - Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance - in + let keeping_prefix = F_sequence.is_empty dropped_from_nonce in + let keeping_suffix = Sequence.is_empty dropped_for_balance in (* t with all_by_sender and applicable_by_fee fields updated *) let t_partially_updated = - match - ( F_sequence.uncons drop_queue - , Sequence.hd dropped_for_balance - , F_sequence.uncons keep_queue' ) - with - | None, None, _ -> + match F_sequence.uncons keep_queue with + | _ when keeping_prefix && keeping_suffix -> (* Nothing dropped, nothing needs to be updated *) t - | Some (first_dropped, _), _, None | None, Some first_dropped, None -> + | None -> (* We drop the entire queue, first element needs to be removed from applicable_by_fee *) - let t' = remove_applicable_exn t first_dropped in + let t' = remove_applicable_exn t first_cmd in { t' with all_by_sender = Map.remove t'.all_by_sender sender } - | None, _, Some _ -> - (* We drop only some transactions from the end of queue, keeping + | Some _ when keeping_prefix -> + (* We drop only transactions from the end of queue, keeping the head untouched, no need to update applicable_by_fee *) { t with all_by_sender = Map.set t.all_by_sender ~key:sender - ~data:(keep_queue', currency_reserved'') + ~data:(keep_queue, currency_reserved_updated) } - | Some (first_dropped, _), _, Some (first_kept, _) -> + | Some (first_kept, _) -> (* We need to replace old queue head with the new queue head in applicable_by_fee *) let first_kept_unchecked = Transaction_hash.User_command_with_valid_signature.command first_kept in - let t' = remove_applicable_exn t first_dropped in + let t' = remove_applicable_exn t first_cmd in { t' with all_by_sender = Map.set t'.all_by_sender ~key:sender - ~data:(keep_queue', currency_reserved'') + ~data:(keep_queue, currency_reserved_updated) ; applicable_by_fee = Map_set.insert (module Transaction_hash.User_command_with_valid_signature) @@ -806,6 +801,11 @@ let revalidate : first_kept } in + let to_drop = + Sequence.append + (F_sequence.to_seq dropped_from_nonce) + dropped_for_balance + in let t_updated = Sequence.fold ~init:t_partially_updated ~f:remove_all_by_fee_and_hash_and_expiration_exn to_drop From 487fe95cadf38a8ba1a72db4a239407db58c7021 Mon Sep 17 00:00:00 2001 From: georgeee Date: Thu, 5 Dec 2024 15:05:22 +0000 Subject: [PATCH 6/7] Trivial: rename a local variable --- src/lib/network_pool/indexed_pool.ml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/lib/network_pool/indexed_pool.ml b/src/lib/network_pool/indexed_pool.ml index 7474094ce36..35bb1996616 100644 --- a/src/lib/network_pool/indexed_pool.ml +++ b/src/lib/network_pool/indexed_pool.ml @@ -696,17 +696,17 @@ let revalidate : let global_slot = global_slot_since_genesis t_initial.config in Map.fold requires_revalidation ~init:(t_initial, Sequence.empty) ~f:(fun ~key:sender ~data:(queue, currency_reserved) (t, dropped_acc) -> - let account : Account.t = get_account_by_id sender in + let sender_account : Account.t = get_account_by_id sender in let current_balance = Currency.Balance.to_amount - (Account.liquid_balance_at_slot ~global_slot account) + (Account.liquid_balance_at_slot ~global_slot sender_account) in [%log debug] "Revalidating account $account in transaction pool ($account_nonce, \ $account_balance)" ~metadata: [ ("account", `String (Sexp.to_string @@ Account_id.sexp_of_t sender)) - ; ("account_nonce", `Int (Account_nonce.to_int account.nonce)) + ; ("account_nonce", `Int (Account_nonce.to_int sender_account.nonce)) ; ( "account_balance" , `String (Currency.Amount.to_mina_string current_balance) ) ] ; @@ -717,13 +717,13 @@ let revalidate : in if not - ( Account.has_permission_to_send account - && Account.has_permission_to_increment_nonce account ) + ( Account.has_permission_to_send sender_account + && Account.has_permission_to_increment_nonce sender_account ) then ( [%log debug] "Account no longer has permission to send; dropping queue" ; let dropped, t_updated = remove_with_dependents_exn' t first_cmd in (t_updated, Sequence.append dropped_acc dropped) ) - else if Account_nonce.(account.nonce < first_nonce) then ( + else if Account_nonce.(sender_account.nonce < first_nonce) then ( [%log debug] "Current account nonce precedes first nonce in queue; dropping queue" ; let dropped, t_updated = remove_with_dependents_exn' t first_cmd in @@ -736,7 +736,7 @@ let revalidate : Transaction_hash.User_command_with_valid_signature.command cmd' |> User_command.applicable_at_nonce in - Account_nonce.equal nonce account.nonce ) + Account_nonce.equal nonce sender_account.nonce ) |> Option.value ~default:(F_sequence.length queue) in [%log debug] From 31e881bd38260df4bd45df4c440fef409e1f30b3 Mon Sep 17 00:00:00 2001 From: georgeee Date: Thu, 5 Dec 2024 15:11:42 +0000 Subject: [PATCH 7/7] Extract revalidate_by_sender for readability This commit only moves part of revalidate function utside of its body. --- src/lib/network_pool/indexed_pool.ml | 232 ++++++++++++++------------- 1 file changed, 117 insertions(+), 115 deletions(-) diff --git a/src/lib/network_pool/indexed_pool.ml b/src/lib/network_pool/indexed_pool.ml index 35bb1996616..22aa9e37bad 100644 --- a/src/lib/network_pool/indexed_pool.ml +++ b/src/lib/network_pool/indexed_pool.ml @@ -672,6 +672,120 @@ let drop_until_sufficient_balance : in go queue currency_reserved Sequence.empty +let revalidate_by_sender ~logger ~global_slot ~sender ~sender_account t queue + currency_reserved = + let current_balance = + Currency.Balance.to_amount + (Account.liquid_balance_at_slot ~global_slot sender_account) + in + [%log debug] + "Revalidating account $account in transaction pool ($account_nonce, \ + $account_balance)" + ~metadata: + [ ("account", `String (Sexp.to_string @@ Account_id.sexp_of_t sender)) + ; ("account_nonce", `Int (Account_nonce.to_int sender_account.nonce)) + ; ( "account_balance" + , `String (Currency.Amount.to_mina_string current_balance) ) + ] ; + let first_cmd = F_sequence.head_exn queue in + let first_nonce = + first_cmd |> Transaction_hash.User_command_with_valid_signature.command + |> User_command.applicable_at_nonce + in + if + not + ( Account.has_permission_to_send sender_account + && Account.has_permission_to_increment_nonce sender_account ) + then ( + [%log debug] "Account no longer has permission to send; dropping queue" ; + let dropped, t_updated = remove_with_dependents_exn' t first_cmd in + (t_updated, dropped) ) + else if Account_nonce.(sender_account.nonce < first_nonce) then ( + [%log debug] + "Current account nonce precedes first nonce in queue; dropping queue" ; + let dropped, t_updated = remove_with_dependents_exn' t first_cmd in + (t_updated, dropped) ) + else + (* current_nonce >= first_nonce *) + let first_applicable_nonce_index = + F_sequence.findi queue ~f:(fun cmd' -> + let nonce = + Transaction_hash.User_command_with_valid_signature.command cmd' + |> User_command.applicable_at_nonce + in + Account_nonce.equal nonce sender_account.nonce ) + |> Option.value ~default:(F_sequence.length queue) + in + [%log debug] + "Current account nonce succeeds first nonce in queue; splitting queue at \ + $index" + ~metadata:[ ("index", `Int first_applicable_nonce_index) ] ; + let dropped_from_nonce, retained_by_nonce = + F_sequence.split_at queue first_applicable_nonce_index + in + let currency_reserved_partially_updated = + F_sequence.foldl + (fun c cmd -> + Option.value_exn + Currency.Amount.(c - Option.value_exn (currency_consumed cmd)) ) + currency_reserved dropped_from_nonce + in + (* NB: dropped_for_balance is ordered by nonce *) + let keep_queue, currency_reserved_updated, dropped_for_balance = + drop_until_sufficient_balance + (retained_by_nonce, currency_reserved_partially_updated) + current_balance + in + let keeping_prefix = F_sequence.is_empty dropped_from_nonce in + let keeping_suffix = Sequence.is_empty dropped_for_balance in + (* t with all_by_sender and applicable_by_fee fields updated *) + let t_partially_updated = + match F_sequence.uncons keep_queue with + | _ when keeping_prefix && keeping_suffix -> + (* Nothing dropped, nothing needs to be updated *) + t + | None -> + (* We drop the entire queue, first element needs to be removed from + applicable_by_fee *) + let t' = remove_applicable_exn t first_cmd in + { t' with all_by_sender = Map.remove t'.all_by_sender sender } + | Some _ when keeping_prefix -> + (* We drop only transactions from the end of queue, keeping + the head untouched, no need to update applicable_by_fee *) + { t with + all_by_sender = + Map.set t.all_by_sender ~key:sender + ~data:(keep_queue, currency_reserved_updated) + } + | Some (first_kept, _) -> + (* We need to replace old queue head with the new queue head + in applicable_by_fee *) + let first_kept_unchecked = + Transaction_hash.User_command_with_valid_signature.command + first_kept + in + let t' = remove_applicable_exn t first_cmd in + { t' with + all_by_sender = + Map.set t'.all_by_sender ~key:sender + ~data:(keep_queue, currency_reserved_updated) + ; applicable_by_fee = + Map_set.insert + (module Transaction_hash.User_command_with_valid_signature) + t'.applicable_by_fee + (User_command.fee_per_wu first_kept_unchecked) + first_kept + } + in + let to_drop = + Sequence.append (F_sequence.to_seq dropped_from_nonce) dropped_for_balance + in + let t_updated = + Sequence.fold ~init:t_partially_updated + ~f:remove_all_by_fee_and_hash_and_expiration_exn to_drop + in + (t_updated, to_drop) + (* Iterate over commands in the pool, removing them if they require too much currency or have too low of a nonce. An argument is provided to instruct which commands require revalidation. @@ -696,121 +810,9 @@ let revalidate : let global_slot = global_slot_since_genesis t_initial.config in Map.fold requires_revalidation ~init:(t_initial, Sequence.empty) ~f:(fun ~key:sender ~data:(queue, currency_reserved) (t, dropped_acc) -> - let sender_account : Account.t = get_account_by_id sender in - let current_balance = - Currency.Balance.to_amount - (Account.liquid_balance_at_slot ~global_slot sender_account) - in - [%log debug] - "Revalidating account $account in transaction pool ($account_nonce, \ - $account_balance)" - ~metadata: - [ ("account", `String (Sexp.to_string @@ Account_id.sexp_of_t sender)) - ; ("account_nonce", `Int (Account_nonce.to_int sender_account.nonce)) - ; ( "account_balance" - , `String (Currency.Amount.to_mina_string current_balance) ) - ] ; - let first_cmd = F_sequence.head_exn queue in - let first_nonce = - first_cmd |> Transaction_hash.User_command_with_valid_signature.command - |> User_command.applicable_at_nonce - in - if - not - ( Account.has_permission_to_send sender_account - && Account.has_permission_to_increment_nonce sender_account ) - then ( - [%log debug] "Account no longer has permission to send; dropping queue" ; - let dropped, t_updated = remove_with_dependents_exn' t first_cmd in - (t_updated, Sequence.append dropped_acc dropped) ) - else if Account_nonce.(sender_account.nonce < first_nonce) then ( - [%log debug] - "Current account nonce precedes first nonce in queue; dropping queue" ; - let dropped, t_updated = remove_with_dependents_exn' t first_cmd in - (t_updated, Sequence.append dropped_acc dropped) ) - else - (* current_nonce >= first_nonce *) - let first_applicable_nonce_index = - F_sequence.findi queue ~f:(fun cmd' -> - let nonce = - Transaction_hash.User_command_with_valid_signature.command cmd' - |> User_command.applicable_at_nonce - in - Account_nonce.equal nonce sender_account.nonce ) - |> Option.value ~default:(F_sequence.length queue) - in - [%log debug] - "Current account nonce succeeds first nonce in queue; splitting \ - queue at $index" - ~metadata:[ ("index", `Int first_applicable_nonce_index) ] ; - let dropped_from_nonce, retained_by_nonce = - F_sequence.split_at queue first_applicable_nonce_index - in - let currency_reserved_partially_updated = - F_sequence.foldl - (fun c cmd -> - Option.value_exn - Currency.Amount.(c - Option.value_exn (currency_consumed cmd)) - ) - currency_reserved dropped_from_nonce - in - (* NB: dropped_for_balance is ordered by nonce *) - let keep_queue, currency_reserved_updated, dropped_for_balance = - drop_until_sufficient_balance - (retained_by_nonce, currency_reserved_partially_updated) - current_balance - in - let keeping_prefix = F_sequence.is_empty dropped_from_nonce in - let keeping_suffix = Sequence.is_empty dropped_for_balance in - (* t with all_by_sender and applicable_by_fee fields updated *) - let t_partially_updated = - match F_sequence.uncons keep_queue with - | _ when keeping_prefix && keeping_suffix -> - (* Nothing dropped, nothing needs to be updated *) - t - | None -> - (* We drop the entire queue, first element needs to be removed from - applicable_by_fee *) - let t' = remove_applicable_exn t first_cmd in - { t' with all_by_sender = Map.remove t'.all_by_sender sender } - | Some _ when keeping_prefix -> - (* We drop only transactions from the end of queue, keeping - the head untouched, no need to update applicable_by_fee *) - { t with - all_by_sender = - Map.set t.all_by_sender ~key:sender - ~data:(keep_queue, currency_reserved_updated) - } - | Some (first_kept, _) -> - (* We need to replace old queue head with the new queue head - in applicable_by_fee *) - let first_kept_unchecked = - Transaction_hash.User_command_with_valid_signature.command - first_kept - in - let t' = remove_applicable_exn t first_cmd in - { t' with - all_by_sender = - Map.set t'.all_by_sender ~key:sender - ~data:(keep_queue, currency_reserved_updated) - ; applicable_by_fee = - Map_set.insert - (module Transaction_hash.User_command_with_valid_signature) - t'.applicable_by_fee - (User_command.fee_per_wu first_kept_unchecked) - first_kept - } - in - let to_drop = - Sequence.append - (F_sequence.to_seq dropped_from_nonce) - dropped_for_balance - in - let t_updated = - Sequence.fold ~init:t_partially_updated - ~f:remove_all_by_fee_and_hash_and_expiration_exn to_drop - in - (t_updated, Sequence.append dropped_acc to_drop) ) + Tuple2.map_snd ~f:(Sequence.append dropped_acc) + @@ revalidate_by_sender ~logger ~global_slot ~sender + ~sender_account:(get_account_by_id sender) t queue currency_reserved ) let expired_by_global_slot (t : t) : Transaction_hash.User_command_with_valid_signature.t Sequence.t =