Skip to content

Commit 7accdf8

Browse files
authored
feature: Introduce Cleanup API for TableMetaRefreshHolder Instance (#7559)
1 parent d1ae495 commit 7accdf8

File tree

6 files changed

+87
-2
lines changed

6 files changed

+87
-2
lines changed

changes/en-us/2.x.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ Add changes here for all PR submitted to the 2.x branch.
2323
- [[#7485](https://github.com/apache/incubator-seata/pull/7485)] Add http request filter for seata-server
2424
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] Reuse connection to merge branch transactions
2525
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] upgrade HTTP client in common module to support HTTP/2
26-
[[#7551](https://github.com/apache/incubator-seata/pull/7551)] XAUtils add support for DM Database
26+
- [[#7551](https://github.com/apache/incubator-seata/pull/7551)] XAUtils add support for DM Database
27+
- [[#7559](https://github.com/apache/incubator-seata/pull/7559)] Introduce Cleanup API for TableMetaRefreshHolder Instance
2728

2829
### bugfix:
2930

changes/zh-cn/2.x.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] 复用连接合并分支事务
2525
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] 升级 common 模块中的 HTTP 客户端以支持 HTTP/2
2626
- [[#7551](https://github.com/apache/incubator-seata/pull/7551)] XAUtils支持达梦数据库
27+
- [[#7559](https://github.com/apache/incubator-seata/pull/7559)] 为 TableMetaRefreshHolder 实例引入清理 API
2728

2829

2930
### bugfix:

rm-datasource/src/main/java/org/apache/seata/rm/datasource/DataSourceProxy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,4 +452,9 @@ private void validMySQLVersion(Connection connection) {
452452
LOGGER.error("check mysql version fail error: {}", e.getMessage());
453453
}
454454
}
455+
456+
public void close() throws Exception {
457+
// TODO: Need to unregister resource from DefaultResourceManager
458+
TableMetaCacheFactory.shutdown(resourceId);
459+
}
455460
}

rm-datasource/src/main/java/org/apache/seata/rm/datasource/sql/struct/TableMetaCacheFactory.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,19 @@ private static void removeHolderFromMap(String resourceId) {
108108
LOGGER.info("Removed TableMetaRefreshHolder for resourceId: {}", resourceId);
109109
}
110110

111+
/**
112+
* Shutdown all TableMetaRefreshHolder threads.
113+
*/
114+
public static void shutdown(String resourceId) {
115+
TableMetaRefreshHolder holder = TABLE_META_REFRESH_HOLDER_MAP.remove(resourceId);
116+
if (holder != null) {
117+
holder.shutdown();
118+
LOGGER.info("TableMetaRefreshHolder for resourceId: {} has been shutdown.", resourceId);
119+
}
120+
}
121+
111122
static class TableMetaRefreshHolder {
123+
private volatile boolean stopped = false;
112124
private long lastRefreshFinishTime;
113125
private DataSourceProxy dataSource;
114126
private BlockingQueue<Long> tableMetaRefreshQueue;
@@ -128,7 +140,7 @@ static class TableMetaRefreshHolder {
128140
this.tableMetaRefreshQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
129141

130142
tableMetaRefreshExecutor.execute(() -> {
131-
while (true) {
143+
while (!stopped) {
132144
// 1. check table meta
133145
if (ENABLE_TABLE_META_CHECKER_ENABLE
134146
&& System.nanoTime() - lastRefreshFinishTime
@@ -187,5 +199,13 @@ private boolean isDataSourceClosedException(SQLException ex) {
187199
}
188200
return StringUtils.isNotBlank(message) && message.contains("datasource") && message.contains("close");
189201
}
202+
203+
private void shutdown() {
204+
stopped = true;
205+
if (tableMetaRefreshExecutor instanceof ThreadPoolExecutor) {
206+
((ThreadPoolExecutor) tableMetaRefreshExecutor).shutdownNow();
207+
}
208+
LOGGER.info("TableMetaRefreshHolder shutdown for resourceId: {}", dataSource.getResourceId());
209+
}
190210
}
191211
}

rm-datasource/src/test/java/org/apache/seata/rm/datasource/DataSourceProxyTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.seata.rm.datasource;
1818

1919
import com.alibaba.druid.pool.DruidDataSource;
20+
import org.apache.seata.rm.DefaultResourceManager;
2021
import org.apache.seata.rm.datasource.mock.MockDataSource;
2122
import org.apache.seata.rm.datasource.mock.MockDriver;
2223
import org.apache.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
@@ -215,4 +216,28 @@ public static DataSourceProxy getDataSourceProxy(DataSource dataSource) {
215216
return new DataSourceProxy(dataSource);
216217
}
217218
}
219+
220+
@Test
221+
public void testCloseRemovesResource() throws Exception {
222+
final MockDriver mockDriver = new MockDriver();
223+
final String username = "username";
224+
final String jdbcUrl = "jdbc:mock:xxx";
225+
226+
final DruidDataSource dataSource = new DruidDataSource();
227+
dataSource.setUrl(jdbcUrl);
228+
dataSource.setDriver(mockDriver);
229+
dataSource.setUsername(username);
230+
dataSource.setPassword("password");
231+
232+
DataSourceProxy proxy = getDataSourceProxy(dataSource);
233+
try (MockedStatic<DefaultResourceManager> drmStatic = Mockito.mockStatic(DefaultResourceManager.class);
234+
MockedStatic<TableMetaCacheFactory> tmcfStatic = Mockito.mockStatic(TableMetaCacheFactory.class)) {
235+
DefaultResourceManager drm = Mockito.mock(DefaultResourceManager.class);
236+
drmStatic.when(DefaultResourceManager::get).thenReturn(drm);
237+
238+
proxy.close();
239+
240+
tmcfStatic.verify(() -> TableMetaCacheFactory.shutdown(proxy.getResourceId()));
241+
}
242+
}
218243
}

rm-datasource/src/test/java/org/apache/seata/rm/datasource/sql/struct/TableMetaCacheFactoryTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,33 @@
1717
package org.apache.seata.rm.datasource.sql.struct;
1818

1919
import org.apache.seata.common.loader.EnhancedServiceNotFoundException;
20+
import org.apache.seata.rm.datasource.DataSourceProxy;
21+
import org.apache.seata.rm.datasource.mock.MockDataSource;
2022
import org.apache.seata.rm.datasource.sql.struct.cache.MariadbTableMetaCache;
2123
import org.apache.seata.rm.datasource.sql.struct.cache.MysqlTableMetaCache;
2224
import org.apache.seata.rm.datasource.sql.struct.cache.OceanBaseTableMetaCache;
2325
import org.apache.seata.rm.datasource.sql.struct.cache.OracleTableMetaCache;
2426
import org.apache.seata.rm.datasource.sql.struct.cache.PolarDBXTableMetaCache;
2527
import org.apache.seata.sqlparser.util.JdbcConstants;
2628
import org.junit.jupiter.api.Assertions;
29+
import org.junit.jupiter.api.BeforeEach;
2730
import org.junit.jupiter.api.Test;
2831

32+
import java.lang.reflect.Field;
33+
import java.util.Map;
34+
2935
public class TableMetaCacheFactoryTest {
3036

3137
private static final String NOT_EXIST_SQL_TYPE = "not_exist_sql_type";
3238

39+
@BeforeEach
40+
public void clearTableMetaRefreshHolderMap() throws Exception {
41+
Field field = TableMetaCacheFactory.class.getDeclaredField("TABLE_META_REFRESH_HOLDER_MAP");
42+
field.setAccessible(true);
43+
Map<?, ?> map = (Map<?, ?>) field.get(null);
44+
map.clear();
45+
}
46+
3347
@Test
3448
public void getTableMetaCache() {
3549
Assertions.assertTrue(
@@ -52,4 +66,23 @@ public void getTableMetaCache() {
5266
TableMetaCacheFactory.getTableMetaCache(NOT_EXIST_SQL_TYPE);
5367
});
5468
}
69+
70+
@Test
71+
public void shutdownTest() throws NoSuchFieldException, IllegalAccessException {
72+
DataSourceProxy dummy = new DataSourceProxy(new MockDataSource(), "dummy1");
73+
TableMetaCacheFactory.registerTableMeta(dummy);
74+
75+
Map<String, TableMetaCacheFactory.TableMetaRefreshHolder> holderMap = getTableMetaRefreshHolderMap();
76+
Assertions.assertEquals(1, holderMap.size());
77+
78+
TableMetaCacheFactory.shutdown("jdbc:mysql://127.0.0.1:3306/seata");
79+
Assertions.assertTrue(holderMap.isEmpty(), "TableMetaRefreshHolder map should be empty after shutdown");
80+
}
81+
82+
private Map<String, TableMetaCacheFactory.TableMetaRefreshHolder> getTableMetaRefreshHolderMap()
83+
throws NoSuchFieldException, IllegalAccessException {
84+
Field field = TableMetaCacheFactory.class.getDeclaredField("TABLE_META_REFRESH_HOLDER_MAP");
85+
field.setAccessible(true);
86+
return (Map<String, TableMetaCacheFactory.TableMetaRefreshHolder>) field.get(null);
87+
}
5588
}

0 commit comments

Comments
 (0)