Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/java/io/seata/core/protocol/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public static boolean isAboveOrEqualVersion150(String version) {
return isAboveOrEqualVersion150;
}

private static long convertVersion(String version) throws IncompatibleVersionException {
public static long convertVersion(String version) throws IncompatibleVersionException {
String[] parts = StringUtils.split(version, '.');
long result = 0L;
int i = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public abstract class BaseDataSourceResource<T extends Holdable> implements Seat

protected Driver driver;

private boolean shouldBeHeld = false;

private Map<String, T> keeper = new ConcurrentHashMap<>();

private static final Cache<String, BranchStatus> BRANCH_STATUS_CACHE =
Expand Down Expand Up @@ -217,4 +219,11 @@ public Map<String, T> getKeeper() {
return keeper;
}

public boolean isShouldBeHeld() {
return shouldBeHeld;
}

public void setShouldBeHeld(boolean shouldBeHeld) {
this.shouldBeHeld = shouldBeHeld;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.seata.core.model.BranchType;
import io.seata.rm.BaseDataSourceResource;
import io.seata.rm.DefaultResourceManager;
import io.seata.sqlparser.util.JdbcConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -63,6 +62,8 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold
private volatile Long prepareTime = null;

private volatile Integer timeout = null;

private boolean shouldBeHeld = false;

/**
* Constructor of Connection Proxy for XA mode.
Expand All @@ -72,8 +73,10 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold
* @param resource The corresponding Resource(DataSource proxy) from which the connections was created.
* @param xid Seata global transaction xid.
*/
public ConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) {
public ConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource,
String xid) {
super(originalConnection, xaConnection, resource, xid);
this.shouldBeHeld = resource.isShouldBeHeld();
}

public void init() {
Expand All @@ -95,14 +98,18 @@ public void init() {
}

private void keepIfNecessary() {
resource.hold(xaBranchXid.toString(), this);
if (shouldBeHeld()) {
resource.hold(xaBranchXid.toString(), this);
}
}

private void releaseIfNecessary() {
if (this.xaBranchXid != null) {
String xaBranchXid = this.xaBranchXid.toString();
if (isHeld()) {
resource.release(xaBranchXid, this);
if (shouldBeHeld()) {
if (this.xaBranchXid != null) {
String xaBranchXid = this.xaBranchXid.toString();
if (isHeld()) {
resource.release(xaBranchXid, this);
}
}
}
}
Expand Down Expand Up @@ -328,8 +335,7 @@ public boolean isHeld() {

@Override
public boolean shouldBeHeld() {
return JdbcConstants.MYSQL.equals(resource.getDbType()) || JdbcConstants.MARIADB.equals(resource.getDbType())
|| StringUtils.isBlank(resource.getDbType());
return shouldBeHeld || StringUtils.isBlank(resource.getDbType());
}

public Long getPrepareTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
package io.seata.rm.datasource.xa;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Optional;
import javax.sql.DataSource;
import javax.sql.XAConnection;

import io.seata.core.constants.DBType;
import io.seata.core.context.RootContext;
import io.seata.core.model.BranchType;
import io.seata.core.protocol.Version;
import io.seata.rm.DefaultResourceManager;
import io.seata.rm.datasource.SeataDataSourceProxy;
import io.seata.rm.datasource.util.JdbcUtils;
import io.seata.rm.datasource.util.XAUtils;
Expand Down Expand Up @@ -49,7 +55,29 @@ public DataSourceProxyXA(DataSource dataSource, String resourceGroupId) {
this.dataSource = dataSource;
this.branchType = BranchType.XA;
JdbcUtils.initDataSourceResource(this, dataSource, resourceGroupId);

if (DBType.MYSQL.name().equalsIgnoreCase(dbType)) {
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement("SELECT VERSION()");
ResultSet versionResult = preparedStatement.executeQuery()) {
if (versionResult.next()) {
long currentVersion = Version.convertVersion(versionResult.getString("VERSION()"));
long version = Version.convertVersion("8.0.29");
if (currentVersion < version) {
setShouldBeHeld(true);
}
}
} catch (Exception e) {
setShouldBeHeld(true);
LOGGER.info("get mysql version fail error: {}", e.getMessage());
}
} else if (DBType.MARIADB.name().equalsIgnoreCase(dbType)) {
setShouldBeHeld(true);
}
Optional.ofNullable(DefaultResourceManager.get().getResourceManager(BranchType.XA)).ifPresent(resourceManager -> {
if (resourceManager instanceof ResourceManagerXA) {
((ResourceManagerXA)resourceManager).initXaTwoPhaseTimeoutChecker();
}
});
//Set the default branch type to 'XA' in the RootContext.
RootContext.setDefaultBranchType(this.getBranchType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,35 +52,54 @@ public class ResourceManagerXA extends AbstractDataSourceCacheResourceManager {
/**
* The Timer check xa branch two phase hold timeout.
*/
protected final ScheduledExecutorService xaTwoPhaseTimeoutChecker = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("xaTwoPhaseTimeoutChecker", 1, true));
protected volatile ScheduledExecutorService xaTwoPhaseTimeoutChecker;

@Override
public void init() {
LOGGER.info("ResourceManagerXA init ...");
xaTwoPhaseTimeoutChecker.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<String, Resource> entry : dataSourceCache.entrySet()) {
BaseDataSourceResource resource = (BaseDataSourceResource) entry.getValue();
Map<String, ConnectionProxyXA> keeper = resource.getKeeper();
for (Map.Entry<String, ConnectionProxyXA> connectionEntry : keeper.entrySet()) {
ConnectionProxyXA connection = connectionEntry.getValue();
long now = System.currentTimeMillis();
synchronized (connection) {
if (connection.getPrepareTime() != null &&
now - connection.getPrepareTime() > TWO_PHASE_HOLD_TIMEOUT) {
try {
connection.closeForce();
} catch (SQLException e) {
LOGGER.info("Force close the xa physical connection fail", e);
}

public void initXaTwoPhaseTimeoutChecker() {
if (xaTwoPhaseTimeoutChecker == null) {
synchronized (this) {
if (xaTwoPhaseTimeoutChecker == null) {
boolean shouldBeHold = dataSourceCache.values().parallelStream().anyMatch(resource -> {
if (resource instanceof DataSourceProxyXA) {
return ((DataSourceProxyXA)resource).isShouldBeHeld();
}
return false;
});
if (shouldBeHold) {
xaTwoPhaseTimeoutChecker = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("xaTwoPhaseTimeoutChecker", 1, true));
xaTwoPhaseTimeoutChecker.scheduleAtFixedRate(() -> {
for (Map.Entry<String, Resource> entry : dataSourceCache.entrySet()) {
BaseDataSourceResource resource = (BaseDataSourceResource)entry.getValue();
if (resource.isShouldBeHeld()) {
if (resource instanceof DataSourceProxyXA) {
Map<String, ConnectionProxyXA> keeper = resource.getKeeper();
for (Map.Entry<String, ConnectionProxyXA> connectionEntry : keeper.entrySet()) {
ConnectionProxyXA connection = connectionEntry.getValue();
long now = System.currentTimeMillis();
synchronized (connection) {
if (connection.getPrepareTime() != null
&& now - connection.getPrepareTime() > TWO_PHASE_HOLD_TIMEOUT) {
try {
connection.closeForce();
} catch (SQLException e) {
LOGGER.info("Force close the xa physical connection fail", e);
}
}
}
}
}
}
}
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
}
}
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
}
}

@Override
Expand Down