Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 225 additions & 1 deletion packages/base-manager/src/manager-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import * as services from '@jupyterlab/services';
import * as widgets from '@jupyter-widgets/base';

import { JSONObject, PartialJSONObject } from '@lumino/coreutils';
import {
JSONObject,
PartialJSONObject,
PromiseDelegate,
} from '@lumino/coreutils';

import {
DOMWidgetView,
Expand Down Expand Up @@ -32,6 +36,21 @@ import sanitize from 'sanitize-html';

const PROTOCOL_MAJOR_VERSION = PROTOCOL_VERSION.split('.', 1)[0];

/**
* The control comm target name.
*/
export const CONTROL_COMM_TARGET = 'jupyter.widget.control';

/**
* The supported version for the control comm channel.
*/
export const CONTROL_COMM_PROTOCOL_VERSION = '1.0.0';

/**
* Time (in ms) after which we consider the control comm target not responding.
*/
export const CONTROL_COMM_TIMEOUT = 4000;

/**
* Sanitize HTML-formatted descriptions.
*/
Expand Down Expand Up @@ -342,6 +361,201 @@ export abstract class ManagerBase implements IWidgetManager {
return await modelPromise;
}

/**
* Fetch all widgets states from the kernel using the control comm channel
* If this fails (control comm handler not implemented kernel side),
* it will fallback to `_loadFromKernelSlow`.
*
* This is a utility function that can be used in subclasses.
*/
protected async _loadFromKernel(): Promise<void> {
// Try fetching all widget states through the control comm
let data: any;
let buffers: any;
try {
const initComm = await this._create_comm(
CONTROL_COMM_TARGET,
uuid(),
{},
{ version: CONTROL_COMM_PROTOCOL_VERSION }
);

await new Promise((resolve, reject) => {
initComm.on_msg((msg: any) => {
data = msg['content']['data'];

if (data.method !== 'update_states') {
console.warn(`
Unknown ${data.method} message on the Control channel
`);
return;
}

buffers = (msg.buffers || []).map((b: any) => {
if (b instanceof DataView) {
return b;
} else {
return new DataView(b instanceof ArrayBuffer ? b : b.buffer);
}
});

resolve(null);
});

initComm.on_close(() => reject('Control comm was closed too early'));

// Send a states request msg
initComm.send({ method: 'request_states' }, {});

// Reject if we didn't get a response in time
setTimeout(
() => reject('Control comm did not respond in time'),
CONTROL_COMM_TIMEOUT
);
});

initComm.close();
} catch (error) {
console.warn(
'Failed to fetch widgets through the "jupyter.widget.control" comm channel, fallback to slow fetching of widgets. Reason:',
error
);
// Fallback to the old implementation for old ipywidgets backend versions (<=7.6)
return this._loadFromKernelSlow();
}

const states: any = data.states;

// Extract buffer paths
const bufferPaths: any = {};
for (const bufferPath of data.buffer_paths) {
if (!bufferPaths[bufferPath[0]]) {
bufferPaths[bufferPath[0]] = [];
}
bufferPaths[bufferPath[0]].push(bufferPath.slice(1));
}

// Start creating all widgets
await Promise.all(
Object.keys(states).map(async (widget_id) => {
try {
const state = states[widget_id];
const comm = await this._create_comm('jupyter.widget', widget_id);

// Put binary buffers
if (widget_id in bufferPaths) {
const nBuffers = bufferPaths[widget_id].length;
put_buffers(
state,
bufferPaths[widget_id],
buffers.splice(0, nBuffers)
);
}

await this.new_model(
{
model_name: state.model_name,
model_module: state.model_module,
model_module_version: state.model_module_version,
model_id: widget_id,
comm: comm,
},
state.state
);
} catch (error) {
// Failed to create a widget model, we continue creating other models so that
// other widgets can render
console.error(error);
}
})
);
}

/**
* Old implementation of fetching widgets one by one using
* the request_state message on each comm.
*
* This is a utility function that can be used in subclasses.
*/
protected async _loadFromKernelSlow(): Promise<void> {
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(
Object.keys(comm_ids).map(async (comm_id) => {
try {
const model = this.get_model(comm_id);
// TODO Have the same this.get_model implementation for
// the widgetsnbextension and labextension, the one that
// throws an error if the model is not found instead of
// returning undefined
if (model === undefined) {
throw new Error('widget model not found');
}
await model;
// If we successfully get the model, do no more.
return;
} catch (e) {
// If we have the widget model not found error, then we can create the
// widget. Otherwise, rethrow the error. We have to check the error
// message text explicitly because the get_model function in this
// class throws a generic error with this specific text.
if (e.message !== 'widget model not found') {
throw e;
}
const comm = await this._create_comm(this.comm_target_name, comm_id);

let msg_id = '';
const info = new PromiseDelegate<Private.ICommUpdateData>();
comm.on_msg((msg: services.KernelMessage.ICommMsgMsg) => {
if (
(msg.parent_header as any).msg_id === msg_id &&
msg.header.msg_type === 'comm_msg' &&
msg.content.data.method === 'update'
) {
const data = msg.content.data as any;
const buffer_paths = data.buffer_paths || [];
const buffers = msg.buffers || [];
put_buffers(data.state, buffer_paths, buffers);
info.resolve({ comm, msg });
}
});
msg_id = comm.send(
{
method: 'request_state',
},
this.callbacks(undefined)
);

return info.promise;
}
})
);

// We put in a synchronization barrier here so that we don't have to
// topologically sort the restored widgets. `new_model` synchronously
// registers the widget ids before reconstructing their state
// asynchronously, so promises to every widget reference should be available
// by the time they are used.
await Promise.all(
widgets_info.map(async (widget_info) => {
if (!widget_info) {
return;
}
const content = widget_info.msg.content as any;
await this.new_model(
{
model_name: content.data.state._model_name,
model_module: content.data.state._model_module,
model_module_version: content.data.state._model_module_version,
comm: widget_info.comm,
},
content.data.state
);
})
);
}

async _make_model(
options: RequiredSome<IModelOptions, 'model_id'>,
serialized_state: any = {}
Expand Down Expand Up @@ -690,3 +904,13 @@ export function serialize_state(
});
return { version_major: 2, version_minor: 0, state: state };
}

