Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::SharedArrayBuffer;
using v8::SharedValueConveyor;
using v8::String;
using v8::Symbol;
using v8::Value;
Expand Down Expand Up @@ -92,10 +93,12 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
Environment* env,
const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
const std::vector<CompiledWasmModule>& wasm_modules)
const std::vector<CompiledWasmModule>& wasm_modules,
const std::optional<SharedValueConveyor>& shared_value_conveyor)
: host_objects_(host_objects),
shared_array_buffers_(shared_array_buffers),
wasm_modules_(wasm_modules) {}
wasm_modules_(wasm_modules),
shared_value_conveyor_(shared_value_conveyor) {}

MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
// Identifying the index in the message's BaseObject array is sufficient.
Expand Down Expand Up @@ -128,12 +131,18 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
isolate, wasm_modules_[transfer_id]);
}

const SharedValueConveyor* GetSharedValueConveyor(Isolate* isolate) override {
CHECK(shared_value_conveyor_.has_value());
return &shared_value_conveyor_.value();
}

ValueDeserializer* deserializer = nullptr;

private:
const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
const std::vector<CompiledWasmModule>& wasm_modules_;
const std::optional<SharedValueConveyor>& shared_value_conveyor_;
};

} // anonymous namespace
Expand Down Expand Up @@ -198,8 +207,12 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
shared_array_buffers.push_back(sab);
}

DeserializerDelegate delegate(
this, env, host_objects, shared_array_buffers, wasm_modules_);
DeserializerDelegate delegate(this,
env,
host_objects,
shared_array_buffers,
wasm_modules_,
shared_value_conveyor_);
ValueDeserializer deserializer(
env->isolate(),
reinterpret_cast<const uint8_t*>(main_message_buf_.data),
Expand Down Expand Up @@ -243,6 +256,10 @@ uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
return wasm_modules_.size() - 1;
}

void Message::AdoptSharedValueConveyor(SharedValueConveyor&& conveyor) {
shared_value_conveyor_.emplace(std::move(conveyor));
}

namespace {

MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
Expand Down Expand Up @@ -347,6 +364,12 @@ class SerializerDelegate : public ValueSerializer::Delegate {
return Just(msg_->AddWASMModule(module->GetCompiledModule()));
}

bool AdoptSharedValueConveyor(Isolate* isolate,
SharedValueConveyor&& conveyor) override {
msg_->AdoptSharedValueConveyor(std::move(conveyor));
return true;
}

Maybe<bool> Finish(Local<Context> context) {
for (uint32_t i = 0; i < host_objects_.size(); i++) {
BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
Expand Down
4 changes: 4 additions & 0 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class Message : public MemoryRetainer {
// Internal method of Message that is called when a new WebAssembly.Module
// object is encountered in the incoming value's structure.
uint32_t AddWASMModule(v8::CompiledWasmModule&& mod);
// Internal method of Message that is called when a shared value is
// encountered for the first time in the incoming value's structure.
void AdoptSharedValueConveyor(v8::SharedValueConveyor&& conveyor);

// The host objects that will be transferred, as recorded by Serialize()
// (e.g. MessagePorts).
Expand All @@ -114,6 +117,7 @@ class Message : public MemoryRetainer {
std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_;
std::vector<std::unique_ptr<TransferData>> transferables_;
std::vector<v8::CompiledWasmModule> wasm_modules_;
std::optional<v8::SharedValueConveyor> shared_value_conveyor_;

friend class MessagePort;
};
Expand Down
23 changes: 23 additions & 0 deletions test/parallel/test-experimental-shared-value-conveyor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict';

// Flags: --harmony-struct

const common = require('../common');
const assert = require('assert');
const { Worker, isMainThread, parentPort } = require('worker_threads');

if (isMainThread) {
const m = new globalThis.SharedArray(16);

const worker = new Worker(__filename);
worker.once('message', common.mustCall((message) => {
assert.strictEqual(message, m);
}));

worker.postMessage(m);
} else {
parentPort.once('message', common.mustCall((message) => {
// Simple echo.
parentPort.postMessage(message);
}));
}