Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,8 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
pendingAcks := make(map[position]*bitset.BitSet)
validMsgIDs := make([]MessageID, 0, len(msgIDs))

pc.metrics.AcksCounter.Add(float64(len(msgIDs)))

// They might be complete after the whole for loop
for _, msgID := range msgIDs {
if msgID.PartitionIdx() != pc.partitionIdx {
Expand All @@ -760,6 +762,7 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
position := newPosition(msgID)
if convertedMsgID.ack() {
pendingAcks[position] = nil
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-convertedMsgID.receivedTime.UnixNano()) / 1.0e9)
} else if pc.options.enableBatchIndexAck {
pendingAcks[position] = convertedMsgID.tracker.getAckBitSet()
}
Expand Down
60 changes: 60 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
"github.com/prometheus/client_golang/prometheus"

"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
Expand Down Expand Up @@ -5160,3 +5161,62 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
"The consumer uses a different connection when reconnecting")
}
}

func TestAckIDListWillIncreaseAckCounterMetrics(t *testing.T) {
topicName := newTopicName()

// Create a custom metrics registry
registry := prometheus.NewRegistry()

client, err := NewClient(ClientOptions{
URL: serviceURL,
MetricsRegisterer: registry,
})
assert.NoError(t, err)
defer client.Close()

p, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.NoError(t, err)

for i := 0; i < 10; i++ {
_, err = p.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-%d", i)),
})
assert.NoError(t, err)
}

c, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "my-sub",
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})

assert.NoError(t, err)

msgIDs := make([]MessageID, 10)
for i := 0; i < 10; i++ {
msg, err := c.Receive(context.Background())
assert.NoError(t, err)
msgIDs[i] = msg.ID()
}

err = c.AckIDList(msgIDs)
assert.NoError(t, err)

// Get metrics directly from the registry
metrics, err := registry.Gather()
assert.NoError(t, err)

var ackCount float64
for _, metric := range metrics {
if metric.GetName() == "pulsar_client_consumer_acks" {
for _, m := range metric.GetMetric() {
ackCount += m.GetCounter().GetValue()
}
}
}

assert.Equal(t, float64(10), ackCount)
}
Loading