Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';

const EventEmitter = require('events');

// Port of lower_bound from http://en.cppreference.com/w/cpp/algorithm/lower_bound
// Used to compute insertion index to keep queue sorted after insertion
function lowerBound(array, value, comp) {
Expand Down Expand Up @@ -51,8 +53,10 @@ class PriorityQueue {
}
}

class PQueue {
class PQueue extends EventEmitter {
constructor(options) {
super();

options = Object.assign({
carryoverConcurrencyCount: false,
intervalCap: Infinity,
Expand Down Expand Up @@ -102,6 +106,7 @@ class PQueue {

_next() {
this._pendingCount--;
this.emit('next');
this._tryToStartAnother();
}

Expand Down
5 changes: 5 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ Number of pending promises.

Whether the queue is currently paused.

## Events

#### next
Emitted as each item is processed in the queue for the purpose of tracking progress.

## Advanced example

A more advanced example to help you understand the flow.
Expand Down
20 changes: 20 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import EventEmitter from 'events';
import test from 'ava';
import delay from 'delay';
import inRange from 'in-range';
Expand Down Expand Up @@ -402,3 +403,22 @@ test('pause should work when throttled', async t => {
delay(2200).then(() => t.deepEqual(result, secondV));
await delay(2500);
});

test('should be an event emitter', t => {
const queue = new PQueue();
t.true(queue instanceof EventEmitter);
});

test('should emit next event per item', async t => {
const items = [0, 1, 2, 3, 4];
let eventCount = 0;
const queue = new PQueue();

queue.on('next', () => eventCount++);

items.forEach(item => queue.add(() => item));

await queue.onIdle();

t.is(eventCount, items.length);
});