Skip to content

Commit c6a5ab1

Browse files
committed
fix data race in connection shutdown and channel close
1 parent 9831e04 commit c6a5ab1

File tree

4 files changed

+44
-11
lines changed

4 files changed

+44
-11
lines changed

channel.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,11 @@ func (me *Channel) open() error {
141141
// Performs a request/response call for when the message is not NoWait and is
142142
// specified as Synchronous.
143143
func (me *Channel) call(req message, res ...message) error {
144-
if err := me.send(me, req); err != nil {
144+
me.m.Lock()
145+
send := me.send
146+
me.m.Unlock()
147+
148+
if err := send(me, req); err != nil {
145149
return err
146150
}
147151

@@ -273,22 +277,26 @@ func (me *Channel) dispatch(msg message) {
273277
}
274278

275279
case *basicAck:
280+
me.m.Lock()
276281
if me.confirming {
277282
if m.Multiple {
278283
me.confirms.Multiple(Confirmation{m.DeliveryTag, true})
279284
} else {
280285
me.confirms.One(Confirmation{m.DeliveryTag, true})
281286
}
282287
}
288+
me.m.Unlock()
283289

284290
case *basicNack:
291+
me.m.Lock()
285292
if me.confirming {
286293
if m.Multiple {
287294
me.confirms.Multiple(Confirmation{m.DeliveryTag, false})
288295
} else {
289296
me.confirms.One(Confirmation{m.DeliveryTag, false})
290297
}
291298
}
299+
me.m.Unlock()
292300

293301
case *basicDeliver:
294302
me.consumers.send(m.ConsumerTag, newDelivery(me, m))
@@ -1476,17 +1484,16 @@ exception could occur if the server does not support this method.
14761484
14771485
*/
14781486
func (me *Channel) Confirm(noWait bool) error {
1479-
me.m.Lock()
1480-
defer me.m.Unlock()
1481-
14821487
if err := me.call(
14831488
&confirmSelect{Nowait: noWait},
14841489
&confirmSelectOk{},
14851490
); err != nil {
14861491
return err
14871492
}
14881493

1494+
me.m.Lock()
14891495
me.confirming = true
1496+
me.m.Unlock()
14901497

14911498
return nil
14921499
}

client_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,3 +601,30 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
601601
}
602602
}
603603
}
604+
605+
func TestChannelCloseRace(t *testing.T) {
606+
rwc, srv := newSession(t)
607+
608+
done := make(chan bool)
609+
610+
go func() {
611+
srv.connectionOpen()
612+
srv.channelOpen(1)
613+
614+
rwc.Close()
615+
done <- true
616+
}()
617+
618+
c, err := Open(rwc, defaultConfig())
619+
if err != nil {
620+
t.Fatalf("could not create connection: %v (%s)", c, err)
621+
}
622+
623+
ch, err := c.Channel()
624+
if err != nil {
625+
t.Fatalf("could not open channel: %v (%s)", ch, err)
626+
}
627+
<-done
628+
ch.Close()
629+
c.Close()
630+
}

connection.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -357,14 +357,16 @@ func (me *Connection) shutdown(err *Error) {
357357
closes := make([]chan *Error, len(me.closes))
358358
copy(closes, me.closes)
359359
me.m.Unlock()
360+
360361
if err != nil {
361362
for _, c := range closes {
362363
c <- err
363364
}
364365
}
365366

366367
for _, ch := range me.channels {
367-
me.closeChannel(ch, err)
368+
ch.shutdown(err)
369+
me.releaseChannel(ch.id)
368370
}
369371

370372
if err != nil {
@@ -381,9 +383,7 @@ func (me *Connection) shutdown(err *Error) {
381383
close(c)
382384
}
383385

384-
me.m.Lock()
385386
me.noNotify = true
386-
me.m.Unlock()
387387
})
388388
}
389389

@@ -570,9 +570,6 @@ func (me *Connection) allocateChannel() (*Channel, error) {
570570
// releaseChannel removes a channel from the registry as the final part of the
571571
// channel lifecycle
572572
func (me *Connection) releaseChannel(id uint16) {
573-
me.m.Lock()
574-
defer me.m.Unlock()
575-
576573
delete(me.channels, id)
577574
me.allocator.release(int(id))
578575
}
@@ -595,7 +592,9 @@ func (me *Connection) openChannel() (*Channel, error) {
595592
// this connection.
596593
func (me *Connection) closeChannel(ch *Channel, e *Error) {
597594
ch.shutdown(e)
595+
me.m.Lock()
598596
me.releaseChannel(ch.id)
597+
me.m.Unlock()
599598
}
600599

601600
/*

shared_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (me *logIO) Close() (err error) {
6565
if err != nil {
6666
me.t.Logf("%s close : %v\n", me.prefix, err)
6767
} else {
68-
me.t.Logf("%s close\n", me.prefix, err)
68+
me.t.Logf("%s close\n", me.prefix)
6969
}
7070
return
7171
}

0 commit comments

Comments
 (0)