From 7f33664949daa8071779ec24c17abc573d26dc71 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Mon, 23 Dec 2024 18:48:11 +0800 Subject: [PATCH 01/10] feature:init and seata-server --- .../seata/common/lock/ResourceLock.java | 60 +++++++ .../seata/common/lock/ResourceLockTest.java | 146 ++++++++++++++++++ .../seata/server/session/GlobalSession.java | 6 +- 3 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 common/src/main/java/org/apache/seata/common/lock/ResourceLock.java create mode 100644 common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java diff --git a/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java b/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java new file mode 100644 index 00000000000..22e815e5784 --- /dev/null +++ b/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.common.lock; + +import java.util.concurrent.locks.ReentrantLock; + +/** + * The ResourceLock extends ReentrantLock and implements AutoCloseable, + * allowing it to be used in try-with-resources blocks without needing + * to unlock in a finally block. + * + *

Example

+ *
+ * {@code
+ *   private final ResourceLock resourceLock = new ResourceLock();
+ *   try (ResourceLock lock = resourceLock.obtain()) {
+ *     // do something while holding the resource lock
+ *   }
+ * }
+ * 
+ */ +public class ResourceLock extends ReentrantLock implements AutoCloseable { + + /** + * Obtain the lock. + * + * @return this ResourceLock + */ + public ResourceLock obtain() { + lock(); + return this; + } + + + /** + * Unlock the resource lock. + * + *

This is typically used in try-with-resources blocks to automatically + * unlock the resource lock when the block is exited, regardless of whether + * an exception is thrown or not. + */ + @Override + public void close() { + this.unlock(); + } +} diff --git a/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java new file mode 100644 index 00000000000..1ceee9ca8cb --- /dev/null +++ b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.common.lock; + +import org.apache.seata.common.util.CollectionUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.jupiter.api.Assertions.*; + +@ExtendWith(MockitoExtension.class) +public class ResourceLockTest { + + @Test + public void testObtainAndClose() { + ResourceLock resourceLock = new ResourceLock(); + + // Test obtaining the lock + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread"); + } + + // After try-with-resources, lock should be released + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after try-with-resources"); + } + + @Test + public void testMultipleObtainAndClose() { + ResourceLock resourceLock = new ResourceLock(); + + // First obtain and release + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread"); + } + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after first try-with-resources"); + + // Second obtain and release + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread"); + } + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after second try-with-resources"); + } + + @Test + public void testResourceLockAutoRemovalFromMap() { + ConcurrentHashMap lockMap = new ConcurrentHashMap<>(); + String key = "testKey"; + // Use try-with-resources to obtain and release the lock + try (ResourceLock ignored = CollectionUtils.computeIfAbsent(lockMap, key, k -> new ResourceLock()).obtain()) { + // Do something while holding the lock + assertTrue(lockMap.containsKey(key)); + assertTrue(lockMap.get(key).isHeldByCurrentThread()); + } finally { + assertFalse(lockMap.get(key).isHeldByCurrentThread()); + assertTrue(lockMap.containsKey(key)); + // Remove the lock from the map + lockMap.remove(key); + assertFalse(lockMap.containsKey(key)); + } + // Ensure the lock is removed from the map + assertFalse(lockMap.containsKey(key)); + } + + @Test + public void testConcurrentLocking() throws InterruptedException { + ResourceLock resourceLock = new ResourceLock(); + + Thread t1 = new Thread(() -> { + try (ResourceLock lock = resourceLock.obtain()) { + try { + Thread.sleep(100); // Hold the lock for 100ms + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + + Thread t2 = new Thread(() -> { + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should not be held by current thread before t1 releases it"); + try (ResourceLock lock = resourceLock.obtain()) { + assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread after t1 releases it"); + } + }); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after both threads complete"); + } + + @Test + public void testLockInterruptibly() throws InterruptedException { + ResourceLock resourceLock = new ResourceLock(); + + Thread t1 = new Thread(() -> { + try (ResourceLock lock = resourceLock.obtain()) { + try { + Thread.sleep(1000); // Hold the lock for 1000ms + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + + t1.start(); + Thread.sleep(50); // Wait for t1 to acquire the lock + + Thread t2 = new Thread(() -> { + try { + resourceLock.lockInterruptibly(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + t2.start(); + Thread.sleep(50); // Wait for t2 to attempt to acquire the lock + + t2.interrupt(); // Interrupt t2 + + t1.join(); + t2.join(); + + assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after t1 completes"); + } +} diff --git a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java index 251b9876e5b..57ee9eea985 100644 --- a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java +++ b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java @@ -31,6 +31,7 @@ import org.apache.seata.common.Constants; import org.apache.seata.common.DefaultValues; import org.apache.seata.common.XID; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.BufferUtils; import org.apache.seata.common.util.StringUtils; import org.apache.seata.common.util.UUIDGenerator; @@ -107,6 +108,9 @@ public class GlobalSession implements SessionLifecycle, SessionStorable { private Set lifecycleListeners = new HashSet<>(2); + private final ResourceLock resourceLock = new ResourceLock(); + + /** * Add boolean. * @@ -129,7 +133,7 @@ public boolean add(BranchSession branchSession) { * @return the boolean */ public boolean remove(BranchSession branchSession) { - synchronized (this) { + try (ResourceLock ignored = resourceLock.obtain()) { return branchSessions.remove(branchSession); } } From 6504bbeaac9f16333b4d51ef1262ae13be8c7132 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Mon, 23 Dec 2024 19:45:37 +0800 Subject: [PATCH 02/10] feature:init and seata-rm-datasource,seata-rocketmq --- .../seata/rm/datasource/util/JdbcUtils.java | 4 +- .../rm/datasource/xa/ConnectionProxyXA.java | 178 ++++++++++-------- .../rm/datasource/xa/ResourceManagerXA.java | 6 +- .../rocketmq/SeataMQProducerFactory.java | 5 +- 4 files changed, 110 insertions(+), 83 deletions(-) diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java index 2e5c8b1f336..1ee4c1ef63c 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java @@ -17,6 +17,7 @@ package org.apache.seata.rm.datasource.util; import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.rm.BaseDataSourceResource; import org.apache.seata.rm.DefaultResourceManager; import org.apache.seata.sqlparser.SqlParserType; @@ -33,10 +34,11 @@ public final class JdbcUtils { private static volatile DbTypeParser dbTypeParser; + private final static ResourceLock RESOURCE_LOCK = new ResourceLock(); static DbTypeParser getDbTypeParser() { if (dbTypeParser == null) { - synchronized (JdbcUtils.class) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (dbTypeParser == null) { dbTypeParser = EnhancedServiceLoader.load(DbTypeParser.class, SqlParserType.SQL_PARSER_TYPE_DRUID); } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java index 6d35dfd6301..24b30c444fd 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java @@ -24,6 +24,7 @@ import javax.transaction.xa.XAResource; import org.apache.seata.common.DefaultValues; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.exception.TransactionException; @@ -69,6 +70,8 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold private boolean shouldBeHeld = false; + private final ResourceLock RESOURCE_LOCK = new ResourceLock(); + /** * Constructor of Connection Proxy for XA mode. * @@ -127,10 +130,12 @@ private void xaEnd(XAXid xaXid, int flags) throws XAException { * @param applicationData application data * @throws SQLException SQLException */ - public synchronized void xaCommit(String xid, long branchId, String applicationData) throws XAException { - XAXid xaXid = XAXidBuilder.build(xid, branchId); - xaResource.commit(xaXid, false); - releaseIfNecessary(); + public void xaCommit(String xid, long branchId, String applicationData) throws XAException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + XAXid xaXid = XAXidBuilder.build(xid, branchId); + xaResource.commit(xaXid, false); + releaseIfNecessary(); + } } /** @@ -139,12 +144,14 @@ public synchronized void xaCommit(String xid, long branchId, String applicationD * @param branchId transaction branch id * @param applicationData application data */ - public synchronized void xaRollback(String xid, long branchId, String applicationData) throws XAException { - if (this.xaBranchXid != null) { - xaRollback(xaBranchXid); - } else { - XAXid xaXid = XAXidBuilder.build(xid, branchId); - xaRollback(xaXid); + public void xaRollback(String xid, long branchId, String applicationData) throws XAException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + if (this.xaBranchXid != null) { + xaRollback(xaBranchXid); + } else { + XAXid xaXid = XAXidBuilder.build(xid, branchId); + xaRollback(xaXid); + } } } @@ -214,43 +221,45 @@ public boolean getAutoCommit() throws SQLException { } @Override - public synchronized void commit() throws SQLException { - if (currentAutoCommitStatus || isReadOnly()) { - // Ignore the committing on an autocommit session and read-only transaction. - return; - } - if (!xaActive || this.xaBranchXid == null) { - throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); - } - try { - // XA End: Success - try { - end(XAResource.TMSUCCESS); - } catch (SQLException sqle) { - // Rollback immediately before the XA Branch Context is deleted. - String xaBranchXid = this.xaBranchXid.toString(); - rollback(); - throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle); + public void commit() throws SQLException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + if (currentAutoCommitStatus || isReadOnly()) { + // Ignore the committing on an autocommit session and read-only transaction. + return; } - long now = System.currentTimeMillis(); - checkTimeout(now); - setPrepareTime(now); - int prepare = xaResource.prepare(xaBranchXid); - // Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022), - // only Oracle has read-only optimization; the others do not provide read-only feedback. - // Therefore, the database type check can be eliminated here. - if (prepare == XAResource.XA_RDONLY) { - // Branch Report to TC: RDONLY - reportStatusToTC(BranchStatus.PhaseOne_RDONLY); + if (!xaActive || this.xaBranchXid == null) { + throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); + } + try { + // XA End: Success + try { + end(XAResource.TMSUCCESS); + } catch (SQLException sqle) { + // Rollback immediately before the XA Branch Context is deleted. + String xaBranchXid = this.xaBranchXid.toString(); + rollback(); + throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle); + } + long now = System.currentTimeMillis(); + checkTimeout(now); + setPrepareTime(now); + int prepare = xaResource.prepare(xaBranchXid); + // Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022), + // only Oracle has read-only optimization; the others do not provide read-only feedback. + // Therefore, the database type check can be eliminated here. + if (prepare == XAResource.XA_RDONLY) { + // Branch Report to TC: RDONLY + reportStatusToTC(BranchStatus.PhaseOne_RDONLY); + } + } catch (XAException xe) { + // Branch Report to TC: Failed + reportStatusToTC(BranchStatus.PhaseOne_Failed); + throw new SQLException( + "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe + .getMessage(), xe); + } finally { + cleanXABranchContext(); } - } catch (XAException xe) { - // Branch Report to TC: Failed - reportStatusToTC(BranchStatus.PhaseOne_Failed); - throw new SQLException( - "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe - .getMessage(), xe); - } finally { - cleanXABranchContext(); } } @@ -280,23 +289,25 @@ public void rollback() throws SQLException { } } - private synchronized void start() throws XAException, SQLException { - // 3. XA Start - if (JdbcConstants.ORACLE.equals(resource.getDbType())) { - xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE); - } else { - xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); - } + private void start() throws XAException, SQLException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + // 3. XA Start + if (JdbcConstants.ORACLE.equals(resource.getDbType())) { + xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE); + } else { + xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); + } - try { - termination(); - } catch (SQLException e) { - // the framework layer does not actively call ROLLBACK when setAutoCommit throws an SQL exception - xaResource.end(this.xaBranchXid, XAResource.TMFAIL); - xaRollback(xaBranchXid); - // Branch Report to TC: Failed - reportStatusToTC(BranchStatus.PhaseOne_Failed); - throw e; + try { + termination(); + } catch (SQLException e) { + // the framework layer does not actively call ROLLBACK when setAutoCommit throws an SQL exception + xaResource.end(this.xaBranchXid, XAResource.TMFAIL); + xaRollback(xaBranchXid); + // Branch Report to TC: Failed + reportStatusToTC(BranchStatus.PhaseOne_Failed); + throw e; + } } } @@ -323,27 +334,31 @@ private void checkTimeout(Long now) throws XAException { } @Override - public synchronized void close() throws SQLException { - rollBacked = false; - if (isHeld() && shouldBeHeld()) { - // if kept by a keeper, just hold the connection. - return; + public void close() throws SQLException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + rollBacked = false; + if (isHeld() && shouldBeHeld()) { + // if kept by a keeper, just hold the connection. + return; + } + cleanXABranchContext(); + originalConnection.close(); } - cleanXABranchContext(); - originalConnection.close(); } - protected synchronized void closeForce() throws SQLException { - Connection physicalConn = getWrappedConnection(); - if (physicalConn instanceof PooledConnection) { - physicalConn = ((PooledConnection) physicalConn).getConnection(); + protected void closeForce() throws SQLException { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + Connection physicalConn = getWrappedConnection(); + if (physicalConn instanceof PooledConnection) { + physicalConn = ((PooledConnection) physicalConn).getConnection(); + } + // Force close the physical connection + physicalConn.close(); + rollBacked = false; + cleanXABranchContext(); + originalConnection.close(); + releaseIfNecessary(); } - // Force close the physical connection - physicalConn.close(); - rollBacked = false; - cleanXABranchContext(); - originalConnection.close(); - releaseIfNecessary(); } @Override @@ -398,4 +413,11 @@ private void reportStatusToTC(BranchStatus status) { } } + /** + * Get the lock of the current connection + * @return the RESOURCE_LOCK + */ + public ResourceLock getResourceLock() { + return RESOURCE_LOCK; + } } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java index 8added9ff64..8d44495ddb6 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java @@ -23,6 +23,7 @@ import java.sql.SQLException; import javax.transaction.xa.XAException; import org.apache.seata.common.DefaultValues; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.exception.TransactionException; @@ -53,6 +54,7 @@ public class ResourceManagerXA extends AbstractDataSourceCacheResourceManager { * The Timer check xa branch two phase hold timeout. */ protected volatile ScheduledExecutorService xaTwoPhaseTimeoutChecker; + private final ResourceLock resourceLock = new ResourceLock(); @Override public void init() { @@ -61,7 +63,7 @@ public void init() { public void initXaTwoPhaseTimeoutChecker() { if (xaTwoPhaseTimeoutChecker == null) { - synchronized (this) { + try (ResourceLock ignored = resourceLock.obtain()) { if (xaTwoPhaseTimeoutChecker == null) { boolean shouldBeHold = dataSourceCache.values().parallelStream().anyMatch(resource -> { if (resource instanceof DataSourceProxyXA) { @@ -81,7 +83,7 @@ public void initXaTwoPhaseTimeoutChecker() { for (Map.Entry connectionEntry : keeper.entrySet()) { ConnectionProxyXA connection = connectionEntry.getValue(); long now = System.currentTimeMillis(); - synchronized (connection) { + try (ResourceLock ignored2 = connection.getResourceLock().obtain()) { if (connection.getPrepareTime() != null && now - connection.getPrepareTime() > TWO_PHASE_HOLD_TIMEOUT) { try { diff --git a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java index 63414aa1290..c18e6f758a0 100644 --- a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java +++ b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java @@ -19,6 +19,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.remoting.RPCHook; import org.apache.seata.common.exception.NotSupportYetException; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.core.model.BranchType; import org.apache.seata.integration.tx.api.util.ProxyUtil; @@ -29,7 +30,7 @@ public class SeataMQProducerFactory { public static final String ROCKET_TCC_NAME = "tccRocketMQ"; public static final BranchType ROCKET_BRANCH_TYPE = BranchType.TCC; - + private static final ResourceLock RESOURCE_LOCK = new ResourceLock(); /** * Default Producer, it can be replaced to Map after multi-resource is supported */ @@ -42,7 +43,7 @@ public static SeataMQProducer createSingle(String nameServer, String producerGro public static SeataMQProducer createSingle(String nameServer, String namespace, String groupName, RPCHook rpcHook) throws MQClientException { if (defaultProducer == null) { - synchronized (SeataMQProducerFactory.class) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (defaultProducer == null) { defaultProducer = new SeataMQProducer(namespace, groupName, rpcHook); defaultProducer.setNamesrvAddr(nameServer); From fec2f9f3aa2e538ca06c9ccf5e9ed591b76b69c5 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Mon, 23 Dec 2024 19:49:27 +0800 Subject: [PATCH 03/10] feature:changes --- changes/en-us/2.x.md | 3 ++- changes/zh-cn/2.x.md | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 0dc00393b96..a90c610a721 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -5,6 +5,7 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: - [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] support XXX +- [[#7073](https://github.com/apache/incubator-seata/pull/7073)] support virtual thread,replace the usages of synchronized with ReentrantLock ### bugfix: @@ -31,6 +32,6 @@ Thanks to these contributors for their code commits. Please report an unintended - [slievrly](https://github.com/slievrly) -- [GITHUB_ID](https://github.com/GITHUB_ID) +- [lightClouds917](https://github.com/lightClouds917) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. \ No newline at end of file diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 5c76d15a1b0..f4e82eabc48 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -5,6 +5,7 @@ ### feature: - [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 支持XXX +- [[#7073](https://github.com/apache/incubator-seata/pull/7073)] 支持虚拟线程,用ReentrantLock替换synchronized的用法 ### bugfix: @@ -31,6 +32,6 @@ - [slievrly](https://github.com/slievrly) -- [GITHUB_ID](https://github.com/GITHUB_ID) +- [lightClouds917](https://github.com/lightClouds917) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 \ No newline at end of file From 16beebf8533e1fb5cfa6234775d24271d83fda64 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Mon, 23 Dec 2024 19:58:40 +0800 Subject: [PATCH 04/10] feature:fix ConnectionProxyXA --- .../rm/datasource/xa/ConnectionProxyXA.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java index 24b30c444fd..709263d88cc 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java @@ -70,7 +70,7 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold private boolean shouldBeHeld = false; - private final ResourceLock RESOURCE_LOCK = new ResourceLock(); + private final ResourceLock resourceLock = new ResourceLock(); /** * Constructor of Connection Proxy for XA mode. @@ -131,7 +131,7 @@ private void xaEnd(XAXid xaXid, int flags) throws XAException { * @throws SQLException SQLException */ public void xaCommit(String xid, long branchId, String applicationData) throws XAException { - try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + try (ResourceLock ignored = resourceLock.obtain()) { XAXid xaXid = XAXidBuilder.build(xid, branchId); xaResource.commit(xaXid, false); releaseIfNecessary(); @@ -145,7 +145,7 @@ public void xaCommit(String xid, long branchId, String applicationData) throws X * @param applicationData application data */ public void xaRollback(String xid, long branchId, String applicationData) throws XAException { - try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + try (ResourceLock ignored = resourceLock.obtain()) { if (this.xaBranchXid != null) { xaRollback(xaBranchXid); } else { @@ -222,7 +222,7 @@ public boolean getAutoCommit() throws SQLException { @Override public void commit() throws SQLException { - try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + try (ResourceLock ignored = resourceLock.obtain()) { if (currentAutoCommitStatus || isReadOnly()) { // Ignore the committing on an autocommit session and read-only transaction. return; @@ -290,7 +290,7 @@ public void rollback() throws SQLException { } private void start() throws XAException, SQLException { - try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + try (ResourceLock ignored = resourceLock.obtain()) { // 3. XA Start if (JdbcConstants.ORACLE.equals(resource.getDbType())) { xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE); @@ -335,7 +335,7 @@ private void checkTimeout(Long now) throws XAException { @Override public void close() throws SQLException { - try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + try (ResourceLock ignored = resourceLock.obtain()) { rollBacked = false; if (isHeld() && shouldBeHeld()) { // if kept by a keeper, just hold the connection. @@ -347,7 +347,7 @@ public void close() throws SQLException { } protected void closeForce() throws SQLException { - try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { + try (ResourceLock ignored = resourceLock.obtain()) { Connection physicalConn = getWrappedConnection(); if (physicalConn instanceof PooledConnection) { physicalConn = ((PooledConnection) physicalConn).getConnection(); @@ -418,6 +418,6 @@ private void reportStatusToTC(BranchStatus status) { * @return the RESOURCE_LOCK */ public ResourceLock getResourceLock() { - return RESOURCE_LOCK; + return resourceLock; } } From 45c5c785db6ded8dcba38740ec21247a781a6399 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Mon, 23 Dec 2024 20:14:06 +0800 Subject: [PATCH 05/10] feature:seata-integration-tx-api --- .../seata/integration/tx/api/fence/hook/TccHookManager.java | 5 ++++- .../tx/api/remoting/parser/DefaultRemotingParser.java | 5 ++++- .../org/apache/seata/integration/tx/api/util/ProxyUtil.java | 4 +++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java index e6d537c73f2..4a1048c40c2 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java @@ -20,11 +20,14 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.seata.common.lock.ResourceLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class TccHookManager { private static final Logger LOGGER = LoggerFactory.getLogger(TccHookManager.class); + private static final ResourceLock LOCK = new ResourceLock(); + private TccHookManager() { @@ -40,7 +43,7 @@ private TccHookManager() { */ public static List getHooks() { if (CACHED_UNMODIFIABLE_HOOKS == null) { - synchronized (TccHookManager.class) { + try (ResourceLock ignored = LOCK.obtain()) { if (CACHED_UNMODIFIABLE_HOOKS == null) { CACHED_UNMODIFIABLE_HOOKS = Collections.unmodifiableList(TCC_HOOKS); } diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java index 0ed9625e616..9a1f8d307d0 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java @@ -23,6 +23,7 @@ import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.integration.tx.api.remoting.RemotingDesc; import org.apache.seata.integration.tx.api.remoting.RemotingParser; @@ -43,6 +44,8 @@ public class DefaultRemotingParser { */ protected static Map remotingServiceMap = new ConcurrentHashMap<>(); + private final ResourceLock resourceLock = new ResourceLock(); + private static class SingletonHolder { private static final DefaultRemotingParser INSTANCE = new DefaultRemotingParser(); } @@ -79,7 +82,7 @@ protected void initRemotingParser() { * @param remotingParser */ public boolean registerRemotingParser(RemotingParser remotingParser) { - synchronized (this) { + try (ResourceLock ignored = resourceLock.obtain()) { return allRemotingParsers.add(remotingParser); } } diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java index 964840c993a..c6c5ffdef46 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java @@ -16,6 +16,7 @@ */ package org.apache.seata.integration.tx.api.util; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.integration.tx.api.interceptor.handler.DefaultInvocationHandler; import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler; import org.apache.seata.integration.tx.api.interceptor.parser.DefaultInterfaceParser; @@ -31,6 +32,7 @@ public class ProxyUtil { private static final Map PROXYED_SET = new HashMap<>(); + private static final ResourceLock RESOURCE_LOCK = new ResourceLock(); public static T createProxy(T target) { return createProxy(target, target.getClass().getName()); @@ -53,7 +55,7 @@ public static T createProxy(T target) { */ public static T createProxy(T target, String beanName) { try { - synchronized (PROXYED_SET) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (PROXYED_SET.containsKey(target)) { return (T) PROXYED_SET.get(target); } From 99c514a2d16a2ea8076578aeafc2ac37c4b4d93e Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Tue, 24 Dec 2024 10:20:30 +0800 Subject: [PATCH 06/10] feature:seata-discovery-eureka --- .../registry/eureka/EurekaRegistryServiceImpl.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java index 5ab5191234d..ef441c34bb1 100644 --- a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java +++ b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java @@ -26,6 +26,7 @@ import com.netflix.discovery.EurekaEventListener; import com.netflix.discovery.shared.Application; import org.apache.seata.common.exception.EurekaRegistryException; +import org.apache.seata.common.lock.ResourceLock; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; @@ -68,7 +69,7 @@ public class EurekaRegistryServiceImpl implements RegistryService> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>(); private static final ConcurrentMap> CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<>(); - private static final ConcurrentMap CLUSTER_LOCK = new ConcurrentHashMap<>(); + private static final ConcurrentMap CLUSTER_LOCK = new ConcurrentHashMap<>(); private static volatile ApplicationInfoManager applicationInfoManager; private static volatile CustomEurekaInstanceConfig instanceConfig; @@ -140,8 +141,8 @@ public List lookup(String key) throws Exception { } String clusterUpperName = clusterName.toUpperCase(); if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) { - Object lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, k -> new Object()); - synchronized (lock) { + ResourceLock lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, k -> new ResourceLock()); + try (ResourceLock ignored = lock.obtain()) { if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) { refreshCluster(clusterUpperName); subscribe(clusterUpperName, event -> { From 01421105626ebac1b7e398856d2238196a0bbcd4 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Tue, 24 Dec 2024 19:41:19 +0800 Subject: [PATCH 07/10] feature:seata-common --- .../java/org/apache/seata/common/util/UUIDGenerator.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java b/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java index 542de3ed1eb..7b8ea727599 100644 --- a/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java +++ b/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java @@ -16,12 +16,15 @@ */ package org.apache.seata.common.util; +import org.apache.seata.common.lock.ResourceLock; + /** * The type Uuid generator. */ public class UUIDGenerator { private static volatile IdWorker idWorker; + private final static ResourceLock RESOURCE_LOCK = new ResourceLock(); /** * generate UUID using snowflake algorithm @@ -30,7 +33,7 @@ public class UUIDGenerator { */ public static long generateUUID() { if (idWorker == null) { - synchronized (UUIDGenerator.class) { + try (ResourceLock ignored = RESOURCE_LOCK.obtain()) { if (idWorker == null) { init(null); } From 8f86af11a9bf8511e236f7d21b3e02f7e28d605a Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Sat, 28 Dec 2024 19:48:09 +0800 Subject: [PATCH 08/10] feature:seata-common --- .../exception/SkipCallbackWrapperException.java | 12 +++++++++--- .../loader/EnhancedServiceNotFoundException.java | 9 +++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java b/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java index 5282954324a..e112dc35b10 100644 --- a/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java +++ b/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java @@ -16,6 +16,8 @@ */ package org.apache.seata.common.exception; +import org.apache.seata.common.lock.ResourceLock; + /** * Skip Callback Wrapper Exception. * This exception class will make the semantics clearer. @@ -24,13 +26,17 @@ */ public class SkipCallbackWrapperException extends RuntimeException { + private final ResourceLock stackTraceLock = new ResourceLock(); + public SkipCallbackWrapperException(Throwable cause) { super(cause); } @Override - public synchronized Throwable fillInStackTrace() { - // do nothing - return null; + public Throwable fillInStackTrace() { + try (ResourceLock ignored = stackTraceLock.obtain()) { + // do nothing + return null; + } } } diff --git a/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java b/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java index 88bc211d5d3..5636e25b365 100644 --- a/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java +++ b/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java @@ -17,6 +17,7 @@ package org.apache.seata.common.loader; import org.apache.commons.lang.exception.NestableRuntimeException; +import org.apache.seata.common.lock.ResourceLock; /** * The type Enhanced service not found exception. @@ -25,6 +26,8 @@ public class EnhancedServiceNotFoundException extends NestableRuntimeException { private static final long serialVersionUID = 7748438218914409019L; + private final ResourceLock stackTraceLock = new ResourceLock(); + /** * Instantiates a new Enhanced service not found exception. * @@ -75,7 +78,9 @@ public EnhancedServiceNotFoundException(Throwable cause) { } @Override - public synchronized Throwable fillInStackTrace() { - return this; + public Throwable fillInStackTrace() { + try (ResourceLock ignored = stackTraceLock.obtain()) { + return this; + } } } From 457752f10a0ddecfc0908b75ec667522aca57054 Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Sat, 28 Dec 2024 19:54:31 +0800 Subject: [PATCH 09/10] Revert "feature:seata-common" This reverts commit 8f86af11a9bf8511e236f7d21b3e02f7e28d605a. --- .../exception/SkipCallbackWrapperException.java | 12 +++--------- .../loader/EnhancedServiceNotFoundException.java | 9 ++------- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java b/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java index e112dc35b10..5282954324a 100644 --- a/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java +++ b/common/src/main/java/org/apache/seata/common/exception/SkipCallbackWrapperException.java @@ -16,8 +16,6 @@ */ package org.apache.seata.common.exception; -import org.apache.seata.common.lock.ResourceLock; - /** * Skip Callback Wrapper Exception. * This exception class will make the semantics clearer. @@ -26,17 +24,13 @@ */ public class SkipCallbackWrapperException extends RuntimeException { - private final ResourceLock stackTraceLock = new ResourceLock(); - public SkipCallbackWrapperException(Throwable cause) { super(cause); } @Override - public Throwable fillInStackTrace() { - try (ResourceLock ignored = stackTraceLock.obtain()) { - // do nothing - return null; - } + public synchronized Throwable fillInStackTrace() { + // do nothing + return null; } } diff --git a/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java b/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java index 5636e25b365..88bc211d5d3 100644 --- a/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java +++ b/common/src/main/java/org/apache/seata/common/loader/EnhancedServiceNotFoundException.java @@ -17,7 +17,6 @@ package org.apache.seata.common.loader; import org.apache.commons.lang.exception.NestableRuntimeException; -import org.apache.seata.common.lock.ResourceLock; /** * The type Enhanced service not found exception. @@ -26,8 +25,6 @@ public class EnhancedServiceNotFoundException extends NestableRuntimeException { private static final long serialVersionUID = 7748438218914409019L; - private final ResourceLock stackTraceLock = new ResourceLock(); - /** * Instantiates a new Enhanced service not found exception. * @@ -78,9 +75,7 @@ public EnhancedServiceNotFoundException(Throwable cause) { } @Override - public Throwable fillInStackTrace() { - try (ResourceLock ignored = stackTraceLock.obtain()) { - return this; - } + public synchronized Throwable fillInStackTrace() { + return this; } } From 8cffcb2d86a2616600d65f2895c052cb6a36b84b Mon Sep 17 00:00:00 2001 From: lightClouds917 Date: Sat, 28 Dec 2024 20:33:19 +0800 Subject: [PATCH 10/10] feature:opt ResourceLockTest --- .../java/org/apache/seata/common/lock/ResourceLockTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java index 1ceee9ca8cb..0ed2e7e2b7d 100644 --- a/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java +++ b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java @@ -23,7 +23,8 @@ import java.util.concurrent.ConcurrentHashMap; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(MockitoExtension.class) public class ResourceLockTest {