Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
8edf4d2
license
Bughue Dec 27, 2023
5397478
rocket
Bughue Dec 28, 2023
f67c651
style
Bughue Jan 2, 2024
304df90
check listener
Bughue Jan 2, 2024
93b96f3
bean holder
Bughue Jan 2, 2024
052532b
bean holder
Bughue Jan 2, 2024
b90978b
TCCRocketMQ
Bughue Jan 22, 2024
8803a39
rocketmq
Bughue Jan 25, 2024
0c335bc
Merge branch '2.x' of https://github.com/seata/seata into dev-rocketmq
Bughue Jan 26, 2024
625ff48
rocketmq
Bughue Jan 26, 2024
e11a818
rocketmq
Bughue Jan 26, 2024
1d76507
rocketmq
Bughue Jan 26, 2024
4f74db2
rocketmq
Bughue Jan 26, 2024
4f10e9e
rocketmq
Bughue Jan 26, 2024
bafee71
independent module
Bughue Jan 26, 2024
7fb2ee1
style
Bughue Jan 29, 2024
cc7cec4
style
Bughue Jan 29, 2024
daf0888
style bom
Bughue Jan 29, 2024
5f42ad9
module
Bughue Jan 29, 2024
b85c799
style
Bughue Jan 29, 2024
87643f7
style
Bughue Jan 29, 2024
0a16810
XID
Bughue Feb 1, 2024
768f862
rocket mq factory
Bughue Feb 4, 2024
6efaa5c
optimize producer
Bughue Feb 6, 2024
2e9d615
Merge branch '2.x' of https://github.com/seata/seata into dev-rocketmq
Bughue Feb 6, 2024
1580478
style
Bughue Feb 6, 2024
f1f37c0
fix configuration
Bughue Feb 7, 2024
748e733
getGlobalStatus in AbstractResourceManager
Bughue Feb 19, 2024
d31fc70
test
Bughue Feb 19, 2024
fc71d14
test
Bughue Feb 20, 2024
2b700ab
remove configuration
Bughue Feb 20, 2024
bb41a20
Merge branch '2.x' of https://github.com/seata/seata into dev-rocketmq
Bughue Feb 20, 2024
b8cc88f
tcc api
Bughue Feb 20, 2024
dc9e7c5
tcc api
Bughue Feb 20, 2024
e5a4790
style
Bughue Feb 20, 2024
4faffc7
to many getXX
Bughue Feb 26, 2024
2554991
unit test
Bughue Feb 26, 2024
2e53a3a
unit test
Bughue Feb 26, 2024
3b57588
style
Bughue Feb 27, 2024
547df7e
style
Bughue Feb 27, 2024
cfbb664
style
Bughue Feb 28, 2024
1be3c83
style
Bughue Mar 1, 2024
345baad
mock test
Bughue Mar 1, 2024
30bef40
mock test
Bughue Mar 1, 2024
c91c58d
includeFromCI
Bughue Mar 1, 2024
343c586
excludeCI
Bughue Mar 1, 2024
d17c95e
excludeCI
Bughue Mar 2, 2024
b7af430
excludeCI
Bughue Mar 2, 2024
1318eb0
test
Bughue Mar 2, 2024
de54524
test
Bughue Mar 2, 2024
5b16241
test
Bughue Mar 2, 2024
4d74963
test
Bughue Mar 2, 2024
96e91c4
update
funky-eyes Mar 3, 2024
e273e87
Merge branch '2.x' of github.com:seata/seata into dev-rocketmq
funky-eyes Mar 3, 2024
1de6fd8
update
funky-eyes Mar 3, 2024
c67ad75
update
funky-eyes Mar 3, 2024
eef9e66
update
funky-eyes Mar 3, 2024
8fd38b7
update
funky-eyes Mar 3, 2024
0c6e901
update
funky-eyes Mar 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-sqlparser-core</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@
<artifactId>seata-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-rm</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1011,4 +1011,9 @@ public interface ConfigurationKeys {
* The constant SERVER_APPLICATION_DATA_SIZE_CHECK
*/
String SERVER_APPLICATION_DATA_SIZE_CHECK = SERVER_PREFIX + "applicationDataLimitCheck";

/**
* The constant ROCKET_MQ_MSG_TIMEOUT
*/
String ROCKET_MQ_MSG_TIMEOUT = SERVER_PREFIX + "rocketmqMsgTimeout";
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,4 +314,6 @@ public interface DefaultValues {
* Default druid location in classpath
*/
String DRUID_LOCATION = "lib/sqlparser/druid.jar";

int DEFAULT_ROCKET_MQ_MSG_TIMEOUT = 60 * 1000;
}
7 changes: 7 additions & 0 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@
<!-- for jdbc driver when package -->
<mysql5.version>${mysql.version}</mysql5.version>
<mysql8.version>8.0.27</mysql8.version>
<!-- rocketmq -->
<rocketmq-version>5.0.0</rocketmq-version>

<!-- # for kotlin -->
<kotlin.version>1.4.32</kotlin.version>
Expand Down Expand Up @@ -776,6 +778,11 @@
<artifactId>janino</artifactId>
<version>${janino-version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<module>integration/brpc</module>
<module>rm</module>
<module>rm-datasource</module>
<module>rocketmq</module>
<module>spring</module>
<module>tcc</module>
<module>test</module>
Expand Down
12 changes: 12 additions & 0 deletions rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;

import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.loader.EnhancedServiceLoader;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.model.BranchStatus;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.model.Resource;
import org.apache.seata.core.model.ResourceManager;
import org.apache.seata.core.protocol.transaction.GlobalStatusRequest;
import org.apache.seata.core.protocol.transaction.GlobalStatusResponse;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;

/**
* default resource manager, adapt all resource managers
Expand Down Expand Up @@ -150,6 +155,13 @@ public BranchType getBranchType() {
throw new FrameworkException("DefaultResourceManager isn't a real ResourceManager");
}

public GlobalStatus getGlobalStatus(String xid) throws TimeoutException {
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setXid(xid);
GlobalStatusResponse response = (GlobalStatusResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(queryGlobalStatus);
return response.getGlobalStatus();
}

private static class SingletonHolder {
private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
}
Expand Down
53 changes: 53 additions & 0 deletions rocketmq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seata</groupId>
<artifactId>seata-parent</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-rocketmq</artifactId>
<packaging>jar</packaging>
<name>seata-rocketmq ${project.version}</name>
<description>rocketmq integration for Seata built with Maven</description>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-tcc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.integration.rocketmq;

import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;

/**
* Seata MQ Producer
**/
public class SeataMQProducer extends TransactionMQProducer {

private static final Logger LOGGER = LoggerFactory.getLogger(SeataMQProducer.class);

public static String PROPERTY_SEATA_XID = "__SEATA_XID";
public static String PROPERTY_SEATA_BRANCHID = "__SEATA_BRANCHID";
private TransactionListener transactionListener;

private TCCRocketMQ tccRocketMQ;

public SeataMQProducer(final String producerGroup) {
this(null, producerGroup, null);
}

public SeataMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
super(namespace, producerGroup, rpcHook);
this.transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.UNKNOW;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String xid = msg.getProperty(PROPERTY_SEATA_XID);
if (StringUtils.isBlank(xid)) {
LOGGER.error("msg has no xid, msgTransactionId: {}, msg will be rollback", msg.getTransactionId());
return LocalTransactionState.ROLLBACK_MESSAGE;
}
List<GlobalStatus> commitStatuses = Arrays.asList(GlobalStatus.Committed, GlobalStatus.Committing, GlobalStatus.CommitRetrying);
List<GlobalStatus> rollbackStatuses = Arrays.asList(GlobalStatus.Rollbacked, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying);
try {
GlobalStatus globalStatus = DefaultResourceManager.get().getGlobalStatus(xid);
if (commitStatuses.contains(globalStatus)) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (rollbackStatuses.contains(globalStatus) || GlobalStatus.isOnePhaseTimeout(globalStatus)) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (GlobalStatus.Finished.equals(globalStatus)) {
LOGGER.error("global transaction finished, msg will be rollback, xid: {}", xid);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (TimeoutException e) {
LOGGER.error("getGlobalStatus error, xid: {}, msgTransactionId: {}", xid, msg.getTransactionId(), e);
}
return LocalTransactionState.UNKNOW;
}
};
}


public void setTccRocketMQ(TCCRocketMQ tccRocketMQ) {
this.tccRocketMQ = tccRocketMQ;
}

public SendResult send(Message msg) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
return send(msg, this.defaultMQProducerImpl.getDefaultMQProducer().getSendMsgTimeout());
}

@Override
public SendResult send(Message msg, long timeout) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
if (RootContext.inGlobalTransaction()) {
if (tccRocketMQ == null) {
throw new RuntimeException("TCCRocketMQ is null");
}
return tccRocketMQ.prepare(msg, timeout);
} else {
return super.send(msg, timeout);
}
}

public SendResult doSendMessageInTransaction(final Message msg, long timeout, String xid, long branchId) throws MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}

Validators.checkMessage(msg, this.defaultMQProducerImpl.getDefaultMQProducer());

SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducerImpl.getDefaultMQProducer().getProducerGroup());
MessageAccessor.putProperty(msg, PROPERTY_SEATA_XID, xid);
MessageAccessor.putProperty(msg, PROPERTY_SEATA_BRANCHID, String.valueOf(branchId));
try {
sendResult = this.send(msg, timeout);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

if (SendStatus.SEND_OK != sendResult.getSendStatus()) {
throw new RuntimeException("Message send fail.status=" + sendResult.getSendStatus());
}
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
return sendResult;
}


public static SeataMQProducer create(String groupName, String ak, String sk, boolean isEnableMsgTrace, String customizedTraceTopic) {
boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);
if (isEnableAcl) {
LOGGER.warn("ACL is not supported yet in SeataMQProducer");
}
if (isEnableMsgTrace) {
LOGGER.warn("MessageTrace is not supported yet in SeataMQProducer");
}
return new SeataMQProducer(groupName);
}


@Override
public TransactionListener getTransactionListener() {
return transactionListener;
}
}
Loading