Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,25 +189,30 @@ func ackIDListFromMultiTopics(log log.Logger, msgIDs []MessageID, findConsumer f
}
}

ackError := AckError{}
for consumer, ids := range consumerToMsgIDs {
if err := consumer.AckIDList(ids); err != nil {
if topicAckError := err.(AckError); topicAckError != nil {
subErrCh := make(chan error, len(consumerToMsgIDs))
errCh := make(chan error)
go func() {
ackError := AckError{}
for i := 0; i < len(consumerToMsgIDs); i++ {
err := <-subErrCh
if topicAckError, ok := err.(AckError); ok {
for id, err := range topicAckError {
ackError[id] = err
}
} else {
// It should not reach here
for _, id := range ids {
ackError[id] = err
}
}
}
if len(ackError) == 0 {
errCh <- nil
} else {
errCh <- ackError
}
}()
for consumer, ids := range consumerToMsgIDs {
go func() {
subErrCh <- consumer.AckIDList(ids)
}()
}
if len(ackError) == 0 {
return nil
}
return ackError
return <-errCh
}

// AckWithTxn the consumption of a single message with a transaction
Expand Down
102 changes: 101 additions & 1 deletion pulsar/consumer_multitopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"

"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -317,3 +318,102 @@ func runMultiTopicAckIDList(t *testing.T, regex bool) {
assert.Fail(t, "AckIDList should return AckError")
}
}

type dummyConnection struct {
}

func (dummyConnection) SendRequest(_ uint64, _ *pb.BaseCommand, _ func(*pb.BaseCommand, error)) {
}

func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error {
return nil
}

func (dummyConnection) WriteData(_ internal.Buffer) {
}

func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error {
return nil
}

func (dummyConnection) UnregisterListener(_ uint64) {
}

func (dummyConnection) AddConsumeHandler(_ uint64, _ internal.ConsumerHandler) error {
return nil
}

func (dummyConnection) DeleteConsumeHandler(_ uint64) {
}

func (dummyConnection) ID() string {
return "cnx"
}

func (dummyConnection) GetMaxMessageSize() int32 {
return 0
}

func (dummyConnection) Close() {
}

func (dummyConnection) WaitForClose() <-chan struct{} {
return nil
}

func (dummyConnection) IsProxied() bool {
return false
}

func TestMultiTopicAckIDListTimeout(t *testing.T) {
topic := fmt.Sprintf("multiTopicAckIDListTimeout%v", time.Now().UnixNano())
createPartitionedTopic(topic, 5)

cli, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 3 * time.Second,
})
assert.Nil(t, err)
defer cli.Close()

consumer, err := cli.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub",
AckWithResponse: true,
})
assert.Nil(t, err)
defer consumer.Close()

const numMessages = 5
sendMessages(t, cli, topic, 0, numMessages, false)
msgs := receiveMessages(t, consumer, numMessages)
msgIDs := make([]MessageID, len(msgs))

var conn internal.Connection
for i := 0; i < len(msgs); i++ {
msgIDs[i] = msgs[i].ID()
pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer)
assert.True(t, ok)
conn = pc._getConn()
pc._setConn(dummyConnection{})
}

start := time.Now()
err = consumer.AckIDList(msgIDs)
elapsed := time.Since(start)
t.Logf("AckIDList takes %v ms", elapsed)
assert.True(t, elapsed < 4*time.Second && elapsed >= 3*time.Second)
if ackError, ok := err.(AckError); ok {
for _, err := range ackError {
assert.Equal(t, "request timed out", err.Error())
}
} else {
assert.Fail(t, "AckIDList should return AckError")
}

for i := 0; i < len(msgs); i++ {
pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer)
assert.True(t, ok)
pc._setConn(conn)
}
}
6 changes: 3 additions & 3 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type partitionConsumer struct {
state uAtomic.Int32
options *partitionConsumerOpts

conn uAtomic.Value
conn atomic.Pointer[internal.Connection]

topic string
name string
Expand Down Expand Up @@ -2205,7 +2205,7 @@ func (pc *partitionConsumer) hasMoreMessages() bool {
// _setConn sets the internal connection field of this partition consumer atomically.
// Note: should only be called by this partition consumer when a new connection is available.
func (pc *partitionConsumer) _setConn(conn internal.Connection) {
pc.conn.Store(conn)
pc.conn.Store(&conn)
}

// _getConn returns internal connection field of this partition consumer atomically.
Expand All @@ -2214,7 +2214,7 @@ func (pc *partitionConsumer) _getConn() internal.Connection {
// Invariant: The conn must be non-nill for the lifetime of the partitionConsumer.
// For this reason we leave this cast unchecked and panic() if the
// invariant is broken
return pc.conn.Load().(internal.Connection)
return *pc.conn.Load()
}

func convertToMessageIDData(msgID *trackingMessageID) *pb.MessageIdData {
Expand Down
Loading