Skip to content
2 changes: 1 addition & 1 deletion changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Add changes here for all PR submitted to the 2.x branch.

### refactor:

- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] refactor for XXX
- [[#7615](https://github.com/seata/seata/pull/7615)] Refactor DataSourceProxy


### doc:
Expand Down
2 changes: 1 addition & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@

### refactor:

- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 重构 XXX
- [[#7615](https://github.com/seata/seata/pull/7615)] 重构 DataSourceProxy


### doc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

import org.apache.commons.lang.StringUtils;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.Constants;
import org.apache.seata.common.loader.EnhancedServiceNotFoundException;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.Resource;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.datasource.initializer.ResourceIdInitializer;
import org.apache.seata.rm.datasource.initializer.ResourceIdInitializerRegistry;
import org.apache.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
import org.apache.seata.rm.datasource.undo.UndoLogManager;
import org.apache.seata.rm.datasource.undo.UndoLogManagerFactory;
Expand All @@ -45,7 +46,6 @@

/**
* The type Data source proxy.
*
*/
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {

Expand Down Expand Up @@ -131,7 +131,6 @@ private void init(DataSource dataSource, String resourceGroupId) {

/**
* Define derivative product version for MySQL Kernel
*
*/
private void checkDerivativeProduct() {
if (!JdbcConstants.MYSQL.equals(dbType)) {
Expand Down Expand Up @@ -234,179 +233,8 @@ public String getResourceId() {
}

private void initResourceId() {
if (JdbcConstants.POSTGRESQL.equals(dbType)) {
initPGResourceId();
} else if (JdbcConstants.ORACLE.equals(dbType) && userName != null) {
initOracleResourceId();
} else if (JdbcConstants.MYSQL.equals(dbType) || JdbcConstants.POLARDBX.equals(dbType)) {
initMysqlResourceId();
} else if (JdbcConstants.SQLSERVER.equals(dbType)) {
initSqlServerResourceId();
} else if (JdbcConstants.DM.equals(dbType)) {
initDMResourceId();
} else if (JdbcConstants.OSCAR.equals(dbType)) {
initOscarResourceId();
} else {
initDefaultResourceId();
}
}

/**
* init the default resource id
*/
private void initDefaultResourceId() {
if (jdbcUrl.contains("?")) {
resourceId = jdbcUrl.substring(0, jdbcUrl.indexOf('?'));
} else {
resourceId = jdbcUrl;
}
}

/**
* init the oracle resource id
*/
private void initOracleResourceId() {
if (jdbcUrl.contains("?")) {
resourceId = jdbcUrl.substring(0, jdbcUrl.indexOf('?')) + "/" + userName;
} else {
resourceId = jdbcUrl + "/" + userName;
}
}

/**
* prevent mysql url like
* jdbc:mysql:loadbalance://192.168.100.2:3306,192.168.100.1:3306/seata
* it will cause the problem like
* 1.rm client is not connected
*/
private void initMysqlResourceId() {
String startsWith = "jdbc:mysql:loadbalance://";
if (jdbcUrl.startsWith(startsWith)) {
String url;
if (jdbcUrl.contains("?")) {
url = jdbcUrl.substring(0, jdbcUrl.indexOf('?'));
} else {
url = jdbcUrl;
}
resourceId = url.replace(",", "|");
} else {
initDefaultResourceId();
}
}

private void initDMResourceId() {
LOGGER.warn("support for the dameng database is currently an experimental feature ");
if (jdbcUrl.contains("?")) {
StringBuilder jdbcUrlBuilder = new StringBuilder();
jdbcUrlBuilder.append(jdbcUrl, 0, jdbcUrl.indexOf('?'));

StringBuilder paramsBuilder = new StringBuilder();
String paramUrl = jdbcUrl.substring(jdbcUrl.indexOf('?') + 1);
String[] urlParams = paramUrl.split("&");
for (String urlParam : urlParams) {
if (urlParam.contains("schema")) {
// remove the '"'
if (urlParam.contains("\"")) {
urlParam = urlParam.replaceAll("\"", "");
}
paramsBuilder.append(urlParam);
break;
}
}

if (paramsBuilder.length() > 0) {
jdbcUrlBuilder.append("?");
jdbcUrlBuilder.append(paramsBuilder);
}
resourceId = jdbcUrlBuilder.toString();
} else {
resourceId = jdbcUrl;
}
}

/**
* init the oscar resource id
* jdbc:oscar://192.168.x.xx:2003/OSRDB
*/
private void initOscarResourceId() {
if (jdbcUrl.contains("?")) {
resourceId = jdbcUrl.substring(0, jdbcUrl.indexOf('?')) + "/" + userName;
} else {
resourceId = jdbcUrl + "/" + userName;
}
}

/**
* prevent pg sql url like
* jdbc:postgresql://127.0.0.1:5432/seata?currentSchema=public
* jdbc:postgresql://127.0.0.1:5432/seata?currentSchema=seata
* cause the duplicated resourceId
* it will cause the problem like
* 1.get file lock fail
* 2.error table meta cache
*/
private void initPGResourceId() {
if (jdbcUrl.contains("?")) {
StringBuilder jdbcUrlBuilder = new StringBuilder();
jdbcUrlBuilder.append(jdbcUrl, 0, jdbcUrl.indexOf('?'));

StringBuilder paramsBuilder = new StringBuilder();
String paramUrl = jdbcUrl.substring(jdbcUrl.indexOf('?') + 1);
String[] urlParams = paramUrl.split("&");
for (String urlParam : urlParams) {
if (urlParam.contains("currentSchema")) {
if (urlParam.contains(Constants.DBKEYS_SPLIT_CHAR)) {
urlParam = urlParam.replace(Constants.DBKEYS_SPLIT_CHAR, "!");
}
paramsBuilder.append(urlParam);
break;
}
}

if (paramsBuilder.length() > 0) {
jdbcUrlBuilder.append("?");
jdbcUrlBuilder.append(paramsBuilder);
}
resourceId = jdbcUrlBuilder.toString();
} else {
resourceId = jdbcUrl;
}
if (resourceId.contains(",")) {
resourceId = resourceId.replace(",", "|");
}
}

/**
* The general form of the connection URL for SqlServer is
* jdbc:sqlserver://[serverName[\instanceName][:portNumber]][;property=value[;property=value]]
* required connection properties: [INSTANCENAME], [databaseName,database]
*
*/
private void initSqlServerResourceId() {
if (jdbcUrl.contains(";")) {
StringBuilder jdbcUrlBuilder = new StringBuilder();
jdbcUrlBuilder.append(jdbcUrl, 0, jdbcUrl.indexOf(';'));
StringBuilder paramsBuilder = new StringBuilder();
String paramUrl = jdbcUrl.substring(jdbcUrl.indexOf(';') + 1);
String[] urlParams = paramUrl.split(";");
for (String urlParam : urlParams) {
String[] paramSplit = urlParam.split("=");
String propertyName = paramSplit[0];
if ("INSTANCENAME".equalsIgnoreCase(propertyName)
|| "databaseName".equalsIgnoreCase(propertyName)
|| "database".equalsIgnoreCase(propertyName)) {
paramsBuilder.append(urlParam);
}
}

if (paramsBuilder.length() > 0) {
jdbcUrlBuilder.append(";");
jdbcUrlBuilder.append(paramsBuilder);
}
resourceId = jdbcUrlBuilder.toString();
} else {
resourceId = jdbcUrl;
}
ResourceIdInitializer initializer = ResourceIdInitializerRegistry.getInitializer(dbType, this);
initializer.initResourceId(this);
}

@Override
Expand Down Expand Up @@ -453,6 +281,18 @@ private void validMySQLVersion(Connection connection) {
}
}

public String getJdbcUrl() {
return jdbcUrl;
}

public void setResourceId(String resourceId) {
this.resourceId = resourceId;
}

public String getUserName() {
return userName;
}

public void close() throws Exception {
// TODO: Need to unregister resource from DefaultResourceManager
TableMetaCacheFactory.shutdown(resourceId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seata.rm.datasource.initializer;

import org.apache.seata.rm.datasource.DataSourceProxy;

public abstract class AbstractResourceIdInitializer implements ResourceIdInitializer {
@Override
public void initResourceId(DataSourceProxy proxy) {
doInitResourceId(proxy);
}

protected abstract void doInitResourceId(DataSourceProxy proxy);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seata.rm.datasource.initializer;

import org.apache.seata.rm.datasource.DataSourceProxy;

public interface ResourceIdInitializer {
String JDBC_URL_SPLIT_CHAR = "?";

boolean supports(String dbType, DataSourceProxy proxy);

void initResourceId(DataSourceProxy proxy);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seata.rm.datasource.initializer;

import org.apache.seata.rm.datasource.DataSourceProxy;
import org.apache.seata.rm.datasource.initializer.db.DMResourceIdInitializer;
import org.apache.seata.rm.datasource.initializer.db.DefaultResourceIdInitializer;
import org.apache.seata.rm.datasource.initializer.db.MysqlResourceIdInitializer;
import org.apache.seata.rm.datasource.initializer.db.OracleResourceIdInitializer;
import org.apache.seata.rm.datasource.initializer.db.OscarResourceIdInitializer;
import org.apache.seata.rm.datasource.initializer.db.PostgresqlResourceIdInitializer;
import org.apache.seata.rm.datasource.initializer.db.SqlServerResourceIdInitializer;

import java.util.ArrayList;
import java.util.List;

public class ResourceIdInitializerRegistry {
private static final List<ResourceIdInitializer> INITIALIZERS = new ArrayList<>();

static {
INITIALIZERS.add(new PostgresqlResourceIdInitializer());
INITIALIZERS.add(new OracleResourceIdInitializer());
INITIALIZERS.add(new MysqlResourceIdInitializer());
INITIALIZERS.add(new SqlServerResourceIdInitializer());
INITIALIZERS.add(new DMResourceIdInitializer());
INITIALIZERS.add(new OscarResourceIdInitializer());
}

public static ResourceIdInitializer getInitializer(String dbType, DataSourceProxy proxy) {
for (ResourceIdInitializer initializer : INITIALIZERS) {
if (initializer.supports(dbType, proxy)) {
return initializer;
}
}
return new DefaultResourceIdInitializer();
}
}
Loading
Loading