Skip to content

Commit

Permalink
Switch out std mutexes to parking lot and spin mutexes (#2512)
Browse files Browse the repository at this point in the history
* Switch out std mutexes with parking_lot

* Add spin locks
  • Loading branch information
prasannavl authored Oct 3, 2023
1 parent 4001020 commit 9793443
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 41 deletions.
15 changes: 13 additions & 2 deletions lib/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ rocksdb = { version = "0.21", default-features = false }
statrs = "0.16"
rustc-hex = "2.1"
rustc_version_runtime = "0.2.1"
parking_lot = "0.12.1"
spin = "0.9.8"

### eth

Expand Down
2 changes: 2 additions & 0 deletions lib/ain-evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ serde_json.workspace = true
statrs.workspace = true
rustc-hex.workspace = true
ethabi.workspace = true
parking_lot.workspace = true
spin.workspace = true

# Trie dependencies
hash-db.workspace = true
Expand Down
36 changes: 19 additions & 17 deletions lib/ain-evm/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use std::{
collections::{BTreeSet, HashMap},
num::NonZeroUsize,
path::PathBuf,
sync::{Arc, Mutex},
sync::Arc,
};

use parking_lot::Mutex;

use ain_contracts::{
dst20_address_from_token_id, get_transfer_domain_contract,
get_transferdomain_dst20_transfer_function, get_transferdomain_native_transfer_function,
Expand Down Expand Up @@ -36,7 +38,7 @@ use crate::{
pub type XHash = String;

pub struct SignedTxCache {
inner: Mutex<LruCache<String, SignedTx>>,
inner: spin::Mutex<LruCache<String, SignedTx>>,
}

const DEFAULT_CACHE_SIZE: usize = 10000;
Expand All @@ -50,12 +52,12 @@ impl Default for SignedTxCache {
impl SignedTxCache {
pub fn new(capacity: usize) -> Self {
Self {
inner: Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
inner: spin::Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
}
}

pub fn try_get_or_create(&self, key: &str) -> Result<SignedTx> {
let mut guard = self.inner.lock().unwrap();
let mut guard = self.inner.lock();
debug!("[signed-tx-cache]::get: {}", key);
let res = guard.try_get_or_insert(key.to_string(), || {
debug!("[signed-tx-cache]::create {}", key);
Expand All @@ -67,7 +69,7 @@ impl SignedTxCache {
pub fn try_get_or_create_from_tx(&self, tx: &TransactionV2) -> Result<SignedTx> {
let data = EnvelopedEncodable::encode(tx);
let key = hex::encode(&data);
let mut guard = self.inner.lock().unwrap();
let mut guard = self.inner.lock();
debug!("[signed-tx-cache]::get from tx: {}", &key);
let res = guard.try_get_or_insert(key.clone(), || {
debug!("[signed-tx-cache]::create from tx {}", &key);
Expand All @@ -78,8 +80,8 @@ impl SignedTxCache {
}

struct TxValidationCache {
validated: Mutex<LruCache<(U256, H256, String, bool), ValidateTxInfo>>,
stateless: Mutex<LruCache<String, ValidateTxInfo>>,
validated: spin::Mutex<LruCache<(U256, H256, String, bool), ValidateTxInfo>>,
stateless: spin::Mutex<LruCache<String, ValidateTxInfo>>,
}

impl Default for TxValidationCache {
Expand All @@ -91,35 +93,35 @@ impl Default for TxValidationCache {
impl TxValidationCache {
pub fn new(capacity: usize) -> Self {
Self {
validated: Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
stateless: Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
validated: spin::Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
stateless: spin::Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
}
}

pub fn get(&self, key: &(U256, H256, String, bool)) -> Option<ValidateTxInfo> {
self.validated.lock().unwrap().get(key).cloned()
self.validated.lock().get(key).cloned()
}

pub fn get_stateless(&self, key: &str) -> Option<ValidateTxInfo> {
self.stateless.lock().unwrap().get(key).cloned()
self.stateless.lock().get(key).cloned()
}

pub fn set(&self, key: (U256, H256, String, bool), value: ValidateTxInfo) -> ValidateTxInfo {
let mut cache = self.validated.lock().unwrap();
let mut cache = self.validated.lock();
cache.put(key, value.clone());
value
}

pub fn set_stateless(&self, key: String, value: ValidateTxInfo) -> ValidateTxInfo {
let mut cache = self.stateless.lock().unwrap();
let mut cache = self.stateless.lock();
cache.put(key, value.clone());
value
}

// To be used on new block or any known state changes. Only clears fully validated TX cache.
// Stateless cache can be kept across blocks and is handled by LRU itself
pub fn clear(&self) {
let mut cache = self.validated.lock().unwrap();
let mut cache = self.validated.lock();
cache.clear()
}
}
Expand Down Expand Up @@ -1011,7 +1013,7 @@ impl EVMCoreService {

pub fn get_next_account_nonce(&self, address: H160, state_root: H256) -> Result<U256> {
let state_root_nonce = self.get_nonce_from_state_root(address, state_root)?;
let mut nonce_store = self.nonce_store.lock().unwrap();
let mut nonce_store = self.nonce_store.lock();
match nonce_store.entry(address) {
std::collections::hash_map::Entry::Vacant(_) => Ok(state_root_nonce),
std::collections::hash_map::Entry::Occupied(e) => {
Expand All @@ -1035,7 +1037,7 @@ impl EVMCoreService {
}

pub fn store_account_nonce(&self, address: H160, nonce: U256) -> bool {
let mut nonce_store = self.nonce_store.lock().unwrap();
let mut nonce_store = self.nonce_store.lock();
nonce_store.entry(address).or_default();

match nonce_store.entry(address) {
Expand All @@ -1048,7 +1050,7 @@ impl EVMCoreService {
}

pub fn clear_account_nonce(&self) {
let mut nonce_store = self.nonce_store.lock().unwrap();
let mut nonce_store = self.nonce_store.lock();
nonce_store.clear()
}
}
4 changes: 2 additions & 2 deletions lib/ain-evm/src/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl EVMServices {
mnview_ptr: usize,
) -> Result<FinalizedBlockInfo> {
let tx_queue = self.core.tx_queues.get(queue_id)?;
let mut queue = tx_queue.data.lock().unwrap();
let mut queue = tx_queue.data.lock();

let queue_txs_len = queue.transactions.len();
let mut all_transactions = Vec::with_capacity(queue_txs_len);
Expand Down Expand Up @@ -376,7 +376,7 @@ impl EVMServices {
pub unsafe fn commit_queue(&self, queue_id: u64) -> Result<()> {
{
let tx_queue = self.core.tx_queues.get(queue_id)?;
let queue = tx_queue.data.lock().unwrap();
let queue = tx_queue.data.lock();
let Some(BlockData { block, receipts }) = queue.block_data.clone() else {
return Err(format_err!("no constructed EVM block exist in queue id").into());
};
Expand Down
7 changes: 4 additions & 3 deletions lib/ain-evm/src/services.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
Arc,
},
thread::{self, JoinHandle},
};

use parking_lot::Mutex;

use anyhow::Result;
use jsonrpsee_server::ServerHandle as HttpServerHandle;
use tokio::{
Expand Down Expand Up @@ -68,7 +70,7 @@ impl Services {
}

pub fn stop_network(&self) -> Result<()> {
let mut json_rpc_handle = self.json_rpc.lock().unwrap();
let mut json_rpc_handle = self.json_rpc.lock();
if (json_rpc_handle).is_none() {
// Server was never started
return Ok(());
Expand All @@ -88,7 +90,6 @@ impl Services {

self.tokio_worker
.lock()
.unwrap()
.take()
.expect("runtime terminated?")
.join()
Expand Down
28 changes: 12 additions & 16 deletions lib/ain-evm/src/txqueue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex, RwLock},
};
use std::{collections::HashMap, sync::Arc};

use parking_lot::{Mutex, RwLock};

use ethereum::{Block, TransactionV2};
use ethereum_types::{H256, U256};
Expand Down Expand Up @@ -52,7 +51,7 @@ impl TransactionQueueMap {
if queue_id == 0 {
continue;
};
let mut write_guard = self.queues.write().unwrap();
let mut write_guard = self.queues.write();

if let std::collections::hash_map::Entry::Vacant(e) = write_guard.entry(queue_id) {
e.insert(Arc::new(TransactionQueue::new(target_block, state_root)));
Expand All @@ -70,7 +69,7 @@ impl TransactionQueueMap {
/// across all usages. Note: To be replaced with a proper lock flow later.
///
pub unsafe fn remove(&self, queue_id: u64) -> Option<Arc<TransactionQueue>> {
self.queues.write().unwrap().remove(&queue_id)
self.queues.write().remove(&queue_id)
}

/// Returns an atomic reference counting pointer of the `TransactionQueue` associated with the provided queue ID.
Expand All @@ -91,7 +90,6 @@ impl TransactionQueueMap {
Ok(Arc::clone(
self.queues
.read()
.unwrap()
.get(&queue_id)
.ok_or(QueueError::NoSuchQueue)?,
))
Expand Down Expand Up @@ -193,7 +191,7 @@ impl TransactionQueueMap {
where
F: FnOnce(&TransactionQueue) -> T,
{
match self.queues.read().unwrap().get(&queue_id) {
match self.queues.read().get(&queue_id) {
Some(queue) => Ok(f(queue)),
None => Err(QueueError::NoSuchQueue),
}
Expand Down Expand Up @@ -268,7 +266,7 @@ impl TransactionQueue {
gas_used: U256,
state_root: H256,
) -> Result<()> {
let mut data = self.data.lock().unwrap();
let mut data = self.data.lock();

data.total_gas_used += gas_used;

Expand All @@ -282,7 +280,7 @@ impl TransactionQueue {
}

pub fn remove_txs_above_hash(&self, target_hash: XHash) -> Result<Vec<XHash>> {
let mut data = self.data.lock().unwrap();
let mut data = self.data.lock();
let mut removed_txs = Vec::new();

if let Some(index) = data
Expand All @@ -306,29 +304,28 @@ impl TransactionQueue {
}

pub fn get_queue_txs_cloned(&self) -> Vec<QueueTxItem> {
self.data.lock().unwrap().transactions.clone()
self.data.lock().transactions.clone()
}

pub fn get_total_gas_used(&self) -> U256 {
self.data.lock().unwrap().total_gas_used
self.data.lock().total_gas_used
}

pub fn get_target_block(&self) -> U256 {
self.data.lock().unwrap().target_block
self.data.lock().target_block
}

pub fn get_state_root_from_native_hash(&self, hash: XHash) -> Option<H256> {
self.data
.lock()
.unwrap()
.transactions
.iter()
.find(|tx_item| tx_item.tx_hash == hash)
.map(|tx_item| tx_item.state_root)
}

pub fn get_latest_state_root(&self) -> H256 {
let data = self.data.lock().unwrap();
let data = self.data.lock();
data.transactions
.last()
.map_or(data.initial_state_root, |tx_item| tx_item.state_root)
Expand All @@ -337,7 +334,6 @@ impl TransactionQueue {
pub fn is_queued(&self, tx: &QueueTx) -> bool {
self.data
.lock()
.unwrap()
.transactions
.iter()
.any(|queued| &queued.tx == tx)
Expand Down
2 changes: 1 addition & 1 deletion lib/ain-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn init_network_json_rpc_service(runtime: &Services, addr: &str) -> Result<(
methods.merge(MetachainNetRPCModule::new(Arc::clone(&runtime.evm)).into_rpc())?;
methods.merge(MetachainWeb3RPCModule::new(Arc::clone(&runtime.evm)).into_rpc())?;

*runtime.json_rpc.lock().unwrap() = Some(server.start(methods)?);
*runtime.json_rpc.lock() = Some(server.start(methods)?);
Ok(())
}

Expand Down

0 comments on commit 9793443

Please sign in to comment.