Skip to content
Merged
13 changes: 13 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,18 @@ If the Worker thread is no longer running, which may occur before the
[`'exit'` event][] is emitted, the returned `Promise` is rejected
immediately with an [`ERR_WORKER_NOT_RUNNING`][] error.

### `worker.getHeapStatistics()`

<!-- YAML
added: REPLACEME
-->

* Returns: {Promise}

This method returns a `Promise` that will resolve to an object identical to [`v8.getHeapStatistics()`][],
or reject with an [`ERR_WORKER_NOT_RUNNING`][] error if the worker is no longer running.
This methods allows the statistics to be observed from outside the actual thread.

### `worker.performance`

<!-- YAML
Expand Down Expand Up @@ -1631,6 +1643,7 @@ thread spawned will spawn another until the application crashes.
[`require('node:worker_threads').workerData`]: #workerworkerdata
[`trace_events`]: tracing.md
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions
[`v8.getHeapStatistics()`]: v8.md#v8getheapstatistics
[`vm`]: vm.md
[`worker.SHARE_ENV`]: #workershare_env
[`worker.on('message')`]: #event-message_1
Expand Down
11 changes: 11 additions & 0 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,17 @@ class Worker extends EventEmitter {
};
});
}

getHeapStatistics() {
const taker = this[kHandle]?.getHeapStatistics();

return new Promise((resolve, reject) => {
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
taker.ondone = (handle) => {
resolve(handle);
};
});
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ namespace node {
V(SIGINTWATCHDOG) \
V(WORKER) \
V(WORKERHEAPSNAPSHOT) \
V(WORKERHEAPSTATISTICS) \
V(WRITEWRAP) \
V(ZLIB)

Expand Down
1 change: 1 addition & 0 deletions src/env_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@
V(tty_constructor_template, v8::FunctionTemplate) \
V(write_wrap_template, v8::ObjectTemplate) \
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
V(worker_heap_statistics_taker_template, v8::ObjectTemplate) \
V(x509_constructor_template, v8::FunctionTemplate)

#define PER_REALM_STRONG_PERSISTENT_VALUES(V) \
Expand Down
126 changes: 126 additions & 0 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,116 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
}
}

class WorkerHeapStatisticsTaker : public AsyncWrap {
public:
WorkerHeapStatisticsTaker(Environment* env, Local<Object> obj)
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSTATISTICS) {}

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(WorkerHeapStatisticsTaker)
SET_SELF_SIZE(WorkerHeapStatisticsTaker)
};

void Worker::GetHeapStatistics(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());

Environment* env = w->env();
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
Local<Object> wrap;
if (!env->worker_heap_statistics_taker_template()
->NewInstance(env->context())
.ToLocal(&wrap)) {
return;
}

// The created WorkerHeapStatisticsTaker is an object owned by main
// thread's Isolate, it can not be accessed by worker thread
std::unique_ptr<BaseObjectPtr<WorkerHeapStatisticsTaker>> taker =
std::make_unique<BaseObjectPtr<WorkerHeapStatisticsTaker>>(
MakeDetachedBaseObject<WorkerHeapStatisticsTaker>(env, wrap));

// Interrupt the worker thread and take a snapshot, then schedule a call
// on the parent thread that turns that snapshot into a readable stream.
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
env](Environment* worker_env) mutable {
// We create a unique pointer to HeapStatistics so that the actual object
// it's not copied in the lambda, but only the pointer is.
auto heap_stats = std::make_unique<v8::HeapStatistics>();
worker_env->isolate()->GetHeapStatistics(heap_stats.get());

// Here, the worker thread temporarily owns the WorkerHeapStatisticsTaker
// object.

env->SetImmediateThreadsafe(
[taker = std::move(taker),
heap_stats = std::move(heap_stats)](Environment* env) mutable {
Isolate* isolate = env->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());

AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());

Local<v8::Name> heap_stats_names[] = {
FIXED_ONE_BYTE_STRING(isolate, "total_heap_size"),
FIXED_ONE_BYTE_STRING(isolate, "total_heap_size_executable"),
FIXED_ONE_BYTE_STRING(isolate, "total_physical_size"),
FIXED_ONE_BYTE_STRING(isolate, "total_available_size"),
FIXED_ONE_BYTE_STRING(isolate, "used_heap_size"),
FIXED_ONE_BYTE_STRING(isolate, "heap_size_limit"),
FIXED_ONE_BYTE_STRING(isolate, "malloced_memory"),
FIXED_ONE_BYTE_STRING(isolate, "peak_malloced_memory"),
FIXED_ONE_BYTE_STRING(isolate, "does_zap_garbage"),
FIXED_ONE_BYTE_STRING(isolate, "number_of_native_contexts"),
FIXED_ONE_BYTE_STRING(isolate, "number_of_detached_contexts"),
FIXED_ONE_BYTE_STRING(isolate, "total_global_handles_size"),
FIXED_ONE_BYTE_STRING(isolate, "used_global_handles_size"),
FIXED_ONE_BYTE_STRING(isolate, "external_memory")};

// Define an array of property values
Local<Value> heap_stats_values[] = {
Number::New(isolate, heap_stats->total_heap_size()),
Number::New(isolate, heap_stats->total_heap_size_executable()),
Number::New(isolate, heap_stats->total_physical_size()),
Number::New(isolate, heap_stats->total_available_size()),
Number::New(isolate, heap_stats->used_heap_size()),
Number::New(isolate, heap_stats->heap_size_limit()),
Number::New(isolate, heap_stats->malloced_memory()),
Number::New(isolate, heap_stats->peak_malloced_memory()),
Boolean::New(isolate, heap_stats->does_zap_garbage()),
Number::New(isolate, heap_stats->number_of_native_contexts()),
Number::New(isolate, heap_stats->number_of_detached_contexts()),
Number::New(isolate, heap_stats->total_global_handles_size()),
Number::New(isolate, heap_stats->used_global_handles_size()),
Number::New(isolate, heap_stats->external_memory())};

DCHECK_EQ(arraysize(heap_stats_names), arraysize(heap_stats_values));

// Create the object with the property names and values
Local<Object> stats = Object::New(isolate,
Null(isolate),
heap_stats_names,
heap_stats_values,
arraysize(heap_stats_names));

Local<Value> args[] = {stats};
taker->get()->MakeCallback(
env->ondone_string(), arraysize(args), args);
// implicitly delete `taker`
},
CallbackFlags::kUnrefed);

