@@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) {
5858
5959} // anonymous namespace
6060
61+ void AsyncRequest::Install (Environment* env, void * data, uv_async_cb target) {
62+ Mutex::ScopedLock lock (mutex_);
63+ env_ = env;
64+ async_ = new uv_async_t ;
65+ if (data != nullptr ) async_->data = data;
66+ CHECK_EQ (uv_async_init (env_->event_loop (), async_, target), 0 );
67+ }
68+
69+ void AsyncRequest::Uninstall () {
70+ Mutex::ScopedLock lock (mutex_);
71+ if (async_ != nullptr )
72+ env_->CloseHandle (async_, [](uv_async_t * async) { delete async; });
73+ }
74+
75+ void AsyncRequest::Stop () {
76+ Mutex::ScopedLock lock (mutex_);
77+ stop_ = true ;
78+ if (async_ != nullptr ) uv_async_send (async_);
79+ }
80+
81+ void AsyncRequest::SetStopped (bool flag) {
82+ Mutex::ScopedLock lock (mutex_);
83+ stop_ = flag;
84+ }
85+
86+ bool AsyncRequest::IsStopped () const {
87+ Mutex::ScopedLock lock (mutex_);
88+ return stop_;
89+ }
90+
91+ uv_async_t * AsyncRequest::GetHandle () {
92+ Mutex::ScopedLock lock (mutex_);
93+ return async_;
94+ }
95+
96+ void AsyncRequest::MemoryInfo (MemoryTracker* tracker) const {
97+ Mutex::ScopedLock lock (mutex_);
98+ if (async_ != nullptr ) tracker->TrackField (" async_request" , *async_);
99+ }
100+
61101Worker::Worker (Environment* env,
62102 Local<Object> wrap,
63103 const std::string& url,
@@ -98,8 +138,7 @@ Worker::Worker(Environment* env,
98138}
99139
100140bool Worker::is_stopped () const {
101- Mutex::ScopedLock stopped_lock (stopped_mutex_);
102- return stopped_;
141+ return thread_stopper_.IsStopped ();
103142}
104143
105144// This class contains data that is only relevant to the child thread itself,
@@ -207,6 +246,8 @@ void Worker::Run() {
207246 Context::Scope context_scope (env_->context ());
208247 if (child_port != nullptr )
209248 child_port->Close ();
249+ thread_stopper_.Uninstall ();
250+ thread_stopper_.SetStopped (true );
210251 env_->stop_sub_worker_contexts ();
211252 env_->RunCleanup ();
212253 RunAtExit (env_.get ());
@@ -215,23 +256,19 @@ void Worker::Run() {
215256 WaitForWorkerInspectorToStop (env_.get ());
216257#endif
217258
218- {
219- Mutex::ScopedLock stopped_lock (stopped_mutex_);
220- stopped_ = true ;
221- }
222-
223259 // This call needs to be made while the `Environment` is still alive
224260 // because we assume that it is available for async tracking in the
225261 // NodePlatform implementation.
226262 platform_->DrainTasks (isolate_);
227263 }
228264 });
229265
266+ if (thread_stopper_.IsStopped ()) return ;
230267 {
231268 HandleScope handle_scope (isolate_);
232269 Local<Context> context = NewContext (isolate_);
233- if (is_stopped ()) return ;
234270
271+ if (thread_stopper_.IsStopped ()) return ;
235272 CHECK (!context.IsEmpty ());
236273 Context::Scope context_scope (context);
237274 {
@@ -253,6 +290,14 @@ void Worker::Run() {
253290 Debug (this , " Created Environment for worker with id %llu" , thread_id_);
254291
255292 if (is_stopped ()) return ;
293+ thread_stopper_.Install (env_.get (), env_.get (), [](uv_async_t * handle) {
294+ Environment* env_ = static_cast <Environment*>(handle->data );
295+ uv_stop (env_->event_loop ());
296+ });
297+ uv_unref (reinterpret_cast <uv_handle_t *>(thread_stopper_.GetHandle ()));
298+
299+ Debug (this , " Created Environment for worker with id %llu" , thread_id_);
300+ if (thread_stopper_.IsStopped ()) return ;
256301 {
257302 HandleScope handle_scope (isolate_);
258303 Mutex::ScopedLock lock (mutex_);
@@ -268,7 +313,7 @@ void Worker::Run() {
268313 Debug (this , " Created message port for worker %llu" , thread_id_);
269314 }
270315
271- if (is_stopped ()) return ;
316+ if (thread_stopper_. IsStopped ()) return ;
272317 {
273318#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
274319 StartWorkerInspector (env_.get (),
@@ -289,22 +334,21 @@ void Worker::Run() {
289334 Debug (this , " Loaded environment for worker %llu" , thread_id_);
290335 }
291336
292- if (is_stopped ()) return ;
337+ if (thread_stopper_. IsStopped ()) return ;
293338 {
294339 SealHandleScope seal (isolate_);
295340 bool more;
296341 env_->performance_state ()->Mark (
297342 node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
298343 do {
299- if (is_stopped ()) break ;
344+ if (thread_stopper_. IsStopped ()) break ;
300345 uv_run (&data.loop_ , UV_RUN_DEFAULT);
301- if (is_stopped ()) break ;
346+ if (thread_stopper_. IsStopped ()) break ;
302347
303348 platform_->DrainTasks (isolate_);
304349
305350 more = uv_loop_alive (&data.loop_ );
306- if (more && !is_stopped ())
307- continue ;
351+ if (more && !thread_stopper_.IsStopped ()) continue ;
308352
309353 EmitBeforeExit (env_.get ());
310354
@@ -319,7 +363,7 @@ void Worker::Run() {
319363
320364 {
321365 int exit_code;
322- bool stopped = is_stopped ();
366+ bool stopped = thread_stopper_. IsStopped ();
323367 if (!stopped)
324368 exit_code = EmitExit (env_.get ());
325369 Mutex::ScopedLock lock (mutex_);
@@ -341,34 +385,11 @@ void Worker::JoinThread() {
341385 thread_joined_ = true ;
342386
343387 env ()->remove_sub_worker_context (this );
344-
345- if (thread_exit_async_) {
346- env ()->CloseHandle (thread_exit_async_.release (), [](uv_async_t * async) {
347- delete async;
348- });
349-
350- if (scheduled_on_thread_stopped_)
351- OnThreadStopped ();
352- }
388+ OnThreadStopped ();
389+ on_thread_finished_.Uninstall ();
353390}
354391
355392void Worker::OnThreadStopped () {
356- {
357- Mutex::ScopedLock lock (mutex_);
358- scheduled_on_thread_stopped_ = false ;
359-
360- Debug (this , " Worker %llu thread stopped" , thread_id_);
361-
362- {
363- Mutex::ScopedLock stopped_lock (stopped_mutex_);
364- CHECK (stopped_);
365- }
366-
367- parent_port_ = nullptr ;
368- }
369-
370- JoinThread ();
371-
372393 {
373394 HandleScope handle_scope (env ()->isolate ());
374395 Context::Scope context_scope (env ()->context ());
@@ -391,7 +412,7 @@ Worker::~Worker() {
391412 Mutex::ScopedLock lock (mutex_);
392413 JoinThread ();
393414
394- CHECK (stopped_ );
415+ CHECK (thread_stopper_. IsStopped () );
395416 CHECK (thread_joined_);
396417
397418 // This has most likely already happened within the worker thread -- this
@@ -480,16 +501,15 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
480501 Mutex::ScopedLock lock (w->mutex_ );
481502
482503 w->env ()->add_sub_worker_context (w);
483- w->stopped_ = false ;
484504 w->thread_joined_ = false ;
505+ w->thread_stopper_ .SetStopped (false );
485506
486- w->thread_exit_async_ .reset (new uv_async_t );
487- w->thread_exit_async_ ->data = w;
488- CHECK_EQ (uv_async_init (w->env ()->event_loop (),
489- w->thread_exit_async_ .get (),
490- [](uv_async_t * handle) {
491- static_cast <Worker*>(handle->data )->OnThreadStopped ();
492- }), 0 );
507+ w->on_thread_finished_ .Install (w->env (), w, [](uv_async_t * handle) {
508+ Worker* w_ = static_cast <Worker*>(handle->data );
509+ CHECK (w_->thread_stopper_ .IsStopped ());
510+ w_->parent_port_ = nullptr ;
511+ w_->JoinThread ();
512+ });
493513
494514 uv_thread_options_t thread_options;
495515 thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
@@ -505,9 +525,7 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
505525 w->Run ();
506526
507527 Mutex::ScopedLock lock (w->mutex_ );
508- CHECK (w->thread_exit_async_ );
509- w->scheduled_on_thread_stopped_ = true ;
510- uv_async_send (w->thread_exit_async_ .get ());
528+ w->on_thread_finished_ .Stop ();
511529 }, static_cast <void *>(w)), 0 );
512530}
513531
@@ -523,28 +541,23 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
523541void Worker::Ref (const FunctionCallbackInfo<Value>& args) {
524542 Worker* w;
525543 ASSIGN_OR_RETURN_UNWRAP (&w, args.This ());
526- if (w->thread_exit_async_ )
527- uv_ref (reinterpret_cast <uv_handle_t *>(w->thread_exit_async_ .get ()));
544+ uv_ref (reinterpret_cast <uv_handle_t *>(w->on_thread_finished_ .GetHandle ()));
528545}
529546
530547void Worker::Unref (const FunctionCallbackInfo<Value>& args) {
531548 Worker* w;
532549 ASSIGN_OR_RETURN_UNWRAP (&w, args.This ());
533- if (w->thread_exit_async_ )
534- uv_unref (reinterpret_cast <uv_handle_t *>(w->thread_exit_async_ .get ()));
550+ uv_unref (reinterpret_cast <uv_handle_t *>(w->on_thread_finished_ .GetHandle ()));
535551}
536552
537553void Worker::Exit (int code) {
538554 Mutex::ScopedLock lock (mutex_);
539- Mutex::ScopedLock stopped_lock (stopped_mutex_);
540555
541556 Debug (this , " Worker %llu called Exit(%d)" , thread_id_, code);
542-
543- if (!stopped_) {
544- stopped_ = true ;
557+ if (!thread_stopper_.IsStopped ()) {
545558 exit_code_ = code;
546- if (child_port_ != nullptr )
547- child_port_-> StopEventLoop ();
559+ Debug ( this , " Received StopEventLoop request " );
560+ thread_stopper_. Stop ();
548561 if (isolate_ != nullptr )
549562 isolate_->TerminateExecution ();
550563 }
0 commit comments