Skip to content

Commit

Permalink
feature:[loom] replace the usages of synchronized with ReentrantLock (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lightClouds917 authored Jan 17, 2025
1 parent 4632770 commit 02e1514
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 91 deletions.
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Add changes here for all PR submitted to the 2.x branch.

### feature:

- [[#7073](https://github.com/apache/incubator-seata/pull/7073)] support virtual thread,replace the usages of synchronized with ReentrantLock
- [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury undolog parser
- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster mode supports address translation
- [[#7038](https://github.com/apache/incubator-seata/pull/7038)] support fury serializer
Expand Down Expand Up @@ -43,6 +44,7 @@ Thanks to these contributors for their code commits. Please report an unintended
- [slievrly](https://github.com/slievrly)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [remind](https://github.com/remind)
- [lightClouds917](https://github.com/lightClouds917)
- [GoodBoyCoder](https://github.com/GoodBoyCoder)
- [PeppaO](https://github.com/PeppaO)
- [funky-eyes](https://github.com/funky-eyes)
Expand Down
2 changes: 2 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### feature:

- [[#7073](https://github.com/apache/incubator-seata/pull/7073)] 支持虚拟线程,用ReentrantLock替换synchronized的用法
- [[#7037](https://github.com/apache/incubator-seata/pull/7037)] 支持UndoLog的fury序列化方式
- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换
- [[#7038](https://github.com/apache/incubator-seata/pull/7038)] 支持Fury序列化器
Expand Down Expand Up @@ -42,6 +43,7 @@
- [slievrly](https://github.com/slievrly)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [remind](https://github.com/remind)
- [lightClouds917](https://github.com/lightClouds917)
- [GoodBoyCoder](https://github.com/GoodBoyCoder)
- [PeppaO](https://github.com/PeppaO)
- [funky-eyes](https://github.com/funky-eyes)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.lock;

import java.util.concurrent.locks.ReentrantLock;

/**
* The ResourceLock extends ReentrantLock and implements AutoCloseable,
* allowing it to be used in try-with-resources blocks without needing
* to unlock in a finally block.
*
* <h3>Example</h3>
* <pre>
* {@code
* private final ResourceLock resourceLock = new ResourceLock();
* try (ResourceLock lock = resourceLock.obtain()) {
* // do something while holding the resource lock
* }
* }
* </pre>
*/
public class ResourceLock extends ReentrantLock implements AutoCloseable {

/**
* Obtain the lock.
*
* @return this ResourceLock
*/
public ResourceLock obtain() {
lock();
return this;
}


/**
* Unlock the resource lock.
*
* <p>This is typically used in try-with-resources blocks to automatically
* unlock the resource lock when the block is exited, regardless of whether
* an exception is thrown or not.
*/
@Override
public void close() {
this.unlock();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
*/
package org.apache.seata.common.util;

import org.apache.seata.common.lock.ResourceLock;

/**
* The type Uuid generator.
*/
public class UUIDGenerator {

private static volatile IdWorker idWorker;
private final static ResourceLock RESOURCE_LOCK = new ResourceLock();

/**
* generate UUID using snowflake algorithm
Expand All @@ -30,7 +33,7 @@ public class UUIDGenerator {
*/
public static long generateUUID() {
if (idWorker == null) {
synchronized (UUIDGenerator.class) {
try (ResourceLock ignored = RESOURCE_LOCK.obtain()) {
if (idWorker == null) {
init(null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.lock;

import org.apache.seata.common.util.CollectionUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.concurrent.ConcurrentHashMap;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(MockitoExtension.class)
public class ResourceLockTest {

@Test
public void testObtainAndClose() {
ResourceLock resourceLock = new ResourceLock();

// Test obtaining the lock
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread");
}

// After try-with-resources, lock should be released
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after try-with-resources");
}

@Test
public void testMultipleObtainAndClose() {
ResourceLock resourceLock = new ResourceLock();

// First obtain and release
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread");
}
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after first try-with-resources");

// Second obtain and release
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread");
}
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after second try-with-resources");
}

@Test
public void testResourceLockAutoRemovalFromMap() {
ConcurrentHashMap<String, ResourceLock> lockMap = new ConcurrentHashMap<>();
String key = "testKey";
// Use try-with-resources to obtain and release the lock
try (ResourceLock ignored = CollectionUtils.computeIfAbsent(lockMap, key, k -> new ResourceLock()).obtain()) {
// Do something while holding the lock
assertTrue(lockMap.containsKey(key));
assertTrue(lockMap.get(key).isHeldByCurrentThread());
} finally {
assertFalse(lockMap.get(key).isHeldByCurrentThread());
assertTrue(lockMap.containsKey(key));
// Remove the lock from the map
lockMap.remove(key);
assertFalse(lockMap.containsKey(key));
}
// Ensure the lock is removed from the map
assertFalse(lockMap.containsKey(key));
}

@Test
public void testConcurrentLocking() throws InterruptedException {
ResourceLock resourceLock = new ResourceLock();

Thread t1 = new Thread(() -> {
try (ResourceLock lock = resourceLock.obtain()) {
try {
Thread.sleep(100); // Hold the lock for 100ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

Thread t2 = new Thread(() -> {
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should not be held by current thread before t1 releases it");
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread after t1 releases it");
}
});

t1.start();
t2.start();

t1.join();
t2.join();

assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after both threads complete");
}

@Test
public void testLockInterruptibly() throws InterruptedException {
ResourceLock resourceLock = new ResourceLock();

Thread t1 = new Thread(() -> {
try (ResourceLock lock = resourceLock.obtain()) {
try {
Thread.sleep(1000); // Hold the lock for 1000ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

t1.start();
Thread.sleep(50); // Wait for t1 to acquire the lock

Thread t2 = new Thread(() -> {
try {
resourceLock.lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

t2.start();
Thread.sleep(50); // Wait for t2 to attempt to acquire the lock

t2.interrupt(); // Interrupt t2

t1.join();
t2.join();

assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after t1 completes");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.netflix.discovery.EurekaEventListener;
import com.netflix.discovery.shared.Application;
import org.apache.seata.common.exception.EurekaRegistryException;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventLis
private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;
private static final ConcurrentMap<String, List<EurekaEventListener>> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, List<InetSocketAddress>> CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Object> CLUSTER_LOCK = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, ResourceLock> CLUSTER_LOCK = new ConcurrentHashMap<>();

private static volatile ApplicationInfoManager applicationInfoManager;
private static volatile CustomEurekaInstanceConfig instanceConfig;
Expand Down Expand Up @@ -140,8 +141,8 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
}
String clusterUpperName = clusterName.toUpperCase();
if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) {
Object lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, k -> new Object());
synchronized (lock) {
ResourceLock lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, k -> new ResourceLock());
try (ResourceLock ignored = lock.obtain()) {
if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) {
refreshCluster(clusterUpperName);
subscribe(clusterUpperName, event -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.seata.common.lock.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TccHookManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TccHookManager.class);
private static final ResourceLock LOCK = new ResourceLock();


private TccHookManager() {

Expand All @@ -40,7 +43,7 @@ private TccHookManager() {
*/
public static List<TccHook> getHooks() {
if (CACHED_UNMODIFIABLE_HOOKS == null) {
synchronized (TccHookManager.class) {
try (ResourceLock ignored = LOCK.obtain()) {
if (CACHED_UNMODIFIABLE_HOOKS == null) {
CACHED_UNMODIFIABLE_HOOKS = Collections.unmodifiableList(TCC_HOOKS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.loader.EnhancedServiceLoader;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.integration.tx.api.remoting.RemotingDesc;
import org.apache.seata.integration.tx.api.remoting.RemotingParser;
Expand All @@ -43,6 +44,8 @@ public class DefaultRemotingParser {
*/
protected static Map<Object, RemotingDesc> remotingServiceMap = new ConcurrentHashMap<>();

private final ResourceLock resourceLock = new ResourceLock();

private static class SingletonHolder {
private static final DefaultRemotingParser INSTANCE = new DefaultRemotingParser();
}
Expand Down Expand Up @@ -79,7 +82,7 @@ protected void initRemotingParser() {
* @param remotingParser
*/
public boolean registerRemotingParser(RemotingParser remotingParser) {
synchronized (this) {
try (ResourceLock ignored = resourceLock.obtain()) {
return allRemotingParsers.add(remotingParser);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.seata.integration.tx.api.util;

import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.integration.tx.api.interceptor.handler.DefaultInvocationHandler;
import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler;
import org.apache.seata.integration.tx.api.interceptor.parser.DefaultInterfaceParser;
Expand All @@ -31,6 +32,7 @@
public class ProxyUtil {

private static final Map<Object, Object> PROXYED_SET = new HashMap<>();
private static final ResourceLock RESOURCE_LOCK = new ResourceLock();

public static <T> T createProxy(T target) {
return createProxy(target, target.getClass().getName());
Expand All @@ -53,7 +55,7 @@ public static <T> T createProxy(T target) {
*/
public static <T> T createProxy(T target, String beanName) {
try {
synchronized (PROXYED_SET) {
try (ResourceLock ignored = RESOURCE_LOCK.obtain()) {
if (PROXYED_SET.containsKey(target)) {
return (T) PROXYED_SET.get(target);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.seata.rm.datasource.util;

import org.apache.seata.common.loader.EnhancedServiceLoader;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.rm.BaseDataSourceResource;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.sqlparser.SqlParserType;
Expand All @@ -33,10 +34,11 @@
public final class JdbcUtils {

private static volatile DbTypeParser dbTypeParser;
private final static ResourceLock RESOURCE_LOCK = new ResourceLock();

static DbTypeParser getDbTypeParser() {
if (dbTypeParser == null) {
synchronized (JdbcUtils.class) {
try (ResourceLock ignored = RESOURCE_LOCK.obtain()) {
if (dbTypeParser == null) {
dbTypeParser = EnhancedServiceLoader.load(DbTypeParser.class, SqlParserType.SQL_PARSER_TYPE_DRUID);
}
Expand Down
Loading

0 comments on commit 02e1514

Please sign in to comment.