namespace Private {
/**
* Data promised when a comm info request resolves.
*/
export interface ICommUpdateData {
comm: IClassicComm;
msg: services.KernelMessage.ICommMsgMsg;
}
}
81 changes: 2 additions & 79 deletions python/jupyterlab_widgets/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
ExportData,
WidgetModel,
WidgetView,
put_buffers,
ICallbacks,
} from '@jupyter-widgets/base';

Expand All @@ -21,7 +20,7 @@ import {

import { IDisposable } from '@lumino/disposable';

import { PromiseDelegate, ReadonlyPartialJSONValue } from '@lumino/coreutils';
import { ReadonlyPartialJSONValue } from '@lumino/coreutils';

import { INotebookModel } from '@jupyterlab/notebook';

Expand Down Expand Up @@ -106,74 +105,8 @@ export abstract class LabWidgetManager
// A "load" for a kernel that does not handle comms does nothing.
return;
}
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(
Object.keys(comm_ids).map(async (comm_id) => {
try {
await this.get_model(comm_id);
// If we successfully get the model, do no more.
return;
} catch (e) {
// If we have the widget model not found error, then we can create the
// widget. Otherwise, rethrow the error. We have to check the error
// message text explicitly because the get_model function in this
// class throws a generic error with this specific text.
if (e.message !== 'widget model not found') {
throw e;
}
const comm = await this._create_comm(this.comm_target_name, comm_id);

let msg_id = '';
const info = new PromiseDelegate<Private.ICommUpdateData>();
comm.on_msg((msg: KernelMessage.ICommMsgMsg) => {
if (
(msg.parent_header as any).msg_id === msg_id &&
msg.header.msg_type === 'comm_msg' &&
msg.content.data.method === 'update'
) {
const data = msg.content.data as any;
const buffer_paths = data.buffer_paths || [];
const buffers = msg.buffers || [];
put_buffers(data.state, buffer_paths, buffers);
info.resolve({ comm, msg });
}
});
msg_id = comm.send(
{
method: 'request_state',
},
this.callbacks(undefined)
);

return info.promise;
}
})
);

// We put in a synchronization barrier here so that we don't have to
// topologically sort the restored widgets. `new_model` synchronously
// registers the widget ids before reconstructing their state
// asynchronously, so promises to every widget reference should be available
// by the time they are used.
await Promise.all(
widgets_info.map(async (widget_info) => {
if (!widget_info) {
return;
}
const content = widget_info.msg.content as any;
await this.new_model(
{
model_name: content.data.state._model_name,
model_module: content.data.state._model_module,
model_module_version: content.data.state._model_module_version,
comm: widget_info.comm,
},
content.data.state
);
})
);
return super._loadFromKernel();
}

/**
Expand Down Expand Up @@ -668,13 +601,3 @@ export namespace WidgetManager {
saveState: boolean;
};
}

namespace Private {
/**
* Data promised when a comm info request resolves.
*/
export interface ICommUpdateData {
comm: IClassicComm;
msg: KernelMessage.ICommMsgMsg;
}
}
Loading