Skip to content

Commit 66baa8d

Browse files
committed
[improve][cli] PIP-353: Improve transaction message visibility for peek-message (#22762)
(cherry picked from commit 20e83b9)
1 parent 7ca884b commit 66baa8d

File tree

7 files changed

+316
-27
lines changed

7 files changed

+316
-27
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import org.apache.pulsar.client.api.MessageId;
9898
import org.apache.pulsar.client.api.PulsarClientException;
9999
import org.apache.pulsar.client.api.SubscriptionType;
100+
import org.apache.pulsar.client.api.transaction.TxnID;
100101
import org.apache.pulsar.client.impl.MessageIdImpl;
101102
import org.apache.pulsar.client.impl.MessageImpl;
102103
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -2707,7 +2708,7 @@ public void readEntryFailed(ManagedLedgerException exception,
27072708
@Override
27082709
public void readEntryComplete(Entry entry, Object ctx) {
27092710
try {
2710-
results.complete(generateResponseWithEntry(entry));
2711+
results.complete(generateResponseWithEntry(entry, (PersistentTopic) topic));
27112712
} catch (IOException exception) {
27122713
throw new RestException(exception);
27132714
} finally {
@@ -2848,10 +2849,12 @@ protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName
28482849
entry = sub.peekNthMessage(messagePosition);
28492850
}
28502851
}
2851-
return entry;
2852-
}).thenCompose(entry -> {
2852+
return entry.thenApply(e -> Pair.of(e, (PersistentTopic) topic));
2853+
}).thenCompose(entryTopicPair -> {
2854+
Entry entry = entryTopicPair.getLeft();
2855+
PersistentTopic persistentTopic = entryTopicPair.getRight();
28532856
try {
2854-
Response response = generateResponseWithEntry(entry);
2857+
Response response = generateResponseWithEntry(entry, persistentTopic);
28552858
return CompletableFuture.completedFuture(response);
28562859
} catch (NullPointerException npe) {
28572860
throw new RestException(Status.NOT_FOUND, "Message not found");
@@ -2930,17 +2933,18 @@ public String toString() {
29302933
PersistentTopicsBase.this.topicName);
29312934
}
29322935
}, null);
2933-
return future;
2936+
return future.thenApply(entry -> Pair.of(entry, (PersistentTopic) topic));
29342937
} catch (ManagedLedgerException exception) {
29352938
log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(),
29362939
messagePosition,
29372940
topicName, exception);
29382941
throw new RestException(exception);
29392942
}
2940-
2941-
}).thenApply(entry -> {
2943+
}).thenApply(entryTopicPair -> {
2944+
Entry entry = entryTopicPair.getLeft();
2945+
PersistentTopic persistentTopic = entryTopicPair.getRight();
29422946
try {
2943-
return generateResponseWithEntry(entry);
2947+
return generateResponseWithEntry(entry, persistentTopic);
29442948
} catch (IOException exception) {
29452949
throw new RestException(exception);
29462950
} finally {
@@ -2951,7 +2955,7 @@ public String toString() {
29512955
});
29522956
}
29532957

2954-
private Response generateResponseWithEntry(Entry entry) throws IOException {
2958+
private Response generateResponseWithEntry(Entry entry, PersistentTopic persistentTopic) throws IOException {
29552959
checkNotNull(entry);
29562960
PositionImpl pos = (PositionImpl) entry.getPosition();
29572961
ByteBuf metadataAndPayload = entry.getDataBuffer();
@@ -3069,6 +3073,14 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
30693073
if (metadata.hasNullPartitionKey()) {
30703074
responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey());
30713075
}
3076+
if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) {
3077+
TxnID txnID = new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits());
3078+
boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, (PositionImpl) entry.getPosition());
3079+
responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted);
3080+
}
3081+
boolean isTxnUncommitted = ((PositionImpl) entry.getPosition())
3082+
.compareTo(persistentTopic.getMaxReadPosition()) > 0;
3083+
responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);
30723084

