Skip to content

Commit fad8ee4

Browse files
committed
Add .runningTasks and .isSaturated properties
Fixes #110
1 parent 84edc56 commit fad8ee4

File tree

6 files changed

+934
-427
lines changed

6 files changed

+934
-427
lines changed

readme.md

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,68 @@ Whether the queue is currently paused.
367367

368368
Whether the queue is currently rate-limited due to `intervalCap`. Returns `true` when the number of tasks executed in the current interval has reached the `intervalCap` and there are still tasks waiting to be processed.
369369

370+
#### .isSaturated
371+
372+
Whether the queue is saturated. Returns `true` when:
373+
- All concurrency slots are occupied and tasks are waiting, OR
374+
- The queue is rate-limited and tasks are waiting
375+
376+
Useful for detecting backpressure and potential hanging tasks.
377+
378+
```js
379+
import PQueue from 'p-queue';
380+
381+
const queue = new PQueue({concurrency: 2});
382+
383+
// Backpressure handling
384+
if (queue.isSaturated) {
385+
console.log('Queue is saturated, waiting for capacity...');
386+
await queue.onSizeLessThan(queue.concurrency);
387+
}
388+
389+
// Monitoring for stuck tasks
390+
setInterval(() => {
391+
if (queue.isSaturated) {
392+
console.warn(`Queue saturated: ${queue.pending} running, ${queue.size} waiting`);
393+
}
394+
}, 60000);
395+
```
396+
397+
#### .runningTasks
398+
399+
The tasks currently being executed. Each task includes its `id`, `priority`, `startTime`, and `timeout` (if set).
400+
401+
Returns an array of task info objects.
402+
403+
```js
404+
import PQueue from 'p-queue';
405+
406+
const queue = new PQueue({concurrency: 2});
407+
408+
// Add tasks with IDs for better debugging
409+
queue.add(() => fetchUser(123), {id: 'user-123'});
410+
queue.add(() => fetchPosts(456), {id: 'posts-456', priority: 1});
411+
412+
// Check what's running
413+
console.log(queue.runningTasks);
414+
/*
415+
[
416+
{
417+
id: 'user-123',
418+
priority: 0,
419+
startTime: 1759253001716,
420+
timeout: undefined
421+
},
422+
{
423+
id: 'posts-456',
424+
priority: 1,
425+
startTime: 1759253001916,
426+
timeout: undefined
427+
}
428+
]
429+
*/
430+
```
431+
370432
## Events
371433

372434
#### active
@@ -739,6 +801,77 @@ for await (const result of pMapIterable(
739801
}
740802
```
741803

804+
#### How do I debug a queue that stops processing tasks?
805+
806+
If your queue stops processing tasks after extended use, it's likely that some tasks are hanging indefinitely, exhausting the concurrency limit. Use the `.runningTasks` property to identify which specific tasks are stuck.
807+
808+
Common causes:
809+
- Network requests without timeouts
810+
- Database queries that hang
811+
- File operations on unresponsive network drives
812+
- Unhandled promise rejections
813+
814+
Debugging steps:
815+
816+
```js
817+
// 1. Add timeouts to prevent hanging
818+
const queue = new PQueue({
819+
concurrency: 2,
820+
timeout: 30000 // 30 seconds
821+
});
822+
823+
// 2. Always add IDs to tasks for debugging
824+
queue.add(() => processItem(item), {id: `item-${item.id}`});
825+
826+
// 3. Monitor for stuck tasks using runningTasks
827+
setInterval(() => {
828+
const now = Date.now();
829+
const stuckTasks = queue.runningTasks.filter(task =>
830+
now - task.startTime > 30000 // Running for over 30 seconds
831+
);
832+
833+
if (stuckTasks.length > 0) {
834+
console.error('Stuck tasks:', stuckTasks);
835+
// Consider aborting or logging more details
836+
}
837+
838+
// Detect saturation (potential hanging if persistent)
839+
if (queue.isSaturated) {
840+
console.warn(`Queue saturated: ${queue.pending} running, ${queue.size} waiting`);
841+
}
842+
}, 60000);
843+
844+
// 4. Track task lifecycle
845+
queue.on('completed', result => {
846+
console.log('Task completed');
847+
});
848+
queue.on('error', error => {
849+
console.error('Task failed:', error);
850+
});
851+
852+
// 5. Wrap tasks with debugging
853+
const debugTask = async (fn, name) => {
854+
const start = Date.now();
855+
console.log(`Starting: ${name}`);
856+
try {
857+
const result = await fn();
858+
console.log(`Completed: ${name} (${Date.now() - start}ms)`);
859+
return result;
860+
} catch (error) {
861+
console.error(`Failed: ${name} (${Date.now() - start}ms)`, error);
862+
throw error;
863+
}
864+
};
865+
866+
queue.add(() => debugTask(() => fetchData(), 'fetchData'), {id: 'fetchData'});
867+
```
868+
869+
Prevention:
870+
- Always use timeouts for I/O operations
871+
- Ensure all async functions properly resolve or reject
872+
- Use the `timeout` option to enforce task time limits
873+
- Monitor `queue.size` and `queue.pending` in production
874+
742875
#### How do I test code that uses `p-queue` with Jest fake timers?
743876

744877
Jest fake timers don't work well with `p-queue` because it uses `queueMicrotask` internally.

source/index.ts

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
4949
// Use to assign a unique identifier to a promise function, if not explicitly specified
5050
#idAssigner = 1n;
5151

52+
// Track currently running tasks for debugging
53+
readonly #runningTasks = new Map<symbol, {
54+
id?: string;
55+
priority: number;
56+
startTime: number;
57+
timeout?: number;
58+
}>();
59+
5260
/**
5361
Per-operation timeout in milliseconds. Operations will throw a `TimeoutError` if they don't complete within the specified time.
5462
@@ -335,9 +343,20 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
335343
};
336344

337345
return new Promise((resolve, reject) => {
346+
// Create a unique symbol for tracking this task
347+
const taskSymbol = Symbol(`task-${options.id}`);
348+
338349
this.#queue.enqueue(async () => {
339350
this.#pending++;
340351

352+
// Track this running task
353+
this.#runningTasks.set(taskSymbol, {
354+
id: options.id,
355+
priority: options.priority ?? 0, // Match priority-queue default
356+
startTime: Date.now(),
357+
timeout: options.timeout,
358+
});
359+
341360
try {
342361
// Check abort signal - if aborted, need to decrement the counter
343362
// that was incremented in tryToStartAnother
@@ -349,13 +368,19 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
349368
this.#intervalCount--;
350369
}
351370

371+
// Clean up tracking before throwing
372+
this.#runningTasks.delete(taskSymbol);
373+
352374
throw error;
353375
}
354376

355377
let operation = function_({signal: options.signal});
356378

357379
if (options.timeout) {
358-
operation = pTimeout(Promise.resolve(operation), {milliseconds: options.timeout});
380+
operation = pTimeout(Promise.resolve(operation), {
381+
milliseconds: options.timeout,
382+
message: `Task timed out after ${options.timeout}ms (queue has ${this.#pending} running, ${this.#queue.size} waiting)`,
383+
});
359384
}
360385

