@@ -569,6 +569,40 @@ MessagePort* MessagePort::New(
569569 return port;
570570}
571571
572+ MaybeLocal<Value> MessagePort::ReceiveMessage (Local<Context> context,
573+ bool only_if_receiving) {
574+ Message received;
575+ {
576+ // Get the head of the message queue.
577+ Mutex::ScopedLock lock (data_->mutex_ );
578+
579+ Debug (this , " MessagePort has message" );
580+
581+ bool wants_message = receiving_messages_ || !only_if_receiving;
582+ // We have nothing to do if:
583+ // - There are no pending messages
584+ // - We are not intending to receive messages, and the message we would
585+ // receive is not the final "close" message.
586+ if (data_->incoming_messages_ .empty () ||
587+ (!wants_message &&
588+ !data_->incoming_messages_ .front ().IsCloseMessage ())) {
589+ return env ()->no_message_symbol ();
590+ }
591+
592+ received = std::move (data_->incoming_messages_ .front ());
593+ data_->incoming_messages_ .pop_front ();
594+ }
595+
596+ if (received.IsCloseMessage ()) {
597+ Close ();
598+ return env ()->no_message_symbol ();
599+ }
600+
601+ if (!env ()->can_call_into_js ()) return MaybeLocal<Value>();
602+
603+ return received.Deserialize (env (), context);
604+ }
605+
572606void MessagePort::OnMessage () {
573607 Debug (this , " Running MessagePort::OnMessage()" );
574608 HandleScope handle_scope (env ()->isolate ());
@@ -579,61 +613,33 @@ void MessagePort::OnMessage() {
579613 // messages, so we need to check that this handle still owns its `data_` field
580614 // on every iteration.
581615 while (data_) {
582- Message received;
583- {
584- // Get the head of the message queue.
585- Mutex::ScopedLock lock (data_->mutex_ );
586-
587- Debug (this , " MessagePort has message, receiving = %d" ,
588- static_cast <int >(receiving_messages_));
589-
590- // We have nothing to do if:
591- // - There are no pending messages
592- // - We are not intending to receive messages, and the message we would
593- // receive is not the final "close" message.
594- if (data_->incoming_messages_ .empty () ||
595- (!receiving_messages_ &&
596- !data_->incoming_messages_ .front ().IsCloseMessage ())) {
597- break ;
598- }
616+ HandleScope handle_scope (env ()->isolate ());
617+ Context::Scope context_scope (context);
599618
600- received = std::move (data_->incoming_messages_ .front ());
601- data_->incoming_messages_ .pop_front ();
602- }
603-
604- if (received.IsCloseMessage ()) {
605- Close ();
606- return ;
607- }
619+ Local<Value> payload;
620+ if (!ReceiveMessage (context, true ).ToLocal (&payload)) break ;
621+ if (payload == env ()->no_message_symbol ()) break ;
608622
609623 if (!env ()->can_call_into_js ()) {
610624 Debug (this , " MessagePort drains queue because !can_call_into_js()" );
611625 // In this case there is nothing to do but to drain the current queue.
612626 continue ;
613627 }
614628
615- {
616- // Call the JS .onmessage() callback.
617- HandleScope handle_scope (env ()->isolate ());
618- Context::Scope context_scope (context);
619-
620- Local<Object> event;
621- Local<Value> payload;
622- Local<Value> cb_args[1 ];
623- if (!received.Deserialize (env (), context).ToLocal (&payload) ||
624- !env ()->message_event_object_template ()->NewInstance (context)
625- .ToLocal (&event) ||
626- event->Set (context, env ()->data_string (), payload).IsNothing () ||
627- event->Set (context, env ()->target_string (), object ()).IsNothing () ||
628- (cb_args[0 ] = event, false ) ||
629- MakeCallback (env ()->onmessage_string (),
630- arraysize (cb_args),
631- cb_args).IsEmpty ()) {
632- // Re-schedule OnMessage() execution in case of failure.
633- if (data_)
634- TriggerAsync ();
635- return ;
636- }
629+ Local<Object> event;
630+ Local<Value> cb_args[1 ];
631+ if (!env ()->message_event_object_template ()->NewInstance (context)
632+ .ToLocal (&event) ||
633+ event->Set (context, env ()->data_string (), payload).IsNothing () ||
634+ event->Set (context, env ()->target_string (), object ()).IsNothing () ||
635+ (cb_args[0 ] = event, false ) ||
636+ MakeCallback (env ()->onmessage_string (),
637+ arraysize (cb_args),
638+ cb_args).IsEmpty ()) {
639+ // Re-schedule OnMessage() execution in case of failure.
640+ if (data_)
641+ TriggerAsync ();
642+ return ;
637643 }
638644 }
639645}
@@ -754,11 +760,26 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
754760
755761void MessagePort::Drain (const FunctionCallbackInfo<Value>& args) {
756762 MessagePort* port;
757- CHECK (args[0 ]->IsObject ());
758763 ASSIGN_OR_RETURN_UNWRAP (&port, args[0 ].As <Object>());
759764 port->OnMessage ();
760765}
761766
767+ void MessagePort::ReceiveMessage (const FunctionCallbackInfo<Value>& args) {
768+ CHECK (args[0 ]->IsObject ());
769+ MessagePort* port = Unwrap<MessagePort>(args[0 ].As <Object>());
770+ if (port == nullptr ) {
771+ // Return 'no messages' for a closed port.
772+ args.GetReturnValue ().Set (
773+ Environment::GetCurrent (args)->no_message_symbol ());
774+ return ;
775+ }
776+
777+ MaybeLocal<Value> payload =
778+ port->ReceiveMessage (port->object ()->CreationContext (), false );
779+ if (!payload.IsEmpty ())
780+ args.GetReturnValue ().Set (payload.ToLocalChecked ());
781+ }
782+
762783void MessagePort::MoveToContext (const FunctionCallbackInfo<Value>& args) {
763784 Environment* env = Environment::GetCurrent (args);
764785 if (!args[0 ]->IsObject () ||
@@ -875,6 +896,7 @@ static void InitMessaging(Local<Object> target,
875896 // the browser equivalents do not provide them.
876897 env->SetMethod (target, " stopMessagePort" , MessagePort::Stop);
877898 env->SetMethod (target, " drainMessagePort" , MessagePort::Drain);
899+ env->SetMethod (target, " receiveMessageOnPort" , MessagePort::ReceiveMessage);
878900 env->SetMethod (target, " moveMessagePortToContext" ,
879901 MessagePort::MoveToContext);
880902}
0 commit comments