// Now, the lambda is delivered to the main thread, as a result, the
// WorkerHeapStatisticsTaker object is delivered to the main thread, too.
});

if (scheduled) {
args.GetReturnValue().Set(wrap);
} else {
args.GetReturnValue().Set(Local<Object>());
}
}

void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
Expand Down Expand Up @@ -996,6 +1106,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
SetProtoMethod(isolate, w, "getHeapStatistics", Worker::GetHeapStatistics);

SetConstructorFunction(isolate, target, "Worker", w);
}
Expand All @@ -1014,6 +1125,20 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
wst->InstanceTemplate());
}

{
Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);

wst->InstanceTemplate()->SetInternalFieldCount(
WorkerHeapSnapshotTaker::kInternalFieldCount);
wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));

Local<String> wst_string =
FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapStatisticsTaker");
wst->SetClassName(wst_string);
isolate_data->set_worker_heap_statistics_taker_template(
wst->InstanceTemplate());
}

SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
}

Expand Down Expand Up @@ -1079,6 +1204,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Worker::TakeHeapSnapshot);
registry->Register(Worker::LoopIdleTime);
registry->Register(Worker::LoopStartTime);
registry->Register(Worker::GetHeapStatistics);
}

} // anonymous namespace
Expand Down
2 changes: 2 additions & 0 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class Worker : public AsyncWrap {
static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetHeapStatistics(
const v8::FunctionCallbackInfo<v8::Value>& args);

private:
bool CreateEnvMessagePort(Environment* env);
Expand Down
63 changes: 63 additions & 0 deletions test/parallel/test-worker-heap-statistics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
'use strict';

const common = require('../common');
const fixtures = require('../common/fixtures');

common.skipIfInspectorDisabled();

const {
Worker,
isMainThread,
} = require('worker_threads');

if (!isMainThread) {
common.skip('This test only works on a main thread');
}

// Ensures that worker.getHeapStatistics() returns valid data

const assert = require('assert');

if (isMainThread) {
const name = 'Hello Thread';
const worker = new Worker(fixtures.path('worker-name.js'), {
name,
});
worker.once('message', common.mustCall(async (message) => {
const stats = await worker.getHeapStatistics();
const keys = [
`total_heap_size`,
`total_heap_size_executable`,
`total_physical_size`,
`total_available_size`,
`used_heap_size`,
`heap_size_limit`,
`malloced_memory`,
`peak_malloced_memory`,
`does_zap_garbage`,
`number_of_native_contexts`,
`number_of_detached_contexts`,
`total_global_handles_size`,
`used_global_handles_size`,
`external_memory`,
].sort();
assert.deepStrictEqual(keys, Object.keys(stats).sort());
for (const key of keys) {
if (key === 'does_zap_garbage') {
assert.strictEqual(typeof stats[key], 'boolean', `Expected ${key} to be a boolean`);
continue;
}
assert.strictEqual(typeof stats[key], 'number', `Expected ${key} to be a number`);
assert.ok(stats[key] >= 0, `Expected ${key} to be >= 0`);
}

worker.postMessage('done');
}));

worker.once('exit', common.mustCall(async (code) => {
assert.strictEqual(code, 0);
await assert.rejects(worker.getHeapStatistics(), {
code: 'ERR_WORKER_NOT_RUNNING'
});
}));
}
1 change: 1 addition & 0 deletions test/sequential/test-async-wrap-getasyncid.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const { getSystemErrorName } = require('util');
delete providers.ELDHISTOGRAM;
delete providers.SIGINTWATCHDOG;
delete providers.WORKERHEAPSNAPSHOT;
delete providers.WORKERHEAPSTATISTICS;
delete providers.BLOBREADER;
delete providers.RANDOMPRIMEREQUEST;
delete providers.CHECKPRIMEREQUEST;
Expand Down
1 change: 1 addition & 0 deletions typings/internalBinding/worker.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ declare namespace InternalWorkerBinding {
unref(): void;
getResourceLimits(): Float64Array;
takeHeapSnapshot(): object;
getHeapStatistics(): Promise<object>;
loopIdleTime(): number;
loopStartTime(): number;
}
Expand Down
Loading