Skip to content

Commit 20e83b9

Browse files
authored
[improve][cli] PIP-353: Improve transaction message visibility for peek-message (#22762)
1 parent 55ad4b2 commit 20e83b9

File tree

8 files changed

+343
-51
lines changed

8 files changed

+343
-51
lines changed

pip/pip-353.md

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ This behavior can confuse users and lead to incorrect data handling. The proposa
2525

2626
### In Scope
2727

28-
- Implement flags to selectively display `server markers`, `uncommitted messages`, and `aborted messages` in peek operations.
28+
- Implement flags to selectively display `server markers`, `uncommitted messages(include aborted messages) for transaction` in peek operations.
2929
- Set the default behavior to only show messages from committed transactions to ensure data integrity.
3030

3131
### Out of Scope
@@ -37,16 +37,17 @@ This behavior can confuse users and lead to incorrect data handling. The proposa
3737
The proposal introduces three new flags to the `peek-messages` command:
3838

3939
1. `--show-server-marker`: Controls the visibility of server markers (default: `false`).
40-
2. `--show-txn-uncommitted`: Controls the visibility of messages from uncommitted transactions (default: `false`).
41-
3. `--show-txn-aborted`: Controls the visibility of messages from aborted transactions (default: `false`).
40+
2. `---transaction-isolation-level`: Controls the visibility of messages for transactions. (default: `READ_COMMITTED`). Options:
41+
- READ_COMMITTED: Can only consume all transactional messages which have been committed.
42+
- READ_UNCOMMITTED: Can consume all messages, even transactional messages which have been aborted.
4243

4344
These flags will allow administrators and developers to tailor the peek functionality to their needs, improving the usability and security of message handling in transactional contexts.
4445

4546
## Detailed Design
4647

4748
### Design & Implementation Details
4849

49-
To support the `--show-server-marker` and `--show-txn-aborted`, `--show-txn-uncommitted` flags, needs to introduce specific tag into the `headers` of messages returned by the
50+
To support the `--show-server-marker` and `---transaction-isolation-level` flags, needs to introduce specific tag into the `headers` of messages returned by the
5051
[peekNthMessage REST API](https://github.com/apache/pulsar/blob/8ca01cd42edfd4efd986f752f6f8538ea5bf4f94/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java#L1892-L1905).
5152

5253
- `X-Pulsar-marker-type`: Already exists.
@@ -62,11 +63,10 @@ see the following code: [https://github.com/shibd/pulsar/pull/34](https://github
6263

6364
New command line flags added for the `bin/pulsar-admin topics peek-messages` command:
6465

65-
| Flag | Abbreviation | Type | Default | Description |
66-
|--------------------------|--------------|---------|---------|----------------------------------------------------------------|
67-
| `--show-server-marker` | `-ssm` | Boolean | `false` | Enables the display of internal server write markers. |
68-
| `--show-txn-uncommitted` | `-stu` | Boolean | `false` | Enables the display of messages from uncommitted transactions. |
69-
| `--show-txn-aborted` | `-sta` | Boolean | `false` | Enables the display of messages from aborted transactions. |
66+
| Flag | Abbreviation | Type | Default | Description |
67+
|----------------------------------|--------------|---------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
68+
| `--show-server-marker` | `-ssm` | Boolean | `false` | Enables the display of internal server write markers. |
69+
| `---transaction-isolation-level` | `-til` | Enum | `false` | Enables theSets the isolation level for consuming messages within transactions. </br> - 'READ_COMMITTED' allows consuming only committed transactional messages. </br> - 'READ_UNCOMMITTED' allows consuming all messages, even transactional messages which have been aborted. |
7070

7171

7272
## Public-facing Changes
@@ -85,10 +85,11 @@ Add two methods to the admin.Topics() interface.
8585
* Number of messages
8686
* @param showServerMarker
8787
* Enables the display of internal server write markers
88-
* @param showTxnAborted
89-
* Enables the display of messages from aborted transactions
90-
* @param showTxnUncommitted
91-
* Enables the display of messages from uncommitted transactions
88+
* @param transactionIsolationLevel
89+
* Sets the isolation level for consuming messages within transactions.
90+
* - 'READ_COMMITTED' allows consuming only committed transactional messages.
91+
* - 'READ_UNCOMMITTED' allows consuming all messages,
92+
* even transactional messages which have been aborted.
9293
* @return
9394
* @throws NotAuthorizedException
9495
* Don't have admin permission
@@ -98,8 +99,9 @@ Add two methods to the admin.Topics() interface.
9899
* Unexpected error
99100
*/
100101
List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages,
101-
boolean showServerMarker, boolean showTxnAborted,
102-
boolean showTxnUncommitted) throws PulsarAdminException;
102+
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel)
103+
throws PulsarAdminException;
104+
103105

104106
/**
105107
* Peek messages from a topic subscription asynchronously.
@@ -112,15 +114,16 @@ Add two methods to the admin.Topics() interface.
112114
* Number of messages
113115
* @param showServerMarker
114116
* Enables the display of internal server write markers
115-
* @param showTxnAborted
116-
* Enables the display of messages from aborted transactions
117-
* @param showTxnUncommitted
118-
* Enables the display of messages from uncommitted transactions
119-
* @return a future that can be used to track when the messages are returned
117+
@param transactionIsolationLevel
118+
* Sets the isolation level for consuming messages within transactions.
119+
* - 'READ_COMMITTED' allows consuming only committed transactional messages.
120+
* - 'READ_UNCOMMITTED' allows consuming all messages,
121+
* even transactional messages which have been aborted.
122+
* @return a future that can be used to track when the messages are returned
120123
*/
121-
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages,
122-
boolean showServerMarker, boolean showTxnAborted,
123-
boolean showTxnUncommitted);
124+
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(
125+
String topic, String subName, int numMessages,
126+
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel);
124127
```
125128

126129
## Backward & Forward Compatibility
@@ -130,5 +133,5 @@ Reverting to a previous version of Pulsar without this feature will remove the a
130133

131134
### Upgrade
132135
While upgrading to the new version of Pulsar that includes these changes, the default behavior of the `peek-messages` command will change.
133-
Existing scripts or commands that rely on the old behavior (where transaction markers and messages from uncommitted or aborted transactions are visible) will need to explicitly set the new flags (`--show-server-marker`, `--show-txn-uncommitted`, `--show-txn-aborted`) to `true` to maintain the old behavior.
136+
Existing scripts or commands that rely on the old behavior (where transaction markers and messages from uncommitted or aborted transactions are visible) will need to explicitly set the new flags (`--show-server-marker true` and `--transaction-isolation-level READ_UNCOMMITTED` to maintain the old behavior.
134137
This change is necessary as the previous default behavior did not align with typical expectations around data visibility and integrity in transactional systems.

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import org.apache.pulsar.client.api.MessageId;
9999
import org.apache.pulsar.client.api.PulsarClientException;
100100
import org.apache.pulsar.client.api.SubscriptionType;
101+
import org.apache.pulsar.client.api.transaction.TxnID;
101102
import org.apache.pulsar.client.impl.MessageIdImpl;
102103
import org.apache.pulsar.client.impl.MessageImpl;
103104
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -2726,7 +2727,7 @@ public void readEntryFailed(ManagedLedgerException exception,
27262727
@Override
27272728
public void readEntryComplete(Entry entry, Object ctx) {
27282729
try {
2729-
results.complete(generateResponseWithEntry(entry));
2730+
results.complete(generateResponseWithEntry(entry, (PersistentTopic) topic));
27302731
} catch (IOException exception) {
27312732
throw new RestException(exception);
27322733
} finally {
@@ -2867,10 +2868,12 @@ protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName
28672868
entry = sub.peekNthMessage(messagePosition);
28682869
}
28692870
}
2870-
return entry;
2871-
}).thenCompose(entry -> {
2871+
return entry.thenApply(e -> Pair.of(e, (PersistentTopic) topic));
2872+
}).thenCompose(entryTopicPair -> {
2873+
Entry entry = entryTopicPair.getLeft();
2874+
PersistentTopic persistentTopic = entryTopicPair.getRight();
28722875
try {
2873-
Response response = generateResponseWithEntry(entry);
2876+
Response response = generateResponseWithEntry(entry, persistentTopic);
28742877
return CompletableFuture.completedFuture(response);
28752878
} catch (NullPointerException npe) {
28762879
throw new RestException(Status.NOT_FOUND, "Message not found");
@@ -2949,17 +2952,18 @@ public String toString() {
29492952
PersistentTopicsBase.this.topicName);
29502953
}
29512954
}, null);
2952-
return future;
2955+
return future.thenApply(entry -> Pair.of(entry, (PersistentTopic) topic));
29532956
} catch (ManagedLedgerException exception) {
29542957
log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(),
29552958
messagePosition,
29562959
topicName, exception);
29572960
throw new RestException(exception);
29582961
}
2959-
2960-
}).thenApply(entry -> {
2962+
}).thenApply(entryTopicPair -> {
2963+
Entry entry = entryTopicPair.getLeft();
2964+
PersistentTopic persistentTopic = entryTopicPair.getRight();
29612965
try {
2962-
return generateResponseWithEntry(entry);
2966+
return generateResponseWithEntry(entry, persistentTopic);
29632967
} catch (IOException exception) {
29642968
throw new RestException(exception);
29652969
} finally {
@@ -2970,7 +2974,7 @@ public String toString() {
29702974
});
29712975
}
29722976

2973-
private Response generateResponseWithEntry(Entry entry) throws IOException {
2977+
private Response generateResponseWithEntry(Entry entry, PersistentTopic persistentTopic) throws IOException {
29742978
checkNotNull(entry);
29752979
PositionImpl pos = (PositionImpl) entry.getPosition();
29762980
ByteBuf metadataAndPayload = entry.getDataBuffer();
@@ -3088,6 +3092,14 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
30883092
if (metadata.hasNullPartitionKey()) {
30893093
responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey());
30903094
}
3095+
if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) {
3096+
TxnID txnID = new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits());
3097+
boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, (PositionImpl) entry.getPosition());
3098+
responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted);
3099+
}
3100+
boolean isTxnUncommitted = ((PositionImpl) entry.getPosition())
3101+
.compareTo(persistentTopic.getMaxReadPosition()) > 0;
3102+
responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);
30913103

30923104
// Decode if needed
30933105
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import lombok.Cleanup;
3939
import org.apache.bookkeeper.mledger.impl.PositionImpl;
4040
import org.apache.http.HttpStatus;
41+
import org.apache.pulsar.broker.BrokerTestUtil;
4142
import org.apache.pulsar.broker.ServiceConfiguration;
4243
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
4344
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -48,12 +49,16 @@
4849
import org.apache.pulsar.client.api.Producer;
4950
import org.apache.pulsar.client.api.PulsarClient;
5051
import org.apache.pulsar.client.api.Schema;
52+
import org.apache.pulsar.client.api.TransactionIsolationLevel;
5153
import org.apache.pulsar.client.api.SubscriptionType;
5254
import org.apache.pulsar.client.api.transaction.Transaction;
5355
import org.apache.pulsar.client.api.transaction.TxnID;
5456
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
5557
import org.apache.pulsar.client.impl.MessageIdImpl;
58+
import org.apache.pulsar.client.impl.MessageImpl;
5659
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
60+
import org.apache.pulsar.common.api.proto.MarkerType;
61+
import org.apache.pulsar.common.api.proto.MessageMetadata;
5762
import org.apache.pulsar.common.naming.NamespaceName;
5863
import org.apache.pulsar.common.naming.SystemTopicNames;
5964
import org.apache.pulsar.common.naming.TopicDomain;
@@ -917,6 +922,127 @@ public void testAbortTransaction() throws Exception {
917922
}
918923
}
919924

925+
@Test
926+
public void testPeekMessageForSkipTxnMarker() throws Exception {
927+
initTransaction(1);
928+
929+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_marker");
930+
931+
@Cleanup
932+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
933+
int n = 10;
934+
for (int i = 0; i < n; i++) {
935+
Transaction txn = pulsarClient.newTransaction().build().get();
936+
producer.newMessage(txn).value("msg").send();
937+
txn.commit().get();
938+
}
939+
940+
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
941+
false, TransactionIsolationLevel.READ_UNCOMMITTED);
942+
assertEquals(peekMsgs.size(), n);
943+
for (Message<byte[]> peekMsg : peekMsgs) {
944+
assertEquals(new String(peekMsg.getValue()), "msg");
945+
}
946+
}
947+
948+
@Test
949+
public void testPeekMessageFoReadCommittedMessages() throws Exception {
950+
initTransaction(1);
951+
952+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_txn");
953+
954+
@Cleanup
955+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
956+
int n = 10;
957+
// Alternately sends `n` committed transactional messages and `n` abort transactional messages.
958+
for (int i = 0; i < 2 * n; i++) {
959+
Transaction txn = pulsarClient.newTransaction().build().get();
960+
if (i % 2 == 0) {
961+
producer.newMessage(txn).value("msg").send();
962+
txn.commit().get();
963+
} else {
964+
producer.newMessage(txn).value("msg-aborted").send();
965+
txn.abort();
966+
}
967+
}
968+
// Then sends 1 uncommitted transactional messages.
969+
Transaction txn = pulsarClient.newTransaction().build().get();
970+
producer.newMessage(txn).value("msg-uncommitted").send();
971+
// Then sends n-1 no transaction messages.
972+
for (int i = 0; i < n - 1; i++) {
973+
producer.newMessage().value("msg-after-uncommitted").send();
974+
}
975+
976+
// peek n message, all messages value should be "msg"
977+
{
978+
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
979+
false, TransactionIsolationLevel.READ_COMMITTED);
980+
assertEquals(peekMsgs.size(), n);
981+
for (Message<byte[]> peekMsg : peekMsgs) {
982+
assertEquals(new String(peekMsg.getValue()), "msg");
983+
}
984+
}
985+
986+
// peek 3 * n message, and still get n message, all messages value should be "msg"
987+
{
988+
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 2 * n,
989+
false, TransactionIsolationLevel.READ_COMMITTED);
990+
assertEquals(peekMsgs.size(), n);
991+
for (Message<byte[]> peekMsg : peekMsgs) {
992+
assertEquals(new String(peekMsg.getValue()), "msg");
993+
}
994+
}
995+
}
996+
997+
@Test
998+
public void testPeekMessageForShowAllMessages() throws Exception {
999+
initTransaction(1);
1000+
1001+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_all");
1002+
1003+
@Cleanup
1004+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
1005+
int n = 10;
1006+
// Alternately sends `n` committed transactional messages and `n` abort transactional messages.
1007+
for (int i = 0; i < 2 * n; i++) {
1008+
Transaction txn = pulsarClient.newTransaction().build().get();
1009+
if (i % 2 == 0) {
1010+
producer.newMessage(txn).value("msg").send();
1011+
txn.commit().get();
1012+
} else {
1013+
producer.newMessage(txn).value("msg-aborted").send();
1014+
txn.abort();
1015+
}
1016+
}
1017+
// Then sends `n` uncommitted transactional messages.
1018+
Transaction txn = pulsarClient.newTransaction().build().get();
1019+
for (int i = 0; i < n; i++) {
1020+
producer.newMessage(txn).value("msg-uncommitted").send();
1021+
}
1022+
1023+
// peek 5 * n message, will get 5 * n msg.
1024+
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 5 * n,
1025+
true, TransactionIsolationLevel.READ_UNCOMMITTED);
1026+
assertEquals(peekMsgs.size(), 5 * n);
1027+
1028+
for (int i = 0; i < 4 * n; i++) {
1029+
Message<byte[]> peekMsg = peekMsgs.get(i);
1030+
MessageImpl peekMsgImpl = (MessageImpl) peekMsg;
1031+
MessageMetadata metadata = peekMsgImpl.getMessageBuilder();
1032+
if (metadata.hasMarkerType()) {
1033+
assertTrue(metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE ||
1034+
metadata.getMarkerType() == MarkerType.TXN_ABORT_VALUE);
1035+
} else {
1036+
String value = new String(peekMsg.getValue());
1037+
assertTrue(value.equals("msg") || value.equals("msg-aborted"));
1038+
}
1039+
}
1040+
for (int i = 4 * n; i < peekMsgs.size(); i++) {
1041+
Message<byte[]> peekMsg = peekMsgs.get(i);
1042+
assertEquals(new String(peekMsg.getValue()), "msg-uncommitted");
1043+
}
1044+
}
1045+
9201046
private static void verifyCoordinatorStats(String state,
9211047
long sequenceId, long lowWaterMark) {
9221048
assertEquals(state, "Ready");

0 commit comments

Comments
 (0)