Skip to content
This repository has been archived by the owner on Feb 3, 2025. It is now read-only.

Make node manager locks RwLocks #1010

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mutiny-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ impl<S: MutinyStorage> MutinyWallet<S> {
if self
.node_manager
.nodes
.lock()
.read()
.await
.iter()
.flat_map(|(_, n)| n.channel_manager.list_channels())
Expand Down
67 changes: 34 additions & 33 deletions mutiny-core/src/nodemanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
storage::{MutinyStorage, DEVICE_ID_KEY, KEYCHAIN_STORE_KEY, NEED_FULL_SYNC_KEY},
};
use anyhow::anyhow;
use async_lock::RwLock;
use bdk::chain::{BlockId, ConfirmationTime};
use bdk::{wallet::AddressIndex, FeeRate, LocalUtxo};
use bitcoin::blockdata::script;
Expand Down Expand Up @@ -390,7 +391,7 @@ impl<S: MutinyStorage> NodeManagerBuilder<S> {
let nodes = if c.safe_mode {
// If safe mode is enabled, we don't start any nodes
log_warn!(logger, "Safe mode enabled, not starting any nodes");
Arc::new(Mutex::new(HashMap::new()))
Arc::new(RwLock::new(HashMap::new()))
} else {
// Remove the archived nodes, we don't need to start them up.
let unarchived_nodes = node_storage
Expand Down Expand Up @@ -453,7 +454,7 @@ impl<S: MutinyStorage> NodeManagerBuilder<S> {

log_info!(logger, "inserted updated nodes");

Arc::new(Mutex::new(nodes_map))
Arc::new(RwLock::new(nodes_map))
};

let price_cache = self
Expand All @@ -473,7 +474,7 @@ impl<S: MutinyStorage> NodeManagerBuilder<S> {
chain,
fee_estimator,
storage: self.storage,
node_storage: Mutex::new(node_storage),
node_storage: RwLock::new(node_storage),
nodes,
#[cfg(target_arch = "wasm32")]
websocket_proxy_addr,
Expand Down Expand Up @@ -511,8 +512,8 @@ pub struct NodeManager<S: MutinyStorage> {
chain: Arc<MutinyChain<S>>,
fee_estimator: Arc<MutinyFeeEstimator<S>>,
pub(crate) storage: S,
pub(crate) node_storage: Mutex<NodeStorage>,
pub(crate) nodes: Arc<Mutex<HashMap<PublicKey, Arc<Node<S>>>>>,
pub(crate) node_storage: RwLock<NodeStorage>,
pub(crate) nodes: Arc<RwLock<HashMap<PublicKey, Arc<Node<S>>>>>,
pub(crate) lsp_config: Option<LspConfig>,
pub(crate) logger: Arc<MutinyLogger>,
bitcoin_price_cache: Arc<Mutex<HashMap<String, (f32, Duration)>>>,
Expand All @@ -532,7 +533,7 @@ impl<S: MutinyStorage> NodeManager<S> {
&self,
pk: Option<&PublicKey>,
) -> Result<Arc<Node<S>>, MutinyError> {
let nodes = self.nodes.lock().await;
let nodes = self.nodes.read().await;
let node = match pk {
Some(pubkey) => nodes.get(pubkey),
None => nodes.iter().next().map(|(_, node)| node),
Expand All @@ -544,7 +545,7 @@ impl<S: MutinyStorage> NodeManager<S> {
/// Returns after node has been stopped.
pub async fn stop(&self) -> Result<(), MutinyError> {
self.stop.swap(true, Ordering::Relaxed);
let mut nodes = self.nodes.lock().await;
let mut nodes = self.nodes.write().await;
let node_futures = nodes.iter().map(|(_, n)| async {
match n.stop().await {
Ok(_) => {
Expand Down Expand Up @@ -1050,7 +1051,7 @@ impl<S: MutinyStorage> NodeManager<S> {
return Err(MutinyError::WalletOperationFailed);
};

let nodes = self.nodes.lock().await;
let nodes = self.nodes.read().await;
let lightning_msats: u64 = nodes
.iter()
.flat_map(|(_, n)| n.channel_manager.list_channels())
Expand Down Expand Up @@ -1097,13 +1098,17 @@ impl<S: MutinyStorage> NodeManager<S> {
/// This should be called before syncing the on-chain wallet
/// to ensure that new on-chain transactions are picked up.
async fn sync_ldk(&self) -> Result<(), MutinyError> {
let nodes = self.nodes.lock().await;
// get nodes hashmap, immediately drop lock because sync can take a while
let nodes = {
let nodes = self.nodes.read().await;
nodes.deref().clone()
};
Comment on lines +1101 to +1105
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the main change that fixed the issue


// Lock all the nodes so we can sync them, make sure we keep the locks
// in scope so they don't get dropped and unlocked.
let futs = nodes
.iter()
.map(|(_, node)| node.sync_lock.lock())
.values()
.map(|node| node.sync_lock.lock())
.collect::<Vec<_>>();
let _locks = join_all(futs).await;

Expand Down Expand Up @@ -1224,7 +1229,7 @@ impl<S: MutinyStorage> NodeManager<S> {
/// If the node has any active channels it will fail to archive
#[allow(dead_code)]
pub(crate) async fn archive_node(&self, pubkey: PublicKey) -> Result<(), MutinyError> {
if let Some(node) = self.nodes.lock().await.get(&pubkey) {
if let Some(node) = self.nodes.read().await.get(&pubkey) {
// disallow archiving nodes with active channels or
// claimable on-chain funds, so we don't lose funds
if node.channel_manager.list_channels().is_empty()
Expand All @@ -1244,7 +1249,7 @@ impl<S: MutinyStorage> NodeManager<S> {
/// If the node has any active channels it will fail to archive
#[allow(dead_code)]
pub(crate) async fn archive_node_by_uuid(&self, node_uuid: String) -> Result<(), MutinyError> {
let mut node_storage = self.node_storage.lock().await;
let mut node_storage = self.node_storage.write().await;

match node_storage.nodes.get(&node_uuid).map(|n| n.to_owned()) {
None => Err(anyhow!("Could not find node to archive").into()),
Expand All @@ -1262,7 +1267,7 @@ impl<S: MutinyStorage> NodeManager<S> {

/// Lists the pubkeys of the lightning node in the manager.
pub async fn list_nodes(&self) -> Result<Vec<PublicKey>, MutinyError> {
let nodes = self.nodes.lock().await;
let nodes = self.nodes.read().await;
let peers = nodes.iter().map(|(_, n)| n.pubkey).collect();
Ok(peers)
}
Expand All @@ -1279,7 +1284,7 @@ impl<S: MutinyStorage> NodeManager<S> {

// check if any nodes have active channels with the current LSP
// if they do, we can't change the LSP
let nodes = self.nodes.lock().await;
let nodes = self.nodes.read().await;
if nodes.iter().any(|(_, n)| {
if let Some(lsp_pk) = n.lsp_client.as_ref().map(|x| x.get_lsp_pubkey()) {
!n.channel_manager
Expand All @@ -1294,7 +1299,7 @@ impl<S: MutinyStorage> NodeManager<S> {
drop(nodes);

// edit node storage
let mut node_storage = self.node_storage.lock().await;
let mut node_storage = self.node_storage.write().await;
node_storage.nodes.iter_mut().for_each(|(_, n)| {
n.lsp = lsp_config.clone();
});
Expand Down Expand Up @@ -1374,7 +1379,7 @@ impl<S: MutinyStorage> NodeManager<S> {
amount: u64,
labels: Vec<String>,
) -> Result<(MutinyInvoice, u64), MutinyError> {
let nodes = self.nodes.lock().await;
let nodes = self.nodes.read().await;
let use_phantom = nodes.len() > 1 && self.lsp_config.is_none();
if nodes.len() == 0 {
return Err(MutinyError::InvoiceCreationFailed);
Expand Down Expand Up @@ -1448,7 +1453,7 @@ impl<S: MutinyStorage> NodeManager<S> {
&self,
hash: &sha256::Hash,
) -> Result<MutinyInvoice, MutinyError> {
let nodes = self.nodes.lock().await;
let nodes = self.nodes.read().await;
for (_, node) in nodes.iter() {
if let Ok(inv) = node.get_invoice_by_hash(hash) {
return Ok(inv);
Expand All @@ -1462,7 +1467,7 @@ impl<S: MutinyStorage> NodeManager<S> {
&self,
user_channel_id: u128,
) -> Result<ChannelClosure, MutinyError> {
let nodes = self.nodes.lock().await;
let nodes = self.nodes.read().await;
for (_, node) in nodes.iter() {
if let Ok(Some(closure)) = node.get_channel_closure(user_channel_id) {
return Ok(closure);
Expand All @@ -1474,7 +1479,7 @@ impl<S: MutinyStorage> NodeManager<S> {

pub async fn list_channel_closures(&self) -> Result<Vec<ChannelClosure>, MutinyError> {
let mut channels: Vec<ChannelClosure> = vec![];
let nodes = self.nodes.lock().await;
let nodes = self.nodes.read().await;
for (_, node) in nodes.iter() {
if let Ok(mut invs) = node.get_channel_closures() {
channels.append(&mut invs)
Expand Down Expand Up @@ -1593,7 +1598,7 @@ impl<S: MutinyStorage> NodeManager<S> {
return Err(MutinyError::ChannelClosingFailed);
}

let nodes = self.nodes.lock().await;
let nodes = self.nodes.read().await;
let channel_opt: Option<(Arc<Node<S>>, ChannelDetails)> =
nodes.iter().find_map(|(_, n)| {
n.channel_manager
Expand Down Expand Up @@ -1679,7 +1684,7 @@ impl<S: MutinyStorage> NodeManager<S> {

/// Lists all the channels for all the nodes in the node manager.
pub async fn list_channels(&self) -> Result<Vec<MutinyChannel>, MutinyError> {
let nodes = self.nodes.lock().await;
let nodes = self.nodes.read().await;
let channels: Vec<ChannelDetails> = nodes
.iter()
.flat_map(|(_, n)| n.channel_manager.list_channels())
Expand Down Expand Up @@ -1709,7 +1714,7 @@ impl<S: MutinyStorage> NodeManager<S> {
})
.collect();

let nodes = self.nodes.lock().await;
let nodes = self.nodes.read().await;

// get peers we are connected to
let connected_peers: Vec<PublicKey> = nodes
Expand Down Expand Up @@ -1950,7 +1955,7 @@ pub(crate) async fn create_new_node_from_node_manager<S: MutinyStorage>(
// Begin with a mutex lock so that nothing else can
// save or alter the node list while it is about to
// be saved.
let mut node_mutex = node_manager.node_storage.lock().await;
let mut node_mutex = node_manager.node_storage.write().await;

// Get the current nodes and their bip32 indices
// so that we can create another node with the next.
Expand Down Expand Up @@ -2009,15 +2014,11 @@ pub(crate) async fn create_new_node_from_node_manager<S: MutinyStorage>(

let new_node = node_builder.build().await?;
let node_pubkey = new_node.pubkey;
node_manager
.nodes
.clone()
.lock()
.await
.insert(node_pubkey, Arc::new(new_node));
let mut nodes = node_manager.nodes.write().await;
nodes.insert(node_pubkey, Arc::new(new_node));

Ok(NodeIdentity {
uuid: next_node_uuid.clone(),
uuid: next_node_uuid,
pubkey: node_pubkey,
})
}
Expand Down Expand Up @@ -2127,7 +2128,7 @@ mod tests {

{
let node_identity = nm.new_node().await.expect("should create new node");
let node_storage = nm.node_storage.lock().await;
let node_storage = nm.node_storage.read().await;
assert_ne!("", node_identity.uuid);
assert_ne!("", node_identity.pubkey.to_string());
assert_eq!(1, node_storage.nodes.len());
Expand All @@ -2138,7 +2139,7 @@ mod tests {

{
let node_identity = nm.new_node().await.expect("node manager should initialize");
let node_storage = nm.node_storage.lock().await;
let node_storage = nm.node_storage.read().await;

assert_ne!("", node_identity.uuid);
assert_ne!("", node_identity.pubkey.to_string());
Expand Down
Loading