Skip to content

Commit 14cbf16

Browse files
committed
Address comments
1 parent f726a2c commit 14cbf16

File tree

2 files changed

+19
-20
lines changed

2 files changed

+19
-20
lines changed

pulsar/consumer_partition.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,6 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
702702
pendingAcks := make(map[position]*bitset.BitSet)
703703

704704
// They might be complete after the whole for loop
705-
incompleteTrackingIDs := make([]*trackingMessageID, 0)
706705
for _, msgID := range msgIDs {
707706
if msgID.PartitionIdx() != pc.partitionIdx {
708707
pc.log.Errorf("%v inconsistent partition index %v (current: %v)", msgID, msgID.PartitionIdx(), pc.partitionIdx)
@@ -712,10 +711,11 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
712711
} else {
713712
switch convertedMsgID := msgID.(type) {
714713
case *trackingMessageID:
714+
position := newPosition(msgID)
715715
if convertedMsgID.ack() {
716-
pendingAcks[newPosition(msgID)] = nil
717-
} else {
718-
incompleteTrackingIDs = append(incompleteTrackingIDs, convertedMsgID)
716+
pendingAcks[position] = nil
717+
} else if pc.options.enableBatchIndexAck {
718+
pendingAcks[position] = convertedMsgID.tracker.getAckBitSet()
719719
}
720720
case *chunkMessageID:
721721
for _, id := range pc.unAckChunksTracker.get(convertedMsgID) {
@@ -730,15 +730,6 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
730730
}
731731
}
732732

733-
if pc.options.enableBatchIndexAck {
734-
for _, trackingID := range incompleteTrackingIDs {
735-
position := newPosition(trackingID)
736-
if _, found := pendingAcks[position]; !found {
737-
pendingAcks[position] = trackingID.tracker.getAckBitSet()
738-
}
739-
}
740-
}
741-
742733
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
743734
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
744735
return errors.New("consumer state is closed")

pulsar/consumer_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4827,8 +4827,11 @@ func createSharedConsumer(t *testing.T, client Client, topic string, enableBatch
48274827

48284828
func sendMessages(t *testing.T, client Client, topic string, startIndex int, numMessages int, batching bool) {
48294829
producer, err := client.CreateProducer(ProducerOptions{
4830-
Topic: topic,
4831-
DisableBatching: !batching,
4830+
Topic: topic,
4831+
DisableBatching: !batching,
4832+
BatchingMaxMessages: uint(numMessages),
4833+
BatchingMaxSize: 1024 * 1024 * 10,
4834+
BatchingMaxPublishDelay: 1 * time.Hour,
48324835
})
48334836
assert.Nil(t, err)
48344837
defer producer.Close()
@@ -4837,12 +4840,18 @@ func sendMessages(t *testing.T, client Client, topic string, startIndex int, num
48374840
for i := 0; i < numMessages; i++ {
48384841
msg := &ProducerMessage{Payload: []byte(fmt.Sprintf("msg-%d", startIndex+i))}
48394842
if batching {
4840-
producer.SendAsync(ctx, msg, func(_ MessageID, _ *ProducerMessage, _ error) {})
4843+
producer.SendAsync(ctx, msg, func(_ MessageID, _ *ProducerMessage, err error) {
4844+
if err != nil {
4845+
t.Logf("Failed to send message: %v", err)
4846+
}
4847+
})
48414848
} else {
4842-
producer.Send(ctx, msg)
4849+
if _, err := producer.Send(ctx, msg); err != nil {
4850+
assert.Fail(t, "Failed to send message: %v", err)
4851+
}
48434852
}
48444853
}
4845-
producer.Flush()
4854+
assert.Nil(t, producer.Flush())
48464855
}
48474856

48484857
func receiveMessages(t *testing.T, consumer Consumer, numMessages int) []Message {
@@ -4851,10 +4860,9 @@ func receiveMessages(t *testing.T, consumer Consumer, numMessages int) []Message
48514860
msgs := make([]Message, 0)
48524861
for i := 0; i < numMessages; i++ {
48534862
if msg, err := consumer.Receive(ctx); err == nil {
4854-
fmt.Println("Received message: ", string(msg.Payload()))
48554863
msgs = append(msgs, msg)
48564864
} else {
4857-
fmt.Printf("Failed to receive message: %v", err)
4865+
t.Logf("Failed to receive message: %v", err)
48584866
break
48594867
}
48604868
}

0 commit comments

Comments
 (0)