@@ -9,22 +9,31 @@ import (
99 "bytes"
1010 "io"
1111 "reflect"
12+ "sync"
1213 "testing"
1314 "time"
1415)
1516
1617type server struct {
1718 * testing.T
18- r reader // framer <- client
19- w writer // framer -> client
20- S io.ReadWriteCloser // Server IO
21- C io.ReadWriteCloser // Client IO
19+ destructor sync.Once
20+ r reader // framer <- client
21+ w writer // framer -> client
22+ S io.ReadWriteCloser // Server IO
23+ C io.ReadWriteCloser // Client IO
2224
2325 // captured client frames
2426 start connectionStartOk
2527 tune connectionTuneOk
2628}
2729
30+ func (srv * server ) close () {
31+ srv .destructor .Do (func () {
32+ srv .C .Close ()
33+ srv .S .Close ()
34+ })
35+ }
36+
2837func defaultConfig () Config {
2938 return Config {SASL : []Authentication {& PlainAuth {"guest" , "guest" }}, Vhost : "/" }
3039}
@@ -33,8 +42,8 @@ func newSession(t *testing.T) (io.ReadWriteCloser, *server) {
3342 rs , wc := io .Pipe ()
3443 rc , ws := io .Pipe ()
3544
36- rws := & logIO {t , "server" , pipe {rs , ws }}
37- rwc := & logIO {t , "client" , pipe {rc , wc }}
45+ rws := & logIO {t : t , prefix : "server" , proxy : & pipe {r : rs , w : ws }}
46+ rwc := & logIO {t : t , prefix : "client" , proxy : & pipe {r : rc , w : wc }}
3847
3948 server := server {
4049 T : t ,
@@ -175,13 +184,16 @@ func (t *server) channelOpen(id int) {
175184
176185func TestDefaultClientProperties (t * testing.T ) {
177186 rwc , srv := newSession (t )
187+ defer srv .close ()
178188
179189 go func () {
190+ defer srv .close ()
180191 srv .connectionOpen ()
181- rwc .Close ()
182192 }()
183193
184- if c , err := Open (rwc , defaultConfig ()); err != nil {
194+ c , err := Open (rwc , defaultConfig ())
195+ defer c .Close ()
196+ if err != nil {
185197 t .Fatalf ("could not create connection: %v (%s)" , c , err )
186198 }
187199
@@ -196,6 +208,7 @@ func TestDefaultClientProperties(t *testing.T) {
196208
197209func TestCustomClientProperties (t * testing.T ) {
198210 rwc , srv := newSession (t )
211+ defer srv .close ()
199212
200213 config := defaultConfig ()
201214 config .Properties = Table {
@@ -204,11 +217,13 @@ func TestCustomClientProperties(t *testing.T) {
204217 }
205218
206219 go func () {
220+ defer srv .close ()
207221 srv .connectionOpen ()
208- rwc .Close ()
209222 }()
210223
211- if c , err := Open (rwc , config ); err != nil {
224+ c , err := Open (rwc , config )
225+ defer c .Close ()
226+ if err != nil {
212227 t .Fatalf ("could not create connection: %v (%s)" , c , err )
213228 }
214229
@@ -223,27 +238,31 @@ func TestCustomClientProperties(t *testing.T) {
223238
224239func TestOpen (t * testing.T ) {
225240 rwc , srv := newSession (t )
241+ defer srv .close ()
226242 go func () {
243+ defer srv .close ()
227244 srv .connectionOpen ()
228- rwc .Close ()
229245 }()
230246
231- if c , err := Open (rwc , defaultConfig ()); err != nil {
247+ c , err := Open (rwc , defaultConfig ())
248+ defer c .Close ()
249+ if err != nil {
232250 t .Fatalf ("could not create connection: %v (%s)" , c , err )
233251 }
234252}
235253
236254func TestChannelOpen (t * testing.T ) {
237255 rwc , srv := newSession (t )
256+ defer srv .close ()
238257
239258 go func () {
259+ defer srv .close ()
240260 srv .connectionOpen ()
241261 srv .channelOpen (1 )
242-
243- rwc .Close ()
244262 }()
245263
246264 c , err := Open (rwc , defaultConfig ())
265+ defer c .Close ()
247266 if err != nil {
248267 t .Fatalf ("could not create connection: %v (%s)" , c , err )
249268 }
@@ -256,8 +275,10 @@ func TestChannelOpen(t *testing.T) {
256275
257276func TestOpenFailedSASLUnsupportedMechanisms (t * testing.T ) {
258277 rwc , srv := newSession (t )
278+ defer srv .close ()
259279
260280 go func () {
281+ defer srv .close ()
261282 srv .expectAMQP ()
262283 srv .send (0 , & connectionStart {
263284 VersionMajor : 0 ,
@@ -268,51 +289,56 @@ func TestOpenFailedSASLUnsupportedMechanisms(t *testing.T) {
268289 }()
269290
270291 c , err := Open (rwc , defaultConfig ())
292+ defer c .Close ()
271293 if err != ErrSASL {
272294 t .Fatalf ("expected ErrSASL got: %+v on %+v" , err , c )
273295 }
274296}
275297
276298func TestOpenFailedCredentials (t * testing.T ) {
277299 rwc , srv := newSession (t )
300+ defer srv .close ()
278301
279302 go func () {
303+ // kill/timeout the connection indicating bad auth
304+ defer srv .close ()
280305 srv .expectAMQP ()
281306 srv .connectionStart ()
282- // Now kill/timeout the connection indicating bad auth
283- rwc .Close ()
284307 }()
285308
286309 c , err := Open (rwc , defaultConfig ())
310+ defer c .Close ()
287311 if err != ErrCredentials {
288312 t .Fatalf ("expected ErrCredentials got: %+v on %+v" , err , c )
289313 }
290314}
291315
292316func TestOpenFailedVhost (t * testing.T ) {
293317 rwc , srv := newSession (t )
318+ defer srv .close ()
294319
295320 go func () {
321+ // kill/timeout the connection on bad Vhost
322+ defer srv .close ()
296323 srv .expectAMQP ()
297324 srv .connectionStart ()
298325 srv .connectionTune ()
299326 srv .recv (0 , & connectionOpen {})
300-
301- // Now kill/timeout the connection on bad Vhost
302- rwc .Close ()
303327 }()
304328
305329 c , err := Open (rwc , defaultConfig ())
330+ defer c .Close ()
306331 if err != ErrVhost {
307332 t .Fatalf ("expected ErrVhost got: %+v on %+v" , err , c )
308333 }
309334}
310335
311336func TestConfirmMultipleOrdersDeliveryTags (t * testing.T ) {
312337 rwc , srv := newSession (t )
313- defer rwc . Close ()
338+ defer srv . close ()
314339
315340 go func () {
341+ defer srv .close ()
316342 srv .connectionOpen ()
317343 srv .channelOpen (1 )
318344
@@ -343,6 +369,7 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {
343369 }()
344370
345371 c , err := Open (rwc , defaultConfig ())
372+ defer c .Close ()
346373 if err != nil {
347374 t .Fatalf ("could not create connection: %v (%s)" , c , err )
348375 }
@@ -387,8 +414,10 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {
387414
388415func TestNotifyClosesReusedPublisherConfirmChan (t * testing.T ) {
389416 rwc , srv := newSession (t )
417+ defer srv .close ()
390418
391419 go func () {
420+ defer srv .close ()
392421 srv .connectionOpen ()
393422 srv .channelOpen (1 )
394423
@@ -400,6 +429,7 @@ func TestNotifyClosesReusedPublisherConfirmChan(t *testing.T) {
400429 }()
401430
402431 c , err := Open (rwc , defaultConfig ())
432+ defer c .Close ()
403433 if err != nil {
404434 t .Fatalf ("could not create connection: %v (%s)" , c , err )
405435 }
@@ -423,8 +453,10 @@ func TestNotifyClosesReusedPublisherConfirmChan(t *testing.T) {
423453
424454func TestNotifyClosesAllChansAfterConnectionClose (t * testing.T ) {
425455 rwc , srv := newSession (t )
456+ defer srv .close ()
426457
427458 go func () {
459+ defer srv .close ()
428460 srv .connectionOpen ()
429461 srv .channelOpen (1 )
430462
@@ -433,6 +465,7 @@ func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
433465 }()
434466
435467 c , err := Open (rwc , defaultConfig ())
468+ defer c .Close ()
436469 if err != nil {
437470 t .Fatalf ("could not create connection: %v (%s)" , c , err )
438471 }
@@ -488,7 +521,7 @@ func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
488521// Should not panic when sending bodies split at different boundaries
489522func TestPublishBodySliceIssue74 (t * testing.T ) {
490523 rwc , srv := newSession (t )
491- defer rwc . Close ()
524+ defer srv . close ()
492525
493526 const frameSize = 100
494527 const publishings = frameSize * 3
@@ -497,6 +530,7 @@ func TestPublishBodySliceIssue74(t *testing.T) {
497530 base := make ([]byte , publishings )
498531
499532 go func () {
533+ defer srv .close ()
500534 srv .connectionOpen ()
501535 srv .channelOpen (1 )
502536
@@ -511,6 +545,7 @@ func TestPublishBodySliceIssue74(t *testing.T) {
511545 cfg .FrameSize = frameSize
512546
513547 c , err := Open (rwc , cfg )
548+ defer c .Close ()
514549 if err != nil {
515550 t .Fatalf ("could not create connection: %v (%s)" , c , err )
516551 }
@@ -530,13 +565,14 @@ func TestPublishBodySliceIssue74(t *testing.T) {
530565// Should not panic when server and client have frame_size of 0
531566func TestPublishZeroFrameSizeIssue161 (t * testing.T ) {
532567 rwc , srv := newSession (t )
533- defer rwc . Close ()
568+ defer srv . close ()
534569
535570 const frameSize = 0
536571 const publishings = 1
537572 done := make (chan bool )
538573
539574 go func () {
575+ defer srv .close ()
540576 srv .connectionOpen ()
541577 srv .channelOpen (1 )
542578
@@ -551,6 +587,7 @@ func TestPublishZeroFrameSizeIssue161(t *testing.T) {
551587 cfg .FrameSize = frameSize
552588
553589 c , err := Open (rwc , cfg )
590+ defer c .Close ()
554591
555592 // override the tuned framesize with a hard 0, as would happen when rabbit is configured with 0
556593 c .Config .FrameSize = frameSize
@@ -573,7 +610,7 @@ func TestPublishZeroFrameSizeIssue161(t *testing.T) {
573610
574611func TestPublishAndShutdownDeadlockIssue84 (t * testing.T ) {
575612 rwc , srv := newSession (t )
576- defer rwc . Close ()
613+ defer srv . close ()
577614
578615 go func () {
579616 srv .connectionOpen ()
@@ -584,6 +621,7 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
584621 }()
585622
586623 c , err := Open (rwc , defaultConfig ())
624+ defer c .Close ()
587625 if err != nil {
588626 t .Fatalf ("couldn't create connection: %v (%s)" , c , err )
589627 }
@@ -604,18 +642,20 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
604642
605643func TestChannelCloseRace (t * testing.T ) {
606644 rwc , srv := newSession (t )
645+ defer srv .close ()
607646
608647 done := make (chan bool )
609648
610649 go func () {
650+ defer srv .close ()
611651 srv .connectionOpen ()
612652 srv .channelOpen (1 )
613653
614- rwc .Close ()
615654 done <- true
616655 }()
617656
618657 c , err := Open (rwc , defaultConfig ())
658+ defer c .Close ()
619659 if err != nil {
620660 t .Fatalf ("could not create connection: %v (%s)" , c , err )
621661 }
0 commit comments