diff --git a/common/src/main/java/haveno/common/ThreadUtils.java b/common/src/main/java/haveno/common/ThreadUtils.java index c2b9aad932f..d366b869b09 100644 --- a/common/src/main/java/haveno/common/ThreadUtils.java +++ b/common/src/main/java/haveno/common/ThreadUtils.java @@ -48,14 +48,25 @@ public static void execute(Runnable command, String threadId) { } } + /** + * Awaits execution of the given command, but does not throw its exception. + * + * @param command the command to execute + * @param threadId the thread id + */ public static void await(Runnable command, String threadId) { if (isCurrentThread(Thread.currentThread(), threadId)) { command.run(); } else { CountDownLatch latch = new CountDownLatch(1); execute(() -> { - command.run(); - latch.countDown(); + try { + command.run(); + } catch (Exception e) { + throw e; + } finally { + latch.countDown(); + } }, threadId); try { latch.await(); diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index a5aceeca42d..93dc0dbd474 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -590,141 +590,140 @@ public void onAckMessage(AckMessage ackMessage, NodeAddress sender) { /////////////////////////////////////////////////////////////////////////////////////////// public void initialize(ProcessModelServiceProvider serviceProvider) { - ThreadUtils.await(() -> { - if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized"); + if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized"); - // check if done - if (isPayoutUnlocked()) { - clearAndShutDown(); - return; - } + // done if payout unlocked + if (isPayoutUnlocked()) { + clearAndShutDown(); + return; + } - // set arbitrator pub key ring once known - serviceProvider.getArbitratorManager().getDisputeAgentByNodeAddress(getArbitratorNodeAddress()).ifPresent(arbitrator -> { - getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing()); - }); + // set arbitrator pub key ring once known + serviceProvider.getArbitratorManager().getDisputeAgentByNodeAddress(getArbitratorNodeAddress()).ifPresent(arbitrator -> { + getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing()); + }); - // handle connection change on dedicated thread - xmrConnectionService.addConnectionListener(connection -> { - ThreadUtils.submitToPool(() -> { // TODO: remove this? - ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId()); - }); + // handle connection change on dedicated thread + xmrConnectionService.addConnectionListener(connection -> { + ThreadUtils.submitToPool(() -> { // TODO: remove this? + ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId()); }); + }); - // reset buyer's payment sent state if no ack receive - if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) { - log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); - setState(Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); - } - - // reset seller's payment received state if no ack receive - if (this instanceof SellerTrade && getState().ordinal() >= Trade.State.SELLER_CONFIRMED_PAYMENT_RECEIPT.ordinal() && getState().ordinal() < Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG.ordinal()) { - log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); - setState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); - } + // reset buyer's payment sent state if no ack receive + if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) { + log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); + setState(Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); + } - // handle trade state events - tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> { - if (isShutDownStarted) return; - ThreadUtils.execute(() -> { - if (newValue == Trade.State.MULTISIG_COMPLETED) { - updateWalletRefreshPeriod(); - startPolling(); - } - }, getId()); - }); + // reset seller's payment received state if no ack receive + if (this instanceof SellerTrade && getState().ordinal() >= Trade.State.SELLER_CONFIRMED_PAYMENT_RECEIPT.ordinal() && getState().ordinal() < Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG.ordinal()) { + log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); + setState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); + } - // handle trade phase events - tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> { - if (isShutDownStarted) return; - ThreadUtils.execute(() -> { - if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod(); - if (isPaymentReceived()) { - UserThread.execute(() -> { - if (tradePhaseSubscription != null) { - tradePhaseSubscription.unsubscribe(); - tradePhaseSubscription = null; - } - }); - } - }, getId()); - }); + // handle trade state events + tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> { + if (!isInitialized || isShutDownStarted) return; + ThreadUtils.execute(() -> { + if (newValue == Trade.State.MULTISIG_COMPLETED) { + updateWalletRefreshPeriod(); + startPolling(); + } + }, getId()); + }); - // handle payout events - payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> { - if (isShutDownStarted) return; - ThreadUtils.execute(() -> { - if (isPayoutPublished()) updateWalletRefreshPeriod(); - - // handle when payout published - if (newValue == Trade.PayoutState.PAYOUT_PUBLISHED) { - log.info("Payout published for {} {}", getClass().getSimpleName(), getId()); - - // sync main wallet to update pending balance - new Thread(() -> { - GenUtils.waitFor(1000); - if (isShutDownStarted) return; - if (Boolean.TRUE.equals(xmrConnectionService.isConnected())) xmrWalletService.syncWallet(xmrWalletService.getWallet()); - }).start(); - - // complete disputed trade - if (getDisputeState().isArbitrated() && !getDisputeState().isClosed()) { - processModel.getTradeManager().closeDisputedTrade(getId(), Trade.DisputeState.DISPUTE_CLOSED); - if (!isArbitrator()) for (Dispute dispute : getDisputes()) dispute.setIsClosed(); // auto close trader tickets + // handle trade phase events + tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> { + if (!isInitialized || isShutDownStarted) return; + ThreadUtils.execute(() -> { + if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod(); + if (isPaymentReceived()) { + UserThread.execute(() -> { + if (tradePhaseSubscription != null) { + tradePhaseSubscription.unsubscribe(); + tradePhaseSubscription = null; } + }); + } + }, getId()); + }); - // auto complete arbitrator trade - if (isArbitrator() && !isCompleted()) processModel.getTradeManager().onTradeCompleted(this); - - // reset address entries - processModel.getXmrWalletService().resetAddressEntriesForTrade(getId()); + // handle payout events + payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> { + if (!isInitialized || isShutDownStarted) return; + ThreadUtils.execute(() -> { + if (isPayoutPublished()) updateWalletRefreshPeriod(); + + // handle when payout published + if (newValue == Trade.PayoutState.PAYOUT_PUBLISHED) { + log.info("Payout published for {} {}", getClass().getSimpleName(), getId()); + + // sync main wallet to update pending balance + new Thread(() -> { + GenUtils.waitFor(1000); + if (isShutDownStarted) return; + if (Boolean.TRUE.equals(xmrConnectionService.isConnected())) xmrWalletService.syncWallet(xmrWalletService.getWallet()); + }).start(); + + // complete disputed trade + if (getDisputeState().isArbitrated() && !getDisputeState().isClosed()) { + processModel.getTradeManager().closeDisputedTrade(getId(), Trade.DisputeState.DISPUTE_CLOSED); + if (!isArbitrator()) for (Dispute dispute : getDisputes()) dispute.setIsClosed(); // auto close trader tickets } - // handle when payout unlocks - if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) { - if (!isInitialized) return; - log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId()); - clearAndShutDown(); - } - }, getId()); - }); + // auto complete arbitrator trade + if (isArbitrator() && !isCompleted()) processModel.getTradeManager().onTradeCompleted(this); - // arbitrator syncs idle wallet when payout unlock expected - if (this instanceof ArbitratorTrade) { - idlePayoutSyncer = new IdlePayoutSyncer(); - xmrWalletService.addWalletListener(idlePayoutSyncer); - } + // reset address entries + processModel.getXmrWalletService().resetAddressEntriesForTrade(getId()); + } - // TODO: buyer's payment sent message state property can become unsynced (after improper shut down?) - if (isBuyer()) { - MessageState expectedState = getPaymentSentMessageState(); - if (expectedState != null && expectedState != processModel.getPaymentSentMessageStateProperty().get()) { - log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStateProperty().get()); - processModel.getPaymentSentMessageStateProperty().set(expectedState); + // handle when payout unlocks + if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) { + if (!isInitialized) return; + log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId()); + clearAndShutDown(); } + }, getId()); + }); + + // arbitrator syncs idle wallet when payout unlock expected + if (this instanceof ArbitratorTrade) { + idlePayoutSyncer = new IdlePayoutSyncer(); + xmrWalletService.addWalletListener(idlePayoutSyncer); + } + + // TODO: buyer's payment sent message state property can become unsynced (after improper shut down?) + if (isBuyer()) { + MessageState expectedState = getPaymentSentMessageState(); + if (expectedState != null && expectedState != processModel.getPaymentSentMessageStateProperty().get()) { + log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStateProperty().get()); + processModel.getPaymentSentMessageStateProperty().set(expectedState); } + } - // trade is initialized - isInitialized = true; + // trade is initialized + isInitialized = true; - // done if payout unlocked or deposit not requested - if (!isDepositRequested() || isPayoutUnlocked()) return; + // done if deposit not requested or payout unlocked + if (!isDepositRequested() || isPayoutUnlocked()) return; - // done if wallet does not exist - if (!walletExists()) { - MoneroTx payoutTx = getPayoutTx(); - if (payoutTx != null && payoutTx.getNumConfirmations() >= 10) { - log.warn("Payout state for {} {} is {} but payout is unlocked, updating state", getClass().getSimpleName(), getId(), getPayoutState()); - setPayoutStateUnlocked(); - return; - } else { - throw new IllegalStateException("Missing trade wallet for " + getClass().getSimpleName() + " " + getId()); - } + // open wallet or done if wallet does not exist + if (walletExists()) getWallet(); + else { + MoneroTx payoutTx = getPayoutTx(); + if (payoutTx != null && payoutTx.getNumConfirmations() >= 10) { + log.warn("Payout state for {} {} is {} but payout is unlocked, updating state", getClass().getSimpleName(), getId(), getPayoutState()); + setPayoutStateUnlocked(); + return; + } else { + throw new IllegalStateException("Missing trade wallet for " + getClass().getSimpleName() + " " + getId()); } + } - // initialize syncing and polling - initSyncing(); - }, getId()); + // initialize syncing and polling + tryInitSyncing(); } public void requestPersistence() { @@ -1275,6 +1274,17 @@ public void shutDown() { shutDownThreads.add(() -> ThreadUtils.shutDown(getConnectionChangedThreadId())); ThreadUtils.awaitTasks(shutDownThreads); } + + // save wallet + if (wallet != null) { + try { + xmrWalletService.saveWallet(wallet, false); // skip backup + stopWallet(); + } catch (Exception e) { + // warning will be logged for main wallet, so skip logging here + //log.warn("Error closing monero-wallet-rpc subprocess for {} {}: {}. Was Haveno stopped manually with ctrl+c?", getClass().getSimpleName(), getId(), e.getMessage()); + } + } }; // shut down trade with timeout @@ -1299,15 +1309,6 @@ public void shutDown() { xmrWalletService.removeWalletListener(idlePayoutSyncer); idlePayoutSyncer = null; } - if (wallet != null) { - try { - xmrWalletService.saveWallet(wallet, false); // skip backup - stopWallet(); - } catch (Exception e) { - // warning will be logged for main wallet, so skip logging here - //log.warn("Error closing monero-wallet-rpc subprocess for {} {}: {}. Was Haveno stopped manually with ctrl+c?", getClass().getSimpleName(), getId(), e.getMessage()); - } - } UserThread.execute(() -> { if (tradeStateSubscription != null) tradeStateSubscription.unsubscribe(); if (tradePhaseSubscription != null) tradePhaseSubscription.unsubscribe(); @@ -1938,12 +1939,12 @@ private void onConnectionChanged(MoneroRpcConnection connection) { // sync and reprocess messages on new thread if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) { - ThreadUtils.execute(() -> initSyncing(), getId()); + ThreadUtils.execute(() -> tryInitSyncing(), getId()); } } } - private void initSyncing() { + private void tryInitSyncing() { if (isShutDownStarted) return; if (!isIdling()) { initSyncingAux(); diff --git a/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java b/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java index 9958cc2b77e..6a2d18d74a1 100644 --- a/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java @@ -1,5 +1,6 @@ package haveno.core.trade.protocol; +import haveno.common.ThreadUtils; import haveno.common.handlers.ErrorMessageHandler; import haveno.core.trade.ArbitratorTrade; import haveno.core.trade.Trade; @@ -43,7 +44,7 @@ public void onMailboxMessage(TradeMessage message, NodeAddress peer) { public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) { System.out.println("ArbitratorProtocol.handleInitTradeRequest()"); - new Thread(() -> { + ThreadUtils.execute(() -> { synchronized (trade) { latchTrade(); this.errorMessageHandler = errorMessageHandler; @@ -68,7 +69,7 @@ public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, E .executeTasks(true); awaitTradeLatch(); } - }).start(); + }, trade.getId()); } @Override @@ -78,7 +79,7 @@ public void handleSignContractResponse(SignContractResponse message, NodeAddress public void handleDepositRequest(DepositRequest request, NodeAddress sender) { System.out.println("ArbitratorProtocol.handleDepositRequest() " + trade.getId()); - new Thread(() -> { + ThreadUtils.execute(() -> { synchronized (trade) { latchTrade(); Validator.checkTradeId(processModel.getOfferId(), request); @@ -103,7 +104,7 @@ public void handleDepositRequest(DepositRequest request, NodeAddress sender) { .executeTasks(true); awaitTradeLatch(); } - }).start(); + }, trade.getId()); } @Override