Skip to content

Commit 191685f

Browse files
michaeljmarshallRobertIndie
authored andcommitted
[fix] Close consumer resources if creation fails (#1070)
### Motivation When a consumer fails to get created, we should close any resources that it created to prevent leaks of internal resources and leaks of the consumer on the broker side. The broker leak could happen if the connection was left open. These fixes are similar to #1061. ### Modifications * Close `ackGroupingTracker` and `chunkedMsgCtxMap` if `grabConn` fails. We cannot call `Close` on the consumer because the state is not `Ready`. If we re-design the consumer, it could be nice to be able to call `Close` in this scenario. * Call `Close` on the consumer in cases where we move it to `Ready` but determine it is not able to be created. * Fix typo in comment (cherry picked from commit a3fcc9a)
1 parent 1724dc9 commit 191685f

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

pulsar/consumer_partition.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
372372
if err != nil {
373373
pc.log.WithError(err).Error("Failed to create consumer")
374374
pc.nackTracker.Close()
375+
pc.ackGroupingTracker.close()
376+
pc.chunkedMsgCtxMap.Close()
375377
return nil, err
376378
}
377379
pc.log.Info("Created consumer")
@@ -381,7 +383,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
381383
if pc.options.startMessageIDInclusive && startingMessageID != nil && startingMessageID.equal(latestMessageID) {
382384
msgID, err := pc.requestGetLastMessageID()
383385
if err != nil {
384-
pc.nackTracker.Close()
386+
pc.Close()
385387
return nil, err
386388
}
387389
if msgID.entryID != noMessageEntry {
@@ -390,7 +392,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
390392
// use the WithoutClear version because the dispatcher is not started yet
391393
err = pc.requestSeekWithoutClear(msgID.messageID)
392394
if err != nil {
393-
pc.nackTracker.Close()
395+
pc.Close()
394396
return nil, err
395397
}
396398
}

pulsar/producer_partition.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1405,7 +1405,7 @@ func (p *partitionProducer) setProducerState(state producerState) {
14051405
p.state.Swap(int32(state))
14061406
}
14071407

1408-
// set a new consumerState and return the last state
1408+
// set a new producerState and return the last state
14091409
// returns bool if the new state has been set or not
14101410
func (p *partitionProducer) casProducerState(oldState, newState producerState) bool {
14111411
return p.state.CAS(int32(oldState), int32(newState))

0 commit comments

Comments
 (0)