Skip to content

Commit 6c6775e

Browse files
committed
[Fix] fix dlq message RealTopic property is empty
1 parent 1422129 commit 6c6775e

File tree

5 files changed

+98
-21
lines changed

5 files changed

+98
-21
lines changed

pulsar/consumer_impl.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -632,17 +632,21 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert
632632
} else {
633633
props[SysPropertyRealTopic] = msg.Topic()
634634
props[SysPropertyOriginMessageID] = msgID.messageID.String()
635+
props[PropertyOriginMessageID] = msgID.messageID.String()
635636
}
636637
props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
637638
props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)
638639

639640
consumerMsg := ConsumerMessage{
640641
Consumer: c,
642+
// Copy msgID so that dlq/rlq router can ack this msg after successfully sent to new topic
641643
Message: &message{
642-
payLoad: msg.Payload(),
643-
properties: props,
644-
msgID: msgID,
645-
eventTime: msg.EventTime(),
644+
payLoad: msg.Payload(),
645+
key: msg.Key(),
646+
orderingKey: msg.OrderingKey(),
647+
properties: props,
648+
eventTime: msg.EventTime(),
649+
msgID: msgID,
646650
},
647651
}
648652
if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {

pulsar/consumer_partition.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1606,6 +1606,26 @@ func (pc *partitionConsumer) dispatcher() {
16061606
if !pc.isSeeking.Load() {
16071607
if pc.dlq.shouldSendToDlq(&nextMessage) {
16081608
// pass the message to the DLQ router
1609+
// we need to create a new ConsumerMessage and add dlq related metadata properties
1610+
properties := make(map[string]string)
1611+
properties[SysPropertyRealTopic] = messages[0].Topic()
1612+
properties[SysPropertyOriginMessageID] = messages[0].msgID.String()
1613+
properties[PropertyOriginMessageID] = messages[0].msgID.String()
1614+
for key, value := range messages[0].properties {
1615+
properties[key] = value
1616+
}
1617+
nextMessage = ConsumerMessage{
1618+
Consumer: pc.parentConsumer,
1619+
// Copy msgID so that dlq/rlq router can ack this msg after successfully sent to new topic
1620+
Message: &message{
1621+
payLoad: messages[0].Payload(),
1622+
key: messages[0].Key(),
1623+
orderingKey: messages[0].OrderingKey(),
1624+
properties: properties,
1625+
eventTime: messages[0].EventTime(),
1626+
msgID: messages[0].msgID,
1627+
},
1628+
}
16091629
pc.metrics.DlqCounter.Inc()
16101630
messageCh = pc.dlq.Chan()
16111631
} else {

pulsar/consumer_test.go

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1580,12 +1580,23 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
15801580
defer producer.Close()
15811581

15821582
// send 10 messages
1583+
eventTimeList := make([]time.Time, 10)
1584+
msgIDList := make([]string, 10)
1585+
msgKeyList := make([]string, 10)
15831586
for i := 0; i < 10; i++ {
1584-
if _, err := producer.Send(ctx, &ProducerMessage{
1585-
Payload: []byte(fmt.Sprintf("hello-%d", i)),
1586-
}); err != nil {
1587+
eventTime := time.Now()
1588+
eventTimeList[i] = eventTime
1589+
msgKeyList[i] = fmt.Sprintf("key-%d", i)
1590+
msgID, err := producer.Send(ctx, &ProducerMessage{
1591+
Payload: []byte(fmt.Sprintf("hello-%d", i)),
1592+
Key: fmt.Sprintf("key-%d", i),
1593+
OrderingKey: fmt.Sprintf("key-%d", i),
1594+
EventTime: eventTime,
1595+
})
1596+
if err != nil {
15871597
log.Fatal(err)
15881598
}
1599+
msgIDList[i] = msgID.String()
15891600
}
15901601

15911602
// receive 10 messages and only ack half-of-them
@@ -1624,10 +1635,25 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
16241635
assert.True(t, regex.MatchString(msg.ProducerName()))
16251636

16261637
// check original messageId
1638+
assert.NotEmpty(t, msg.Properties()[SysPropertyOriginMessageID])
1639+
assert.Equal(t, msgIDList[expectedMsgIdx], msg.Properties()[SysPropertyOriginMessageID])
16271640
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
1641+
assert.Equal(t, msgIDList[expectedMsgIdx], msg.Properties()[PropertyOriginMessageID])
16281642

16291643
// check original topic
1630-
assert.NotEmpty(t, msg.Properties()[SysPropertyRealTopic])
1644+
assert.Contains(t, msg.Properties()[SysPropertyRealTopic], topic)
1645+
1646+
// check original key
1647+
assert.NotEmpty(t, msg.Key())
1648+
assert.Equal(t, msgKeyList[expectedMsgIdx], msg.Key())
1649+
assert.NotEmpty(t, msg.OrderingKey())
1650+
assert.Equal(t, msgKeyList[expectedMsgIdx], msg.OrderingKey())
1651+
1652+
// check original event time
1653+
// Broker will ignore event time microsecond(us) level precision,
1654+
// so that we need to check eventTime precision in millisecond level
1655+
assert.NotEqual(t, 0, msg.EventTime())
1656+
assert.True(t, eventTimeList[expectedMsgIdx].Sub(msg.EventTime()).Abs() < 2*time.Millisecond)
16311657
}
16321658

16331659
// No more messages on the DLQ
@@ -1855,9 +1881,21 @@ func TestRLQ(t *testing.T) {
18551881
assert.Nil(t, err)
18561882
defer producer.Close()
18571883

1884+
eventTimeList := make([]time.Time, N)
1885+
msgIDList := make([]string, N)
1886+
msgKeyList := make([]string, N)
18581887
for i := 0; i < N; i++ {
1859-
_, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MESSAGE_%d", i))})
1888+
eventTime := time.Now()
1889+
eventTimeList[i] = eventTime
1890+
msgKeyList[i] = fmt.Sprintf("key-%d", i)
1891+
msgID, err := producer.Send(ctx, &ProducerMessage{
1892+
Payload: []byte(fmt.Sprintf("MESSAGE_%d", i)),
1893+
Key: fmt.Sprintf("key-%d", i),
1894+
OrderingKey: fmt.Sprintf("key-%d", i),
1895+
EventTime: eventTime,
1896+
})
18601897
assert.Nil(t, err)
1898+
msgIDList[i] = msgID.String()
18611899
}
18621900

18631901
// 2. Create consumer on the Retry Topic to reconsume N messages (maxRedeliveries+1) times
@@ -1903,6 +1941,31 @@ func TestRLQ(t *testing.T) {
19031941
dlqReceived := 0
19041942
for dlqReceived < N {
19051943
msg, err := dlqConsumer.Receive(ctx)
1944+
// check original messageId
1945+
// we create a topic with three partitions,
1946+
// so that messages maybe not be received as the same order as we produced
1947+
assert.NotEmpty(t, msg.Properties()[SysPropertyOriginMessageID])
1948+
assert.Contains(t, msgIDList, msg.Properties()[SysPropertyOriginMessageID])
1949+
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
1950+
assert.Contains(t, msgIDList, msg.Properties()[PropertyOriginMessageID])
1951+
1952+
// check original topic
1953+
assert.Contains(t, msg.Properties()[SysPropertyRealTopic], topic)
1954+
1955+
// check original key
1956+
assert.NotEmpty(t, msg.Key())
1957+
assert.Contains(t, msgKeyList, msg.Key())
1958+
assert.NotEmpty(t, msg.OrderingKey())
1959+
assert.Contains(t, msgKeyList, msg.OrderingKey())
1960+
1961+
// check original event time
1962+
assert.NotEqual(t, 0, msg.EventTime())
1963+
// check original event time
1964+
// Broker will ignore event time microsecond(us) level precision,
1965+
// so that we need to check eventTime precision in millisecond level
1966+
assert.LessOrEqual(t, eventTimeList[0].Add(-2*time.Millisecond), msg.EventTime())
1967+
assert.LessOrEqual(t, msg.EventTime(), eventTimeList[N-1].Add(2*time.Millisecond))
1968+
19061969
assert.Nil(t, err)
19071970
dlqConsumer.Ack(msg)
19081971
dlqReceived++

pulsar/dlq_router.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,21 +111,11 @@ func (r *dlqRouter) run() {
111111
producer := r.getProducer(cm.Consumer.(*consumer).options.Schema)
112112
msg := cm.Message.(*message)
113113
msgID := msg.ID()
114-
115-
// properties associated with original message
116-
properties := msg.Properties()
117-
118-
// include orinal message id in string format in properties
119-
properties[PropertyOriginMessageID] = msgID.String()
120-
121-
// include original topic name of the message in properties
122-
properties[SysPropertyRealTopic] = msg.Topic()
123-
124114
producer.SendAsync(context.Background(), &ProducerMessage{
125115
Payload: msg.Payload(),
126116
Key: msg.Key(),
127117
OrderingKey: msg.OrderingKey(),
128-
Properties: properties,
118+
Properties: msg.Properties(),
129119
EventTime: msg.EventTime(),
130120
ReplicationClusters: msg.replicationClusters,
131121
}, func(_ MessageID, _ *ProducerMessage, err error) {

pulsar/message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type ProducerMessage struct {
4040

4141
// EventTime set the event time for a given message
4242
// By default, messages don't have an event time associated, while the publish
43-
// time will be be always present.
43+
// time will be always present.
4444
// Set the event time to a non-zero timestamp to explicitly declare the time
4545
// that the event "happened", as opposed to when the message is being published.
4646
EventTime time.Time

0 commit comments

Comments
 (0)