From 02e1514012315b87d0693201898a76d6ab66d6b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=91=E6=B8=85?= <33415199+lightClouds917@users.noreply.github.com> Date: Fri, 17 Jan 2025 16:06:46 +0800 Subject: [PATCH] feature:[loom] replace the usages of synchronized with ReentrantLock (#7073) --- changes/en-us/2.x.md | 2 + changes/zh-cn/2.x.md | 2 + .../seata/common/lock/ResourceLock.java | 60 ++++++ .../seata/common/util/UUIDGenerator.java | 5 +- .../seata/common/lock/ResourceLockTest.java | 147 +++++++++++++++ .../eureka/EurekaRegistryServiceImpl.java | 7 +- .../tx/api/fence/hook/TccHookManager.java | 5 +- .../parser/DefaultRemotingParser.java | 5 +- .../integration/tx/api/util/ProxyUtil.java | 4 +- .../seata/rm/datasource/util/JdbcUtils.java | 4 +- .../rm/datasource/xa/ConnectionProxyXA.java | 178 ++++++++++-------- .../rm/datasource/xa/ResourceManagerXA.java | 6 +- .../rocketmq/SeataMQProducerFactory.java | 5 +- .../seata/server/session/GlobalSession.java | 6 +- 14 files changed, 345 insertions(+), 91 deletions(-) create mode 100644 common/src/main/java/org/apache/seata/common/lock/ResourceLock.java create mode 100644 common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 5940b569f7d..419f3f63371 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -4,6 +4,7 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: +- [[#7073](https://github.com/apache/incubator-seata/pull/7073)] support virtual thread,replace the usages of synchronized with ReentrantLock - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury undolog parser - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster mode supports address translation - [[#7038](https://github.com/apache/incubator-seata/pull/7038)] support fury serializer @@ -43,6 +44,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [slievrly](https://github.com/slievrly) - [lyl2008dsg](https://github.com/lyl2008dsg) - [remind](https://github.com/remind) +- [lightClouds917](https://github.com/lightClouds917) - [GoodBoyCoder](https://github.com/GoodBoyCoder) - [PeppaO](https://github.com/PeppaO) - [funky-eyes](https://github.com/funky-eyes) diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 13e4e531831..2f7d72c5ef5 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -4,6 +4,7 @@ ### feature: +- [[#7073](https://github.com/apache/incubator-seata/pull/7073)] 支持虚拟线程,用ReentrantLock替换synchronized的用法 - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] 支持UndoLog的fury序列化方式 - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换 - [[#7038](https://github.com/apache/incubator-seata/pull/7038)] 支持Fury序列化器 @@ -42,6 +43,7 @@ - [slievrly](https://github.com/slievrly) - [lyl2008dsg](https://github.com/lyl2008dsg) - [remind](https://github.com/remind) +- [lightClouds917](https://github.com/lightClouds917) - [GoodBoyCoder](https://github.com/GoodBoyCoder) - [PeppaO](https://github.com/PeppaO) - [funky-eyes](https://github.com/funky-eyes) diff --git a/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java b/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java new file mode 100644 index 00000000000..22e815e5784 --- /dev/null +++ b/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.common.lock; + +import java.util.concurrent.locks.ReentrantLock; + +/** + * The ResourceLock extends ReentrantLock and implements AutoCloseable, + * allowing it to be used in try-with-resources blocks without needing + * to unlock in a finally block. + * + *

Example

+ *
+ * {@code
+ *   private final ResourceLock resourceLock = new ResourceLock();
+ *   try (ResourceLock lock = resourceLock.obtain()) {
+ *     // do something while holding the resource lock
+ *   }
+ * }
+ * 
+ */ +public class ResourceLock extends ReentrantLock implements AutoCloseable { + + /** + * Obtain the lock. + * + * @return this ResourceLock + */ + public ResourceLock obtain() { + lock(); + return this; + } + + + /** + * Unlock the resource lock. + * + *

This is typically used in try-with-resources blocks to automatically + * unlock the resource lock when the block is exited, regardless of whether + * an exception is thrown or not. + */ + @Override + public void close() { + this.unlock(); + } +} diff --git a/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java b/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java index 542de3ed1eb..7b8ea727599 100644 --- a/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java +++ b/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java @@ -16,12 +16,15 @@ */ package org.apache.seata.common.util; +import org.apache.seata.common.lock.ResourceLock; + /** * The type Uuid generator. */ public class UUIDGenerator { private static volatile IdWorker idWorker; + private final static ResourceLock RESOURCE_LOCK = new ResourceLock(); /** * generate UUID using snowflake algorithm @@ -30,7 +33,7 @@ public class UUIDGenerator { */ public static long generateUUID() { if (idWorker == null) { - synchronized (UUIDGenerator.class) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (idWorker == null) { init(null); } diff --git a/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java new file mode 100644 index 00000000000..0ed2e7e2b7d --- /dev/null +++ b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.common.lock; + +import org.apache.seata.common.util.CollectionUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(MockitoExtension.class) +public class ResourceLockTest { + + @Test + public void testObtainAndClose() { + ResourceLock resourceLock = new ResourceLock(); + + // Test obtaining the lock + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread"); + } + + // After try-with-resources, lock should be released + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after try-with-resources"); + } + + @Test + public void testMultipleObtainAndClose() { + ResourceLock resourceLock = new ResourceLock(); + + // First obtain and release + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread"); + } + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after first try-with-resources"); + + // Second obtain and release + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread"); + } + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after second try-with-resources"); + } + + @Test + public void testResourceLockAutoRemovalFromMap() { + ConcurrentHashMap lockMap = new ConcurrentHashMap<>(); + String key = "testKey"; + // Use try-with-resources to obtain and release the lock + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(lockMap, key, k -> new ResourceLock()).obtain()) { + // Do something while holding the lock + assertTrue(lockMap.containsKey(key)); + assertTrue(lockMap.get(key).isHeldByCurrentThread()); + } finally { + assertFalse(lockMap.get(key).isHeldByCurrentThread()); + assertTrue(lockMap.containsKey(key)); + // Remove the lock from the map + lockMap.remove(key); + assertFalse(lockMap.containsKey(key)); + } + // Ensure the lock is removed from the map + assertFalse(lockMap.containsKey(key)); + } + + @Test + public void testConcurrentLocking() throws InterruptedException { + ResourceLock resourceLock = new ResourceLock(); + + Thread t1 = new Thread(() -> { + try (ResourceLock lock = resourceLock.obtain()) { + try { + Thread.sleep(100); // Hold the lock for 100ms + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + + Thread t2 = new Thread(() -> { + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should not be held by current thread before t1 releases it"); + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread after t1 releases it"); + } + }); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after both threads complete"); + } + + @Test + public void testLockInterruptibly() throws InterruptedException { + ResourceLock resourceLock = new ResourceLock(); + + Thread t1 = new Thread(() -> { + try (ResourceLock lock = resourceLock.obtain()) { + try { + Thread.sleep(1000); // Hold the lock for 1000ms + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + + t1.start(); + Thread.sleep(50); // Wait for t1 to acquire the lock + + Thread t2 = new Thread(() -> { + try { + resourceLock.lockInterruptibly(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + t2.start(); + Thread.sleep(50); // Wait for t2 to attempt to acquire the lock + + t2.interrupt(); // Interrupt t2 + + t1.join(); + t2.join(); + + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after t1 completes"); + } +} diff --git a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java index 5ab5191234d..ef441c34bb1 100644 --- a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java +++ b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java @@ -26,6 +26,7 @@ import com.netflix.discovery.EurekaEventListener; import com.netflix.discovery.shared.Application; import org.apache.seata.common.exception.EurekaRegistryException; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; @@ -68,7 +69,7 @@ public class EurekaRegistryServiceImpl implements RegistryService> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>(); private static final ConcurrentMap> CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<>(); - private static final ConcurrentMap CLUSTER_LOCK = new ConcurrentHashMap<>(); + private static final ConcurrentMap CLUSTER_LOCK = new ConcurrentHashMap<>(); private static volatile ApplicationInfoManager applicationInfoManager; private static volatile CustomEurekaInstanceConfig instanceConfig; @@ -140,8 +141,8 @@ public List lookup(String key) throws Exception { } String clusterUpperName = clusterName.toUpperCase(); if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) { - Object lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, k -> new Object()); - synchronized (lock) { + ResourceLock lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, k -> new ResourceLock()); + try (ResourceLock ignored = lock.obtain()) { if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) { refreshCluster(clusterUpperName); subscribe(clusterUpperName, event -> { diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java index e6d537c73f2..4a1048c40c2 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java @@ -20,11 +20,14 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.seata.common.lock.ResourceLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class TccHookManager { private static final Logger LOGGER = LoggerFactory.getLogger(TccHookManager.class); + private static final ResourceLock LOCK = new ResourceLock(); + private TccHookManager() { @@ -40,7 +43,7 @@ private TccHookManager() { */ public static List getHooks() { if (CACHED_UNMODIFIABLE_HOOKS == null) { - synchronized (TccHookManager.class) { + try (ResourceLock ignored = LOCK.obtain()) { if (CACHED_UNMODIFIABLE_HOOKS == null) { CACHED_UNMODIFIABLE_HOOKS = Collections.unmodifiableList(TCC_HOOKS); } diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java index 0ed9625e616..9a1f8d307d0 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java @@ -23,6 +23,7 @@ import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.integration.tx.api.remoting.RemotingDesc; import org.apache.seata.integration.tx.api.remoting.RemotingParser; @@ -43,6 +44,8 @@ public class DefaultRemotingParser { */ protected static Map remotingServiceMap = new ConcurrentHashMap<>(); + private final ResourceLock resourceLock = new ResourceLock(); + private static class SingletonHolder { private static final DefaultRemotingParser INSTANCE = new DefaultRemotingParser(); } @@ -79,7 +82,7 @@ protected void initRemotingParser() { * @param remotingParser */ public boolean registerRemotingParser(RemotingParser remotingParser) { - synchronized (this) { + try (ResourceLock ignored = resourceLock.obtain()) { return allRemotingParsers.add(remotingParser); } } diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java index 964840c993a..c6c5ffdef46 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java @@ -16,6 +16,7 @@ */ package org.apache.seata.integration.tx.api.util; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.integration.tx.api.interceptor.handler.DefaultInvocationHandler; import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler; import org.apache.seata.integration.tx.api.interceptor.parser.DefaultInterfaceParser; @@ -31,6 +32,7 @@ public class ProxyUtil { private static final Map PROXYED_SET = new HashMap<>(); + private static final ResourceLock RESOURCE_LOCK = new ResourceLock(); public static T createProxy(T target) { return createProxy(target, target.getClass().getName()); @@ -53,7 +55,7 @@ public static T createProxy(T target) { */ public static T createProxy(T target, String beanName) { try { - synchronized (PROXYED_SET) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (PROXYED_SET.containsKey(target)) { return (T) PROXYED_SET.get(target); } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java index 2e5c8b1f336..1ee4c1ef63c 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java @@ -17,6 +17,7 @@ package org.apache.seata.rm.datasource.util; import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.rm.BaseDataSourceResource; import org.apache.seata.rm.DefaultResourceManager; import org.apache.seata.sqlparser.SqlParserType; @@ -33,10 +34,11 @@ public final class JdbcUtils { private static volatile DbTypeParser dbTypeParser; + private final static ResourceLock RESOURCE_LOCK = new ResourceLock(); static DbTypeParser getDbTypeParser() { if (dbTypeParser == null) { - synchronized (JdbcUtils.class) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (dbTypeParser == null) { dbTypeParser = EnhancedServiceLoader.load(DbTypeParser.class, SqlParserType.SQL_PARSER_TYPE_DRUID); } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java index 6d35dfd6301..709263d88cc 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java @@ -24,6 +24,7 @@ import javax.transaction.xa.XAResource; import org.apache.seata.common.DefaultValues; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.exception.TransactionException; @@ -69,6 +70,8 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold private boolean shouldBeHeld = false; + private final ResourceLock resourceLock = new ResourceLock(); + /** * Constructor of Connection Proxy for XA mode. * @@ -127,10 +130,12 @@ private void xaEnd(XAXid xaXid, int flags) throws XAException { * @param applicationData application data * @throws SQLException SQLException */ - public synchronized void xaCommit(String xid, long branchId, String applicationData) throws XAException { - XAXid xaXid = XAXidBuilder.build(xid, branchId); - xaResource.commit(xaXid, false); - releaseIfNecessary(); + public void xaCommit(String xid, long branchId, String applicationData) throws XAException { + try (ResourceLock ignored = resourceLock.obtain()) { + XAXid xaXid = XAXidBuilder.build(xid, branchId); + xaResource.commit(xaXid, false); + releaseIfNecessary(); + } } /** @@ -139,12 +144,14 @@ public synchronized void xaCommit(String xid, long branchId, String applicationD * @param branchId transaction branch id * @param applicationData application data */ - public synchronized void xaRollback(String xid, long branchId, String applicationData) throws XAException { - if (this.xaBranchXid != null) { - xaRollback(xaBranchXid); - } else { - XAXid xaXid = XAXidBuilder.build(xid, branchId); - xaRollback(xaXid); + public void xaRollback(String xid, long branchId, String applicationData) throws XAException { + try (ResourceLock ignored = resourceLock.obtain()) { + if (this.xaBranchXid != null) { + xaRollback(xaBranchXid); + } else { + XAXid xaXid = XAXidBuilder.build(xid, branchId); + xaRollback(xaXid); + } } } @@ -214,43 +221,45 @@ public boolean getAutoCommit() throws SQLException { } @Override - public synchronized void commit() throws SQLException { - if (currentAutoCommitStatus || isReadOnly()) { - // Ignore the committing on an autocommit session and read-only transaction. - return; - } - if (!xaActive || this.xaBranchXid == null) { - throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); - } - try { - // XA End: Success - try { - end(XAResource.TMSUCCESS); - } catch (SQLException sqle) { - // Rollback immediately before the XA Branch Context is deleted. - String xaBranchXid = this.xaBranchXid.toString(); - rollback(); - throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle); + public void commit() throws SQLException { + try (ResourceLock ignored = resourceLock.obtain()) { + if (currentAutoCommitStatus || isReadOnly()) { + // Ignore the committing on an autocommit session and read-only transaction. + return; } - long now = System.currentTimeMillis(); - checkTimeout(now); - setPrepareTime(now); - int prepare = xaResource.prepare(xaBranchXid); - // Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022), - // only Oracle has read-only optimization; the others do not provide read-only feedback. - // Therefore, the database type check can be eliminated here. - if (prepare == XAResource.XA_RDONLY) { - // Branch Report to TC: RDONLY - reportStatusToTC(BranchStatus.PhaseOne_RDONLY); + if (!xaActive || this.xaBranchXid == null) { + throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); + } + try { + // XA End: Success + try { + end(XAResource.TMSUCCESS); + } catch (SQLException sqle) { + // Rollback immediately before the XA Branch Context is deleted. + String xaBranchXid = this.xaBranchXid.toString(); + rollback(); + throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle); + } + long now = System.currentTimeMillis(); + checkTimeout(now); + setPrepareTime(now); + int prepare = xaResource.prepare(xaBranchXid); + // Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022), + // only Oracle has read-only optimization; the others do not provide read-only feedback. + // Therefore, the database type check can be eliminated here. + if (prepare == XAResource.XA_RDONLY) { + // Branch Report to TC: RDONLY + reportStatusToTC(BranchStatus.PhaseOne_RDONLY); + } + } catch (XAException xe) { + // Branch Report to TC: Failed + reportStatusToTC(BranchStatus.PhaseOne_Failed); + throw new SQLException( + "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe + .getMessage(), xe); + } finally { + cleanXABranchContext(); } - } catch (XAException xe) { - // Branch Report to TC: Failed - reportStatusToTC(BranchStatus.PhaseOne_Failed); - throw new SQLException( - "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe - .getMessage(), xe); - } finally { - cleanXABranchContext(); } } @@ -280,23 +289,25 @@ public void rollback() throws SQLException { } } - private synchronized void start() throws XAException, SQLException { - // 3. XA Start - if (JdbcConstants.ORACLE.equals(resource.getDbType())) { - xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE); - } else { - xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); - } + private void start() throws XAException, SQLException { + try (ResourceLock ignored = resourceLock.obtain()) { + // 3. XA Start + if (JdbcConstants.ORACLE.equals(resource.getDbType())) { + xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE); + } else { + xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); + } - try { - termination(); - } catch (SQLException e) { - // the framework layer does not actively call ROLLBACK when setAutoCommit throws an SQL exception - xaResource.end(this.xaBranchXid, XAResource.TMFAIL); - xaRollback(xaBranchXid); - // Branch Report to TC: Failed - reportStatusToTC(BranchStatus.PhaseOne_Failed); - throw e; + try { + termination(); + } catch (SQLException e) { + // the framework layer does not actively call ROLLBACK when setAutoCommit throws an SQL exception + xaResource.end(this.xaBranchXid, XAResource.TMFAIL); + xaRollback(xaBranchXid); + // Branch Report to TC: Failed + reportStatusToTC(BranchStatus.PhaseOne_Failed); + throw e; + } } } @@ -323,27 +334,31 @@ private void checkTimeout(Long now) throws XAException { } @Override - public synchronized void close() throws SQLException { - rollBacked = false; - if (isHeld() && shouldBeHeld()) { - // if kept by a keeper, just hold the connection. - return; + public void close() throws SQLException { + try (ResourceLock ignored = resourceLock.obtain()) { + rollBacked = false; + if (isHeld() && shouldBeHeld()) { + // if kept by a keeper, just hold the connection. + return; + } + cleanXABranchContext(); + originalConnection.close(); } - cleanXABranchContext(); - originalConnection.close(); } - protected synchronized void closeForce() throws SQLException { - Connection physicalConn = getWrappedConnection(); - if (physicalConn instanceof PooledConnection) { - physicalConn = ((PooledConnection) physicalConn).getConnection(); + protected void closeForce() throws SQLException { + try (ResourceLock ignored = resourceLock.obtain()) { + Connection physicalConn = getWrappedConnection(); + if (physicalConn instanceof PooledConnection) { + physicalConn = ((PooledConnection) physicalConn).getConnection(); + } + // Force close the physical connection + physicalConn.close(); + rollBacked = false; + cleanXABranchContext(); + originalConnection.close(); + releaseIfNecessary(); } - // Force close the physical connection - physicalConn.close(); - rollBacked = false; - cleanXABranchContext(); - originalConnection.close(); - releaseIfNecessary(); } @Override @@ -398,4 +413,11 @@ private void reportStatusToTC(BranchStatus status) { } } + /** + * Get the lock of the current connection + * @return the RESOURCE_LOCK + */ + public ResourceLock getResourceLock() { + return resourceLock; + } } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java index 8added9ff64..8d44495ddb6 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java @@ -23,6 +23,7 @@ import java.sql.SQLException; import javax.transaction.xa.XAException; import org.apache.seata.common.DefaultValues; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.exception.TransactionException; @@ -53,6 +54,7 @@ public class ResourceManagerXA extends AbstractDataSourceCacheResourceManager { * The Timer check xa branch two phase hold timeout. */ protected volatile ScheduledExecutorService xaTwoPhaseTimeoutChecker; + private final ResourceLock resourceLock = new ResourceLock(); @Override public void init() { @@ -61,7 +63,7 @@ public void init() { public void initXaTwoPhaseTimeoutChecker() { if (xaTwoPhaseTimeoutChecker == null) { - synchronized (this) { + try (ResourceLock ignored = resourceLock.obtain()) { if (xaTwoPhaseTimeoutChecker == null) { boolean shouldBeHold = dataSourceCache.values().parallelStream().anyMatch(resource -> { if (resource instanceof DataSourceProxyXA) { @@ -81,7 +83,7 @@ public void initXaTwoPhaseTimeoutChecker() { for (Map.Entry connectionEntry : keeper.entrySet()) { ConnectionProxyXA connection = connectionEntry.getValue(); long now = System.currentTimeMillis(); - synchronized (connection) { + try (ResourceLock ignored2 = connection.getResourceLock().obtain()) { if (connection.getPrepareTime() != null && now - connection.getPrepareTime() > TWO_PHASE_HOLD_TIMEOUT) { try { diff --git a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java index 63414aa1290..c18e6f758a0 100644 --- a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java +++ b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java @@ -19,6 +19,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.remoting.RPCHook; import org.apache.seata.common.exception.NotSupportYetException; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.core.model.BranchType; import org.apache.seata.integration.tx.api.util.ProxyUtil; @@ -29,7 +30,7 @@ public class SeataMQProducerFactory { public static final String ROCKET_TCC_NAME = "tccRocketMQ"; public static final BranchType ROCKET_BRANCH_TYPE = BranchType.TCC; - + private static final ResourceLock RESOURCE_LOCK = new ResourceLock(); /** * Default Producer, it can be replaced to Map after multi-resource is supported */ @@ -42,7 +43,7 @@ public static SeataMQProducer createSingle(String nameServer, String producerGro public static SeataMQProducer createSingle(String nameServer, String namespace, String groupName, RPCHook rpcHook) throws MQClientException { if (defaultProducer == null) { - synchronized (SeataMQProducerFactory.class) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (defaultProducer == null) { defaultProducer = new SeataMQProducer(namespace, groupName, rpcHook); defaultProducer.setNamesrvAddr(nameServer); diff --git a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java index 251b9876e5b..57ee9eea985 100644 --- a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java +++ b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java @@ -31,6 +31,7 @@ import org.apache.seata.common.Constants; import org.apache.seata.common.DefaultValues; import org.apache.seata.common.XID; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.BufferUtils; import org.apache.seata.common.util.StringUtils; import org.apache.seata.common.util.UUIDGenerator; @@ -107,6 +108,9 @@ public class GlobalSession implements SessionLifecycle, SessionStorable { private Set lifecycleListeners = new HashSet<>(2); + private final ResourceLock resourceLock = new ResourceLock(); + + /** * Add boolean. * @@ -129,7 +133,7 @@ public boolean add(BranchSession branchSession) { * @return the boolean */ public boolean remove(BranchSession branchSession) { - synchronized (this) { + try (ResourceLock ignored = resourceLock.obtain()) { return branchSessions.remove(branchSession); } }