Skip to content

Commit

Permalink
optimize: async remove branch when commite/rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
pengten committed Jan 19, 2022
1 parent 07ae2b0 commit 0703565
Showing 1 changed file with 35 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ public static DefaultCoordinator getInstance() {
* @param branchSession the branchSession
*/
public void doBranchRemoveAsync(GlobalSession globalSession, BranchSession branchSession) {
if (globalSession == null) {
return;
}
branchRemoveExecutor.execute(new BranchRemoveTask(globalSession, branchSession));
}

Expand All @@ -219,6 +222,9 @@ public void doBranchRemoveAsync(GlobalSession globalSession, BranchSession branc
* @param globalSession the globalSession
*/
public void doBranchRemoveAllAsync(GlobalSession globalSession) {
if (globalSession == null) {
return;
}
branchRemoveExecutor.execute(new BranchRemoveTask(globalSession));
}

Expand Down Expand Up @@ -541,11 +547,19 @@ static class BranchRemoveTask implements Runnable {
*/
private final BranchSession branchSession;

/**
* If you use this construct, the task will remove the branchSession provided by the parameter
* @param globalSession the globalSession
*/
public BranchRemoveTask(GlobalSession globalSession, BranchSession branchSession) {
this.globalSession = globalSession;
this.branchSession = branchSession;
}

/**
* If you use this construct, the task will remove all branchSession
* @param globalSession the globalSession
*/
public BranchRemoveTask(GlobalSession globalSession) {
this.globalSession = globalSession;
this.branchSession = null;
Expand All @@ -557,25 +571,30 @@ public void run() {
return;
}
try {
MDC.put(RootContext.MDC_KEY_XID, globalSession.getXid());
if (branchSession != null) {
LOGGER.info("Asynchronous delete bt successfully, xid = {}, branchId = {}",
globalSession.getXid(), branchSession.getBranchId());
globalSession.removeBranch(branchSession);
doRemove(branchSession);
} else {
SessionHelper.forEach(globalSession.getSortedBranches(), bt -> {
try {
LOGGER.info("Asynchronous delete bt successfully, xid = {}, branchId = {}",
globalSession.getXid(), bt.getBranchId());
globalSession.removeBranch(bt);
} catch (TransactionException transactionException) {
LOGGER.warn("Asynchronous delete bt error, xid = {}, branchId = {}, msg = {}",
globalSession.getXid(), bt.getBranchId(), transactionException.getMessage());
}
return null;
});
globalSession.getSortedBranches().forEach(this::doRemove);
}
} catch (TransactionException e) {
LOGGER.warn("Asynchronous delete branchSession error, xid = {}, msg = {}", globalSession.getXid(), e.getMessage());
} catch (Exception unKnowException) {
LOGGER.error("Asynchronous delete branchSession error, xid = {}", globalSession.getXid(), unKnowException);
} finally {
MDC.remove(RootContext.MDC_KEY_XID);
}
}

private void doRemove(BranchSession bt) {
try {
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(bt.getBranchId()));
globalSession.removeBranch(bt);
LOGGER.info("Asynchronous delete branchSession successfully, xid = {}, branchId = {}",
globalSession.getXid(), bt.getBranchId());
} catch (TransactionException transactionException) {
LOGGER.error("Asynchronous delete branchSession error, xid = {}, branchId = {}",
globalSession.getXid(), bt.getBranchId(), transactionException);
} finally {
MDC.remove(RootContext.MDC_KEY_BRANCH_ID);
}
}
}
Expand Down

0 comments on commit 0703565

Please sign in to comment.