@@ -37,6 +37,19 @@ using v8::ValueSerializer;
3737using v8::WasmModuleObject;
3838
3939namespace node {
40+
41+ BaseObject::TransferMode BaseObject::GetTransferMode () const {
42+ return BaseObject::TransferMode::kUntransferable ;
43+ }
44+
45+ std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging () {
46+ return CloneForMessaging ();
47+ }
48+
49+ std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging () const {
50+ return {};
51+ }
52+
4053namespace worker {
4154
4255Message::Message (MallocedBuffer<char >&& buffer)
@@ -55,21 +68,20 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
5568 DeserializerDelegate (
5669 Message* m,
5770 Environment* env,
58- const std::vector<MessagePort*>& message_ports ,
71+ const std::vector<BaseObjectPtr<BaseObject>>& host_objects ,
5972 const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
6073 const std::vector<CompiledWasmModule>& wasm_modules)
61- : message_ports_(message_ports ),
74+ : host_objects_(host_objects ),
6275 shared_array_buffers_ (shared_array_buffers),
6376 wasm_modules_(wasm_modules) {}
6477
6578 MaybeLocal<Object> ReadHostObject (Isolate* isolate) override {
66- // Currently, only MessagePort hosts objects are supported, so identifying
67- // by the index in the message's MessagePort array is sufficient.
79+ // Identifying the index in the message's BaseObject array is sufficient.
6880 uint32_t id;
6981 if (!deserializer->ReadUint32 (&id))
7082 return MaybeLocal<Object>();
71- CHECK_LE (id, message_ports_ .size ());
72- return message_ports_ [id]->object (isolate);
83+ CHECK_LE (id, host_objects_ .size ());
84+ return host_objects_ [id]->object (isolate);
7385 }
7486
7587 MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId (
@@ -88,7 +100,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
88100 ValueDeserializer* deserializer = nullptr ;
89101
90102 private:
91- const std::vector<MessagePort*>& message_ports_ ;
103+ const std::vector<BaseObjectPtr<BaseObject>>& host_objects_ ;
92104 const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
93105 const std::vector<CompiledWasmModule>& wasm_modules_;
94106};
@@ -102,22 +114,25 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
102114 EscapableHandleScope handle_scope (env->isolate ());
103115 Context::Scope context_scope (context);
104116
105- // Create all necessary MessagePort handles.
106- std::vector<MessagePort*> ports (message_ports_.size ());
107- for (uint32_t i = 0 ; i < message_ports_.size (); ++i) {
108- ports[i] = MessagePort::New (env,
109- context,
110- std::move (message_ports_[i]));
111- if (ports[i] == nullptr ) {
112- for (MessagePort* port : ports) {
113- // This will eventually release the MessagePort object itself.
114- if (port != nullptr )
115- port->Close ();
117+ // Create all necessary objects for transferables, e.g. MessagePort handles.
118+ std::vector<BaseObjectPtr<BaseObject>> host_objects (transferables_.size ());
119+ for (uint32_t i = 0 ; i < transferables_.size (); ++i) {
120+ TransferData* data = transferables_[i].get ();
121+ host_objects[i] = data->Deserialize (
122+ env, context, std::move (transferables_[i]));
123+ if (!host_objects[i]) {
124+ for (BaseObjectPtr<BaseObject> object : host_objects) {
125+ if (!object) continue ;
126+
127+ // Since creating one of the objects failed, we don't want to have the
128+ // other objects lying around in memory. We act as if the object has
129+ // been garbage-collected.
130+ object->Detach ();
116131 }
117132 return MaybeLocal<Value>();
118133 }
119134 }
120- message_ports_ .clear ();
135+ transferables_ .clear ();
121136
122137 std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
123138 // Attach all transferred SharedArrayBuffers to their new Isolate.
@@ -130,7 +145,7 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
130145 shared_array_buffers_.clear ();
131146
132147 DeserializerDelegate delegate (
133- this , env, ports , shared_array_buffers, wasm_modules_);
148+ this , env, host_objects , shared_array_buffers, wasm_modules_);
134149 ValueDeserializer deserializer (
135150 env->isolate (),
136151 reinterpret_cast <const uint8_t *>(main_message_buf_.data ),
@@ -157,8 +172,8 @@ void Message::AddSharedArrayBuffer(
157172 shared_array_buffers_.emplace_back (std::move (backing_store));
158173}
159174
160- void Message::AddMessagePort (std::unique_ptr<MessagePortData >&& data) {
161- message_ports_ .emplace_back (std::move (data));
175+ void Message::AddTransferable (std::unique_ptr<TransferData >&& data) {
176+ transferables_ .emplace_back (std::move (data));
162177}
163178
164179uint32_t Message::AddWASMModule (CompiledWasmModule&& mod) {
@@ -224,8 +239,8 @@ class SerializerDelegate : public ValueSerializer::Delegate {
224239 }
225240
226241 Maybe<bool > WriteHostObject (Isolate* isolate, Local<Object> object) override {
227- if (env_->message_port_constructor_template ()->HasInstance (object)) {
228- return WriteMessagePort (Unwrap<MessagePort >(object));
242+ if (env_->base_object_ctor_template ()->HasInstance (object)) {
243+ return WriteHostObject (Unwrap<BaseObject >(object));
229244 }
230245
231246 ThrowDataCloneError (env_->clone_unsupported_type_str ());
@@ -257,32 +272,61 @@ class SerializerDelegate : public ValueSerializer::Delegate {
257272 void Finish () {
258273 // Only close the MessagePort handles and actually transfer them
259274 // once we know that serialization succeeded.
260- for (MessagePort* port : ports_) {
261- port->Close ();
262- msg_->AddMessagePort (port->Detach ());
275+ for (uint32_t i = 0 ; i < host_objects_.size (); i++) {
276+ BaseObject* host_object = host_objects_[i];
277+ std::unique_ptr<TransferData> data;
278+ if (i < first_cloned_object_index_)
279+ data = host_object->TransferForMessaging ();
280+ if (!data)
281+ data = host_object->CloneForMessaging ();
282+ CHECK (data);
283+ msg_->AddTransferable (std::move (data));
263284 }
264285 }
265286
287+ inline void AddHostObject (BaseObject* host_object) {
288+ // Make sure we have not started serializing the value itself yet.
289+ CHECK_EQ (first_cloned_object_index_, SIZE_MAX);
290+ host_objects_.push_back (host_object);
291+ }
292+
266293 ValueSerializer* serializer = nullptr ;
267294
268295 private:
269- Maybe<bool > WriteMessagePort (MessagePort* port ) {
270- for (uint32_t i = 0 ; i < ports_ .size (); i++) {
271- if (ports_ [i] == port ) {
296+ Maybe<bool > WriteHostObject (BaseObject* host_object ) {
297+ for (uint32_t i = 0 ; i < host_objects_ .size (); i++) {
298+ if (host_objects_ [i] == host_object ) {
272299 serializer->WriteUint32 (i);
273300 return Just (true );
274301 }
275302 }
276303
277- THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST (env_);
278- return Nothing<bool >();
304+ BaseObject::TransferMode mode = host_object->GetTransferMode ();
305+ if (mode == BaseObject::TransferMode::kUntransferable ) {
306+ ThrowDataCloneError (env_->clone_unsupported_type_str ());
307+ return Nothing<bool >();
308+ } else if (mode == BaseObject::TransferMode::kTransferable ) {
309+ // TODO(addaleax): This message code is too specific. Fix that in a
310+ // semver-major follow-up.
311+ THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST (env_);
312+ return Nothing<bool >();
313+ }
314+
315+ CHECK_EQ (mode, BaseObject::TransferMode::kCloneable );
316+ uint32_t index = host_objects_.size ();
317+ if (first_cloned_object_index_ == SIZE_MAX)
318+ first_cloned_object_index_ = index;
319+ serializer->WriteUint32 (index);
320+ host_objects_.push_back (host_object);
321+ return Just (true );
279322 }
280323
281324 Environment* env_;
282325 Local<Context> context_;
283326 Message* msg_;
284327 std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
285- std::vector<MessagePort*> ports_;
328+ std::vector<BaseObject*> host_objects_;
329+ size_t first_cloned_object_index_ = SIZE_MAX;
286330
287331 friend class worker ::Message;
288332};
@@ -344,8 +388,7 @@ Maybe<bool> Message::Serialize(Environment* env,
344388 array_buffers.push_back (ab);
345389 serializer.TransferArrayBuffer (id, ab);
346390 continue ;
347- } else if (env->message_port_constructor_template ()
348- ->HasInstance (entry)) {
391+ } else if (env->base_object_ctor_template ()->HasInstance (entry)) {
349392 // Check if the source MessagePort is being transferred.
350393 if (!source_port.IsEmpty () && entry == source_port) {
351394 ThrowDataCloneException (
@@ -354,26 +397,34 @@ Maybe<bool> Message::Serialize(Environment* env,
354397 " Transfer list contains source port" ));
355398 return Nothing<bool >();
356399 }
357- MessagePort* port = Unwrap<MessagePort>(entry.As <Object>());
358- if (port == nullptr || port->IsDetached ()) {
400+ BaseObject* host_object = Unwrap<BaseObject>(entry.As <Object>());
401+ if (env->message_port_constructor_template ()->HasInstance (entry) &&
402+ (host_object == nullptr ||
403+ static_cast <MessagePort*>(host_object)->IsDetached ())) {
359404 ThrowDataCloneException (
360405 context,
361406 FIXED_ONE_BYTE_STRING (
362407 env->isolate (),
363408 " MessagePort in transfer list is already detached" ));
364409 return Nothing<bool >();
365410 }
366- if (std::find (delegate.ports_ .begin (), delegate.ports_ .end (), port) !=
367- delegate.ports_ .end ()) {
411+ if (std::find (delegate.host_objects_ .begin (),
412+ delegate.host_objects_ .end (),
413+ host_object) != delegate.host_objects_ .end ()) {
368414 ThrowDataCloneException (
369415 context,
370- FIXED_ONE_BYTE_STRING (
371- env->isolate (),
372- " Transfer list contains duplicate MessagePort" ));
416+ String::Concat (env->isolate (),
417+ FIXED_ONE_BYTE_STRING (
418+ env->isolate (),
419+ " Transfer list contains duplicate " ),
420+ entry.As <Object>()->GetConstructorName ()));
373421 return Nothing<bool >();
374422 }
375- delegate.ports_ .push_back (port);
376- continue ;
423+ if (host_object != nullptr && host_object->GetTransferMode () !=
424+ BaseObject::TransferMode::kUntransferable ) {
425+ delegate.AddHostObject (host_object);
426+ continue ;
427+ }
377428 }
378429
379430 THROW_ERR_INVALID_TRANSFER_OBJECT (env);
@@ -406,7 +457,7 @@ Maybe<bool> Message::Serialize(Environment* env,
406457void Message::MemoryInfo (MemoryTracker* tracker) const {
407458 tracker->TrackField (" array_buffers_" , array_buffers_);
408459 tracker->TrackField (" shared_array_buffers" , shared_array_buffers_);
409- tracker->TrackField (" message_ports " , message_ports_ );
460+ tracker->TrackField (" transferables " , transferables_ );
410461}
411462
412463MessagePortData::MessagePortData (MessagePort* owner) : owner_(owner) { }
@@ -672,6 +723,25 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {
672723 return std::move (data_);
673724}
674725
726+ BaseObject::TransferMode MessagePort::GetTransferMode () const {
727+ if (IsDetached ())
728+ return BaseObject::TransferMode::kUntransferable ;
729+ return BaseObject::TransferMode::kTransferable ;
730+ }
731+
732+ std::unique_ptr<TransferData> MessagePort::TransferForMessaging () {
733+ Close ();
734+ return Detach ();
735+ }
736+
737+ BaseObjectPtr<BaseObject> MessagePortData::Deserialize (
738+ Environment* env,
739+ Local<Context> context,
740+ std::unique_ptr<TransferData> self) {
741+ return BaseObjectPtr<MessagePort> { MessagePort::New (
742+ env, context,
743+ static_unique_pointer_cast<MessagePortData>(std::move (self))) };
744+ }
675745
676746Maybe<bool > MessagePort::PostMessage (Environment* env,
677747 Local<Value> message_v,
@@ -699,8 +769,8 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
699769
700770 // Check if the target port is posted to itself.
701771 if (data_->sibling_ != nullptr ) {
702- for (const auto & port_data : msg.message_ports ()) {
703- if (data_->sibling_ == port_data .get ()) {
772+ for (const auto & transferable : msg.transferables ()) {
773+ if (data_->sibling_ == transferable .get ()) {
704774 doomed = true ;
705775 ProcessEmitWarning (env, " The target port was posted to itself, and "
706776 " the communication channel was lost" );
0 commit comments