Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: Improve the performance of global commit and global rollback #4270

Merged
merged 6 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#4277](https://github.com/seata/seata/pull/4277)] 优化Redis-pipeline模式本地事务下的锁竞争机制
- [[#4284](https://github.com/seata/seata/pull/4284)] 支持MSE-Nacos 的 ak/sk 鉴权方式
- [[#4300](https://github.com/seata/seata/pull/4300)] 优化NettyRemotingServer的close()由DefaultCoordinator来调用,不再额外注册到ServerRunner
- [[#4270](https://github.com/seata/seata/pull/4270)] 提高全局提交和全局回滚的性能,分支事务清理异步化




Expand Down
3 changes: 3 additions & 0 deletions changes/en-us/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@
- [[#4277](https://github.com/seata/seata/pull/4277)] optimize acquire lock return fail-fast code in redis-pipeline mode.
- [[#4284](https://github.com/seata/seata/pull/4284)] support authentication of MSE-Nacos with ak/sk
- [[#4300](https://github.com/seata/seata/pull/4300)] optimize let DefaultCoordinator invoke NettyRemotingServer's close method,no longer closed by ServerRunner
- [[#4270](https://github.com/seata/seata/pull/4270)] improve the performance of global commit and global rollback, asynchronous branch transaction cleanup




### test:
Expand Down
24 changes: 22 additions & 2 deletions core/src/main/java/io/seata/core/constants/ConfigurationKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ public interface ConfigurationKeys {
*/
String STORE_PREFIX = "store.";

/**
* The constant SESSION_PREFIX.
*/
String SESSION_PREFIX = "session.";
pengten marked this conversation as resolved.
Show resolved Hide resolved

/**
* The constant STORE_SESSION_PREFIX.
*/
String STORE_SESSION_PREFIX = STORE_PREFIX + SESSION_PREFIX;

/**
* The constant MODE.
*/
Expand Down Expand Up @@ -65,12 +75,12 @@ public interface ConfigurationKeys {
/**
* The constant STORE_SESSION_MODE.
*/
String STORE_SESSION_MODE = STORE_PREFIX + "session." + MODE;
String STORE_SESSION_MODE = STORE_SESSION_PREFIX + MODE;

/**
* The constant SERVER_STORE_SESSION_MODE.
*/
String SERVER_STORE_SESSION_MODE = SEATA_PREFIX + STORE_PREFIX + "session." + MODE;
String SERVER_STORE_SESSION_MODE = SEATA_PREFIX + STORE_SESSION_PREFIX + MODE;

/**
* The constant STORE_PUBLIC_KEY.
Expand Down Expand Up @@ -772,4 +782,14 @@ public interface ConfigurationKeys {
* The constant RPC_TM_REQUEST_TIMEOUT
*/
String RPC_TC_REQUEST_TIMEOUT = TRANSPORT_PREFIX + "rpcTcRequestTimeout";

/**
* The constant SESSION_BRANCH_ASYNC_QUEUE_SIZE
*/
String SESSION_BRANCH_ASYNC_QUEUE_SIZE = SERVER_PREFIX + SESSION_PREFIX + "branchAsyncQueueSize";

/**
* The constant ENABLE_BRANCH_ASYNC_REMOVE
*/
String ENABLE_BRANCH_ASYNC_REMOVE = SERVER_PREFIX + SESSION_PREFIX + "enableBranchAsyncRemove";
}
2 changes: 2 additions & 0 deletions script/config-center/config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,5 @@ metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=true
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public interface StarterConstants {
String STORE_REDIS_SINGLE_PREFIX = STORE_REDIS_PREFIX + ".single";
String STORE_REDIS_SENTINEL_PREFIX = STORE_REDIS_PREFIX + ".sentinel";

String SESSION_PREFIX = SERVER_PREFIX + ".session";

String REGEX_SPLIT_CHAR = ";";


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.seata.spring.boot.autoconfigure.properties.server.ServerProperties;
import io.seata.spring.boot.autoconfigure.properties.server.ServerRecoveryProperties;
import io.seata.spring.boot.autoconfigure.properties.server.ServerUndoProperties;
import io.seata.spring.boot.autoconfigure.properties.server.session.SessionProperties;
import io.seata.spring.boot.autoconfigure.properties.server.store.StoreDBProperties;
import io.seata.spring.boot.autoconfigure.properties.server.store.StoreFileProperties;
import io.seata.spring.boot.autoconfigure.properties.server.store.StoreProperties;
Expand All @@ -33,6 +34,7 @@
import static io.seata.spring.boot.autoconfigure.StarterConstants.SERVER_PREFIX;
import static io.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RECOVERY_PREFIX;
import static io.seata.spring.boot.autoconfigure.StarterConstants.SERVER_UNDO_PREFIX;
import static io.seata.spring.boot.autoconfigure.StarterConstants.SESSION_PREFIX;
import static io.seata.spring.boot.autoconfigure.StarterConstants.STORE_DB_PREFIX;
import static io.seata.spring.boot.autoconfigure.StarterConstants.STORE_FILE_PREFIX;
import static io.seata.spring.boot.autoconfigure.StarterConstants.STORE_LOCK_PREFIX;
Expand Down Expand Up @@ -62,6 +64,7 @@ public void postProcessEnvironment(ConfigurableEnvironment environment, SpringAp
PROPERTY_BEAN_MAP.put(STORE_REDIS_PREFIX, StoreRedisProperties.class);
PROPERTY_BEAN_MAP.put(STORE_REDIS_SINGLE_PREFIX, StoreRedisProperties.Single.class);
PROPERTY_BEAN_MAP.put(STORE_REDIS_SENTINEL_PREFIX, StoreRedisProperties.Sentinel.class);
PROPERTY_BEAN_MAP.put(SESSION_PREFIX, SessionProperties.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.spring.boot.autoconfigure.properties.server.session;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import static io.seata.spring.boot.autoconfigure.StarterConstants.SESSION_PREFIX;

/**
* session properties
*
* @author yangwenpeng
* @since 2022-01-07 17:39
*/
@Component
@ConfigurationProperties(prefix = SESSION_PREFIX)
public class SessionProperties {

/**
* branch async remove queue size
*/
private Integer branchAsyncQueueSize;
pengten marked this conversation as resolved.
Show resolved Hide resolved
pengten marked this conversation as resolved.
Show resolved Hide resolved

/**
* enable to asynchronous remove branchSession
*/
private Boolean enableBranchAsyncRemove = true;

public Integer getBranchAsyncQueueSize() {
return branchAsyncQueueSize;
}

public SessionProperties setBranchAsyncQueueSize(Integer branchAsyncQueueSize) {
this.branchAsyncQueueSize = branchAsyncQueueSize;
return this;
}

public Boolean getEnableBranchAsync() {
return enableBranchAsyncRemove;
}

public SessionProperties setEnableBranchAsync(Boolean enableBranchAsyncRemove) {
this.enableBranchAsyncRemove = enableBranchAsyncRemove;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import io.netty.channel.Channel;
Expand Down Expand Up @@ -61,6 +63,7 @@
import io.seata.core.rpc.netty.NettyRemotingServer;
import io.seata.server.AbstractTCInboundHandler;
import io.seata.server.event.EventBusManager;
import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionHelper;
import io.seata.server.session.SessionHolder;
Expand Down Expand Up @@ -119,6 +122,16 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran

private static final int ALWAYS_RETRY_BOUNDARY = 0;

/**
* default branch async queue size
*/
private static final int DEFAULT_BRANCH_ASYNC_QUEUE_SIZE = 5000;

/**
* the pool size of branch asynchronous remove thread pool
*/
private static final int BRANCH_ASYNC_POOL_SIZE = Runtime.getRuntime().availableProcessors();

private static final Duration MAX_COMMIT_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getDuration(
ConfigurationKeys.MAX_COMMIT_RETRY_TIMEOUT, DurationUtil.DEFAULT_DURATION, 100);

Expand All @@ -143,6 +156,14 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
private final ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("UndoLogDelete", 1));

private final ThreadPoolExecutor branchRemoveExecutor = new ThreadPoolExecutor(BRANCH_ASYNC_POOL_SIZE, BRANCH_ASYNC_POOL_SIZE,
Integer.MAX_VALUE, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(
CONFIG.getInt(ConfigurationKeys.SESSION_BRANCH_ASYNC_QUEUE_SIZE, DEFAULT_BRANCH_ASYNC_QUEUE_SIZE)
), new NamedThreadFactory("branchSessionRemove", 2, true),
new ThreadPoolExecutor.CallerRunsPolicy());


private RemotingServer remotingServer;

private final DefaultCore core;
Expand All @@ -157,6 +178,9 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
* @param remotingServer the remoting server
*/
private DefaultCoordinator(RemotingServer remotingServer) {
if (remotingServer == null) {
throw new IllegalArgumentException("RemotingServer not allowed be null.");
}
this.remotingServer = remotingServer;
this.core = new DefaultCore(remotingServer);
}
Expand All @@ -172,6 +196,32 @@ public static DefaultCoordinator getInstance(RemotingServer remotingServer) {
return instance;
}

public static DefaultCoordinator getInstance() {
if (null == instance) {
throw new IllegalArgumentException("The instance has not been created.");
}
return instance;
}

/**
* Asynchronous remove branch
*
* @param globalSession the globalSession
* @param branchSession the branchSession
*/
public void doBranchRemoveAsync(GlobalSession globalSession, BranchSession branchSession) {
branchRemoveExecutor.execute(new BranchRemoveTask(globalSession, branchSession));
wangliang181230 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Asynchronous remove all branch
*
* @param globalSession the globalSession
*/
public void doBranchRemoveAllAsync(GlobalSession globalSession) {
branchRemoveExecutor.execute(new BranchRemoveTask(globalSession));
}

@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
Expand Down Expand Up @@ -450,19 +500,21 @@ public void destroy() {
retryCommitting.shutdown();
asyncCommitting.shutdown();
timeoutCheck.shutdown();
branchRemoveExecutor.shutdown();
try {
retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {

}
// 2. second close netty flow
if (remotingServer instanceof NettyRemotingServer) {
((NettyRemotingServer) remotingServer).destroy();
}
// 3. last destroy SessionHolder
// 3. third destroy SessionHolder
SessionHolder.destroy();
}

Expand All @@ -473,4 +525,58 @@ public void destroy() {
public void setRemotingServer(RemotingServer remotingServer) {
this.remotingServer = remotingServer;
}

/**
* the task to remove branchSession
*/
static class BranchRemoveTask implements Runnable {

/**
* the globalSession
*/
private final GlobalSession globalSession;

/**
* the branchSession
*/
private final BranchSession branchSession;

public BranchRemoveTask(GlobalSession globalSession, BranchSession branchSession) {
this.globalSession = globalSession;
this.branchSession = branchSession;
}

public BranchRemoveTask(GlobalSession globalSession) {
this.globalSession = globalSession;
this.branchSession = null;
}

@Override
public void run() {
if (globalSession == null) {
return;
}
try {
if (branchSession != null) {
wangliang181230 marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.info("Asynchronous delete bt successfully, xid = {}, branchId = {}",
globalSession.getXid(), branchSession.getBranchId());
wangliang181230 marked this conversation as resolved.
Show resolved Hide resolved
globalSession.removeBranch(branchSession);
} else {
SessionHelper.forEach(globalSession.getSortedBranches(), bt -> {
try {
LOGGER.info("Asynchronous delete bt successfully, xid = {}, branchId = {}",
globalSession.getXid(), bt.getBranchId());
wangliang181230 marked this conversation as resolved.
Show resolved Hide resolved
globalSession.removeBranch(bt);
} catch (TransactionException transactionException) {
LOGGER.warn("Asynchronous delete bt error, xid = {}, branchId = {}, msg = {}",
globalSession.getXid(), bt.getBranchId(), transactionException.getMessage());
wangliang181230 marked this conversation as resolved.
Show resolved Hide resolved
}
return null;
});
}
} catch (TransactionException e) {
LOGGER.warn("Asynchronous delete branchSession error, xid = {}, msg = {}", globalSession.getXid(), e.getMessage());
wangliang181230 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,15 @@ public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) thr

BranchStatus currentStatus = branchSession.getStatus();
if (currentStatus == BranchStatus.PhaseOne_Failed) {
globalSession.removeBranch(branchSession);
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
}
try {
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

switch (branchStatus) {
case PhaseTwo_Committed:
globalSession.removeBranch(branchSession);
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
if (globalSession.canBeCommittedAsync()) {
Expand Down Expand Up @@ -307,14 +307,14 @@ public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) t
Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
BranchStatus currentBranchStatus = branchSession.getStatus();
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
globalSession.removeBranch(branchSession);
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
}
try {
BranchStatus branchStatus = branchRollback(globalSession, branchSession);
switch (branchStatus) {
case PhaseTwo_Rollbacked:
globalSession.removeBranch(branchSession);
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
return CONTINUE;
case PhaseTwo_RollbackFailed_Unretryable:
Expand Down
Loading