Skip to content

Commit 7bd26fc

Browse files
RobertIndieTechnoboy-
authored andcommitted
[fix][client] Fix DLQ producer name conflicts when multiples consumers send messages to DLQ (#21890)
1 parent 106b838 commit 7bd26fc

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ public void testDeadLetterTopicWithMessageKey() throws Exception {
140140
public void testDeadLetterTopicWithProducerName() throws Exception {
141141
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
142142
final String subscription = "my-subscription";
143-
String deadLetterProducerName = String.format("%s-%s-DLQ", topic, subscription);
143+
final String consumerName = "my-consumer";
144+
String deadLetterProducerName = String.format("%s-%s-%s-DLQ", topic, subscription, consumerName);
144145

145146
final int maxRedeliveryCount = 1;
146147

@@ -149,6 +150,7 @@ public void testDeadLetterTopicWithProducerName() throws Exception {
149150
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
150151
.topic(topic)
151152
.subscriptionName(subscription)
153+
.consumerName(consumerName)
152154
.subscriptionType(SubscriptionType.Shared)
153155
.ackTimeout(1, TimeUnit.SECONDS)
154156
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
@@ -929,6 +931,9 @@ public void testDeadLetterTopicWithInitialSubscriptionAndMultiConsumers() throws
929931
assertTrue(admin.topics().getSubscriptions(deadLetterTopic).contains(dlqInitialSub));
930932
});
931933

934+
// We should assert that all consumers are able to produce messages to DLQ
935+
assertEquals(admin.topics().getStats(deadLetterTopic).getPublishers().size(), 2);
936+
932937
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
933938
.topic(deadLetterTopic)
934939
.subscriptionName(dlqInitialSub)

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2172,7 +2172,8 @@ private void initDeadLetterProducerIfNeeded() {
21722172
((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
21732173
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
21742174
.topic(this.deadLetterPolicy.getDeadLetterTopic())
2175-
.producerName(String.format("%s-%s-DLQ", this.topicName, this.subscription))
2175+
.producerName(String.format("%s-%s-%s-DLQ", this.topicName, this.subscription,
2176+
this.consumerName))
21762177
.blockIfQueueFull(false)
21772178
.enableBatching(false)
21782179
.enableChunking(true)

0 commit comments

Comments
 (0)