Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ type partitionConsumer struct {
state uAtomic.Int32
options *partitionConsumerOpts

conn atomic.Pointer[internal.Connection]
conn atomic.Pointer[internal.Connection]
cnxKeySuffix int32

topic string
name string
Expand Down Expand Up @@ -351,6 +352,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
parentConsumer: parent,
client: client,
options: options,
cnxKeySuffix: client.cnxPool.GenerateRoundRobinIndex(),
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
Expand Down Expand Up @@ -1964,7 +1966,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL string) error {
cmdSubscribe.ForceTopicCreation = proto.Bool(false)
}

res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)

if err != nil {
Expand All @@ -1975,7 +1977,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL string) error {
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
}
_, _ = pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
_, _ = pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
}
return err
Expand Down
28 changes: 28 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5038,3 +5038,31 @@ func TestClientVersion(t *testing.T) {
assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client"))

}

func TestSelectConnectionForSameConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
MaxConnectionsPerBroker: 10,
})
assert.NoError(t, err)
defer client.Close()

topicName := newTopicName()

_consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
})
assert.NoError(t, err)
defer _consumer.Close()

partitionConsumerImpl := _consumer.(*consumer).consumers[0]
conn := partitionConsumerImpl._getConn()

for i := 0; i < 5; i++ {
assert.NoError(t, partitionConsumerImpl.grabConn(""))
assert.Equal(t, conn.ID(), partitionConsumerImpl._getConn().ID(),
"The consumer uses a different connection when reconnecting")
}
}
33 changes: 17 additions & 16 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ import (
// ConnectionPool is a interface of connection pool.
type ConnectionPool interface {
// GetConnection get a connection from ConnectionPool.
GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)
GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, keySuffix int32) (Connection, error)

// GetConnections get all connections in the pool.
GetConnections() map[string]Connection

// GenerateRoundRobinIndex generates a round-robin index.
GenerateRoundRobinIndex() int32

