From a8779b55345bd52f41de32ecebf82e0321b44423 Mon Sep 17 00:00:00 2001 From: Moshe Shababo <17073733+moshababo@users.noreply.github.com> Date: Sat, 20 Jul 2024 15:05:07 -0700 Subject: [PATCH] Use a single connection per read batch --- core/node/consensus/src/storage/tests.rs | 4 +- core/node/consensus/src/storage/vm_reader.rs | 123 +++++++------------ 2 files changed, 42 insertions(+), 85 deletions(-) diff --git a/core/node/consensus/src/storage/tests.rs b/core/node/consensus/src/storage/tests.rs index 5fc7b29da97e..3eb040f21320 100644 --- a/core/node/consensus/src/storage/tests.rs +++ b/core/node/consensus/src/storage/tests.rs @@ -51,11 +51,9 @@ async fn test_vm_reader() { let mut reader = super::vm_reader::VMReader::new(pool.clone(), tx_sender.clone(), registry_address); - let validators = reader.read_validator_committee(ctx, block_id).await; + let (validators, attesters) = reader.read_committees(ctx, block_id).await; assert_eq!(validators.len(), num_nodes); - let attesters = reader.read_attester_committee(ctx, block_id).await; assert_eq!(attesters.len(), num_nodes); - for i in 0..nodes.len() { assert_eq!( nodes[i][0].clone().into_address().unwrap(), diff --git a/core/node/consensus/src/storage/vm_reader.rs b/core/node/consensus/src/storage/vm_reader.rs index de53b354cebf..6fd59bc3ca24 100644 --- a/core/node/consensus/src/storage/vm_reader.rs +++ b/core/node/consensus/src/storage/vm_reader.rs @@ -2,7 +2,10 @@ use std::time::Duration; use zksync_concurrency::ctx::Ctx; use zksync_contracts::load_contract; -use zksync_node_api_server::{execution_sandbox::BlockStartInfo, tx_sender::TxSender}; +use zksync_node_api_server::{ + execution_sandbox::{BlockArgs, BlockStartInfo}, + tx_sender::TxSender, +}; use zksync_system_constants::DEFAULT_L2_TX_GAS_PER_PUBDATA_BYTE; use zksync_types::{ api::BlockId, @@ -33,35 +36,46 @@ impl VMReader { } } - pub async fn read_validator_committee( - &mut self, + pub async fn read_committees( + &self, ctx: &Ctx, block_id: BlockId, - ) -> Vec { + ) -> (Vec, Vec) { + let mut conn = self.pool.connection(ctx).await.unwrap().0; + let start_info = BlockStartInfo::new(&mut conn, Duration::from_secs(10)) + .await + .unwrap(); + let block_args = BlockArgs::new(&mut conn, block_id, &start_info) + .await + .unwrap(); + + let validator_committee = self.read_validator_committee(block_args).await; + let attester_committee = self.read_attester_committee(block_args).await; + + (validator_committee, attester_committee) + } + + pub async fn read_validator_committee(&self, block_args: BlockArgs) -> Vec { let mut committee = vec![]; - let validator_committee_size = self.read_validator_committee_size(ctx, block_id).await; + let validator_committee_size = self.read_validator_committee_size(block_args).await; for i in 0..validator_committee_size { - let committee_validator = self.read_committee_validator(ctx, block_id, i).await; + let committee_validator = self.read_committee_validator(block_args, i).await; committee.push(committee_validator) } committee } - pub async fn read_attester_committee( - &mut self, - ctx: &Ctx, - block_id: BlockId, - ) -> Vec { + pub async fn read_attester_committee(&self, block_args: BlockArgs) -> Vec { let mut committee = vec![]; - let attester_committee_size = self.read_attester_committee_size(ctx, block_id).await; + let attester_committee_size = self.read_attester_committee_size(block_args).await; for i in 0..attester_committee_size { - let committee_validator = self.read_committee_attester(ctx, block_id, i).await; + let committee_validator = self.read_committee_attester(block_args, i).await; committee.push(committee_validator) } committee } - async fn read_validator_committee_size(&mut self, ctx: &Ctx, block_id: BlockId) -> usize { + async fn read_validator_committee_size(&self, block_args: BlockArgs) -> usize { let func = self .registry_contract .function("validatorCommitteeSize") @@ -70,7 +84,7 @@ impl VMReader { let tx = self.gen_l2_call_tx(self.registry_address, func.short_signature().to_vec()); - let res = self.eth_call(ctx, block_id, tx).await; + let res = self.eth_call(block_args, tx).await; func.decode_output(&res).unwrap()[0] .clone() @@ -79,7 +93,7 @@ impl VMReader { .as_usize() } - async fn read_attester_committee_size(&mut self, ctx: &Ctx, block_id: BlockId) -> usize { + async fn read_attester_committee_size(&self, block_args: BlockArgs) -> usize { let func = self .registry_contract .function("attesterCommitteeSize") @@ -87,7 +101,7 @@ impl VMReader { .clone(); let tx = self.gen_l2_call_tx(self.registry_address, func.short_signature().to_vec()); - let res = self.eth_call(ctx, block_id, tx).await; + let res = self.eth_call(block_args, tx).await; func.decode_output(&res).unwrap()[0] .clone() .into_uint() @@ -95,34 +109,9 @@ impl VMReader { .as_usize() } - async fn read_attester( - &mut self, - ctx: &Ctx, - block_id: BlockId, - node_owner: Address, - ) -> (usize, Vec, bool) { - let func = self - .registry_contract - .function("attesters") - .unwrap() - .clone(); - let tx = self.gen_l2_call_tx( - self.registry_address, - func.encode_input(&[Token::Address(node_owner)]).unwrap(), - ); - let res = self.eth_call(ctx, block_id, tx).await; - let tokens = func.decode_output(&res).unwrap(); - ( - tokens[0].clone().into_uint().unwrap().as_usize(), - tokens[1].clone().into_bytes().unwrap(), - tokens[2].clone().into_bool().unwrap(), - ) - } - async fn read_committee_validator( - &mut self, - ctx: &Ctx, - block_id: BlockId, + &self, + block_args: BlockArgs, idx: usize, ) -> CommitteeValidator { let func = self @@ -136,7 +125,7 @@ impl VMReader { .unwrap(), ); - let res = self.eth_call(ctx, block_id, tx).await; + let res = self.eth_call(block_args, tx).await; let tokens = func.decode_output(&res).unwrap(); CommitteeValidator { node_owner: tokens[0].clone().into_address().unwrap(), @@ -147,9 +136,8 @@ impl VMReader { } async fn read_committee_attester( - &mut self, - ctx: &Ctx, - block_id: BlockId, + &self, + block_args: BlockArgs, idx: usize, ) -> CommitteeAttester { let func = self @@ -163,7 +151,7 @@ impl VMReader { .unwrap(), ); - let res = self.eth_call(ctx, block_id, tx).await; + let res = self.eth_call(block_args, tx).await; let tokens = func.decode_output(&res).unwrap(); CommitteeAttester { weight: tokens[0].clone().into_uint().unwrap().as_usize(), @@ -172,46 +160,17 @@ impl VMReader { } } - async fn read_address( - &mut self, - ctx: &Ctx, - block_id: BlockId, - contract_address: Address, - func: Function, - ) -> Address { - let tx = self.gen_l2_call_tx(contract_address, func.encode_input(&vec![]).unwrap()); - - let res = self.eth_call(ctx, block_id, tx).await; - let tokens = func.decode_output(&res).unwrap(); - tokens[0].clone().into_address().unwrap() - } - - async fn eth_call(&mut self, ctx: &Ctx, block_id: BlockId, tx: L2Tx) -> Vec { - let mut conn = self.pool.connection(ctx).await.unwrap().0; - let start_info = BlockStartInfo::new(&mut conn, Duration::from_secs(10)) - .await - .unwrap(); - let block_args = zksync_node_api_server::execution_sandbox::BlockArgs::new( - &mut conn, - block_id, - &start_info, - ) - .await - .unwrap(); + async fn eth_call(&self, block_args: BlockArgs, tx: L2Tx) -> Vec { let call_overrides = CallOverrides { enforced_base_fee: None, }; - - let res = self - .tx_sender + self.tx_sender .eth_call(block_args, call_overrides, tx) .await - .unwrap(); - - res + .unwrap() } - fn gen_l2_call_tx(&mut self, contract_address: Address, calldata: Vec) -> L2Tx { + fn gen_l2_call_tx(&self, contract_address: Address, calldata: Vec) -> L2Tx { L2Tx::new( contract_address, calldata,