30733085
// Decode if needed
30743086
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import lombok.Cleanup;
3939
import org.apache.bookkeeper.mledger.impl.PositionImpl;
4040
import org.apache.http.HttpStatus;
41+
import org.apache.pulsar.broker.BrokerTestUtil;
4142
import org.apache.pulsar.broker.ServiceConfiguration;
4243
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
4344
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -48,12 +49,16 @@
4849
import org.apache.pulsar.client.api.Producer;
4950
import org.apache.pulsar.client.api.PulsarClient;
5051
import org.apache.pulsar.client.api.Schema;
52+
import org.apache.pulsar.client.api.TransactionIsolationLevel;
5153
import org.apache.pulsar.client.api.SubscriptionType;
5254
import org.apache.pulsar.client.api.transaction.Transaction;
5355
import org.apache.pulsar.client.api.transaction.TxnID;
5456
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
5557
import org.apache.pulsar.client.impl.MessageIdImpl;
58+
import org.apache.pulsar.client.impl.MessageImpl;
5659
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
60+
import org.apache.pulsar.common.api.proto.MarkerType;
61+
import org.apache.pulsar.common.api.proto.MessageMetadata;
5762
import org.apache.pulsar.common.naming.NamespaceName;
5863
import org.apache.pulsar.common.naming.SystemTopicNames;
5964
import org.apache.pulsar.common.naming.TopicDomain;
@@ -917,6 +922,127 @@ public void testAbortTransaction() throws Exception {
917922
}
918923
}
919924

