Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more thread and shut down fixes #762

Merged
merged 4 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions common/src/main/java/haveno/common/ThreadUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,25 @@ public static void execute(Runnable command, String threadId) {
}
}

/**
* Awaits execution of the given command, but does not throw its exception.
*
* @param command the command to execute
* @param threadId the thread id
*/
public static void await(Runnable command, String threadId) {
if (isCurrentThread(Thread.currentThread(), threadId)) {
command.run();
} else {
CountDownLatch latch = new CountDownLatch(1);
execute(() -> {
command.run();
latch.countDown();
try {
command.run();
} catch (Exception e) {
throw e;
} finally {
latch.countDown();
}
}, threadId);
try {
latch.await();
Expand Down
251 changes: 126 additions & 125 deletions core/src/main/java/haveno/core/trade/Trade.java
Original file line number Diff line number Diff line change
Expand Up @@ -590,141 +590,140 @@ public void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
///////////////////////////////////////////////////////////////////////////////////////////

public void initialize(ProcessModelServiceProvider serviceProvider) {
ThreadUtils.await(() -> {
if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized");
if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized");

// check if done
if (isPayoutUnlocked()) {
clearAndShutDown();
return;
}
// done if payout unlocked
if (isPayoutUnlocked()) {
clearAndShutDown();
return;
}

// set arbitrator pub key ring once known
serviceProvider.getArbitratorManager().getDisputeAgentByNodeAddress(getArbitratorNodeAddress()).ifPresent(arbitrator -> {
getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing());
});
// set arbitrator pub key ring once known
serviceProvider.getArbitratorManager().getDisputeAgentByNodeAddress(getArbitratorNodeAddress()).ifPresent(arbitrator -> {
getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing());
});

// handle connection change on dedicated thread
xmrConnectionService.addConnectionListener(connection -> {
ThreadUtils.submitToPool(() -> { // TODO: remove this?
ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId());
});
// handle connection change on dedicated thread
xmrConnectionService.addConnectionListener(connection -> {
ThreadUtils.submitToPool(() -> { // TODO: remove this?
ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId());
});
});

// reset buyer's payment sent state if no ack receive
if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) {
log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN);
setState(Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN);
}

// reset seller's payment received state if no ack receive
if (this instanceof SellerTrade && getState().ordinal() >= Trade.State.SELLER_CONFIRMED_PAYMENT_RECEIPT.ordinal() && getState().ordinal() < Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG.ordinal()) {
log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
setState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
}
// reset buyer's payment sent state if no ack receive
if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) {
log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN);
setState(Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN);
}

// handle trade state events
tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> {
if (isShutDownStarted) return;
ThreadUtils.execute(() -> {
if (newValue == Trade.State.MULTISIG_COMPLETED) {
updateWalletRefreshPeriod();
startPolling();
}
}, getId());
});
// reset seller's payment received state if no ack receive
if (this instanceof SellerTrade && getState().ordinal() >= Trade.State.SELLER_CONFIRMED_PAYMENT_RECEIPT.ordinal() && getState().ordinal() < Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG.ordinal()) {
log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
setState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
}

// handle trade phase events
tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> {
if (isShutDownStarted) return;
ThreadUtils.execute(() -> {
if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod();
if (isPaymentReceived()) {
UserThread.execute(() -> {
if (tradePhaseSubscription != null) {
tradePhaseSubscription.unsubscribe();
tradePhaseSubscription = null;
}
});
}
}, getId());
});
// handle trade state events
tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> {
if (!isInitialized || isShutDownStarted) return;
ThreadUtils.execute(() -> {
if (newValue == Trade.State.MULTISIG_COMPLETED) {
updateWalletRefreshPeriod();
startPolling();
}
}, getId());
});

