Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(watchtower): activate utxo watchers #1859

Merged
merged 13 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl MmCtx {

pub fn is_watcher(&self) -> bool { self.conf["is_watcher"].as_bool().unwrap_or_default() }

pub fn use_watchers(&self) -> bool { self.conf["use_watchers"].as_bool().unwrap_or_default() }
pub fn use_watchers(&self) -> bool { self.conf["use_watchers"].as_bool().unwrap_or(true) }
rozhkovdmitrii marked this conversation as resolved.
Show resolved Hide resolved

pub fn netid(&self) -> u16 {
let netid = self.conf["netid"].as_u64().unwrap_or(0);
Expand Down
32 changes: 26 additions & 6 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub use pubkey_banning::{ban_pubkey_rpc, is_pubkey_banned, list_banned_pubkeys_r
pub use recreate_swap_data::recreate_swap_data;
pub use saved_swap::{SavedSwap, SavedSwapError, SavedSwapIo, SavedSwapResult};
pub use swap_watcher::{process_watcher_msg, watcher_topic, TakerSwapWatcherData, MAKER_PAYMENT_SPEND_FOUND_LOG,
MAKER_PAYMENT_SPEND_SENT_LOG, TAKER_PAYMENT_REFUND_SENT_LOG, TAKER_SWAP_ENTRY_TIMEOUT,
MAKER_PAYMENT_SPEND_SENT_LOG, TAKER_PAYMENT_REFUND_SENT_LOG, TAKER_SWAP_ENTRY_TIMEOUT_SEC,
WATCHER_PREFIX};
use taker_swap::TakerSwapEvent;
pub use taker_swap::{calc_max_taker_vol, check_balance_for_taker_swap, max_taker_vol, max_taker_vol_from_available,
Expand Down Expand Up @@ -202,17 +202,35 @@ pub fn p2p_private_and_peer_id_to_broadcast(ctx: &MmArc, p2p_privkey: Option<&Ke

/// Spawns the loop that broadcasts message every `interval` seconds returning the AbortOnDropHandle
/// to stop it
pub fn broadcast_swap_message_every<T: 'static + Serialize + Clone + Send>(
pub fn broadcast_swap_msg_every<T: 'static + Serialize + Clone + Send>(
ctx: MmArc,
topic: String,
msg: T,
interval: f64,
interval_sec: f64,
p2p_privkey: Option<KeyPair>,
) -> AbortOnDropHandle {
let fut = async move {
loop {
broadcast_swap_message(&ctx, topic.clone(), msg.clone(), &p2p_privkey);
Timer::sleep(interval).await;
Timer::sleep(interval_sec).await;
}
};
spawn_abortable(fut)
}

/// Spawns the loop that broadcasts message every `interval` seconds returning the AbortOnDropHandle
/// to stop it. This function waits for interval seconds first before starting the broadcast.
pub fn broadcast_swap_msg_every_delayed<T: 'static + Serialize + Clone + Send>(
ctx: MmArc,
topic: String,
msg: T,
interval_sec: f64,
p2p_privkey: Option<KeyPair>,
) -> AbortOnDropHandle {
let fut = async move {
loop {
Timer::sleep(interval_sec).await;
broadcast_swap_message(&ctx, topic.clone(), msg.clone(), &p2p_privkey);
rozhkovdmitrii marked this conversation as resolved.
Show resolved Hide resolved
}
};
spawn_abortable(fut)
Expand Down Expand Up @@ -370,7 +388,7 @@ pub fn wait_for_maker_payment_conf_until(swap_started_at: u64, locktime: u64) ->
const _SWAP_DEFAULT_NUM_CONFIRMS: u32 = 1;
const _SWAP_DEFAULT_MAX_CONFIRMS: u32 = 6;
/// MM2 checks that swap payment is confirmed every WAIT_CONFIRM_INTERVAL seconds
const WAIT_CONFIRM_INTERVAL: u64 = 15;
const WAIT_CONFIRM_INTERVAL_SEC: u64 = 15;

#[derive(Debug, PartialEq, Serialize)]
pub enum RecoveredSwapAction {
Expand Down Expand Up @@ -437,7 +455,9 @@ impl SwapsContext {
running_swaps: Mutex::new(vec![]),
banned_pubkeys: Mutex::new(HashMap::new()),
swap_msgs: Mutex::new(HashMap::new()),
taker_swap_watchers: PaMutex::new(DuplicateCache::new(Duration::from_secs(TAKER_SWAP_ENTRY_TIMEOUT))),
taker_swap_watchers: PaMutex::new(DuplicateCache::new(Duration::from_secs(
TAKER_SWAP_ENTRY_TIMEOUT_SEC,
))),
#[cfg(target_arch = "wasm32")]
swap_db: ConstructibleDb::new(ctx),
})
Expand Down
36 changes: 21 additions & 15 deletions mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use super::check_balance::{check_base_coin_balance_for_swap, check_my_coin_balan
use super::pubkey_banning::ban_pubkey_on_failed_swap;
use super::swap_lock::{SwapLock, SwapLockOps};
use super::trade_preimage::{TradePreimageRequest, TradePreimageRpcError, TradePreimageRpcResult};
use super::{broadcast_my_swap_status, broadcast_p2p_tx_msg, broadcast_swap_message_every,
use super::{broadcast_my_swap_status, broadcast_p2p_tx_msg, broadcast_swap_msg_every,
check_other_coin_balance_for_swap, detect_secret_hash_algo, dex_fee_amount_from_taker_coin,
get_locked_amount, recv_swap_msg, swap_topic, taker_payment_spend_deadline, tx_helper_topic,
wait_for_maker_payment_conf_until, AtomicSwap, LockedAmount, MySwapInfo, NegotiationDataMsg,
NegotiationDataV2, NegotiationDataV3, RecoveredSwap, RecoveredSwapAction, SavedSwap, SavedSwapIo,
SavedTradeFee, SecretHashAlgo, SwapConfirmationsSettings, SwapError, SwapMsg, SwapPubkeys, SwapTxDataMsg,
SwapsContext, TransactionIdentifier, WAIT_CONFIRM_INTERVAL};
SwapsContext, TransactionIdentifier, WAIT_CONFIRM_INTERVAL_SEC};
use crate::mm2::lp_dispatcher::{DispatcherContext, LpEvents};
use crate::mm2::lp_network::subscribe_to_topic;
use crate::mm2::lp_ordermatch::{MakerOrderBuilder, OrderConfirmationsSettings};
Expand Down Expand Up @@ -584,21 +584,21 @@ impl MakerSwap {
let negotiation_data = self.get_my_negotiation_data();

let maker_negotiation_data = SwapMsg::Negotiation(negotiation_data);
const NEGOTIATION_TIMEOUT: u64 = 90;
const NEGOTIATION_TIMEOUT_SEC: u64 = 90;

debug!("Sending maker negotiation data {:?}", maker_negotiation_data);
let send_abort_handle = broadcast_swap_message_every(
let send_abort_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
maker_negotiation_data,
NEGOTIATION_TIMEOUT as f64 / 6.,
NEGOTIATION_TIMEOUT_SEC as f64 / 6.,
self.p2p_privkey,
);
let recv_fut = recv_swap_msg(
self.ctx.clone(),
|store| store.negotiation_reply.take(),
&self.uuid,
NEGOTIATION_TIMEOUT,
NEGOTIATION_TIMEOUT_SEC,
);
let taker_data = match recv_fut.await {
Ok(d) => d,
Expand Down Expand Up @@ -688,21 +688,21 @@ impl MakerSwap {
}

async fn wait_taker_fee(&self) -> Result<(Option<MakerSwapCommand>, Vec<MakerSwapEvent>), String> {
const TAKER_FEE_RECV_TIMEOUT: u64 = 600;
const TAKER_FEE_RECV_TIMEOUT_SEC: u64 = 600;
let negotiated = SwapMsg::Negotiated(true);
let send_abort_handle = broadcast_swap_message_every(
let send_abort_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
negotiated,
TAKER_FEE_RECV_TIMEOUT as f64 / 6.,
TAKER_FEE_RECV_TIMEOUT_SEC as f64 / 6.,
self.p2p_privkey,
);

let recv_fut = recv_swap_msg(
self.ctx.clone(),
|store| store.taker_fee.take(),
&self.uuid,
TAKER_FEE_RECV_TIMEOUT,
TAKER_FEE_RECV_TIMEOUT_SEC,
);
let payload = match recv_fut.await {
Ok(d) => d,
Expand Down Expand Up @@ -888,6 +888,7 @@ impl MakerSwap {
}

async fn wait_for_taker_payment(&self) -> Result<(Option<MakerSwapCommand>, Vec<MakerSwapEvent>), String> {
const PAYMENT_MSG_INTERVAL_SEC: f64 = 600.;
let payment_data_msg = match self.get_my_payment_data().await {
Ok(data) => data,
Err(e) => {
Expand All @@ -900,8 +901,13 @@ impl MakerSwap {
},
};
let msg = SwapMsg::MakerPayment(payment_data_msg);
let abort_send_handle =
broadcast_swap_message_every(self.ctx.clone(), swap_topic(&self.uuid), msg, 600., self.p2p_privkey);
let abort_send_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
msg,
PAYMENT_MSG_INTERVAL_SEC,
self.p2p_privkey,
);

let maker_payment_wait_confirm =
wait_for_maker_payment_conf_until(self.r().data.started_at, self.r().data.lock_duration);
Expand All @@ -910,7 +916,7 @@ impl MakerSwap {
confirmations: self.r().data.maker_payment_confirmations,
requires_nota: self.r().data.maker_payment_requires_nota.unwrap_or(false),
wait_until: maker_payment_wait_confirm,
check_every: WAIT_CONFIRM_INTERVAL,
check_every: WAIT_CONFIRM_INTERVAL_SEC,
};

let f = self.maker_coin.wait_for_confirmations(confirm_maker_payment_input);
Expand Down Expand Up @@ -984,7 +990,7 @@ impl MakerSwap {
confirmations,
requires_nota: self.r().data.taker_payment_requires_nota.unwrap_or(false),
wait_until: wait_taker_payment,
check_every: WAIT_CONFIRM_INTERVAL,
check_every: WAIT_CONFIRM_INTERVAL_SEC,
};
let wait_f = self
.taker_coin
Expand Down Expand Up @@ -1140,7 +1146,7 @@ impl MakerSwap {
confirmations,
requires_nota,
wait_until: self.wait_refund_until(),
check_every: WAIT_CONFIRM_INTERVAL,
check_every: WAIT_CONFIRM_INTERVAL_SEC,
};
let wait_fut = self.taker_coin.wait_for_confirmations(confirm_taker_payment_input);
if let Err(err) = wait_fut.compat().await {
Expand Down
6 changes: 3 additions & 3 deletions mm2src/mm2_main/src/lp_swap/swap_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{broadcast_p2p_tx_msg, get_payment_locktime, lp_coinfind, taker_payment_spend_deadline, tx_helper_topic,
H256Json, SwapsContext, WAIT_CONFIRM_INTERVAL};
H256Json, SwapsContext, WAIT_CONFIRM_INTERVAL_SEC};
use crate::mm2::lp_network::{P2PRequestError, P2PRequestResult};
use crate::mm2::MmError;
use async_trait::async_trait;
Expand All @@ -22,7 +22,7 @@ use uuid::Uuid;

pub const WATCHER_PREFIX: TopicPrefix = "swpwtchr";
const TAKER_SWAP_CONFIRMATIONS: u64 = 1;
pub const TAKER_SWAP_ENTRY_TIMEOUT: u64 = 21600;
pub const TAKER_SWAP_ENTRY_TIMEOUT_SEC: u64 = 21600;

pub const MAKER_PAYMENT_SPEND_SENT_LOG: &str = "Maker payment spend sent";
pub const MAKER_PAYMENT_SPEND_FOUND_LOG: &str = "Maker payment spend found by watcher";
Expand Down Expand Up @@ -224,7 +224,7 @@ impl State for ValidateTakerPayment {
confirmations,
requires_nota: watcher_ctx.data.taker_payment_requires_nota.unwrap_or(false),
wait_until: taker_payment_spend_deadline,
check_every: WAIT_CONFIRM_INTERVAL,
check_every: WAIT_CONFIRM_INTERVAL_SEC,
};

let wait_fut = watcher_ctx
Expand Down
56 changes: 30 additions & 26 deletions mm2src/mm2_main/src/lp_swap/taker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ use super::pubkey_banning::ban_pubkey_on_failed_swap;
use super::swap_lock::{SwapLock, SwapLockOps};
use super::swap_watcher::{watcher_topic, SwapWatcherMsg};
use super::trade_preimage::{TradePreimageRequest, TradePreimageRpcError, TradePreimageRpcResult};
use super::{broadcast_my_swap_status, broadcast_swap_message, broadcast_swap_message_every,
use super::{broadcast_my_swap_status, broadcast_swap_message, broadcast_swap_msg_every,
check_other_coin_balance_for_swap, dex_fee_amount_from_taker_coin, dex_fee_rate, dex_fee_threshold,
get_locked_amount, recv_swap_msg, swap_topic, wait_for_maker_payment_conf_until, AtomicSwap, LockedAmount,
MySwapInfo, NegotiationDataMsg, NegotiationDataV2, NegotiationDataV3, RecoveredSwap, RecoveredSwapAction,
SavedSwap, SavedSwapIo, SavedTradeFee, SwapConfirmationsSettings, SwapError, SwapMsg, SwapPubkeys,
SwapTxDataMsg, SwapsContext, TransactionIdentifier, WAIT_CONFIRM_INTERVAL};
SwapTxDataMsg, SwapsContext, TransactionIdentifier, WAIT_CONFIRM_INTERVAL_SEC};
use crate::mm2::lp_network::subscribe_to_topic;
use crate::mm2::lp_ordermatch::{MatchBy, OrderConfirmationsSettings, TakerAction, TakerOrderBuilder};
use crate::mm2::lp_swap::{broadcast_p2p_tx_msg, tx_helper_topic, wait_for_maker_payment_conf_duration,
TakerSwapWatcherData};
use crate::mm2::lp_swap::{broadcast_p2p_tx_msg, broadcast_swap_msg_every_delayed, tx_helper_topic,
wait_for_maker_payment_conf_duration, TakerSwapWatcherData};
use coins::lp_price::fetch_swap_coins_price;
use coins::{lp_coinfind, CanRefundHtlc, CheckIfMyPaymentSentArgs, ConfirmPaymentInput, FeeApproxStage,
FoundSwapTxSpend, MmCoinEnum, PaymentInstructionArgs, PaymentInstructions, PaymentInstructionsErr,
Expand Down Expand Up @@ -113,14 +113,18 @@ async fn save_my_taker_swap_event(ctx: &MmArc, swap: &TakerSwap, event: TakerSav
gui: ctx.gui().map(|g| g.to_owned()),
mm_version: Some(ctx.mm_version.to_owned()),
events: vec![],
success_events: match ctx.use_watchers() {
true => TAKER_USING_WATCHERS_SUCCESS_EVENTS
success_events: if ctx.use_watchers()
&& swap.taker_coin.is_supported_by_watchers()
&& swap.maker_coin.is_supported_by_watchers()
{
TAKER_USING_WATCHERS_SUCCESS_EVENTS
.iter()
.map(|event| event.to_string())
.collect(),
false => TAKER_SUCCESS_EVENTS.iter().map(|event| event.to_string()).collect(),
.map(<&str>::to_string)
.collect()
} else {
TAKER_SUCCESS_EVENTS.iter().map(<&str>::to_string).collect()
},
error_events: TAKER_ERROR_EVENTS.iter().map(|event| event.to_string()).collect(),
error_events: TAKER_ERROR_EVENTS.iter().map(<&str>::to_string).collect(),
}),
Err(e) => return ERR!("{}", e),
};
Expand Down Expand Up @@ -1076,13 +1080,13 @@ impl TakerSwap {
}

async fn negotiate(&self) -> Result<(Option<TakerSwapCommand>, Vec<TakerSwapEvent>), String> {
const NEGOTIATE_TIMEOUT: u64 = 90;
const NEGOTIATE_TIMEOUT_SEC: u64 = 90;

let recv_fut = recv_swap_msg(
self.ctx.clone(),
|store| store.negotiation.take(),
&self.uuid,
NEGOTIATE_TIMEOUT,
NEGOTIATE_TIMEOUT_SEC,
);
let maker_data = match recv_fut.await {
Ok(d) => d,
Expand Down Expand Up @@ -1182,18 +1186,18 @@ impl TakerSwap {

let taker_data = SwapMsg::NegotiationReply(my_negotiation_data);
debug!("Sending taker negotiation data {:?}", taker_data);
let send_abort_handle = broadcast_swap_message_every(
let send_abort_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
taker_data,
NEGOTIATE_TIMEOUT as f64 / 6.,
NEGOTIATE_TIMEOUT_SEC as f64 / 6.,
self.p2p_privkey,
);
let recv_fut = recv_swap_msg(
self.ctx.clone(),
|store| store.negotiated.take(),
&self.uuid,
NEGOTIATE_TIMEOUT,
NEGOTIATE_TIMEOUT_SEC,
);
let negotiated = match recv_fut.await {
Ok(d) => d,
Expand Down Expand Up @@ -1264,7 +1268,7 @@ impl TakerSwap {
}

async fn wait_for_maker_payment(&self) -> Result<(Option<TakerSwapCommand>, Vec<TakerSwapEvent>), String> {
const MAKER_PAYMENT_WAIT_TIMEOUT: u64 = 600;
const MAKER_PAYMENT_WAIT_TIMEOUT_SEC: u64 = 600;

let payment_data_msg = match self.get_taker_fee_data().await {
Ok(data) => data,
Expand All @@ -1276,19 +1280,19 @@ impl TakerSwap {
};

let msg = SwapMsg::TakerFee(payment_data_msg);
let abort_send_handle = broadcast_swap_message_every(
let abort_send_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
msg,
MAKER_PAYMENT_WAIT_TIMEOUT as f64 / 6.,
MAKER_PAYMENT_WAIT_TIMEOUT_SEC as f64 / 6.,
self.p2p_privkey,
);

let recv_fut = recv_swap_msg(
self.ctx.clone(),
|store| store.maker_payment.take(),
&self.uuid,
MAKER_PAYMENT_WAIT_TIMEOUT,
MAKER_PAYMENT_WAIT_TIMEOUT_SEC,
);
let payload = match recv_fut.await {
Ok(p) => p,
Expand Down Expand Up @@ -1356,7 +1360,7 @@ impl TakerSwap {
confirmations,
requires_nota: self.r().data.maker_payment_requires_nota.unwrap_or(false),
wait_until: self.r().data.maker_payment_wait,
check_every: WAIT_CONFIRM_INTERVAL,
check_every: WAIT_CONFIRM_INTERVAL_SEC,
};

let f = self.maker_coin.wait_for_confirmations(confirm_maker_payment_input);
Expand Down Expand Up @@ -1606,7 +1610,7 @@ impl TakerSwap {
}

async fn wait_for_taker_payment_spend(&self) -> Result<(Option<TakerSwapCommand>, Vec<TakerSwapEvent>), String> {
const BROADCAST_SWAP_MESSAGE_INTERVAL: f64 = 600.;
const BROADCAST_MSG_INTERVAL_SEC: f64 = 600.;

let tx_hex = self.r().taker_payment.as_ref().unwrap().tx_hex.0.clone();
let mut watcher_broadcast_abort_handle = None;
Expand All @@ -1627,23 +1631,23 @@ impl TakerSwap {
);
let swpmsg_watcher = SwapWatcherMsg::TakerSwapWatcherMsg(watcher_data);
let htlc_keypair = self.taker_coin.derive_htlc_key_pair(&self.unique_swap_data());
watcher_broadcast_abort_handle = Some(broadcast_swap_message_every(
watcher_broadcast_abort_handle = Some(broadcast_swap_msg_every_delayed(
self.ctx.clone(),
watcher_topic(&self.r().data.taker_coin),
swpmsg_watcher,
BROADCAST_SWAP_MESSAGE_INTERVAL,
BROADCAST_MSG_INTERVAL_SEC,
Some(htlc_keypair),
));
}
}

// Todo: taker_payment should be a message on lightning network not a swap message
let msg = SwapMsg::TakerPayment(tx_hex);
let send_abort_handle = broadcast_swap_message_every(
let send_abort_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
msg,
BROADCAST_SWAP_MESSAGE_INTERVAL,
BROADCAST_MSG_INTERVAL_SEC,
self.p2p_privkey,
);

Expand All @@ -1652,7 +1656,7 @@ impl TakerSwap {
confirmations: self.r().data.taker_payment_confirmations,
requires_nota: self.r().data.taker_payment_requires_nota.unwrap_or(false),
wait_until: self.r().data.taker_payment_lock,
check_every: WAIT_CONFIRM_INTERVAL,
check_every: WAIT_CONFIRM_INTERVAL_SEC,
};
let wait_f = self
.taker_coin
Expand Down
Loading