// Close all the connections in the pool
Close()
}
Expand All @@ -47,8 +50,8 @@ type connectionPool struct {
connectionTimeout time.Duration
tlsOptions *TLSOptions
auth auth.Provider
maxConnectionsPerHost int32
roundRobinCnt int32
maxConnectionsPerHost uint32
roundRobinCnt uint32
keepAliveInterval time.Duration
closeCh chan struct{}

Expand All @@ -73,7 +76,7 @@ func NewConnectionPool(
tlsOptions: tlsOptions,
auth: auth,
connectionTimeout: connectionTimeout,
maxConnectionsPerHost: int32(maxConnectionsPerHost),
maxConnectionsPerHost: uint32(maxConnectionsPerHost),
keepAliveInterval: keepAliveInterval,
log: logger,
metrics: metrics,
Expand All @@ -84,9 +87,12 @@ func NewConnectionPool(
return p
}

func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
p.log.WithField("logicalAddr", logicalAddr).WithField("physicalAddr", physicalAddr).Debug("Getting pooled connection")
key := p.getMapKey(logicalAddr, physicalAddr)
func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL,
keySuffix int32) (Connection, error) {
p.log.WithField("logicalAddr", logicalAddr).
WithField("physicalAddr", physicalAddr).
WithField("keySuffix", keySuffix).Debug("Getting pooled connection")
key := fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", keySuffix)

p.Lock()
conn, ok := p.connections[key]
Expand Down Expand Up @@ -141,6 +147,10 @@ func (p *connectionPool) GetConnections() map[string]Connection {
return conns
}

func (p *connectionPool) GenerateRoundRobinIndex() int32 {
return int32(atomic.AddUint32(&p.roundRobinCnt, 1) % p.maxConnectionsPerHost)
}

func (p *connectionPool) Close() {
p.Lock()
close(p.closeCh)
Expand All @@ -151,15 +161,6 @@ func (p *connectionPool) Close() {
p.Unlock()
}

func (p *connectionPool) getMapKey(logicalAddr *url.URL, physicalAddr *url.URL) string {
cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
if cnt < 0 {
cnt = -cnt
}
idx := cnt % p.maxConnectionsPerHost
return fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", idx)
}

func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime time.Duration) {
if maxIdleTime < 0 {
return
Expand Down
12 changes: 12 additions & 0 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.
}, nil
}

func (c *mockedLookupRPCClient) RequestWithCnxKeySuffix(_ *url.URL, _ *url.URL,
_ int32, _ uint64, _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, error) {
assert.Fail(c.t, "Shouldn't be called")
return nil, nil
}

func (c *mockedLookupRPCClient) RequestOnCnx(_ Connection, _ uint64, _ pb.BaseCommand_Type,
_ proto.Message) (*RPCResult, error) {
assert.Fail(c.t, "Shouldn't be called")
Expand Down Expand Up @@ -492,6 +498,12 @@ func (m mockedPartitionedTopicMetadataRPCClient) Request(_ *url.URL, _ *url.URL,
return nil, nil
}

func (m *mockedPartitionedTopicMetadataRPCClient) RequestWithCnxKeySuffix(_ *url.URL, _ *url.URL,
_ int32, _ uint64, _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, error) {
assert.Fail(m.t, "Shouldn't be called")
return nil, nil
}

func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(_ Connection, _ pb.BaseCommand_Type,
_ proto.Message) error {
assert.Fail(m.t, "Shouldn't be called")
Expand Down
11 changes: 10 additions & 1 deletion pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type RPCClient interface {
RequestToHost(serviceNameResolver *ServiceNameResolver, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

RequestWithCnxKeySuffix(logicalAddr *url.URL, physicalAddr *url.URL, cnxKeySuffix int32, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

Expand Down Expand Up @@ -154,7 +157,13 @@ func (c *rpcClient) RequestToHost(serviceNameResolver *ServiceNameResolver, requ

func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
return c.RequestWithCnxKeySuffix(logicalAddr, physicalAddr, c.pool.GenerateRoundRobinIndex(),
requestID, cmdType, message)
}

func (c *rpcClient) RequestWithCnxKeySuffix(logicalAddr *url.URL, physicalAddr *url.URL, cnxKeySuffix int32,
requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr, cnxKeySuffix)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ type partitionProducer struct {
topic string
log log.Logger

conn uAtomic.Value
conn uAtomic.Value
cnxKeySuffix int32

options *ProducerOptions
producerName string
Expand Down Expand Up @@ -179,6 +180,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
client: client,
topic: topic,
log: logger,
cnxKeySuffix: client.cnxPool.GenerateRoundRobinIndex(),
options: options,
producerID: client.rpcClient.NewProducerID(),
dataChan: make(chan *sendRequest, maxPendingMessages),
Expand Down Expand Up @@ -301,7 +303,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
cmdProducer.Metadata = toKeyValues(p.options.Properties)
}

cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, lr.PhysicalAddr)
cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, lr.PhysicalAddr, p.cnxKeySuffix)
// registering the producer first in case broker sends commands in the middle
if err != nil {
p.log.Error("Failed to get connection")
Expand Down
31 changes: 31 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2574,3 +2574,34 @@ func TestProducerKeepReconnectingAndThenCallClose(t *testing.T) {
return true
}, 30*time.Second, 1*time.Second)
}

func TestSelectConnectionForSameProducer(t *testing.T) {
topicName := newTopicName()

client, err := NewClient(ClientOptions{
URL: serviceURL,
MaxConnectionsPerBroker: 10,
})
assert.NoError(t, err)
defer client.Close()

reconnectNum := uint(1)
_producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
MaxReconnectToBroker: &reconnectNum,
})
assert.NoError(t, err)
defer _producer.Close()

partitionProducerImp := _producer.(*producer).producers[0].(*partitionProducer)
conn := partitionProducerImp._getConn()

for i := 0; i < 5; i++ {
partitionProducerImp.grabCnx("")
currentConn := partitionProducerImp._getConn()
assert.Equal(t, conn.ID(), currentConn.ID(),
"The producer uses a different connection when reconnecting")
}

client.Close()
}
6 changes: 4 additions & 2 deletions pulsar/transaction_coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type transactionHandler struct {
tc *transactionCoordinatorClient
state uAtomic.Int32
conn uAtomic.Value
cnxKeySuffix int32
partition uint64
closeCh chan any
requestCh chan any
Expand All @@ -67,6 +68,7 @@ func (t *transactionHandler) getState() txnHandlerState {
func (tc *transactionCoordinatorClient) newTransactionHandler(partition uint64) (*transactionHandler, error) {
handler := &transactionHandler{
tc: tc,
cnxKeySuffix: tc.client.cnxPool.GenerateRoundRobinIndex(),
partition: partition,
closeCh: make(chan any),
requestCh: make(chan any),
Expand Down Expand Up @@ -95,8 +97,8 @@ func (t *transactionHandler) grabConn() error {
TcId: proto.Uint64(t.partition),
}

res, err := t.tc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST, &cmdTCConnect)
res, err := t.tc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr, t.cnxKeySuffix,
requestID, pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST, &cmdTCConnect)

if err != nil {
t.log.WithError(err).Error("Failed to connect transaction_impl coordinator " +
Expand Down
Loading