// handle payout events
payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> {
if (isShutDownStarted) return;
ThreadUtils.execute(() -> {
if (isPayoutPublished()) updateWalletRefreshPeriod();

// handle when payout published
if (newValue == Trade.PayoutState.PAYOUT_PUBLISHED) {
log.info("Payout published for {} {}", getClass().getSimpleName(), getId());

// sync main wallet to update pending balance
new Thread(() -> {
GenUtils.waitFor(1000);
if (isShutDownStarted) return;
if (Boolean.TRUE.equals(xmrConnectionService.isConnected())) xmrWalletService.syncWallet(xmrWalletService.getWallet());
}).start();

// complete disputed trade
if (getDisputeState().isArbitrated() && !getDisputeState().isClosed()) {
processModel.getTradeManager().closeDisputedTrade(getId(), Trade.DisputeState.DISPUTE_CLOSED);
if (!isArbitrator()) for (Dispute dispute : getDisputes()) dispute.setIsClosed(); // auto close trader tickets
// handle trade phase events
tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> {
if (!isInitialized || isShutDownStarted) return;
ThreadUtils.execute(() -> {
if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod();
if (isPaymentReceived()) {
UserThread.execute(() -> {
if (tradePhaseSubscription != null) {
tradePhaseSubscription.unsubscribe();
tradePhaseSubscription = null;
}
});
}
}, getId());
});

// auto complete arbitrator trade
if (isArbitrator() && !isCompleted()) processModel.getTradeManager().onTradeCompleted(this);

// reset address entries
processModel.getXmrWalletService().resetAddressEntriesForTrade(getId());
// handle payout events
payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> {
if (!isInitialized || isShutDownStarted) return;
ThreadUtils.execute(() -> {
if (isPayoutPublished()) updateWalletRefreshPeriod();

// handle when payout published
if (newValue == Trade.PayoutState.PAYOUT_PUBLISHED) {
log.info("Payout published for {} {}", getClass().getSimpleName(), getId());

// sync main wallet to update pending balance
new Thread(() -> {
GenUtils.waitFor(1000);
if (isShutDownStarted) return;
if (Boolean.TRUE.equals(xmrConnectionService.isConnected())) xmrWalletService.syncWallet(xmrWalletService.getWallet());
}).start();

// complete disputed trade
if (getDisputeState().isArbitrated() && !getDisputeState().isClosed()) {
processModel.getTradeManager().closeDisputedTrade(getId(), Trade.DisputeState.DISPUTE_CLOSED);
if (!isArbitrator()) for (Dispute dispute : getDisputes()) dispute.setIsClosed(); // auto close trader tickets
}

// handle when payout unlocks
if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) {
if (!isInitialized) return;
log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId());
clearAndShutDown();
}
}, getId());
});
// auto complete arbitrator trade
if (isArbitrator() && !isCompleted()) processModel.getTradeManager().onTradeCompleted(this);

// arbitrator syncs idle wallet when payout unlock expected
if (this instanceof ArbitratorTrade) {
idlePayoutSyncer = new IdlePayoutSyncer();
xmrWalletService.addWalletListener(idlePayoutSyncer);
}
// reset address entries
processModel.getXmrWalletService().resetAddressEntriesForTrade(getId());
}

// TODO: buyer's payment sent message state property can become unsynced (after improper shut down?)
if (isBuyer()) {
MessageState expectedState = getPaymentSentMessageState();
if (expectedState != null && expectedState != processModel.getPaymentSentMessageStateProperty().get()) {
log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStateProperty().get());
processModel.getPaymentSentMessageStateProperty().set(expectedState);
// handle when payout unlocks
if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) {
if (!isInitialized) return;
log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId());
clearAndShutDown();
}
}, getId());
});

// arbitrator syncs idle wallet when payout unlock expected
if (this instanceof ArbitratorTrade) {
idlePayoutSyncer = new IdlePayoutSyncer();
xmrWalletService.addWalletListener(idlePayoutSyncer);
}

// TODO: buyer's payment sent message state property can become unsynced (after improper shut down?)
if (isBuyer()) {
MessageState expectedState = getPaymentSentMessageState();
if (expectedState != null && expectedState != processModel.getPaymentSentMessageStateProperty().get()) {
log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStateProperty().get());
processModel.getPaymentSentMessageStateProperty().set(expectedState);
}
}

// trade is initialized
isInitialized = true;
// trade is initialized
isInitialized = true;

// done if payout unlocked or deposit not requested
if (!isDepositRequested() || isPayoutUnlocked()) return;
// done if deposit not requested or payout unlocked
if (!isDepositRequested() || isPayoutUnlocked()) return;