925+
@Test
926+
public void testPeekMessageForSkipTxnMarker() throws Exception {
927+
initTransaction(1);
928+
929+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_marker");
930+
931+
@Cleanup
932+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
933+
int n = 10;
934+
for (int i = 0; i < n; i++) {
935+
Transaction txn = pulsarClient.newTransaction().build().get();
936+
producer.newMessage(txn).value("msg").send();
937+
txn.commit().get();
938+
}
939+
940+
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
941+
false, TransactionIsolationLevel.READ_UNCOMMITTED);
942+
assertEquals(peekMsgs.size(), n);
943+
for (Message<byte[]> peekMsg : peekMsgs) {
944+
assertEquals(new String(peekMsg.getValue()), "msg");
945+
}
946+
}
947+
948+
@Test
949+
public void testPeekMessageFoReadCommittedMessages() throws Exception {
950+
initTransaction(1);
951+
952+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_txn");
953+
954+
@Cleanup
955+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
956+
int n = 10;
957+
// Alternately sends `n` committed transactional messages and `n` abort transactional messages.
958+
for (int i = 0; i < 2 * n; i++) {
959+
Transaction txn = pulsarClient.newTransaction().build().get();
960+
if (i % 2 == 0) {
961+
producer.newMessage(txn).value("msg").send();
962+
txn.commit().get();
963+
} else {
964+
producer.newMessage(txn).value("msg-aborted").send();
965+
txn.abort();
966+
}
967+
}
968+
// Then sends 1 uncommitted transactional messages.
969+
Transaction txn = pulsarClient.newTransaction().build().get();
970+
producer.newMessage(txn).value("msg-uncommitted").send();
971+
// Then sends n-1 no transaction messages.
972+
for (int i = 0; i < n - 1; i++) {
973+
producer.newMessage().value("msg-after-uncommitted").send();
974+
}
975+
976+
// peek n message, all messages value should be "msg"
977+
{
978+
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
979+
false, TransactionIsolationLevel.READ_COMMITTED);
980+
assertEquals(peekMsgs.size(), n);
981+
for (Message<byte[]> peekMsg : peekMsgs) {
982+
assertEquals(new String(peekMsg.getValue()), "msg");
983+
}
984+
}
985+
986+
// peek 3 * n message, and still get n message, all messages value should be "msg"
987+
{
988+
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 2 * n,
989+
false, TransactionIsolationLevel.READ_COMMITTED);
990+
assertEquals(peekMsgs.size(), n);
991+
for (Message<byte[]> peekMsg : peekMsgs) {
992+
assertEquals(new String(peekMsg.getValue()), "msg");
993+
}
994+
}
995+
}
996+
997+
@Test
998+
public void testPeekMessageForShowAllMessages() throws Exception {
999+
initTransaction(1);
1000+
1001+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_all");
1002+
1003+
@Cleanup
1004+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
1005+
int n = 10;
1006+
// Alternately sends `n` committed transactional messages and `n` abort transactional messages.
1007+
for (int i = 0; i < 2 * n; i++) {
1008+
Transaction txn = pulsarClient.newTransaction().build().get();
1009+
if (i % 2 == 0) {
1010+
producer.newMessage(txn).value("msg").send();
1011+
txn.commit().get();
1012+
} else {
1013+
producer.newMessage(txn).value("msg-aborted").send();
1014+
txn.abort();
1015+
}
1016+
}
1017+
// Then sends `n` uncommitted transactional messages.
1018+
Transaction txn = pulsarClient.newTransaction().build().get();
1019+
for (int i = 0; i < n; i++) {
1020+
producer.newMessage(txn).value("msg-uncommitted").send();
1021+
}
1022+
1023+
// peek 5 * n message, will get 5 * n msg.
1024+
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 5 * n,
1025+
true, TransactionIsolationLevel.READ_UNCOMMITTED);
1026+
assertEquals(peekMsgs.size(), 5 * n);
1027+
1028+
for (int i = 0; i < 4 * n; i++) {
1029+
Message<byte[]> peekMsg = peekMsgs.get(i);
1030+
MessageImpl peekMsgImpl = (MessageImpl) peekMsg;
1031+
MessageMetadata metadata = peekMsgImpl.getMessageBuilder();
1032+
if (metadata.hasMarkerType()) {
1033+
assertTrue(metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE ||
1034+
metadata.getMarkerType() == MarkerType.TXN_ABORT_VALUE);
1035+
} else {
1036+
String value = new String(peekMsg.getValue());
1037+
assertTrue(value.equals("msg") || value.equals("msg-aborted"));
1038+
}
1039+
}
1040+
for (int i = 4 * n; i < peekMsgs.size(); i++) {
1041+
Message<byte[]> peekMsg = peekMsgs.get(i);
1042+
assertEquals(new String(peekMsg.getValue()), "msg-uncommitted");
1043+
}
1044+
}
1045+
9201046
private static void verifyCoordinatorStats(String state,
9211047
long sequenceId, long lowWaterMark) {
9221048
assertEquals(state, "Ready");

pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.pulsar.client.api.Message;
3232
import org.apache.pulsar.client.api.MessageId;
3333
import org.apache.pulsar.client.api.SubscriptionType;
34+
import org.apache.pulsar.client.api.TransactionIsolationLevel;
3435
import org.apache.pulsar.common.naming.TopicDomain;
3536
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
3637
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -1653,7 +1654,53 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
16531654
* @throws PulsarAdminException
16541655
* Unexpected error
16551656
*/
1656-
List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages) throws PulsarAdminException;
1657+
default List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages)
1658+
throws PulsarAdminException {
1659+
return peekMessages(topic, subName, numMessages, false, TransactionIsolationLevel.READ_COMMITTED);
1660+
}
1661+
1662+
/**
1663+
* Peek messages from a topic subscription.
1664+
*
1665+
* @param topic
1666+
* topic name
1667+
* @param subName
1668+
* Subscription name
1669+
* @param numMessages
1670+
* Number of messages
1671+
* @param showServerMarker
1672+
* Enables the display of internal server write markers
1673+
* @param transactionIsolationLevel
1674+
* Sets the isolation level for peeking messages within transactions.
1675+
* - 'READ_COMMITTED' allows peeking only committed transactional messages.
1676+
* - 'READ_UNCOMMITTED' allows peeking all messages,
1677+
* even transactional messages which have been aborted.
1678+
* @return
1679+
* @throws NotAuthorizedException
1680+
* Don't have admin permission
1681+
* @throws NotFoundException
1682+
* Topic or subscription does not exist
1683+
* @throws PulsarAdminException
1684+
* Unexpected error
1685+
*/
1686+
List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages,
1687+
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel)
1688+
throws PulsarAdminException;
1689+
1690+
/**
1691+
* Peek messages from a topic subscription asynchronously.
1692+
*
1693+
* @param topic
1694+
* topic name
1695+
* @param subName
1696+
* Subscription name
1697+
* @param numMessages
1698+
* Number of messages
1699+
* @return a future that can be used to track when the messages are returned
1700+
*/
1701+
default CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages) {
1702+
return peekMessagesAsync(topic, subName, numMessages, false, TransactionIsolationLevel.READ_COMMITTED);
1703+
}
16571704

16581705
/**
16591706
* Peek messages from a topic subscription asynchronously.
@@ -1664,9 +1711,18 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
16641711
* Subscription name
16651712
* @param numMessages
16661713
* Number of messages
1714+
* @param showServerMarker
1715+
* Enables the display of internal server write markers
1716+
@param transactionIsolationLevel
1717+
* Sets the isolation level for peeking messages within transactions.
1718+
* - 'READ_COMMITTED' allows peeking only committed transactional messages.
1719+
* - 'READ_UNCOMMITTED' allows peeking all messages,
1720+
* even transactional messages which have been aborted.
16671721
* @return a future that can be used to track when the messages are returned
16681722
*/
1669-
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages);
1723+
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(
1724+
String topic, String subName, int numMessages,
1725+
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel);
16701726

16711727
/**
16721728
* Get a message by its messageId via a topic subscription.

0 commit comments

Comments
 (0)