Skip to content

Commit edd2e8a

Browse files
authored
Add add and next events (#111)
1 parent 1e9dcf8 commit edd2e8a

File tree

3 files changed

+123
-1
lines changed

3 files changed

+123
-1
lines changed

readme.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,40 @@ await queue.add(() => delay(600));
245245

246246
The `idle` event is emitted every time the queue reaches an idle state. On the other hand, the promise the `onIdle()` function returns resolves once the queue becomes idle instead of every time the queue is idle.
247247

248+
#### add
249+
250+
Emitted every time the add method is called and the number of pending or queued tasks is increased.
251+
252+
#### next
253+
254+
Emitted every time a task is completed and the number of pending or queued tasks is decreased.
255+
256+
```js
257+
const delay = require('delay');
258+
const {default: PQueue} = require('p-queue');
259+
260+
const queue = new PQueue();
261+
262+
queue.on('add', () => {
263+
console.log(`Task is added. Size: ${queue.size} Pending: ${queue.pending}`);
264+
});
265+
queue.on('next', () => {
266+
console.log(`Task is completed. Size: ${queue.size} Pending: ${queue.pending}`);
267+
});
268+
269+
const job1 = queue.add(() => delay(2000));
270+
const job2 = queue.add(() => delay(500));
271+
272+
await job1;
273+
await job2;
274+
// => 'Task is added. Size: 0 Pending: 1'
275+
// => 'Task is added. Size: 0 Pending: 2'
276+
277+
await queue.add(() => delay(600));
278+
// => 'Task is completed. Size: 0 Pending: 1'
279+
// => 'Task is completed. Size: 0 Pending: 0'
280+
```
281+
248282
## Advanced example
249283

250284
A more advanced example to help you understand the flow.

source/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const timeoutError = new TimeoutError();
1818
/**
1919
Promise queue with concurrency control.
2020
*/
21-
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle'> {
21+
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next'> {
2222
private readonly _carryoverConcurrencyCount: boolean;
2323

2424
private readonly _isIntervalIgnored: boolean;
@@ -99,6 +99,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
9999
private _next(): void {
100100
this._pendingCount--;
101101
this._tryToStartAnother();
102+
this.emit('next');
102103
}
103104

104105
private _resolvePromises(): void {
@@ -255,6 +256,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
255256

256257
this._queue.enqueue(run, options);
257258
this._tryToStartAnother();
259+
this.emit('add');
258260
});
259261
}
260262

test/test.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,92 @@ test('should emit idle event when idle', async t => {
853853
t.is(queue.size, 0);
854854
t.is(timesCalled, 2);
855855
});
856+
test('should emit add event when adding task', async t => {
857+
const queue = new PQueue({concurrency: 1});
858+
859+
let timesCalled = 0;
860+
queue.on('add', () => {
861+
timesCalled++;
862+
});
863+
864+
const job1 = queue.add(async () => delay(100));
865+
866+
t.is(queue.pending, 1);
867+
t.is(queue.size, 0);
868+
t.is(timesCalled, 1);
869+
870+
const job2 = queue.add(async () => delay(100));
871+
872+
t.is(queue.pending, 1);
873+
t.is(queue.size, 1);
874+
t.is(timesCalled, 2);
875+
876+
await job1;
877+
878+
t.is(queue.pending, 1);
879+
t.is(queue.size, 0);
880+
t.is(timesCalled, 2);
881+
882+
await job2;
883+
884+
t.is(queue.pending, 0);
885+
t.is(queue.size, 0);
886+
t.is(timesCalled, 2);
887+
888+
const job3 = queue.add(async () => delay(100));
889+
890+
t.is(queue.pending, 1);
891+
t.is(queue.size, 0);
892+
t.is(timesCalled, 3);
893+
894+
await job3;
895+
t.is(queue.pending, 0);
896+
t.is(queue.size, 0);
897+
t.is(timesCalled, 3);
898+
});
899+
test('should emit next event when completing task', async t => {
900+
const queue = new PQueue({concurrency: 1});
901+
902+
let timesCalled = 0;
903+
queue.on('next', () => {
904+
timesCalled++;
905+
});
906+
907+
const job1 = queue.add(async () => delay(100));
908+
909+
t.is(queue.pending, 1);
910+
t.is(queue.size, 0);
911+
t.is(timesCalled, 0);
912+
913+
const job2 = queue.add(async () => delay(100));
914+
915+
t.is(queue.pending, 1);
916+
t.is(queue.size, 1);
917+
t.is(timesCalled, 0);
918+
919+
await job1;
920+
921+
t.is(queue.pending, 1);
922+
t.is(queue.size, 0);
923+
t.is(timesCalled, 1);
924+
925+
await job2;
926+
927+
t.is(queue.pending, 0);
928+
t.is(queue.size, 0);
929+
t.is(timesCalled, 2);
930+
931+
const job3 = queue.add(async () => delay(100));
932+
933+
t.is(queue.pending, 1);
934+
t.is(queue.size, 0);
935+
t.is(timesCalled, 2);
936+
937+
await job3;
938+
t.is(queue.pending, 0);
939+
t.is(queue.size, 0);
940+
t.is(timesCalled, 3);
941+
});
856942

857943
test('should verify timeout overrides passed to add', async t => {
858944
const queue = new PQueue({timeout: 200, throwOnTimeout: true});

0 commit comments

Comments
 (0)