Skip to content

Commit 7fea658

Browse files
committed
Fix interval cap race condition with high concurrency
Fixes #126
1 parent 7b3f53e commit 7fea658

File tree

2 files changed

+54
-10
lines changed

2 files changed

+54
-10
lines changed

source/index.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
185185
if (this.#doesIntervalAllowAnother && this.#doesConcurrentAllowAnother) {
186186
const job = this.#queue.dequeue()!;
187187

188+
// Increment interval count immediately to prevent race conditions
189+
if (!this.#isIntervalIgnored) {
190+
this.#intervalCount++;
191+
}
192+
188193
this.emit('active');
189194
this.#lastExecutionTime = Date.now();
190195
job();
@@ -314,8 +319,18 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
314319
this.#pending++;
315320

316321
try {
317-
options.signal?.throwIfAborted();
318-
this.#intervalCount++;
322+
// Check abort signal - if aborted, need to decrement the counter
323+
// that was incremented in tryToStartAnother
324+
try {
325+
options.signal?.throwIfAborted();
326+
} catch (error) {
327+
// Decrement the counter that was already incremented
328+
if (!this.#isIntervalIgnored) {
329+
this.#intervalCount--;
330+
}
331+
332+
throw error;
333+
}
319334

320335
let operation = function_({signal: options.signal});
321336

test/test.ts

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1354,10 +1354,10 @@ test('interval should be maintained when using await between adds (issue #182)',
13541354
});
13551355

13561356
// Check intervals between tasks
1357-
for (let i = 1; i < timestamps.length; i++) {
1358-
const interval = timestamps[i] - timestamps[i - 1];
1357+
for (let index = 1; index < timestamps.length; index++) {
1358+
const interval = timestamps[index] - timestamps[index - 1];
13591359
// Allow 10ms tolerance for timing
1360-
t.true(interval >= 90, `Interval between task ${i} and ${i + 1} was ${interval}ms, expected >= 90ms`);
1360+
t.true(interval >= 90, `Interval between task ${index} and ${index + 1} was ${interval}ms, expected >= 90ms`);
13611361
}
13621362
});
13631363

@@ -1393,9 +1393,9 @@ test('interval maintained when queue becomes empty multiple times', async t => {
13931393
});
13941394

13951395
// Check all intervals
1396-
for (let i = 1; i < timestamps.length; i++) {
1397-
const interval = timestamps[i] - timestamps[i - 1];
1398-
t.true(interval >= 90, `Interval between task ${i} and ${i + 1} was ${interval}ms, expected >= 90ms`);
1396+
for (let index = 1; index < timestamps.length; index++) {
1397+
const interval = timestamps[index] - timestamps[index - 1];
1398+
t.true(interval >= 90, `Interval between task ${index} and ${index + 1} was ${interval}ms, expected >= 90ms`);
13991399
}
14001400
});
14011401

@@ -1614,8 +1614,8 @@ test('process exits cleanly after interval tasks complete', async t => {
16141614

16151615
// Execute tasks that complete quickly with long interval
16161616
const tasks = [];
1617-
for (let i = 0; i < 4; i++) {
1618-
tasks.push(queue.add(() => `result-${i}`));
1617+
for (let index = 0; index < 4; index++) {
1618+
tasks.push(queue.add(() => `result-${index}`));
16191619
}
16201620

16211621
await Promise.all(tasks);
@@ -1625,3 +1625,32 @@ test('process exits cleanly after interval tasks complete', async t => {
16251625
// This ensures both intervalId and timeoutId are cleared when idle
16261626
t.pass();
16271627
});
1628+
1629+
test('intervalCap should be respected with high concurrency (issue #126)', async t => {
1630+
const queue = new PQueue({
1631+
concurrency: 5000,
1632+
intervalCap: 1000,
1633+
interval: 1000,
1634+
carryoverConcurrencyCount: true,
1635+
});
1636+
1637+
const results: number[] = [];
1638+
const startTime = Date.now();
1639+
1640+
// Add 5000 tasks that complete immediately
1641+
const promises = [];
1642+
for (let index = 0; index < 5000; index++) {
1643+
promises.push(queue.add(async () => {
1644+
results.push(Date.now() - startTime);
1645+
}));
1646+
}
1647+
1648+
await Promise.all(promises);
1649+
1650+
// Check that no more than intervalCap tasks started in the first interval
1651+
const firstInterval = results.filter(timestamp => timestamp < 1000);
1652+
t.true(firstInterval.length <= 1000, `Expected ≤1000 tasks in first interval, got ${firstInterval.length}`);
1653+
1654+
// Check that tasks actually completed (basic sanity check)
1655+
t.is(results.length, 5000, 'All tasks should complete');
1656+
});

0 commit comments

Comments
 (0)