Skip to content

Commit 682bf5f

Browse files
authored
[Issue 1233] Fix the issue where the AckIDCumulativ cannot return error. (#1235)
1 parent 63ae154 commit 682bf5f

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

pulsar/consumer_partition.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,7 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon
730730

731731
var ackReq *ackRequest
732732
if withResponse {
733-
ackReq := pc.sendCumulativeAck(msgIDToAck)
733+
ackReq = pc.sendCumulativeAck(msgIDToAck)
734734
<-ackReq.doneCh
735735
} else {
736736
pc.ackGroupingTracker.addCumulative(msgIDToAck)

pulsar/consumer_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,11 +1010,13 @@ func TestConsumerBatchCumulativeAck(t *testing.T) {
10101010

10111011
if i == N-1 {
10121012
// cumulative ack the first half of messages
1013-
c1.AckCumulative(msg)
1013+
err := c1.AckCumulative(msg)
1014+
assert.Nil(t, err)
10141015
} else if i == N {
10151016
// the N+1 msg is in the second batch
10161017
// cumulative ack it to test if the first batch can be acked
1017-
c2.AckCumulative(msg)
1018+
err := c2.AckCumulative(msg)
1019+
assert.Nil(t, err)
10181020
}
10191021
}
10201022

@@ -3950,7 +3952,8 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o
39503952
// Acknowledge half of the messages
39513953
if cumulative {
39523954
msgID := msgIds[BatchingMaxSize/2-1]
3953-
consumer.AckIDCumulative(msgID)
3955+
err := consumer.AckIDCumulative(msgID)
3956+
assert.Nil(t, err)
39543957
log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx())
39553958
} else {
39563959
for i := 0; i < BatchingMaxSize; i++ {
@@ -3985,7 +3988,8 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o
39853988
}
39863989
if cumulative {
39873990
msgID := msgIds[BatchingMaxSize-1]
3988-
consumer.AckIDCumulative(msgID)
3991+
err := consumer.AckIDCumulative(msgID)
3992+
assert.Nil(t, err)
39893993
log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx())
39903994
}
39913995
consumer.Close()

0 commit comments

Comments
 (0)