Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(watchtower): activate utxo watchers #1859

Merged
merged 13 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl MmCtx {

pub fn is_watcher(&self) -> bool { self.conf["is_watcher"].as_bool().unwrap_or_default() }

pub fn use_watchers(&self) -> bool { self.conf["use_watchers"].as_bool().unwrap_or_default() }
pub fn use_watchers(&self) -> bool { self.conf["use_watchers"].as_bool().unwrap_or(true) }
rozhkovdmitrii marked this conversation as resolved.
Show resolved Hide resolved

pub fn netid(&self) -> u16 {
let netid = self.conf["netid"].as_u64().unwrap_or(0);
Expand Down
24 changes: 21 additions & 3 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,35 @@ pub fn p2p_private_and_peer_id_to_broadcast(ctx: &MmArc, p2p_privkey: Option<&Ke

/// Spawns the loop that broadcasts message every `interval` seconds returning the AbortOnDropHandle
/// to stop it
pub fn broadcast_swap_message_every<T: 'static + Serialize + Clone + Send>(
pub fn broadcast_swap_msg_every<T: 'static + Serialize + Clone + Send>(
ctx: MmArc,
topic: String,
msg: T,
interval: f64,
interval_sec: f64,
p2p_privkey: Option<KeyPair>,
) -> AbortOnDropHandle {
let fut = async move {
loop {
broadcast_swap_message(&ctx, topic.clone(), msg.clone(), &p2p_privkey);
Timer::sleep(interval).await;
Timer::sleep(interval_sec).await;
}
};
spawn_abortable(fut)
}

/// Spawns the loop that broadcasts message every `interval` seconds returning the AbortOnDropHandle
/// to stop it. This function waits for interval seconds first before starting the broadcast.
pub fn broadcast_swap_msg_every_delayed<T: 'static + Serialize + Clone + Send>(
ctx: MmArc,
topic: String,
msg: T,
interval_sec: f64,
p2p_privkey: Option<KeyPair>,
) -> AbortOnDropHandle {
let fut = async move {
loop {
Timer::sleep(interval_sec).await;
broadcast_swap_message(&ctx, topic.clone(), msg.clone(), &p2p_privkey);
rozhkovdmitrii marked this conversation as resolved.
Show resolved Hide resolved
}
};
spawn_abortable(fut)
Expand Down
16 changes: 11 additions & 5 deletions mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::check_balance::{check_base_coin_balance_for_swap, check_my_coin_balan
use super::pubkey_banning::ban_pubkey_on_failed_swap;
use super::swap_lock::{SwapLock, SwapLockOps};
use super::trade_preimage::{TradePreimageRequest, TradePreimageRpcError, TradePreimageRpcResult};
use super::{broadcast_my_swap_status, broadcast_p2p_tx_msg, broadcast_swap_message_every,
use super::{broadcast_my_swap_status, broadcast_p2p_tx_msg, broadcast_swap_msg_every,
check_other_coin_balance_for_swap, detect_secret_hash_algo, dex_fee_amount_from_taker_coin,
get_locked_amount, recv_swap_msg, swap_topic, taker_payment_spend_deadline, tx_helper_topic,
wait_for_maker_payment_conf_until, AtomicSwap, LockedAmount, MySwapInfo, NegotiationDataMsg,
Expand Down Expand Up @@ -587,7 +587,7 @@ impl MakerSwap {
const NEGOTIATION_TIMEOUT: u64 = 90;

debug!("Sending maker negotiation data {:?}", maker_negotiation_data);
let send_abort_handle = broadcast_swap_message_every(
let send_abort_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
maker_negotiation_data,
Expand Down Expand Up @@ -690,7 +690,7 @@ impl MakerSwap {
async fn wait_taker_fee(&self) -> Result<(Option<MakerSwapCommand>, Vec<MakerSwapEvent>), String> {
const TAKER_FEE_RECV_TIMEOUT: u64 = 600;
let negotiated = SwapMsg::Negotiated(true);
let send_abort_handle = broadcast_swap_message_every(
let send_abort_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
negotiated,
Expand Down Expand Up @@ -888,6 +888,7 @@ impl MakerSwap {
}

async fn wait_for_taker_payment(&self) -> Result<(Option<MakerSwapCommand>, Vec<MakerSwapEvent>), String> {
const PAYMENT_MSG_INTERVAL_SEC: f64 = 600.;
let payment_data_msg = match self.get_my_payment_data().await {
Ok(data) => data,
Err(e) => {
Expand All @@ -900,8 +901,13 @@ impl MakerSwap {
},
};
let msg = SwapMsg::MakerPayment(payment_data_msg);
let abort_send_handle =
broadcast_swap_message_every(self.ctx.clone(), swap_topic(&self.uuid), msg, 600., self.p2p_privkey);
let abort_send_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
msg,
PAYMENT_MSG_INTERVAL_SEC,
self.p2p_privkey,
);

let maker_payment_wait_confirm =
wait_for_maker_payment_conf_until(self.r().data.started_at, self.r().data.lock_duration);
Expand Down
30 changes: 17 additions & 13 deletions mm2src/mm2_main/src/lp_swap/taker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ use super::pubkey_banning::ban_pubkey_on_failed_swap;
use super::swap_lock::{SwapLock, SwapLockOps};
use super::swap_watcher::{watcher_topic, SwapWatcherMsg};
use super::trade_preimage::{TradePreimageRequest, TradePreimageRpcError, TradePreimageRpcResult};
use super::{broadcast_my_swap_status, broadcast_swap_message, broadcast_swap_message_every,
use super::{broadcast_my_swap_status, broadcast_swap_message, broadcast_swap_msg_every,
check_other_coin_balance_for_swap, dex_fee_amount_from_taker_coin, dex_fee_rate, dex_fee_threshold,
get_locked_amount, recv_swap_msg, swap_topic, wait_for_maker_payment_conf_until, AtomicSwap, LockedAmount,
MySwapInfo, NegotiationDataMsg, NegotiationDataV2, NegotiationDataV3, RecoveredSwap, RecoveredSwapAction,
SavedSwap, SavedSwapIo, SavedTradeFee, SwapConfirmationsSettings, SwapError, SwapMsg, SwapPubkeys,
SwapTxDataMsg, SwapsContext, TransactionIdentifier, WAIT_CONFIRM_INTERVAL};
use crate::mm2::lp_network::subscribe_to_topic;
use crate::mm2::lp_ordermatch::{MatchBy, OrderConfirmationsSettings, TakerAction, TakerOrderBuilder};
use crate::mm2::lp_swap::{broadcast_p2p_tx_msg, tx_helper_topic, wait_for_maker_payment_conf_duration,
TakerSwapWatcherData};
use crate::mm2::lp_swap::{broadcast_p2p_tx_msg, broadcast_swap_msg_every_delayed, tx_helper_topic,
wait_for_maker_payment_conf_duration, TakerSwapWatcherData};
use coins::lp_price::fetch_swap_coins_price;
use coins::{lp_coinfind, CanRefundHtlc, CheckIfMyPaymentSentArgs, ConfirmPaymentInput, FeeApproxStage,
FoundSwapTxSpend, MmCoinEnum, PaymentInstructionArgs, PaymentInstructions, PaymentInstructionsErr,
Expand Down Expand Up @@ -113,14 +113,18 @@ async fn save_my_taker_swap_event(ctx: &MmArc, swap: &TakerSwap, event: TakerSav
gui: ctx.gui().map(|g| g.to_owned()),
mm_version: Some(ctx.mm_version.to_owned()),
events: vec![],
success_events: match ctx.use_watchers() {
true => TAKER_USING_WATCHERS_SUCCESS_EVENTS
success_events: if ctx.use_watchers()
&& swap.taker_coin.is_supported_by_watchers()
&& swap.maker_coin.is_supported_by_watchers()
{
TAKER_USING_WATCHERS_SUCCESS_EVENTS
.iter()
.map(|event| event.to_string())
.collect(),
false => TAKER_SUCCESS_EVENTS.iter().map(|event| event.to_string()).collect(),
.map(<&str>::to_string)
.collect()
} else {
TAKER_SUCCESS_EVENTS.iter().map(<&str>::to_string).collect()
},
error_events: TAKER_ERROR_EVENTS.iter().map(|event| event.to_string()).collect(),
error_events: TAKER_ERROR_EVENTS.iter().map(<&str>::to_string).collect(),
}),
Err(e) => return ERR!("{}", e),
};
Expand Down Expand Up @@ -1182,7 +1186,7 @@ impl TakerSwap {

let taker_data = SwapMsg::NegotiationReply(my_negotiation_data);
debug!("Sending taker negotiation data {:?}", taker_data);
let send_abort_handle = broadcast_swap_message_every(
let send_abort_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
taker_data,
Expand Down Expand Up @@ -1276,7 +1280,7 @@ impl TakerSwap {
};

let msg = SwapMsg::TakerFee(payment_data_msg);
let abort_send_handle = broadcast_swap_message_every(
let abort_send_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
msg,
Expand Down Expand Up @@ -1627,7 +1631,7 @@ impl TakerSwap {
);
let swpmsg_watcher = SwapWatcherMsg::TakerSwapWatcherMsg(watcher_data);
let htlc_keypair = self.taker_coin.derive_htlc_key_pair(&self.unique_swap_data());
watcher_broadcast_abort_handle = Some(broadcast_swap_message_every(
watcher_broadcast_abort_handle = Some(broadcast_swap_msg_every_delayed(
self.ctx.clone(),
watcher_topic(&self.r().data.taker_coin),
swpmsg_watcher,
Expand All @@ -1639,7 +1643,7 @@ impl TakerSwap {

// Todo: taker_payment should be a message on lightning network not a swap message
let msg = SwapMsg::TakerPayment(tx_hex);
let send_abort_handle = broadcast_swap_message_every(
let send_abort_handle = broadcast_swap_msg_every(
self.ctx.clone(),
swap_topic(&self.uuid),
msg,
Expand Down
8 changes: 4 additions & 4 deletions mm2src/mm2_main/tests/docker_tests/swap_watcher_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn start_swaps_and_get_balances(
String::from("spice describe gravity federal thank unfair blast come canal monkey style afraid")
};

let alice_conf = Mm2TestConf::seednode_using_watchers(&alice_passphrase, &coins);
let alice_conf = Mm2TestConf::seednode(&alice_passphrase, &coins);
let mut mm_alice = block_on(MarketMakerIt::start_with_envs(
alice_conf.conf.clone(),
alice_conf.rpc_password.clone(),
Expand All @@ -112,7 +112,7 @@ fn start_swaps_and_get_balances(
String::from("also shoot benefit prefer juice shell elder veteran woman mimic image kidney")
};

let bob_conf = Mm2TestConf::light_node_using_watchers(&bob_passphrase, &coins, &[&mm_alice.ip.to_string()]);
let bob_conf = Mm2TestConf::light_node(&bob_passphrase, &coins, &[&mm_alice.ip.to_string()]);
let mut mm_bob = block_on(MarketMakerIt::start_with_envs(
bob_conf.conf.clone(),
bob_conf.rpc_password,
Expand Down Expand Up @@ -574,13 +574,13 @@ fn test_two_watchers_spend_maker_payment_eth_erc20() {

let alice_passphrase =
String::from("spice describe gravity federal blast come thank unfair canal monkey style afraid");
let alice_conf = Mm2TestConf::seednode_using_watchers(&alice_passphrase, &coins);
let alice_conf = Mm2TestConf::seednode(&alice_passphrase, &coins);
let mut mm_alice = MarketMakerIt::start(alice_conf.conf.clone(), alice_conf.rpc_password.clone(), None).unwrap();
let (_alice_dump_log, _alice_dump_dashboard) = mm_alice.mm_dump();
log!("Alice log path: {}", mm_alice.log_path.display());

let bob_passphrase = String::from("also shoot benefit prefer juice shell elder veteran woman mimic image kidney");
let bob_conf = Mm2TestConf::light_node_using_watchers(&bob_passphrase, &coins, &[&mm_alice.ip.to_string()]);
let bob_conf = Mm2TestConf::light_node(&bob_passphrase, &coins, &[&mm_alice.ip.to_string()]);
let mut mm_bob = MarketMakerIt::start(bob_conf.conf, bob_conf.rpc_password, None).unwrap();
let (_bob_dump_log, _bob_dump_dashboard) = mm_bob.mm_dump();
log!("Bob log path: {}", mm_bob.log_path.display());
Expand Down
30 changes: 0 additions & 30 deletions mm2src/mm2_test_helpers/src/for_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,6 @@ impl Mm2TestConf {
}
}

pub fn seednode_using_watchers(passphrase: &str, coins: &Json) -> Self {
Mm2TestConf {
conf: json!({
"gui": "nogui",
"netid": 9998,
"passphrase": passphrase,
"coins": coins,
"rpc_password": DEFAULT_RPC_PASSWORD,
"i_am_seed": true,
"use_watchers": true,
}),
rpc_password: DEFAULT_RPC_PASSWORD.into(),
}
}

pub fn seednode_with_hd_account(passphrase: &str, hd_account_id: u32, coins: &Json) -> Self {
Mm2TestConf {
conf: json!({
Expand Down Expand Up @@ -219,21 +204,6 @@ impl Mm2TestConf {
}
}

pub fn light_node_using_watchers(passphrase: &str, coins: &Json, seednodes: &[&str]) -> Self {
Mm2TestConf {
conf: json!({
"gui": "nogui",
"netid": 9998,
"passphrase": passphrase,
"coins": coins,
"rpc_password": DEFAULT_RPC_PASSWORD,
"seednodes": seednodes,
"use_watchers": true
}),
rpc_password: DEFAULT_RPC_PASSWORD.into(),
}
}

pub fn watcher_light_node(passphrase: &str, coins: &Json, seednodes: &[&str], conf: WatcherConf) -> Self {
Mm2TestConf {
conf: json!({
Expand Down