Skip to content

Commit 9527b32

Browse files
authored
Merge pull request #1205 from caolan/dll
Implment queues using DLLs
2 parents 7a634cc + 81e002d commit 9527b32

File tree

5 files changed

+106
-57
lines changed

5 files changed

+106
-57
lines changed

lib/auto.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import indexOf from 'lodash/_baseIndexOf';
44
import isArray from 'lodash/isArray';
55
import okeys from 'lodash/keys';
66
import noop from 'lodash/noop';
7-
import once from './internal/once';
87
import rest from 'lodash/rest';
98

9+
import once from './internal/once';
1010
import onlyOnce from './internal/onlyOnce';
1111

1212
/**

lib/internal/DoublyLinkedList.js

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Simple doubly linked list (https://en.wikipedia.org/wiki/Doubly_linked_list) implementation
2+
// used for queues. This implementation assumes that the node provided by the user can be modified
3+
// to adjust the next and last properties. We implement only the minimal functionality
4+
// for queue support.
5+
export default function DLL() {
6+
this.head = this.tail = null;
7+
this.length = 0;
8+
}
9+
10+
function setInitial(dll, node) {
11+
dll.length = 1;
12+
dll.head = dll.tail = node;
13+
}
14+
15+
DLL.prototype.removeLink = function(node) {
16+
if (node.prev) node.prev.next = node.next;
17+
else this.head = node.next
18+
if (node.next) node.next.prev = node.prev;
19+
else this.tail = node.prev;
20+
21+
node.prev = node.next = null;
22+
this.length -= 1;
23+
return node;
24+
}
25+
26+
DLL.prototype.empty = DLL;
27+
28+
DLL.prototype.insertAfter = function(node, newNode) {
29+
newNode.prev = node;
30+
newNode.next = node.next;
31+
if (node.next) node.next.prev = newNode;
32+
else this.tail = newNode;
33+
node.next = newNode;
34+
this.length += 1;
35+
}
36+
37+
DLL.prototype.insertBefore = function(node, newNode) {
38+
newNode.prev = node.prev;
39+
newNode.next = node;
40+
if (node.prev) node.prev.next = newNode;
41+
else this.head = newNode;
42+
node.prev = newNode;
43+
this.length += 1;
44+
}
45+
46+
DLL.prototype.unshift = function(node) {
47+
if (this.head) this.insertBefore(this.head, node);
48+
else setInitial(this, node);
49+
};
50+
51+
DLL.prototype.push = function(node) {
52+
if (this.tail) this.insertAfter(this.tail, node);
53+
else setInitial(this, node);
54+
};
55+
56+
DLL.prototype.shift = function() {
57+
return this.head && this.removeLink(this.head);
58+
};
59+
60+
DLL.prototype.pop = function() {
61+
return this.tail && this.removeLink(this.tail);
62+
};

lib/internal/queue.js

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
import arrayEach from 'lodash/_arrayEach';
2-
import arrayMap from 'lodash/_arrayMap';
32
import isArray from 'lodash/isArray';
43
import noop from 'lodash/noop';
5-
import property from 'lodash/_baseProperty';
64

75
import onlyOnce from './onlyOnce';
86
import setImmediate from './setImmediate';
7+
import DLL from './DoublyLinkedList';
98

109
export default function queue(worker, concurrency, payload) {
1110
if (concurrency == null) {
@@ -14,7 +13,8 @@ export default function queue(worker, concurrency, payload) {
1413
else if(concurrency === 0) {
1514
throw new Error('Concurrency must not be zero');
1615
}
17-
function _insert(q, data, pos, callback) {
16+
17+
function _insert(data, pos, callback) {
1818
if (callback != null && typeof callback !== 'function') {
1919
throw new Error('task callback must be a function');
2020
}
@@ -35,19 +35,19 @@ export default function queue(worker, concurrency, payload) {
3535
};
3636

3737
if (pos) {
38-
q.tasks.unshift(item);
38+
q._tasks.unshift(item);
3939
} else {
40-
q.tasks.push(item);
40+
q._tasks.push(item);
4141
}
4242

4343
});
4444
setImmediate(q.process);
4545
}
46-
function _next(q, tasks) {
46+
47+
function _next(tasks) {
4748
return function(){
4849
workers -= 1;
4950

50-
5151
var removed = false;
5252
var args = arguments;
5353
arrayEach(tasks, function (task) {
@@ -69,7 +69,7 @@ export default function queue(worker, concurrency, payload) {
6969
q.unsaturated();
7070
}
7171

72-
if (q.tasks.length + workers === 0) {
72+
if (q._tasks.length + workers === 0) {
7373
q.drain();
7474
}
7575
q.process();
@@ -79,7 +79,7 @@ export default function queue(worker, concurrency, payload) {
7979
var workers = 0;
8080
var workersList = [];
8181
var q = {
82-
tasks: [],
82+
_tasks: new DLL(),
8383
concurrency: concurrency,
8484
payload: payload,
8585
saturated: noop,
@@ -91,25 +91,27 @@ export default function queue(worker, concurrency, payload) {
9191
started: false,
9292
paused: false,
9393
push: function (data, callback) {
94-
_insert(q, data, false, callback);
94+
_insert(data, false, callback);
9595
},
9696
kill: function () {
9797
q.drain = noop;
98-
q.tasks = [];
98+
q._tasks.empty();
9999
},
100100
unshift: function (data, callback) {
101-
_insert(q, data, true, callback);
101+
_insert(data, true, callback);
102102
},
103103
process: function () {
104-
while(!q.paused && workers < q.concurrency && q.tasks.length){
105-
106-
var tasks = q.payload ?
107-
q.tasks.splice(0, q.payload) :
108-
q.tasks.splice(0, q.tasks.length);
109-
110-
var data = arrayMap(tasks, property('data'));
104+
while(!q.paused && workers < q.concurrency && q._tasks.length){
105+
var tasks = [], data = [];
106+
var l = q._tasks.length;
107+
if (q.payload) l = Math.min(l, q.payload);
108+
for (var i = 0; i < l; i++) {
109+
var node = q._tasks.shift();
110+
tasks.push(node);
111+
data.push(node.data);
112+
}
111113

112-
if (q.tasks.length === 0) {
114+
if (q._tasks.length === 0) {
113115
q.empty();
114116
}
115117
workers += 1;
@@ -119,14 +121,12 @@ export default function queue(worker, concurrency, payload) {
119121
q.saturated();
120122
}
121123

122-
var cb = onlyOnce(_next(q, tasks));
124+
var cb = onlyOnce(_next(tasks));
123125
worker(data, cb);
124-
125-
126126
}
127127
},
128128
length: function () {
129-
return q.tasks.length;
129+
return q._tasks.length;
130130
},
131131
running: function () {
132132
return workers;
@@ -135,15 +135,15 @@ export default function queue(worker, concurrency, payload) {
135135
return workersList;
136136
},
137137
idle: function() {
138-
return q.tasks.length + workers === 0;
138+
return q._tasks.length + workers === 0;
139139
},
140140
pause: function () {
141141
q.paused = true;
142142
},
143143
resume: function () {
144144
if (q.paused === false) { return; }
145145
q.paused = false;
146-
var resumeCount = Math.min(q.concurrency, q.tasks.length);
146+
var resumeCount = Math.min(q.concurrency, q._tasks.length);
147147
// Need to call q.process once per concurrent
148148
// worker to preserve full concurrency after pause
149149
for (var w = 1; w <= resumeCount; w++) {

lib/priorityQueue.js

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,11 @@ import queue from './queue';
3131
* * The `unshift` method was removed.
3232
*/
3333
export default function(worker, concurrency) {
34-
function _compareTasks(a, b) {
35-
return a.priority - b.priority;
36-
}
37-
38-
function _binarySearch(sequence, item, compare) {
39-
var beg = -1,
40-
end = sequence.length - 1;
41-
while (beg < end) {
42-
var mid = beg + ((end - beg + 1) >>> 1);
43-
if (compare(item, sequence[mid]) >= 0) {
44-
beg = mid;
45-
} else {
46-
end = mid - 1;
47-
}
48-
}
49-
return beg;
50-
}
34+
// Start with a normal queue
35+
var q = queue(worker, concurrency);
5136

52-
function _insert(q, data, priority, callback) {
37+
// Override push to accept second parameter representing priority
38+
q.push = function(data, priority, callback) {
5339
if (callback != null && typeof callback !== 'function') {
5440
throw new Error('task callback must be a function');
5541
}
@@ -63,25 +49,26 @@ export default function(worker, concurrency) {
6349
q.drain();
6450
});
6551
}
52+
53+
var nextNode = q._tasks.head;
54+
while (nextNode && priority >= nextNode.priority) {
55+
nextNode = nextNode.next;
56+
}
57+
6658
arrayEach(data, function(task) {
6759
var item = {
6860
data: task,
6961
priority: priority,
7062
callback: typeof callback === 'function' ? callback : noop
7163
};
7264

73-
q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);
74-
75-
setImmediate(q.process);
65+
if (nextNode) {
66+
q._tasks.insertBefore(nextNode, item);
67+
} else {
68+
q._tasks.push(item);
69+
}
7670
});
77-
}
78-
79-
// Start with a normal queue
80-
var q = queue(worker, concurrency);
81-
82-
// Override push to accept second parameter representing priority
83-
q.push = function(data, priority, callback) {
84-
_insert(q, data, priority, callback);
71+
setImmediate(q.process);
8572
};
8673

8774
// Remove unshift function

mocha_test/queue.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ describe('queue', function(){
496496
}, 5);
497497

498498
setTimeout(function () {
499-
expect(q.tasks.length).to.equal(1);
499+
expect(q._tasks.length).to.equal(1);
500500
expect(q.running()).to.equal(2);
501501
q.resume();
502502
}, 15);

0 commit comments

Comments
 (0)