Skip to content

Commit a4d1f0b

Browse files
committed
Fix race condition in AmqpConnection
1 parent 85930d6 commit a4d1f0b

File tree

1 file changed

+23
-0
lines changed

1 file changed

+23
-0
lines changed

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ type AmqpConnection struct {
121121
session *amqp.Session
122122
refMap *sync.Map
123123
entitiesTracker *entitiesTracker
124+
mutex sync.RWMutex
125+
closed bool
124126
}
125127

126128
func (a *AmqpConnection) Properties() map[string]any {
@@ -456,6 +458,13 @@ func (a *AmqpConnection) open(ctx context.Context, address string, connOptions *
456458
return nil
457459
}
458460
func (a *AmqpConnection) maybeReconnect() {
461+
a.mutex.RLock()
462+
isClosed := a.closed
463+
a.mutex.RUnlock()
464+
if isClosed {
465+
Info("Connection is closed, not reconnecting", "ID", a.Id())
466+
return
467+
}
459468

460469
if !a.amqpConnOptions.RecoveryConfiguration.ActiveRecovery {
461470
Info("Recovery is disabled, closing connection", "ID", a.Id())
@@ -467,6 +476,13 @@ func (a *AmqpConnection) maybeReconnect() {
467476
maxDelay := 1 * time.Minute
468477

469478
for attempt := 1; attempt <= a.amqpConnOptions.RecoveryConfiguration.MaxReconnectAttempts; attempt++ {
479+
a.mutex.RLock()
480+
isClosed := a.closed
481+
a.mutex.RUnlock()
482+
if isClosed {
483+
Info("Connection is closed, aborting reconnection attempts", "ID", a.Id())
484+
return
485+
}
470486

471487
///wait for before reconnecting
472488
// add some random milliseconds to the wait time to avoid thundering herd
@@ -548,6 +564,13 @@ Close closes the connection to the AMQP 1.0 server and the management interface.
548564
All the publishers and consumers are closed as well.
549565
*/
550566
func (a *AmqpConnection) Close(ctx context.Context) error {
567+
a.mutex.Lock()
568+
if a.closed {
569+
a.mutex.Unlock()
570+
return nil
571+
}
572+
a.closed = true
573+
defer a.mutex.Unlock()
551574
// the status closed (lifeCycle.SetState(&StateClosed{error: nil})) is not set here
552575
// it is set in the connection.Done() channel
553576
// the channel is called anyway

0 commit comments

Comments
 (0)