Skip to content

Commit 7b3f53e

Browse files
committed
Fix interval timing when queue becomes empty between task additions
Fixes #163
1 parent 439d512 commit 7b3f53e

File tree

3 files changed

+218
-20
lines changed

3 files changed

+218
-20
lines changed

readme.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ npm install p-queue
4646

4747
## Usage
4848

49-
Here we run only one promise at the time. For example, set `concurrency` to 4 to run four promises at the same time.
49+
Here we run only one promise at a time. For example, set `concurrency` to 4 to run four promises at the same time.
5050

5151
```js
5252
import PQueue from 'p-queue';

source/index.ts

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
2626

2727
#intervalEnd = 0;
2828

29+
#lastExecutionTime = 0;
30+
2931
#intervalId?: NodeJS.Timeout;
3032

3133
#timeoutId?: NodeJS.Timeout;
@@ -114,40 +116,64 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
114116
if (this.#intervalId === undefined) {
115117
const delay = this.#intervalEnd - now;
116118
if (delay < 0) {
117-
// Act as the interval was done
118-
// We don't need to resume it here because it will be resumed on line 160
119+
// If the interval has expired while idle, check if we should enforce the interval
120+
// from the last task execution. This ensures proper spacing between tasks even
121+
// when the queue becomes empty and then new tasks are added.
122+
if (this.#lastExecutionTime > 0) {
123+
const timeSinceLastExecution = now - this.#lastExecutionTime;
124+
if (timeSinceLastExecution < this.#interval) {
125+
// Not enough time has passed since the last task execution
126+
this.#createIntervalTimeout(this.#interval - timeSinceLastExecution);
127+
return true;
128+
}
129+
}
130+
131+
// Enough time has passed or no previous execution, allow execution
119132
this.#intervalCount = (this.#carryoverConcurrencyCount) ? this.#pending : 0;
120133
} else {
121134
// Act as the interval is pending
122-
if (this.#timeoutId === undefined) {
123-
this.#timeoutId = setTimeout(
124-
() => {
125-
this.#onResumeInterval();
126-
},
127-
delay,
128-
);
129-
}
130-
135+
this.#createIntervalTimeout(delay);
131136
return true;
132137
}
133138
}
134139

135140
return false;
136141
}
137142

143+
#createIntervalTimeout(delay: number): void {
144+
if (this.#timeoutId !== undefined) {
145+
return;
146+
}
147+
148+
this.#timeoutId = setTimeout(() => {
149+
this.#onResumeInterval();
150+
}, delay);
151+
}
152+
153+
#clearIntervalTimer(): void {
154+
if (this.#intervalId) {
155+
clearInterval(this.#intervalId);
156+
this.#intervalId = undefined;
157+
}
158+
}
159+
160+
#clearTimeoutTimer(): void {
161+
if (this.#timeoutId) {
162+
clearTimeout(this.#timeoutId);
163+
this.#timeoutId = undefined;
164+
}
165+
}
166+
138167
#tryToStartAnother(): boolean {
139168
if (this.#queue.size === 0) {
140169
// We can clear the interval ("pause")
141170
// Because we can redo it later ("resume")
142-
if (this.#intervalId) {
143-
clearInterval(this.#intervalId);
144-
}
145-
146-
this.#intervalId = undefined;
147-
171+
this.#clearIntervalTimer();
148172
this.emit('empty');
149173

150174
if (this.#pending === 0) {
175+
// Clear timeout as well when completely idle
176+
this.#clearTimeoutTimer();
151177
this.emit('idle');
152178
}
153179

@@ -160,6 +186,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
160186
const job = this.#queue.dequeue()!;
161187

162188
this.emit('active');
189+
this.#lastExecutionTime = Date.now();
163190
job();
164191

165192
if (canInitializeInterval) {
@@ -190,8 +217,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
190217

191218
#onInterval(): void {
192219
if (this.#intervalCount === 0 && this.#pending === 0 && this.#intervalId) {
193-
clearInterval(this.#intervalId);
194-
this.#intervalId = undefined;
220+
this.#clearIntervalTimer();
195221
}
196222

197223
this.#intervalCount = this.#carryoverConcurrencyCount ? this.#pending : 0;

test/test.ts

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,157 @@ test('.setPriority() - execute a promise before planned', async t => {
13131313
t.deepEqual(result, ['🐌', '🐢', '🦆']);
13141314
});
13151315

1316+
test('interval should be maintained when using await between adds (issue #182)', async t => {
1317+
const queue = new PQueue({
1318+
intervalCap: 1,
1319+
interval: 100,
1320+
});
1321+
1322+
const timestamps: number[] = [];
1323+
1324+
// Add first 3 tasks without await
1325+
queue.add(() => {
1326+
timestamps.push(Date.now());
1327+
return 'task1';
1328+
});
1329+
queue.add(() => {
1330+
timestamps.push(Date.now());
1331+
return 'task2';
1332+
});
1333+
queue.add(() => {
1334+
timestamps.push(Date.now());
1335+
return 'task3';
1336+
});
1337+
1338+
// Add task 4 with await
1339+
await queue.add(() => {
1340+
timestamps.push(Date.now());
1341+
return 'task4';
1342+
});
1343+
1344+
// Add task 5 with await - this should still respect interval
1345+
await queue.add(() => {
1346+
timestamps.push(Date.now());
1347+
return 'task5';
1348+
});
1349+
1350+
// Add task 6 with await
1351+
await queue.add(() => {
1352+
timestamps.push(Date.now());
1353+
return 'task6';
1354+
});
1355+
1356+
// Check intervals between tasks
1357+
for (let i = 1; i < timestamps.length; i++) {
1358+
const interval = timestamps[i] - timestamps[i - 1];
1359+
// Allow 10ms tolerance for timing
1360+
t.true(interval >= 90, `Interval between task ${i} and ${i + 1} was ${interval}ms, expected >= 90ms`);
1361+
}
1362+
});
1363+
1364+
test('interval maintained when queue becomes empty multiple times', async t => {
1365+
const queue = new PQueue({
1366+
intervalCap: 1,
1367+
interval: 100,
1368+
});
1369+
1370+
const timestamps: number[] = [];
1371+
1372+
// First batch
1373+
await queue.add(() => {
1374+
timestamps.push(Date.now());
1375+
return 'task1';
1376+
});
1377+
await queue.add(() => {
1378+
timestamps.push(Date.now());
1379+
return 'task2';
1380+
});
1381+
1382+
// Queue is empty, wait a bit
1383+
await delay(50);
1384+
1385+
// Second batch - should still respect interval from task 2
1386+
await queue.add(() => {
1387+
timestamps.push(Date.now());
1388+
return 'task3';
1389+
});
1390+
await queue.add(() => {
1391+
timestamps.push(Date.now());
1392+
return 'task4';
1393+
});
1394+
1395+
// Check all intervals
1396+
for (let i = 1; i < timestamps.length; i++) {
1397+
const interval = timestamps[i] - timestamps[i - 1];
1398+
t.true(interval >= 90, `Interval between task ${i} and ${i + 1} was ${interval}ms, expected >= 90ms`);
1399+
}
1400+
});
1401+
1402+
test('interval reset after long idle period', async t => {
1403+
const queue = new PQueue({
1404+
intervalCap: 1,
1405+
interval: 100,
1406+
});
1407+
1408+
const timestamps: number[] = [];
1409+
1410+
// Run first task
1411+
await queue.add(() => {
1412+
timestamps.push(Date.now());
1413+
return 'task1';
1414+
});
1415+
1416+
// Wait much longer than interval
1417+
await delay(250);
1418+
1419+
// This task should run immediately since enough time has passed
1420+
await queue.add(() => {
1421+
timestamps.push(Date.now());
1422+
return 'task2';
1423+
});
1424+
1425+
// But this one should wait for interval
1426+
await queue.add(() => {
1427+
timestamps.push(Date.now());
1428+
return 'task3';
1429+
});
1430+
1431+
const interval1to2 = timestamps[1] - timestamps[0];
1432+
const interval2to3 = timestamps[2] - timestamps[1];
1433+
1434+
t.true(interval1to2 >= 240, `Task 2 ran after ${interval1to2}ms, expected >= 240ms`);
1435+
t.true(interval2to3 >= 90, `Task 3 should respect interval: ${interval2to3}ms`);
1436+
});
1437+
1438+
test('interval with carryoverConcurrencyCount after queue empty', async t => {
1439+
const queue = new PQueue({
1440+
intervalCap: 1,
1441+
interval: 100,
1442+
carryoverConcurrencyCount: true,
1443+
});
1444+
1445+
const timestamps: number[] = [];
1446+
1447+
// Run first task
1448+
await queue.add(() => {
1449+
timestamps.push(Date.now());
1450+
return 'task1';
1451+
});
1452+
1453+
// Queue becomes empty
1454+
t.is(queue.size, 0);
1455+
t.is(queue.pending, 0);
1456+
1457+
// Add new task - should respect interval
1458+
await queue.add(() => {
1459+
timestamps.push(Date.now());
1460+
return 'task2';
1461+
});
1462+
1463+
const interval = timestamps[1] - timestamps[0];
1464+
t.true(interval >= 90, `Interval was ${interval}ms, expected >= 90ms`);
1465+
});
1466+
13161467
test('.setPriority() - execute a promise after planned', async t => {
13171468
const result: string[] = [];
13181469
const queue = new PQueue({concurrency: 1});
@@ -1453,3 +1604,24 @@ test('.setPriority() - execute a promise before planned - concurrency 3 and unsp
14531604
await queue.onIdle();
14541605
t.deepEqual(result, ['🐌', '🦆', '🐢', '🦀', '⚡️']);
14551606
});
1607+
1608+
test('process exits cleanly after interval tasks complete', async t => {
1609+
const queue = new PQueue({
1610+
concurrency: 100,
1611+
intervalCap: 500,
1612+
interval: 60 * 1000,
1613+
});
1614+
1615+
// Execute tasks that complete quickly with long interval
1616+
const tasks = [];
1617+
for (let i = 0; i < 4; i++) {
1618+
tasks.push(queue.add(() => `result-${i}`));
1619+
}
1620+
1621+
await Promise.all(tasks);
1622+
await queue.onIdle();
1623+
1624+
// Test that no timers are hanging by checking process can exit naturally
1625+
// This ensures both intervalId and timeoutId are cleared when idle
1626+
t.pass();
1627+
});

0 commit comments

Comments
 (0)