Skip to content

Commit 9729b65

Browse files
committed
[Fix] fix dlq message RealTopic property is empty
1 parent 1422129 commit 9729b65

File tree

5 files changed

+108
-21
lines changed

5 files changed

+108
-21
lines changed

pulsar/consumer_impl.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -632,17 +632,21 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert
632632
} else {
633633
props[SysPropertyRealTopic] = msg.Topic()
634634
props[SysPropertyOriginMessageID] = msgID.messageID.String()
635+
props[PropertyOriginMessageID] = msgID.messageID.String()
635636
}
636637
props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
637638
props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)
638639

639640
consumerMsg := ConsumerMessage{
640641
Consumer: c,
642+
// Copy msgID so that dlq/rlq router can ack this msg after successfully sent to new topic
641643
Message: &message{
642-
payLoad: msg.Payload(),
643-
properties: props,
644-
msgID: msgID,
645-
eventTime: msg.EventTime(),
644+
payLoad: msg.Payload(),
645+
key: msg.Key(),
646+
orderingKey: msg.OrderingKey(),
647+
properties: props,
648+
eventTime: msg.EventTime(),
649+
msgID: msgID,
646650
},
647651
}
648652
if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {

pulsar/consumer_partition.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1606,6 +1606,26 @@ func (pc *partitionConsumer) dispatcher() {
16061606
if !pc.isSeeking.Load() {
16071607
if pc.dlq.shouldSendToDlq(&nextMessage) {
16081608
// pass the message to the DLQ router
1609+
// we need to create a new ConsumerMessage and add dlq related metadata properties
1610+
properties := make(map[string]string)
1611+
properties[SysPropertyRealTopic] = messages[0].Topic()
1612+
properties[SysPropertyOriginMessageID] = messages[0].msgID.String()
1613+
properties[PropertyOriginMessageID] = messages[0].msgID.String()
1614+
for key, value := range messages[0].properties {
1615+
properties[key] = value
1616+
}
1617+
nextMessage = ConsumerMessage{
1618+
Consumer: pc.parentConsumer,
1619+
// Copy msgID so that dlq/rlq router can ack this msg after successfully sent to new topic
1620+
Message: &message{
1621+
payLoad: messages[0].Payload(),
1622+
key: messages[0].Key(),
1623+
orderingKey: messages[0].OrderingKey(),
1624+
properties: properties,
1625+
eventTime: messages[0].EventTime(),
1626+
msgID: messages[0].msgID,
1627+
},
1628+
}
16091629
pc.metrics.DlqCounter.Inc()
16101630
messageCh = pc.dlq.Chan()
16111631
} else {

pulsar/consumer_test.go

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1580,12 +1580,26 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
15801580
defer producer.Close()
15811581

15821582
// send 10 messages
1583+
eventTimeList := make([]time.Time, 10)
1584+
msgIDList := make([]string, 10)
1585+
msgKeyList := make([]string, 10)
15831586
for i := 0; i < 10; i++ {
1584-
if _, err := producer.Send(ctx, &ProducerMessage{
1585-
Payload: []byte(fmt.Sprintf("hello-%d", i)),
1586-
}); err != nil {
1587+
eventTime := time.Now()
1588+
eventTimeList[i] = eventTime
1589+
msgKeyList[i] = fmt.Sprintf("key-%d", i)
1590+
msgID, err := producer.Send(ctx, &ProducerMessage{
1591+
Payload: []byte(fmt.Sprintf("hello-%d", i)),
1592+
Key: fmt.Sprintf("key-%d", i),
1593+
OrderingKey: fmt.Sprintf("key-%d", i),
1594+
EventTime: eventTime,
1595+
Properties: map[string]string{
1596+
"key": fmt.Sprintf("key-%d", i),
1597+
},
1598+
})
1599+
if err != nil {
15871600
log.Fatal(err)
15881601
}
1602+
msgIDList[i] = msgID.String()
15891603
}
15901604

15911605
// receive 10 messages and only ack half-of-them
@@ -1624,10 +1638,27 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
16241638
assert.True(t, regex.MatchString(msg.ProducerName()))
16251639

16261640
// check original messageId
1641+
assert.NotEmpty(t, msg.Properties()[SysPropertyOriginMessageID])
1642+
assert.Equal(t, msgIDList[expectedMsgIdx], msg.Properties()[SysPropertyOriginMessageID])
16271643
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
1644+
assert.Equal(t, msgIDList[expectedMsgIdx], msg.Properties()[PropertyOriginMessageID])
16281645

16291646
// check original topic
1630-
assert.NotEmpty(t, msg.Properties()[SysPropertyRealTopic])
1647+
assert.Contains(t, msg.Properties()[SysPropertyRealTopic], topic)
1648+
1649+
// check original key
1650+
assert.NotEmpty(t, msg.Key())
1651+
assert.Equal(t, msgKeyList[expectedMsgIdx], msg.Key())
1652+
assert.NotEmpty(t, msg.OrderingKey())
1653+
assert.Equal(t, msgKeyList[expectedMsgIdx], msg.OrderingKey())
1654+
assert.NotEmpty(t, msg.Properties()["key"])
1655+
assert.Equal(t, msg.Key(), msg.Properties()["key"])
1656+
1657+
// check original event time
1658+
// Broker will ignore event time microsecond(us) level precision,
1659+
// so that we need to check eventTime precision in millisecond level
1660+
assert.NotEqual(t, 0, msg.EventTime())
1661+
assert.True(t, eventTimeList[expectedMsgIdx].Sub(msg.EventTime()).Abs() < 2*time.Millisecond)
16311662
}
16321663

16331664
// No more messages on the DLQ
@@ -1855,9 +1886,24 @@ func TestRLQ(t *testing.T) {
18551886
assert.Nil(t, err)
18561887
defer producer.Close()
18571888

1889+
eventTimeList := make([]time.Time, N)
1890+
msgIDList := make([]string, N)
1891+
msgKeyList := make([]string, N)
18581892
for i := 0; i < N; i++ {
1859-
_, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MESSAGE_%d", i))})
1893+
eventTime := time.Now()
1894+
eventTimeList[i] = eventTime
1895+
msgKeyList[i] = fmt.Sprintf("key-%d", i)
1896+
msgID, err := producer.Send(ctx, &ProducerMessage{
1897+
Payload: []byte(fmt.Sprintf("MESSAGE_%d", i)),
1898+
Key: fmt.Sprintf("key-%d", i),
1899+
OrderingKey: fmt.Sprintf("key-%d", i),
1900+
EventTime: eventTime,
1901+
Properties: map[string]string{
1902+
"key": fmt.Sprintf("key-%d", i),
1903+
},
1904+
})
18601905
assert.Nil(t, err)
1906+
msgIDList[i] = msgID.String()
18611907
}
18621908

18631909
// 2. Create consumer on the Retry Topic to reconsume N messages (maxRedeliveries+1) times
@@ -1903,6 +1949,33 @@ func TestRLQ(t *testing.T) {
19031949
dlqReceived := 0
19041950
for dlqReceived < N {
19051951
msg, err := dlqConsumer.Receive(ctx)
1952+
// check original messageId
1953+
// we create a topic with three partitions,
1954+
// so that messages maybe not be received as the same order as we produced
1955+
assert.NotEmpty(t, msg.Properties()[SysPropertyOriginMessageID])
1956+
assert.Contains(t, msgIDList, msg.Properties()[SysPropertyOriginMessageID])
1957+
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
1958+
assert.Contains(t, msgIDList, msg.Properties()[PropertyOriginMessageID])
1959+
1960+
// check original topic
1961+
assert.Contains(t, msg.Properties()[SysPropertyRealTopic], topic)
1962+
1963+
// check original key
1964+
assert.NotEmpty(t, msg.Key())
1965+
assert.Contains(t, msgKeyList, msg.Key())
1966+
assert.NotEmpty(t, msg.OrderingKey())
1967+
assert.Contains(t, msgKeyList, msg.OrderingKey())
1968+
assert.NotEmpty(t, msg.Properties()["key"])
1969+
assert.Equal(t, msg.Key(), msg.Properties()["key"])
1970+
1971+
// check original event time
1972+
assert.NotEqual(t, 0, msg.EventTime())
1973+
// check original event time
1974+
// Broker will ignore event time microsecond(us) level precision,
1975+
// so that we need to check eventTime precision in millisecond level
1976+
assert.LessOrEqual(t, eventTimeList[0].Add(-2*time.Millisecond), msg.EventTime())
1977+
assert.LessOrEqual(t, msg.EventTime(), eventTimeList[N-1].Add(2*time.Millisecond))
1978+
19061979
assert.Nil(t, err)
19071980
dlqConsumer.Ack(msg)
19081981
dlqReceived++

pulsar/dlq_router.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,21 +111,11 @@ func (r *dlqRouter) run() {
111111
producer := r.getProducer(cm.Consumer.(*consumer).options.Schema)
112112
msg := cm.Message.(*message)
113113
msgID := msg.ID()
114-
115-
// properties associated with original message
116-
properties := msg.Properties()
117-
118-
// include orinal message id in string format in properties
119-
properties[PropertyOriginMessageID] = msgID.String()
120-
121-
// include original topic name of the message in properties
122-
properties[SysPropertyRealTopic] = msg.Topic()
123-
124114
producer.SendAsync(context.Background(), &ProducerMessage{
125115
Payload: msg.Payload(),
126116
Key: msg.Key(),
127117
OrderingKey: msg.OrderingKey(),
128-
Properties: properties,
118+
Properties: msg.Properties(),
129119
EventTime: msg.EventTime(),
130120
ReplicationClusters: msg.replicationClusters,
131121
}, func(_ MessageID, _ *ProducerMessage, err error) {

pulsar/message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type ProducerMessage struct {
4040

4141
// EventTime set the event time for a given message
4242
// By default, messages don't have an event time associated, while the publish
43-
// time will be be always present.
43+
// time will be always present.
4444
// Set the event time to a non-zero timestamp to explicitly declare the time
4545
// that the event "happened", as opposed to when the message is being published.
4646
EventTime time.Time

0 commit comments

Comments
 (0)