-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Java Client] Fix wrong behavior of deduplication for key based batching #15413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Java Client] Fix wrong behavior of deduplication for key based batching #15413
Conversation
@BewareMyPower:Thanks for your contribution. For this PR, do we need to update docs? |
Though this PR changed the current behavior that the |
@BewareMyPower:Thanks for providing doc info! |
Hi @BewareMyPower. I think it will cause other problems.
I don't know whether I have correctly understood your meaning. If there is any misunderstanding, thank you for telling me. |
First, both cases should not be valid. User should be responsible to make the sequence id unique. What this PR wants to solve is the resend case of an existing message. For example, 4 messages might be split into two groups in key based batching:
If the client didn't receive the ack receipt of batch A, it might resend batch A (msg-0 and msg-3). There are two possible cases:
Assuming msg-0 and msg-3 are sent individually (without batch). For the 1st case, we can verify it easily with the unit test. public void test() throws Exception {
final String topic = "persistent://my-property/my-ns/test";
admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
final List<TypedMessageBuilder<String>> messagesToSend = new ArrayList<>();
messagesToSend.add(producer.newMessage().value("msg-0").key("A").sequenceId(0));
messagesToSend.add(producer.newMessage().value("msg-1").key("B").sequenceId(1));
messagesToSend.add(producer.newMessage().value("msg-2").key("B").sequenceId(2));
messagesToSend.add(producer.newMessage().value("msg-3").key("A").sequenceId(3));
final List<CompletableFuture<MessageId>> futures = new ArrayList<>();
messagesToSend.forEach(msg -> futures.add(msg.sendAsync()));
producer.flush(); // flush the messages in the batch container
final List<MessageId> ids = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
// Resend messages in batch A
ids.add(messagesToSend.get(0).send());
ids.add(messagesToSend.get(3).send());
// TODO: print the ids
}
Before this patch, the resend of For the 2nd case, we can modify the test to: final List<CompletableFuture<MessageId>> futures = new ArrayList<>();
// Assume msg-0 and msg-3 are lost
futures.add(messagesToSend.get(1).sendAsync());
futures.add(messagesToSend.get(2).sendAsync());
producer.flush(); // flush the messages in the batch container
The behaviors are the same. Though before this patch, It's a pity to see However, if we resent // Resend messages in batch A
futures.add(messagesToSend.get(0).sendAsync());
futures.add(messagesToSend.get(3).sendAsync());
final List<MessageId> newIds = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
In short, this PR solves the problem that duplicated messages might be accepted even if the previous messages were persisted successfully.
|
To make it more clear and simple, #5491 introduced the highest sequence id to |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
Show resolved
Hide resolved
/pulsarbot run-failure-checks |
db6ad4f
to
dd62fff
Compare
messagesToSend.add(producer.newMessage().value("msg-0").key("A").sequenceId(0));
messagesToSend.add(producer.newMessage().value("msg-1").key("B").sequenceId(1));
messagesToSend.add(producer.newMessage().value("msg-2").key("B").sequenceId(2));
messagesToSend.add(producer.newMessage().value("msg-3").key("A").sequenceId(3)); What is the expected behavior for the above case? The deduplication depends on the monotonically increasing sequence ID, but the key-based |
I've explained in details in the previous comment. It's true that even after this patch, message deduplication cannot achieve effectively-once, instead, it's at-most once. But it can avoid the duplicated messages. It doesn't solve the problem fundamentally, but it makes it better. I noticed that when #4435 added the key based batch container, it also sorted the batches by sequence id. pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java Lines 154 to 156 in a1fb200
At that time, #5491 was not pushed and there is no highest sequence id field in Even without considering the enhancement (maybe meaningless at this time) to the deduplication, the refactor of this PR that reuses the BTW, maybe in future, we can support deduplication by keys and |
The initial purpose of this PR was that I'm wondering why adding a
So during the refactoring, I tend to adopt the more correct behavior rather than keeping the previous behavior (might not be on purpose, but a mistake). |
Oh, I see. I thought the PR was for support deduplication for key-based batching before. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good to me, just left some minor comments.
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
Outdated
Show resolved
Hide resolved
@codelipenghui All comments are addressed, PTAL again. |
@hangc0276 Could you also take a look? |
### Motivation Currently message deduplication doesn't work well for key based batching. First, the key based batch container doesn't update the `lastSequenceIdPushed`. So a batch could contain both duplicated and not duplicated messages. Second, when `createOpSendMsgs` is called, the `OpSendMsg` objects are sorted by the lowest sequence ids, and the highest sequence id is not set. If a batch contains sequence id 0,1,2, then the message with sequence id 1 or 2 won't be dropped. ### Modifications - Refactor the key based batch container that the `BatchMessageContainerImpl` is reused instead of maintaining a `KeyedBatch` class. - When `createOpSendMsgs` is called, clear the highest sequence id field and configure the sequence id field with the highest sequence id to fix the second issue described before. - Add `testKeyBasedBatchingOrder` to show and verify the current behavior. - Add test for key based batching into `testProducerDeduplicationWithDiscontinuousSequenceId` to verify `lastSlastSequenceIdPushed` is updated correctly.
79e7e47
to
3ad7fa9
Compare
…ing (apache#15413) ### Motivation Currently message deduplication doesn't work well for key based batching. First, the key based batch container doesn't update the `lastSequenceIdPushed`. So a batch could contain both duplicated and not duplicated messages. Second, when `createOpSendMsgs` is called, the `OpSendMsg` objects are sorted by the lowest sequence ids, and the highest sequence id is not set. If a batch contains sequence id 0,1,2, then the message with sequence id 1 or 2 won't be dropped. ### Modifications - Refactor the key based batch container that the `BatchMessageContainerImpl` is reused instead of maintaining a `KeyedBatch` class. - When `createOpSendMsgs` is called, clear the highest sequence id field and configure the sequence id field with the highest sequence id to fix the second issue described before. - Add `testKeyBasedBatchingOrder` to show and verify the current behavior. - Add test for key based batching into `testProducerDeduplicationWithDiscontinuousSequenceId` to verify `lastSlastSequenceIdPushed` is updated correctly.
…ing (apache#15413) ### Motivation Currently message deduplication doesn't work well for key based batching. First, the key based batch container doesn't update the `lastSequenceIdPushed`. So a batch could contain both duplicated and not duplicated messages. Second, when `createOpSendMsgs` is called, the `OpSendMsg` objects are sorted by the lowest sequence ids, and the highest sequence id is not set. If a batch contains sequence id 0,1,2, then the message with sequence id 1 or 2 won't be dropped. ### Modifications - Refactor the key based batch container that the `BatchMessageContainerImpl` is reused instead of maintaining a `KeyedBatch` class. - When `createOpSendMsgs` is called, clear the highest sequence id field and configure the sequence id field with the highest sequence id to fix the second issue described before. - Add `testKeyBasedBatchingOrder` to show and verify the current behavior. - Add test for key based batching into `testProducerDeduplicationWithDiscontinuousSequenceId` to verify `lastSlastSequenceIdPushed` is updated correctly.
…ing (apache#15413) ### Motivation Currently message deduplication doesn't work well for key based batching. First, the key based batch container doesn't update the `lastSequenceIdPushed`. So a batch could contain both duplicated and not duplicated messages. Second, when `createOpSendMsgs` is called, the `OpSendMsg` objects are sorted by the lowest sequence ids, and the highest sequence id is not set. If a batch contains sequence id 0,1,2, then the message with sequence id 1 or 2 won't be dropped. ### Modifications - Refactor the key based batch container that the `BatchMessageContainerImpl` is reused instead of maintaining a `KeyedBatch` class. - When `createOpSendMsgs` is called, clear the highest sequence id field and configure the sequence id field with the highest sequence id to fix the second issue described before. - Add `testKeyBasedBatchingOrder` to show and verify the current behavior. - Add test for key based batching into `testProducerDeduplicationWithDiscontinuousSequenceId` to verify `lastSlastSequenceIdPushed` is updated correctly.
…ing (#15413) ### Motivation Currently message deduplication doesn't work well for key based batching. First, the key based batch container doesn't update the `lastSequenceIdPushed`. So a batch could contain both duplicated and not duplicated messages. Second, when `createOpSendMsgs` is called, the `OpSendMsg` objects are sorted by the lowest sequence ids, and the highest sequence id is not set. If a batch contains sequence id 0,1,2, then the message with sequence id 1 or 2 won't be dropped. ### Modifications - Refactor the key based batch container that the `BatchMessageContainerImpl` is reused instead of maintaining a `KeyedBatch` class. - When `createOpSendMsgs` is called, clear the highest sequence id field and configure the sequence id field with the highest sequence id to fix the second issue described before. - Add `testKeyBasedBatchingOrder` to show and verify the current behavior. - Add test for key based batching into `testProducerDeduplicationWithDiscontinuousSequenceId` to verify `lastSlastSequenceIdPushed` is updated correctly. (cherry picked from commit a773337)
…ing (#15413) ### Motivation Currently message deduplication doesn't work well for key based batching. First, the key based batch container doesn't update the `lastSequenceIdPushed`. So a batch could contain both duplicated and not duplicated messages. Second, when `createOpSendMsgs` is called, the `OpSendMsg` objects are sorted by the lowest sequence ids, and the highest sequence id is not set. If a batch contains sequence id 0,1,2, then the message with sequence id 1 or 2 won't be dropped. ### Modifications - Refactor the key based batch container that the `BatchMessageContainerImpl` is reused instead of maintaining a `KeyedBatch` class. - When `createOpSendMsgs` is called, clear the highest sequence id field and configure the sequence id field with the highest sequence id to fix the second issue described before. - Add `testKeyBasedBatchingOrder` to show and verify the current behavior. - Add test for key based batching into `testProducerDeduplicationWithDiscontinuousSequenceId` to verify `lastSlastSequenceIdPushed` is updated correctly. (cherry picked from commit a773337)
Motivation
Currently message deduplication doesn't work well for key based
batching. First, the key based batch container doesn't update the
lastSequenceIdPushed
. So a batch could contain both duplicated and notduplicated messages. Second, when
createOpSendMsgs
is called, theOpSendMsg
objects are sorted by the lowest sequence ids, and thehighest sequence id is not set. If a batch contains sequence id 0,1,2,
then the message with sequence id 1 or 2 won't be dropped.
Modifications
BatchMessageContainerImpl
is reused instead of maintaining aKeyedBatch
class.createOpSendMsgs
is called, clear the highest sequence id fieldand configure the sequence id field with the highest sequence id to fix
the second issue described before.
testKeyBasedBatchingOrder
to show and verify the currentbehavior.
testProducerDeduplicationWithDiscontinuousSequenceId
to verifylastSlastSequenceIdPushed
is updated correctly.Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
no-need-doc
(Please explain why)
doc
(Your PR contains doc changes)
doc-added
(Docs have been already added)