Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,17 @@ func ackIDListFromMultiTopics(log log.Logger, msgIDs []MessageID, findConsumer f
}
}

ackError := AckError{}
subErrCh := make(chan error, len(consumerToMsgIDs))
for consumer, ids := range consumerToMsgIDs {
if err := consumer.AckIDList(ids); err != nil {
if topicAckError := err.(AckError); topicAckError != nil {
for id, err := range topicAckError {
ackError[id] = err
}
} else {
// It should not reach here
for _, id := range ids {
ackError[id] = err
}
go func() {
subErrCh <- consumer.AckIDList(ids)
}()
}
ackError := AckError{}
for i := 0; i < len(consumerToMsgIDs); i++ {
if topicAckError, ok := (<-subErrCh).(AckError); ok {
for id, err := range topicAckError {
ackError[id] = err
}
}
}
Expand Down
102 changes: 101 additions & 1 deletion pulsar/consumer_multitopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
package pulsar

import (
"errors"
"fmt"
"strings"
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"

"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -317,3 +319,101 @@ func runMultiTopicAckIDList(t *testing.T, regex bool) {
assert.Fail(t, "AckIDList should return AckError")
}
}

type dummyConnection struct {
}

func (dummyConnection) SendRequest(_ uint64, _ *pb.BaseCommand, _ func(*pb.BaseCommand, error)) {
}

func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error {
return nil
}

func (dummyConnection) WriteData(_ internal.Buffer) {
}

func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error {
return nil
}

func (dummyConnection) UnregisterListener(_ uint64) {
}

func (dummyConnection) AddConsumeHandler(_ uint64, _ internal.ConsumerHandler) error {
return nil
}

func (dummyConnection) DeleteConsumeHandler(_ uint64) {
}

func (dummyConnection) ID() string {
return "cnx"
}

func (dummyConnection) GetMaxMessageSize() int32 {
return 0
}

func (dummyConnection) Close() {
}

func (dummyConnection) WaitForClose() <-chan struct{} {
return nil
}

func (dummyConnection) IsProxied() bool {
return false
}

func TestMultiTopicAckIDListTimeout(t *testing.T) {
topic := fmt.Sprintf("multiTopicAckIDListTimeout%v", time.Now().UnixNano())
assert.NoError(t, createPartitionedTopic(topic, 5))

cli, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 3 * time.Second,
})
assert.Nil(t, err)
defer cli.Close()

consumer, err := cli.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub",
AckWithResponse: true,
})
assert.Nil(t, err)
defer consumer.Close()

const numMessages = 5
sendMessages(t, cli, topic, 0, numMessages, false)
msgs := receiveMessages(t, consumer, numMessages)
msgIDs := make([]MessageID, len(msgs))

var conn internal.Connection
for i := 0; i < len(msgs); i++ {
msgIDs[i] = msgs[i].ID()
pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer)
assert.True(t, ok)
conn = pc._getConn()
pc._setConn(dummyConnection{})
}

start := time.Now()
err = consumer.AckIDList(msgIDs)
elapsed := time.Since(start)
t.Logf("AckIDList takes %v ms", elapsed)
assert.True(t, elapsed < 5*time.Second && elapsed >= 3*time.Second)
var ackError AckError
if errors.As(err, &ackError) {
for _, err := range ackError {
assert.Equal(t, "request timed out", err.Error())
}
}

for i := 0; i < len(msgs); i++ {
pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer)
assert.True(t, ok)
pc._setConn(conn)
}
}
6 changes: 3 additions & 3 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type partitionConsumer struct {
state uAtomic.Int32
options *partitionConsumerOpts

conn uAtomic.Value
conn atomic.Pointer[internal.Connection]

topic string
name string
Expand Down Expand Up @@ -2205,7 +2205,7 @@ func (pc *partitionConsumer) hasMoreMessages() bool {
// _setConn sets the internal connection field of this partition consumer atomically.
// Note: should only be called by this partition consumer when a new connection is available.
func (pc *partitionConsumer) _setConn(conn internal.Connection) {
pc.conn.Store(conn)
pc.conn.Store(&conn)
}

// _getConn returns internal connection field of this partition consumer atomically.
Expand All @@ -2214,7 +2214,7 @@ func (pc *partitionConsumer) _getConn() internal.Connection {
// Invariant: The conn must be non-nill for the lifetime of the partitionConsumer.
// For this reason we leave this cast unchecked and panic() if the
// invariant is broken
return pc.conn.Load().(internal.Connection)
return *pc.conn.Load()
}

func convertToMessageIDData(msgID *trackingMessageID) *pb.MessageIdData {
Expand Down
Loading