Skip to content

Commit 5a184b9

Browse files
committed
Implement fallback for when failing to open control channel
1 parent 694ed40 commit 5a184b9

File tree

2 files changed

+113
-3
lines changed

2 files changed

+113
-3
lines changed

packages/voila/src/manager.ts

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import { Widget } from '@lumino/widgets';
4040

4141
import { requireLoader } from './loader';
4242

43+
import { batchRateMap } from './utils';
44+
4345
if (typeof window !== 'undefined' && typeof window.define !== 'undefined') {
4446
window.define('@jupyter-widgets/base', base);
4547
window.define('@jupyter-widgets/controls', controls);
@@ -192,6 +194,10 @@ export class WidgetManager extends JupyterLabManager {
192194
});
193195
}
194196

197+
/**
198+
* This is the implementation of building widgets models making use of the
199+
* jupyter.widget.control comm channel
200+
*/
195201
async _build_models(): Promise<{ [key: string]: base.WidgetModel }> {
196202
const models: { [key: string]: base.WidgetModel } = {};
197203
const commId = base.uuid();
@@ -234,9 +240,8 @@ export class WidgetManager extends JupyterLabManager {
234240
}
235241
catch {
236242
console.warn('Failed to open "jupyter.widget.control" comm channel, fallback to slow fetching of widgets.');
237-
// TODO Fallback to the old implementation for old ipywidgets versions
238-
// return this._build_models_slow();
239-
return {};
243+
// Fallback to the old implementation for old ipywidgets versions (<=7.6)
244+
return this._build_models_slow();
240245
}
241246

242247
initComm.close();
@@ -296,5 +301,75 @@ export class WidgetManager extends JupyterLabManager {
296301
return models;
297302
}
298303

304+
/**
305+
* This is the old implementation of building widgets models
306+
* We keep it around for supporting old ipywidgets versions (<=7.6)
307+
*/
308+
async _build_models_slow(): Promise<{ [key: string]: base.WidgetModel }> {
309+
const comm_ids = await this._get_comm_info();
310+
const models: { [key: string]: base.WidgetModel } = {};
311+
/**
312+
* For the classical notebook, iopub_msg_rate_limit=1000 (default)
313+
* And for zmq, we are affected by the default ZMQ_SNDHWM setting of 1000
314+
* See https://github.com/voila-dashboards/voila/issues/534 for a discussion
315+
*/
316+
const maxMessagesInTransit = 100; // really save limit compared to ZMQ_SNDHWM
317+
const maxMessagesPerSecond = 500; // lets be on the save side, in case the kernel sends more msg'es
318+
const widgets_info = await Promise.all(
319+
batchRateMap(
320+
Object.keys(comm_ids),
321+
async comm_id => {
322+
const comm = await this._create_comm(this.comm_target_name, comm_id);
323+
return this._update_comm(comm);
324+
},
325+
{ room: maxMessagesInTransit, rate: maxMessagesPerSecond }
326+
)
327+
);
328+
329+
await Promise.all(
330+
widgets_info.map(async widget_info => {
331+
const state = (widget_info as any).msg.content.data.state;
332+
try {
333+
const modelPromise = this.new_model(
334+
{
335+
model_name: state._model_name,
336+
model_module: state._model_module,
337+
model_module_version: state._model_module_version,
338+
comm: (widget_info as any).comm
339+
},
340+
state
341+
);
342+
const model = await modelPromise;
343+
models[model.model_id] = model;
344+
} catch (error) {
345+
// Failed to create a widget model, we continue creating other models so that
346+
// other widgets can render
347+
console.error(error);
348+
}
349+
})
350+
);
351+
return models;
352+
}
353+
354+
async _update_comm(
355+
comm: base.IClassicComm
356+
): Promise<{ comm: base.IClassicComm; msg: any }> {
357+
return new Promise((resolve, reject) => {
358+
comm.on_msg(async msg => {
359+
if (msg.content.data.buffer_paths) {
360+
base.put_buffers(
361+
msg.content.data.state,
362+
msg.content.data.buffer_paths,
363+
msg.buffers
364+
);
365+
}
366+
if (msg.content.data.method === 'update') {
367+
resolve({ comm: comm, msg: msg });
368+
}
369+
});
370+
comm.send({ method: 'request_state' }, {});
371+
});
372+
}
373+
299374
private _loader: (name: any, version: any) => Promise<any>;
300375
}

packages/voila/src/utils.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import pLimit from 'p-limit';
2+
3+
const delay = (sec: number) =>
4+
new Promise(resolve => setTimeout(resolve, sec * 1000));
5+
6+
/**
7+
* Map a function onto a list where fn is being called at a limit of 'rate' number of calls per second.
8+
* and 'room' number of parallel calls.
9+
* Note that the minimum window at which rate is respected is room/rate seconds.
10+
*/
11+
export const batchRateMap = (
12+
list: string[],
13+
fn: (...args: any[]) => Promise<any>,
14+
{ room, rate }: { room: number; rate: number }
15+
): Promise<any>[] => {
16+
const limit = pLimit(room);
17+
return list.map(async value => {
18+
return new Promise((valueResolve, reject) => {
19+
limit(() => {
20+
// We may not want to start the next job directly, we want to respect the
21+
// throttling/rate, e.g.:
22+
// If we have room for 10 parallel jobs, at a max rate of 100/second, each job
23+
// should take at least 10/100=0.1 seconds.
24+
// If we have room for 100 parallel jobs, and a max rate of 10/second, each
25+
// job should take 100/10=10 seconds. But it will have a burst of 100 calls.
26+
const throttlePromise = delay(room / rate);
27+
// If the job is done, resolve the promise immediately, don't want for the throttle Promise
28+
// This means that if we have room for 10 parallel jobs
29+
// and just 9 jobs, we will never have to wait for the throttlePromise
30+
const resultPromise = fn(value).then(valueResolve);
31+
return Promise.all([resultPromise, throttlePromise]);
32+
});
33+
});
34+
});
35+
};

0 commit comments

Comments
 (0)