// done if wallet does not exist
if (!walletExists()) {
MoneroTx payoutTx = getPayoutTx();
if (payoutTx != null && payoutTx.getNumConfirmations() >= 10) {
log.warn("Payout state for {} {} is {} but payout is unlocked, updating state", getClass().getSimpleName(), getId(), getPayoutState());
setPayoutStateUnlocked();
return;
} else {
throw new IllegalStateException("Missing trade wallet for " + getClass().getSimpleName() + " " + getId());
}
// open wallet or done if wallet does not exist
if (walletExists()) getWallet();
else {
MoneroTx payoutTx = getPayoutTx();
if (payoutTx != null && payoutTx.getNumConfirmations() >= 10) {
log.warn("Payout state for {} {} is {} but payout is unlocked, updating state", getClass().getSimpleName(), getId(), getPayoutState());
setPayoutStateUnlocked();
return;
} else {
throw new IllegalStateException("Missing trade wallet for " + getClass().getSimpleName() + " " + getId());
}
}

// initialize syncing and polling
initSyncing();
}, getId());
// initialize syncing and polling
tryInitSyncing();
}

public void requestPersistence() {
Expand Down Expand Up @@ -1275,6 +1274,17 @@ public void shutDown() {
shutDownThreads.add(() -> ThreadUtils.shutDown(getConnectionChangedThreadId()));
ThreadUtils.awaitTasks(shutDownThreads);
}

// save wallet
if (wallet != null) {
try {
xmrWalletService.saveWallet(wallet, false); // skip backup
stopWallet();
} catch (Exception e) {
// warning will be logged for main wallet, so skip logging here
//log.warn("Error closing monero-wallet-rpc subprocess for {} {}: {}. Was Haveno stopped manually with ctrl+c?", getClass().getSimpleName(), getId(), e.getMessage());
}
}
};

// shut down trade with timeout
Expand All @@ -1299,15 +1309,6 @@ public void shutDown() {
xmrWalletService.removeWalletListener(idlePayoutSyncer);
idlePayoutSyncer = null;
}
if (wallet != null) {
try {
xmrWalletService.saveWallet(wallet, false); // skip backup
stopWallet();
} catch (Exception e) {
// warning will be logged for main wallet, so skip logging here
//log.warn("Error closing monero-wallet-rpc subprocess for {} {}: {}. Was Haveno stopped manually with ctrl+c?", getClass().getSimpleName(), getId(), e.getMessage());
}
}
UserThread.execute(() -> {
if (tradeStateSubscription != null) tradeStateSubscription.unsubscribe();
if (tradePhaseSubscription != null) tradePhaseSubscription.unsubscribe();
Expand Down Expand Up @@ -1938,12 +1939,12 @@ private void onConnectionChanged(MoneroRpcConnection connection) {

// sync and reprocess messages on new thread
if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) {
ThreadUtils.execute(() -> initSyncing(), getId());
ThreadUtils.execute(() -> tryInitSyncing(), getId());
}
}
}

private void initSyncing() {
private void tryInitSyncing() {
if (isShutDownStarted) return;
if (!isIdling()) {
initSyncingAux();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package haveno.core.trade.protocol;

import haveno.common.ThreadUtils;
import haveno.common.handlers.ErrorMessageHandler;
import haveno.core.trade.ArbitratorTrade;
import haveno.core.trade.Trade;
Expand Down Expand Up @@ -43,7 +44,7 @@ public void onMailboxMessage(TradeMessage message, NodeAddress peer) {

public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) {
System.out.println("ArbitratorProtocol.handleInitTradeRequest()");
new Thread(() -> {
ThreadUtils.execute(() -> {
synchronized (trade) {
latchTrade();
this.errorMessageHandler = errorMessageHandler;
Expand All @@ -68,7 +69,7 @@ public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, E
.executeTasks(true);
awaitTradeLatch();
}
}).start();
}, trade.getId());
}

@Override
Expand All @@ -78,7 +79,7 @@ public void handleSignContractResponse(SignContractResponse message, NodeAddress

public void handleDepositRequest(DepositRequest request, NodeAddress sender) {
System.out.println("ArbitratorProtocol.handleDepositRequest() " + trade.getId());
new Thread(() -> {
ThreadUtils.execute(() -> {
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
Expand All @@ -103,7 +104,7 @@ public void handleDepositRequest(DepositRequest request, NodeAddress sender) {
.executeTasks(true);
awaitTradeLatch();
}
}).start();
}, trade.getId());
}

@Override
Expand Down
Loading