diff --git a/changes/1.5.0.md b/changes/1.5.0.md index 13be8bb1723..e1a6df1c096 100644 --- a/changes/1.5.0.md +++ b/changes/1.5.0.md @@ -160,6 +160,8 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单 - [[#4277](https://github.com/seata/seata/pull/4277)] 优化Redis-pipeline模式本地事务下的锁竞争机制 - [[#4284](https://github.com/seata/seata/pull/4284)] 支持MSE-Nacos 的 ak/sk 鉴权方式 - [[#4300](https://github.com/seata/seata/pull/4300)] 优化NettyRemotingServer的close()由DefaultCoordinator来调用,不再额外注册到ServerRunner + - [[#4270](https://github.com/seata/seata/pull/4270)] 提高全局提交和全局回滚的性能,分支事务清理异步化 + diff --git a/changes/en-us/1.5.0.md b/changes/en-us/1.5.0.md index a29dab8d1fc..59bd0299c0b 100644 --- a/changes/en-us/1.5.0.md +++ b/changes/en-us/1.5.0.md @@ -160,6 +160,9 @@ - [[#4277](https://github.com/seata/seata/pull/4277)] optimize acquire lock return fail-fast code in redis-pipeline mode. - [[#4284](https://github.com/seata/seata/pull/4284)] support authentication of MSE-Nacos with ak/sk - [[#4300](https://github.com/seata/seata/pull/4300)] optimize let DefaultCoordinator invoke NettyRemotingServer's close method,no longer closed by ServerRunner + - [[#4270](https://github.com/seata/seata/pull/4270)] improve the performance of global commit and global rollback, asynchronous branch transaction cleanup + + ### test: diff --git a/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java b/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java index 1aa8f888704..dd8e8c34d92 100644 --- a/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java +++ b/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java @@ -37,6 +37,16 @@ public interface ConfigurationKeys { */ String STORE_PREFIX = "store."; + /** + * The constant SESSION_PREFIX. + */ + String SESSION_PREFIX = "session."; + + /** + * The constant STORE_SESSION_PREFIX. + */ + String STORE_SESSION_PREFIX = STORE_PREFIX + SESSION_PREFIX; + /** * The constant MODE. */ @@ -65,12 +75,12 @@ public interface ConfigurationKeys { /** * The constant STORE_SESSION_MODE. */ - String STORE_SESSION_MODE = STORE_PREFIX + "session." + MODE; + String STORE_SESSION_MODE = STORE_SESSION_PREFIX + MODE; /** * The constant SERVER_STORE_SESSION_MODE. */ - String SERVER_STORE_SESSION_MODE = SEATA_PREFIX + STORE_PREFIX + "session." + MODE; + String SERVER_STORE_SESSION_MODE = SEATA_PREFIX + STORE_SESSION_PREFIX + MODE; /** * The constant STORE_PUBLIC_KEY. @@ -772,4 +782,14 @@ public interface ConfigurationKeys { * The constant RPC_TM_REQUEST_TIMEOUT */ String RPC_TC_REQUEST_TIMEOUT = TRANSPORT_PREFIX + "rpcTcRequestTimeout"; + + /** + * The constant SESSION_BRANCH_ASYNC_QUEUE_SIZE + */ + String SESSION_BRANCH_ASYNC_QUEUE_SIZE = SERVER_PREFIX + SESSION_PREFIX + "branchAsyncQueueSize"; + + /** + * The constant ENABLE_BRANCH_ASYNC_REMOVE + */ + String ENABLE_BRANCH_ASYNC_REMOVE = SERVER_PREFIX + SESSION_PREFIX + "enableBranchAsyncRemove"; } diff --git a/script/config-center/config.txt b/script/config-center/config.txt index 143484efc9b..145d93cf158 100644 --- a/script/config-center/config.txt +++ b/script/config-center/config.txt @@ -100,3 +100,5 @@ metrics.exporterList=prometheus metrics.exporterPrometheusPort=9898 tcc.fence.logTableName=tcc_fence_log tcc.fence.cleanPeriod=1h +server.session.branchAsyncQueueSize=5000 +server.session.enableBranchAsyncRemove=true diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/io/seata/spring/boot/autoconfigure/StarterConstants.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/io/seata/spring/boot/autoconfigure/StarterConstants.java index 4cf70db506c..6b6085916f6 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/io/seata/spring/boot/autoconfigure/StarterConstants.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/io/seata/spring/boot/autoconfigure/StarterConstants.java @@ -81,6 +81,8 @@ public interface StarterConstants { String STORE_REDIS_SINGLE_PREFIX = STORE_REDIS_PREFIX + ".single"; String STORE_REDIS_SENTINEL_PREFIX = STORE_REDIS_PREFIX + ".sentinel"; + String SESSION_PREFIX = SERVER_PREFIX + ".session"; + String REGEX_SPLIT_CHAR = ";"; diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/io/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/io/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java index c12e5586a94..e4857cb43dc 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/io/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/io/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java @@ -19,6 +19,7 @@ import io.seata.spring.boot.autoconfigure.properties.server.ServerProperties; import io.seata.spring.boot.autoconfigure.properties.server.ServerRecoveryProperties; import io.seata.spring.boot.autoconfigure.properties.server.ServerUndoProperties; +import io.seata.spring.boot.autoconfigure.properties.server.session.SessionProperties; import io.seata.spring.boot.autoconfigure.properties.server.store.StoreDBProperties; import io.seata.spring.boot.autoconfigure.properties.server.store.StoreFileProperties; import io.seata.spring.boot.autoconfigure.properties.server.store.StoreProperties; @@ -33,6 +34,7 @@ import static io.seata.spring.boot.autoconfigure.StarterConstants.SERVER_PREFIX; import static io.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RECOVERY_PREFIX; import static io.seata.spring.boot.autoconfigure.StarterConstants.SERVER_UNDO_PREFIX; +import static io.seata.spring.boot.autoconfigure.StarterConstants.SESSION_PREFIX; import static io.seata.spring.boot.autoconfigure.StarterConstants.STORE_DB_PREFIX; import static io.seata.spring.boot.autoconfigure.StarterConstants.STORE_FILE_PREFIX; import static io.seata.spring.boot.autoconfigure.StarterConstants.STORE_LOCK_PREFIX; @@ -62,6 +64,7 @@ public void postProcessEnvironment(ConfigurableEnvironment environment, SpringAp PROPERTY_BEAN_MAP.put(STORE_REDIS_PREFIX, StoreRedisProperties.class); PROPERTY_BEAN_MAP.put(STORE_REDIS_SINGLE_PREFIX, StoreRedisProperties.Single.class); PROPERTY_BEAN_MAP.put(STORE_REDIS_SENTINEL_PREFIX, StoreRedisProperties.Sentinel.class); + PROPERTY_BEAN_MAP.put(SESSION_PREFIX, SessionProperties.class); } @Override diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/io/seata/spring/boot/autoconfigure/properties/server/session/SessionProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/io/seata/spring/boot/autoconfigure/properties/server/session/SessionProperties.java new file mode 100644 index 00000000000..85fcf985168 --- /dev/null +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/io/seata/spring/boot/autoconfigure/properties/server/session/SessionProperties.java @@ -0,0 +1,60 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.spring.boot.autoconfigure.properties.server.session; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import static io.seata.spring.boot.autoconfigure.StarterConstants.SESSION_PREFIX; + +/** + * session properties + * + * @author yangwenpeng + * @since 2022-01-07 17:39 + */ +@Component +@ConfigurationProperties(prefix = SESSION_PREFIX) +public class SessionProperties { + + /** + * branch async remove queue size + */ + private Integer branchAsyncQueueSize; + + /** + * enable to asynchronous remove branchSession + */ + private Boolean enableBranchAsyncRemove = true; + + public Integer getBranchAsyncQueueSize() { + return branchAsyncQueueSize; + } + + public SessionProperties setBranchAsyncQueueSize(Integer branchAsyncQueueSize) { + this.branchAsyncQueueSize = branchAsyncQueueSize; + return this; + } + + public Boolean getEnableBranchAsync() { + return enableBranchAsyncRemove; + } + + public SessionProperties setEnableBranchAsync(Boolean enableBranchAsyncRemove) { + this.enableBranchAsyncRemove = enableBranchAsyncRemove; + return this; + } +} diff --git a/server/src/main/java/io/seata/server/coordinator/DefaultCoordinator.java b/server/src/main/java/io/seata/server/coordinator/DefaultCoordinator.java index a60f3f7cc8a..f265a90be80 100644 --- a/server/src/main/java/io/seata/server/coordinator/DefaultCoordinator.java +++ b/server/src/main/java/io/seata/server/coordinator/DefaultCoordinator.java @@ -18,7 +18,9 @@ import java.time.Duration; import java.util.Collection; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import io.netty.channel.Channel; @@ -61,6 +63,7 @@ import io.seata.core.rpc.netty.NettyRemotingServer; import io.seata.server.AbstractTCInboundHandler; import io.seata.server.event.EventBusManager; +import io.seata.server.session.BranchSession; import io.seata.server.session.GlobalSession; import io.seata.server.session.SessionHelper; import io.seata.server.session.SessionHolder; @@ -119,6 +122,16 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran private static final int ALWAYS_RETRY_BOUNDARY = 0; + /** + * default branch async queue size + */ + private static final int DEFAULT_BRANCH_ASYNC_QUEUE_SIZE = 5000; + + /** + * the pool size of branch asynchronous remove thread pool + */ + private static final int BRANCH_ASYNC_POOL_SIZE = Runtime.getRuntime().availableProcessors(); + private static final Duration MAX_COMMIT_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getDuration( ConfigurationKeys.MAX_COMMIT_RETRY_TIMEOUT, DurationUtil.DEFAULT_DURATION, 100); @@ -143,6 +156,14 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran private final ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("UndoLogDelete", 1)); + private final ThreadPoolExecutor branchRemoveExecutor = new ThreadPoolExecutor(BRANCH_ASYNC_POOL_SIZE, BRANCH_ASYNC_POOL_SIZE, + Integer.MAX_VALUE, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>( + CONFIG.getInt(ConfigurationKeys.SESSION_BRANCH_ASYNC_QUEUE_SIZE, DEFAULT_BRANCH_ASYNC_QUEUE_SIZE) + ), new NamedThreadFactory("branchSessionRemove", 2, true), + new ThreadPoolExecutor.CallerRunsPolicy()); + + private RemotingServer remotingServer; private final DefaultCore core; @@ -157,6 +178,9 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran * @param remotingServer the remoting server */ private DefaultCoordinator(RemotingServer remotingServer) { + if (remotingServer == null) { + throw new IllegalArgumentException("RemotingServer not allowed be null."); + } this.remotingServer = remotingServer; this.core = new DefaultCore(remotingServer); } @@ -172,6 +196,38 @@ public static DefaultCoordinator getInstance(RemotingServer remotingServer) { return instance; } + public static DefaultCoordinator getInstance() { + if (null == instance) { + throw new IllegalArgumentException("The instance has not been created."); + } + return instance; + } + + /** + * Asynchronous remove branch + * + * @param globalSession the globalSession + * @param branchSession the branchSession + */ + public void doBranchRemoveAsync(GlobalSession globalSession, BranchSession branchSession) { + if (globalSession == null) { + return; + } + branchRemoveExecutor.execute(new BranchRemoveTask(globalSession, branchSession)); + } + + /** + * Asynchronous remove all branch + * + * @param globalSession the globalSession + */ + public void doBranchRemoveAllAsync(GlobalSession globalSession) { + if (globalSession == null) { + return; + } + branchRemoveExecutor.execute(new BranchRemoveTask(globalSession)); + } + @Override protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { @@ -450,11 +506,13 @@ public void destroy() { retryCommitting.shutdown(); asyncCommitting.shutdown(); timeoutCheck.shutdown(); + branchRemoveExecutor.shutdown(); try { retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); + branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); } catch (InterruptedException ignore) { } @@ -462,7 +520,7 @@ public void destroy() { if (remotingServer instanceof NettyRemotingServer) { ((NettyRemotingServer) remotingServer).destroy(); } - // 3. last destroy SessionHolder + // 3. third destroy SessionHolder SessionHolder.destroy(); } @@ -473,4 +531,71 @@ public void destroy() { public void setRemotingServer(RemotingServer remotingServer) { this.remotingServer = remotingServer; } + + /** + * the task to remove branchSession + */ + static class BranchRemoveTask implements Runnable { + + /** + * the globalSession + */ + private final GlobalSession globalSession; + + /** + * the branchSession + */ + private final BranchSession branchSession; + + /** + * If you use this construct, the task will remove the branchSession provided by the parameter + * @param globalSession the globalSession + */ + public BranchRemoveTask(GlobalSession globalSession, BranchSession branchSession) { + this.globalSession = globalSession; + this.branchSession = branchSession; + } + + /** + * If you use this construct, the task will remove all branchSession + * @param globalSession the globalSession + */ + public BranchRemoveTask(GlobalSession globalSession) { + this.globalSession = globalSession; + this.branchSession = null; + } + + @Override + public void run() { + if (globalSession == null) { + return; + } + try { + MDC.put(RootContext.MDC_KEY_XID, globalSession.getXid()); + if (branchSession != null) { + doRemove(branchSession); + } else { + globalSession.getSortedBranches().forEach(this::doRemove); + } + } catch (Exception unKnowException) { + LOGGER.error("Asynchronous delete branchSession error, xid = {}", globalSession.getXid(), unKnowException); + } finally { + MDC.remove(RootContext.MDC_KEY_XID); + } + } + + private void doRemove(BranchSession bt) { + try { + MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(bt.getBranchId())); + globalSession.removeBranch(bt); + LOGGER.info("Asynchronous delete branchSession successfully, xid = {}, branchId = {}", + globalSession.getXid(), bt.getBranchId()); + } catch (TransactionException transactionException) { + LOGGER.error("Asynchronous delete branchSession error, xid = {}, branchId = {}", + globalSession.getXid(), bt.getBranchId(), transactionException); + } finally { + MDC.remove(RootContext.MDC_KEY_BRANCH_ID); + } + } + } } diff --git a/server/src/main/java/io/seata/server/coordinator/DefaultCore.java b/server/src/main/java/io/seata/server/coordinator/DefaultCore.java index bbbd3fae216..0369a17ae3f 100644 --- a/server/src/main/java/io/seata/server/coordinator/DefaultCore.java +++ b/server/src/main/java/io/seata/server/coordinator/DefaultCore.java @@ -197,7 +197,7 @@ public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) thr BranchStatus currentStatus = branchSession.getStatus(); if (currentStatus == BranchStatus.PhaseOne_Failed) { - globalSession.removeBranch(branchSession); + SessionHelper.removeBranch(globalSession, branchSession, !retrying); return CONTINUE; } try { @@ -205,7 +205,7 @@ public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) thr switch (branchStatus) { case PhaseTwo_Committed: - globalSession.removeBranch(branchSession); + SessionHelper.removeBranch(globalSession, branchSession, !retrying); return CONTINUE; case PhaseTwo_CommitFailed_Unretryable: if (globalSession.canBeCommittedAsync()) { @@ -307,14 +307,14 @@ public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) t Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> { BranchStatus currentBranchStatus = branchSession.getStatus(); if (currentBranchStatus == BranchStatus.PhaseOne_Failed) { - globalSession.removeBranch(branchSession); + SessionHelper.removeBranch(globalSession, branchSession, !retrying); return CONTINUE; } try { BranchStatus branchStatus = branchRollback(globalSession, branchSession); switch (branchStatus) { case PhaseTwo_Rollbacked: - globalSession.removeBranch(branchSession); + SessionHelper.removeBranch(globalSession, branchSession, !retrying); LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId()); return CONTINUE; case PhaseTwo_RollbackFailed_Unretryable: diff --git a/server/src/main/java/io/seata/server/session/SessionHelper.java b/server/src/main/java/io/seata/server/session/SessionHelper.java index cccd828b39e..2f1a80c40ec 100644 --- a/server/src/main/java/io/seata/server/session/SessionHelper.java +++ b/server/src/main/java/io/seata/server/session/SessionHelper.java @@ -16,12 +16,18 @@ package io.seata.server.session; import java.util.Collection; +import java.util.List; +import java.util.Objects; +import io.seata.config.Configuration; +import io.seata.config.ConfigurationFactory; +import io.seata.core.constants.ConfigurationKeys; import io.seata.core.context.RootContext; import io.seata.core.exception.TransactionException; import io.seata.core.model.BranchType; import io.seata.core.model.GlobalStatus; import io.seata.server.UUIDGenerator; +import io.seata.server.coordinator.DefaultCoordinator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -34,6 +40,20 @@ public class SessionHelper { private static final Logger LOGGER = LoggerFactory.getLogger(SessionHelper.class); + /** + * The constant CONFIG. + */ + private static final Configuration CONFIG = ConfigurationFactory.getInstance(); + + private static final Boolean ENABLE_BRANCH_ASYNC_REMOVE = CONFIG.getBoolean( + ConfigurationKeys.ENABLE_BRANCH_ASYNC_REMOVE, true); + + /** + * The instance of DefaultCoordinator + */ + private static final DefaultCoordinator COORDINATOR = DefaultCoordinator.getInstance(); + + private SessionHelper() {} public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId, String lockKeys, String clientId) { @@ -190,4 +210,40 @@ public static Boolean forEach(Collection sessions, BranchSessionH } return null; } + + + /** + * remove branchSession from globalSession + * @param globalSession the globalSession + * @param branchSession the branchSession + * @param isAsync if asynchronous remove + */ + public static void removeBranch(GlobalSession globalSession, BranchSession branchSession, boolean isAsync) + throws TransactionException { + if (Objects.equals(Boolean.TRUE, ENABLE_BRANCH_ASYNC_REMOVE) && isAsync) { + COORDINATOR.doBranchRemoveAsync(globalSession, branchSession); + } else { + globalSession.removeBranch(branchSession); + } + } + + /** + * remove branchSession from globalSession + * @param globalSession the globalSession + * @param isAsync if asynchronous remove + */ + public static void removeAllBranch(GlobalSession globalSession, boolean isAsync) + throws TransactionException { + List branchSessions = globalSession.getSortedBranches(); + if (branchSessions == null || branchSessions.isEmpty()) { + return; + } + if (Objects.equals(Boolean.TRUE, ENABLE_BRANCH_ASYNC_REMOVE) && isAsync) { + COORDINATOR.doBranchRemoveAllAsync(globalSession); + } else { + for (BranchSession branchSession : branchSessions) { + globalSession.removeBranch(branchSession); + } + } + } } diff --git a/server/src/main/java/io/seata/server/transaction/saga/SagaCore.java b/server/src/main/java/io/seata/server/transaction/saga/SagaCore.java index f24236bebb7..d665814429c 100644 --- a/server/src/main/java/io/seata/server/transaction/saga/SagaCore.java +++ b/server/src/main/java/io/seata/server/transaction/saga/SagaCore.java @@ -16,7 +16,6 @@ package io.seata.server.transaction.saga; import java.io.IOException; -import java.util.ArrayList; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -106,12 +105,12 @@ public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) thr switch (branchStatus) { case PhaseTwo_Committed: - removeAllBranches(globalSession); + SessionHelper.removeAllBranch(globalSession, !retrying); LOGGER.info("Successfully committed SAGA global[" + globalSession.getXid() + "]"); break; case PhaseTwo_Rollbacked: LOGGER.info("Successfully rollbacked SAGA global[" + globalSession.getXid() + "]"); - removeAllBranches(globalSession); + SessionHelper.removeAllBranch(globalSession, !retrying); SessionHelper.endRollbacked(globalSession); return false; case PhaseTwo_RollbackFailed_Retryable: @@ -122,7 +121,7 @@ public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) thr return false; case PhaseOne_Failed: LOGGER.error("By [{}], finish SAGA global [{}]", branchStatus, globalSession.getXid()); - removeAllBranches(globalSession); + SessionHelper.removeAllBranch(globalSession, !retrying); globalSession.changeStatus(GlobalStatus.Finished); globalSession.end(); return false; @@ -164,7 +163,7 @@ public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) t switch (branchStatus) { case PhaseTwo_Rollbacked: - removeAllBranches(globalSession); + SessionHelper.removeAllBranch(globalSession, !retrying); LOGGER.info("Successfully rollbacked SAGA global[{}]",globalSession.getXid()); break; case PhaseTwo_RollbackFailed_Unretryable: @@ -196,12 +195,12 @@ public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) t @Override public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus globalStatus) throws TransactionException { if (GlobalStatus.Committed.equals(globalStatus)) { - removeAllBranches(globalSession); + SessionHelper.removeAllBranch(globalSession, false); SessionHelper.endCommitted(globalSession); LOGGER.info("Global[{}] committed", globalSession.getXid()); } else if (GlobalStatus.Rollbacked.equals(globalStatus) || GlobalStatus.Finished.equals(globalStatus)) { - removeAllBranches(globalSession); + SessionHelper.removeAllBranch(globalSession, false); SessionHelper.endRollbacked(globalSession); LOGGER.info("Global[{}] rollbacked", globalSession.getXid()); } else { @@ -220,19 +219,6 @@ public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus } } - /** - * remove all branches - * - * @param globalSession the globalSession - * @throws TransactionException the TransactionException - */ - private void removeAllBranches(GlobalSession globalSession) throws TransactionException { - ArrayList branchSessions = globalSession.getSortedBranches(); - for (BranchSession branchSession : branchSessions) { - globalSession.removeBranch(branchSession); - } - } - /** * get saga ResourceId * diff --git a/server/src/main/resources/application.example.yml b/server/src/main/resources/application.example.yml index fef5514ccf2..1257b2a1671 100644 --- a/server/src/main/resources/application.example.yml +++ b/server/src/main/resources/application.example.yml @@ -114,6 +114,9 @@ seata: undo: log-save-days: 7 log-delete-period: 86400000 + session: + branch-async-queue-size: 5000 #branch async remove queue size + enable-branch-async-remove: true #enable to asynchronous remove branchSession store: # support: file 、 db 、 redis mode: file