361386
if (options.signal) {
@@ -369,6 +394,9 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
369394
reject(error);
370395
this.emit('error', error);
371396
} finally {
397+
// Remove from running tasks
398+
this.#runningTasks.delete(taskSymbol);
399+
372400
// Use queueMicrotask to prevent deep recursion while maintaining timing
373401
queueMicrotask(() => {
374402
this.#next();
@@ -424,6 +452,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
424452
*/
425453
clear(): void {
426454
this.#queue = new this.#queueClass();
455+
// Note: We don't clear #runningTasks as those tasks are still running
456+
// They will be removed when they complete in the finally block
427457
// Force synchronous update since clear() should have immediate effect
428458
this.#updateRateLimitState();
429459
}
@@ -603,6 +633,76 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
603633
get isRateLimited(): boolean {
604634
return this.#rateLimitedInInterval;
605635
}
636+
637+
/**
638+
Whether the queue is saturated. Returns `true` when:
639+
- All concurrency slots are occupied and tasks are waiting, OR
640+
- The queue is rate-limited and tasks are waiting
641+
642+
Useful for detecting backpressure and potential hanging tasks.
643+
644+
```js
645+
import PQueue from 'p-queue';
646+
647+
const queue = new PQueue({concurrency: 2});
648+
649+
// Backpressure handling
650+
if (queue.isSaturated) {
651+
console.log('Queue is saturated, waiting for capacity...');
652+
await queue.onSizeLessThan(queue.concurrency);
653+
}
654+
655+
// Monitoring for stuck tasks
656+
setInterval(() => {
657+
if (queue.isSaturated) {
658+
console.warn(`Queue saturated: ${queue.pending} running, ${queue.size} waiting`);
659+
}
660+
}, 60000);
661+
```
662+
*/
663+
get isSaturated(): boolean {
664+
return (this.#pending === this.#concurrency && this.#queue.size > 0)
665+
|| (this.isRateLimited && this.#queue.size > 0);
666+
}
667+
668+
/**
669+
The tasks currently being executed. Each task includes its `id`, `priority`, `startTime`, and `timeout` (if set).
670+
671+
Returns an array of task info objects.
672+
673+
```js
674+
import PQueue from 'p-queue';
675+
676+
const queue = new PQueue({concurrency: 2});
677+
678+
// Add tasks with IDs for better debugging
679+
queue.add(() => fetchUser(123), {id: 'user-123'});
680+
queue.add(() => fetchPosts(456), {id: 'posts-456', priority: 1});
681+
682+
// Check what's running
683+
console.log(queue.runningTasks);
684+
// => [{
685+
// id: 'user-123',
686+
// priority: 0,
687+
// startTime: 1759253001716,
688+
// timeout: undefined
689+
// }, {
690+
// id: 'posts-456',
691+
// priority: 1,
692+
// startTime: 1759253001916,
693+
// timeout: undefined
694+
// }]
695+
```
696+
*/
697+
get runningTasks(): ReadonlyArray<{
698+
readonly id?: string;
699+
readonly priority: number;
700+
readonly startTime: number;
701+
readonly timeout?: number;
702+
}> {
703+
// Return fresh array with fresh objects to prevent mutations
704+
return [...this.#runningTasks.values()].map(task => ({...task}));
705+
}
606706
}
607707

608708
export type {Queue} from './queue.js';

0 commit comments

Comments
 (0)