Skip to content

Commit bc44d43

Browse files
committed
fix data race in connection shutdown
1 parent 2e25825 commit bc44d43

File tree

4 files changed

+40
-10
lines changed

4 files changed

+40
-10
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ services:
1111
env:
1212
- AMQP_URL=amqp://guest:[email protected]:5672/ GOMAXPROCS=2
1313

14-
script: go test -v -tags integration ./...
14+
script: go test -v -race -tags integration ./...

channel.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,12 @@ func (me *Channel) open() error {
141141
// Performs a request/response call for when the message is not NoWait and is
142142
// specified as Synchronous.
143143
func (me *Channel) call(req message, res ...message) error {
144+
me.m.Lock()
144145
if err := me.send(me, req); err != nil {
146+
me.m.Unlock()
145147
return err
146148
}
149+
me.m.Unlock()
147150

148151
if req.wait() {
149152
select {
@@ -1476,17 +1479,16 @@ exception could occur if the server does not support this method.
14761479
14771480
*/
14781481
func (me *Channel) Confirm(noWait bool) error {
1479-
me.m.Lock()
1480-
defer me.m.Unlock()
1481-
14821482
if err := me.call(
14831483
&confirmSelect{Nowait: noWait},
14841484
&confirmSelectOk{},
14851485
); err != nil {
14861486
return err
14871487
}
14881488

1489+
me.m.Lock()
14891490
me.confirming = true
1491+
me.m.Unlock()
14901492

14911493
return nil
14921494
}

client_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,3 +601,30 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
601601
}
602602
}
603603
}
604+
605+
func TestChannelCloseRace(t *testing.T) {
606+
rwc, srv := newSession(t)
607+
608+
done := make(chan bool)
609+
610+
go func() {
611+
srv.connectionOpen()
612+
srv.channelOpen(1)
613+
614+
rwc.Close()
615+
done <- true
616+
}()
617+
618+
c, err := Open(rwc, defaultConfig())
619+
if err != nil {
620+
t.Fatalf("could not create connection: %v (%s)", c, err)
621+
}
622+
623+
ch, err := c.Channel()
624+
if err != nil {
625+
t.Fatalf("could not open channel: %v (%s)", ch, err)
626+
}
627+
<-done
628+
ch.Close()
629+
c.Close()
630+
}

connection.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -340,14 +340,18 @@ func (me *Connection) send(f frame) error {
340340

341341
func (me *Connection) shutdown(err *Error) {
342342
me.destructor.Do(func() {
343+
me.m.Lock()
344+
defer me.m.Unlock()
345+
343346
if err != nil {
344347
for _, c := range me.closes {
345348
c <- err
346349
}
347350
}
348351

349352
for _, ch := range me.channels {
350-
me.closeChannel(ch, err)
353+
ch.shutdown(err)
354+
me.releaseChannel(ch.id)
351355
}
352356

353357
if err != nil {
@@ -364,9 +368,7 @@ func (me *Connection) shutdown(err *Error) {
364368
close(c)
365369
}
366370

367-
me.m.Lock()
368371
me.noNotify = true
369-
me.m.Unlock()
370372
})
371373
}
372374

@@ -553,9 +555,6 @@ func (me *Connection) allocateChannel() (*Channel, error) {
553555
// releaseChannel removes a channel from the registry as the final part of the
554556
// channel lifecycle
555557
func (me *Connection) releaseChannel(id uint16) {
556-
me.m.Lock()
557-
defer me.m.Unlock()
558-
559558
delete(me.channels, id)
560559
me.allocator.release(int(id))
561560
}
@@ -578,6 +577,8 @@ func (me *Connection) openChannel() (*Channel, error) {
578577
// this connection.
579578
func (me *Connection) closeChannel(ch *Channel, e *Error) {
580579
ch.shutdown(e)
580+
me.m.Lock()
581+
defer me.m.Unlock()
581582
me.releaseChannel(ch.id)
582583
}
583584

0 commit comments

Comments
 (0)