77//! The thread pool is implemented entirely using
88//! the threading utilities in [`crate::thread`].
99
10- use std:: sync:: {
11- atomic:: { AtomicUsize , Ordering } ,
12- Arc ,
10+ use std:: {
11+ panic:: { self , UnwindSafe } ,
12+ sync:: {
13+ atomic:: { AtomicUsize , Ordering } ,
14+ Arc ,
15+ } ,
1316} ;
1417
1518use crossbeam_channel:: { Receiver , Sender } ;
@@ -25,13 +28,13 @@ pub struct Pool {
2528 // so that the channel is actually closed
2629 // before we join the worker threads!
2730 job_sender : Sender < Job > ,
28- _handles : Vec < JoinHandle > ,
31+ _handles : Box < [ JoinHandle ] > ,
2932 extant_tasks : Arc < AtomicUsize > ,
3033}
3134
3235struct Job {
3336 requested_intent : ThreadIntent ,
34- f : Box < dyn FnOnce ( ) + Send + ' static > ,
37+ f : Box < dyn FnOnce ( ) + Send + UnwindSafe + ' static > ,
3538}
3639
3740impl Pool {
@@ -47,6 +50,7 @@ impl Pool {
4750 let handle = Builder :: new ( INITIAL_INTENT )
4851 . stack_size ( STACK_SIZE )
4952 . name ( "Worker" . into ( ) )
53+ . allow_leak ( true )
5054 . spawn ( {
5155 let extant_tasks = Arc :: clone ( & extant_tasks) ;
5256 let job_receiver: Receiver < Job > = job_receiver. clone ( ) ;
@@ -58,7 +62,8 @@ impl Pool {
5862 current_intent = job. requested_intent ;
5963 }
6064 extant_tasks. fetch_add ( 1 , Ordering :: SeqCst ) ;
61- ( job. f ) ( ) ;
65+ // discard the panic, we should've logged the backtrace already
66+ _ = panic:: catch_unwind ( job. f ) ;
6267 extant_tasks. fetch_sub ( 1 , Ordering :: SeqCst ) ;
6368 }
6469 }
@@ -68,12 +73,12 @@ impl Pool {
6873 handles. push ( handle) ;
6974 }
7075
71- Pool { _handles : handles, extant_tasks, job_sender }
76+ Pool { _handles : handles. into_boxed_slice ( ) , extant_tasks, job_sender }
7277 }
7378
7479 pub fn spawn < F > ( & self , intent : ThreadIntent , f : F )
7580 where
76- F : FnOnce ( ) + Send + ' static ,
81+ F : FnOnce ( ) + Send + UnwindSafe + ' static ,
7782 {
7883 let f = Box :: new ( move || {
7984 if cfg ! ( debug_assertions) {
0 commit comments