@@ -113,27 +113,29 @@ type buffer<T: send> = {
113113
114114struct packet_header {
115115 let mut state : state ;
116- let mut blocked_task: option < * rust_task > ;
116+ let mut blocked_task: * rust_task ;
117117
118118 // This is a reinterpret_cast of a ~buffer, that can also be cast
119119 // to a buffer_header if need be.
120120 let mut buffer: * libc:: c_void ;
121121
122122 new ( ) {
123123 self . state = empty;
124- self . blocked_task = none ;
124+ self . blocked_task = ptr :: null ( ) ;
125125 self . buffer = ptr:: null ( ) ;
126126 }
127127
128128 // Returns the old state.
129129 unsafe fn mark_blocked ( this : * rust_task ) -> state {
130- self . blocked_task = some ( this) ;
130+ rustrt:: rust_task_ref ( this) ;
131+ let old_task = swap_task ( self . blocked_task , this) ;
132+ assert old_task. is_null ( ) ;
131133 swap_state_acq ( self . state , blocked)
132134 }
133135
134136 unsafe fn unblock ( ) {
135- assert self . state != blocked || self . blocked_task != none ;
136- self . blocked_task = none ;
137+ let old_task = swap_task ( self . blocked_task , ptr :: null ( ) ) ;
138+ if !old_task . is_null ( ) { rustrt :: rust_task_deref ( old_task ) }
137139 alt swap_state_acq ( self . state , empty) {
138140 empty | blocked => ( ) ,
139141 terminated => self . state = terminated,
@@ -240,12 +242,26 @@ fn atomic_sub_rel(&dst: int, src: int) -> int {
240242 rusti:: atomic_sub_rel ( dst, src)
241243}
242244
245+ #[ doc( hidden) ]
246+ fn swap_task ( & dst: * rust_task , src : * rust_task ) -> * rust_task {
247+ // It might be worth making both acquire and release versions of
248+ // this.
249+ unsafe {
250+ reinterpret_cast ( rusti:: atomic_xchng (
251+ * ( ptr:: mut_addr_of ( dst) as * mut int ) ,
252+ src as int ) )
253+ }
254+ }
255+
243256#[ doc( hidden) ]
244257type rust_task = libc:: c_void ;
245258
246259extern mod rustrt {
247260 #[ rust_stack]
248261 fn rust_get_task ( ) -> * rust_task ;
262+ #[ rust_stack]
263+ fn rust_task_ref ( task : * rust_task ) ;
264+ fn rust_task_deref ( task : * rust_task ) ;
249265
250266 #[ rust_stack]
251267 fn task_clear_event_reject ( task : * rust_task ) ;
@@ -334,10 +350,11 @@ fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
334350 full => fail ~"duplicate send",
335351 blocked => {
336352 debug ! { "waking up task for %?" , p_} ;
337- alt p. header . blocked_task {
338- some ( task) => rustrt:: task_signal_event (
339- task, ptr:: addr_of ( p. header ) as * libc:: c_void ) ,
340- none => debug ! { "just kidding!" }
353+ let old_task = swap_task ( p. header . blocked_task , ptr:: null ( ) ) ;
354+ if !old_task. is_null ( ) {
355+ rustrt:: task_signal_event (
356+ old_task, ptr:: addr_of ( p. header ) as * libc:: c_void ) ;
357+ rustrt:: rust_task_deref ( old_task) ;
341358 }
342359
343360 // The receiver will eventually clean this up.
@@ -372,7 +389,9 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
372389 let p = unsafe { & * p_ } ;
373390 let this = rustrt:: rust_get_task ( ) ;
374391 rustrt:: task_clear_event_reject ( this) ;
375- p. header . blocked_task = some ( this) ;
392+ rustrt:: rust_task_ref ( this) ;
393+ let old_task = swap_task ( p. header . blocked_task , this) ;
394+ assert old_task. is_null ( ) ;
376395 let mut first = true ;
377396 let mut count = SPIN_COUNT ;
378397 loop {
@@ -402,14 +421,22 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
402421 full => {
403422 let mut payload = none;
404423 payload <-> p. payload ;
405- p. header . blocked_task = none;
424+ let old_task = swap_task ( p. header . blocked_task , ptr:: null ( ) ) ;
425+ if !old_task. is_null ( ) {
426+ rustrt:: rust_task_deref ( old_task) ;
427+ }
406428 p. header . state = empty;
407429 return some ( option:: unwrap ( payload) )
408430 }
409431 terminated => {
410432 // This assert detects when we've accidentally unsafely
411433 // casted too big of a number to a state.
412434 assert old_state == terminated;
435+
436+ let old_task = swap_task ( p. header . blocked_task , ptr:: null ( ) ) ;
437+ if !old_task. is_null ( ) {
438+ rustrt:: rust_task_deref ( old_task) ;
439+ }
413440 return none;
414441 }
415442 }
@@ -437,17 +464,18 @@ fn sender_terminate<T: send>(p: *packet<T>) {
437464 let p = unsafe { & * p } ;
438465 alt swap_state_rel ( p. header . state , terminated) {
439466 empty => {
467+ assert p. header . blocked_task . is_null ( ) ;
440468 // The receiver will eventually clean up.
441469 //unsafe { forget(p) }
442470 }
443471 blocked => {
444472 // wake up the target
445- alt p. header . blocked_task {
446- some ( target ) =>
473+ let old_task = swap_task ( p. header . blocked_task , ptr :: null ( ) ) ;
474+ if !old_task . is_null ( ) {
447475 rustrt:: task_signal_event (
448- target ,
449- ptr:: addr_of ( p. header ) as * libc:: c_void ) ,
450- none => { debug ! { "receiver is already shutting down" } }
476+ old_task ,
477+ ptr:: addr_of ( p. header ) as * libc:: c_void ) ;
478+ rustrt :: rust_task_deref ( old_task ) ;
451479 }
452480 // The receiver will eventually clean up.
453481 //unsafe { forget(p) }
@@ -457,6 +485,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
457485 fail ~"you dun goofed"
458486 }
459487 terminated => {
488+ assert p. header . blocked_task . is_null ( ) ;
460489 // I have to clean up, use drop_glue
461490 }
462491 }
@@ -465,7 +494,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
465494#[ doc( hidden) ]
466495fn receiver_terminate < T : send > ( p : * packet < T > ) {
467496 let p = unsafe { & * p } ;
468- assert p. header . blocked_task == none ;
497+ assert p. header . blocked_task . is_null ( ) ;
469498 alt swap_state_rel ( p. header . state , terminated) {
470499 empty => {
471500 // the sender will clean up
0 commit comments