Skip to content

Commit 7c27e1d

Browse files
committed
Add onError() method
Fixes #29
1 parent 69d25d5 commit 7c27e1d

File tree

3 files changed

+193
-0
lines changed

3 files changed

+193
-0
lines changed

readme.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,36 @@ await queue.onRateLimitCleared();
288288
console.log('Rate limit cleared - can add more tasks');
289289
```
290290

291+
#### .onError()
292+
293+
Returns a promise that rejects when any task in the queue errors.
294+
295+
Use with `Promise.race([queue.onError(), queue.onIdle()])` to fail fast on the first error while still resolving normally when the queue goes idle.
296+
297+
> [!IMPORTANT]
298+
> The promise returned by `add()` still rejects. You must handle each `add()` promise (for example, `.catch(() => {})`) to avoid unhandled rejections.
299+
300+
```js
301+
import PQueue from 'p-queue';
302+
303+
const queue = new PQueue({concurrency: 2});
304+
305+
queue.add(() => fetchData(1)).catch(() => {});
306+
queue.add(() => fetchData(2)).catch(() => {});
307+
queue.add(() => fetchData(3)).catch(() => {});
308+
309+
// Stop processing on first error
310+
try {
311+
await Promise.race([
312+
queue.onError(),
313+
queue.onIdle()
314+
]);
315+
} catch (error) {
316+
queue.pause(); // Stop processing remaining tasks
317+
console.error('Queue failed:', error);
318+
}
319+
```
320+
291321
#### .onSizeLessThan(limit)
292322

293323
Returns a promise that settles when the queue size is less than the given limit: `queue.size < limit`.

source/index.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,47 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
547547
await this.#onEvent('rateLimitCleared');
548548
}
549549

550+
/**
551+
@returns A promise that rejects when any task in the queue errors.
552+
553+
Use with `Promise.race([queue.onError(), queue.onIdle()])` to fail fast on the first error while still resolving normally when the queue goes idle.
554+
555+
Important: The promise returned by `add()` still rejects. You must handle each `add()` promise (for example, `.catch(() => {})`) to avoid unhandled rejections.
556+
557+
@example
558+
```
559+
import PQueue from 'p-queue';
560+
561+
const queue = new PQueue({concurrency: 2});
562+
563+
queue.add(() => fetchData(1)).catch(() => {});
564+
queue.add(() => fetchData(2)).catch(() => {});
565+
queue.add(() => fetchData(3)).catch(() => {});
566+
567+
// Stop processing on first error
568+
try {
569+
await Promise.race([
570+
queue.onError(),
571+
queue.onIdle()
572+
]);
573+
} catch (error) {
574+
queue.pause(); // Stop processing remaining tasks
575+
console.error('Queue failed:', error);
576+
}
577+
```
578+
*/
579+
// eslint-disable-next-line @typescript-eslint/promise-function-async
580+
async onError(): Promise<never> {
581+
return new Promise<never>((_resolve, reject) => {
582+
const handleError = (error: unknown) => {
583+
this.off('error', handleError);
584+
reject(error);
585+
};
586+
587+
this.on('error', handleError);
588+
});
589+
}
590+
550591
async #onEvent(event: EventName, filter?: () => boolean): Promise<void> {
551592
return new Promise(resolve => {
552593
const listener = () => {

test/basic.ts

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,3 +1100,125 @@ test('pause should work when throttled', async () => {
11001100

11011101
await delay(2500);
11021102
});
1103+
1104+
test('.onError() - rejects when task errors', async () => {
1105+
const queue = new PQueue({concurrency: 1});
1106+
1107+
queue.add(async () => delay(50));
1108+
1109+
// Set up onError listener first
1110+
const errorPromise = queue.onError();
1111+
1112+
// Add task that will fail
1113+
const task = queue.add(async () => {
1114+
throw new Error('Task failed');
1115+
});
1116+
1117+
await assert.rejects(
1118+
errorPromise,
1119+
{message: 'Task failed'},
1120+
);
1121+
1122+
// Clean up unhandled rejection
1123+
await Promise.allSettled([task]);
1124+
});
1125+
1126+
test('.onError() - captures first error in queue', async () => {
1127+
const queue = new PQueue({concurrency: 2});
1128+
1129+
// Set up onError listener first
1130+
const errorPromise = queue.onError();
1131+
1132+
// Add tasks
1133+
const task1 = queue.add(async () => {
1134+
await delay(100);
1135+
throw new Error('First error');
1136+
});
1137+
1138+
const task2 = queue.add(async () => {
1139+
await delay(200);
1140+
throw new Error('Second error');
1141+
});
1142+
1143+
// Set up cleanup
1144+
const cleanup = Promise.allSettled([task1, task2]);
1145+
1146+
// Wait for onError to reject with first error
1147+
await assert.rejects(
1148+
errorPromise,
1149+
{message: 'First error'},
1150+
'Should reject with first error',
1151+
);
1152+
1153+
// Ensure all tasks completed
1154+
await cleanup;
1155+
});
1156+
1157+
test('.onError() - works with Promise.race pattern', async () => {
1158+
const queue = new PQueue({concurrency: 2});
1159+
1160+
queue.add(async () => delay(50));
1161+
queue.add(async () => delay(100));
1162+
1163+
const racePromise = Promise.race([
1164+
queue.onError(),
1165+
queue.onIdle(),
1166+
]);
1167+
1168+
const task = queue.add(async () => {
1169+
throw new Error('Failed task');
1170+
});
1171+
queue.add(async () => delay(150));
1172+
1173+
// Stop processing on first error
1174+
try {
1175+
await racePromise;
1176+
assert.fail('Should have thrown error');
1177+
} catch (error) {
1178+
assert.equal((error as Error).message, 'Failed task');
1179+
queue.pause(); // Stop processing remaining tasks
1180+
}
1181+
1182+
// Clean up unhandled rejection
1183+
await Promise.allSettled([task]);
1184+
1185+
assert.equal(queue.isPaused, true);
1186+
});
1187+
1188+
test('.onError() - multiple listeners', async () => {
1189+
const queue = new PQueue({concurrency: 1});
1190+
1191+
queue.add(async () => delay(50));
1192+
1193+
const error1 = queue.onError();
1194+
const error2 = queue.onError();
1195+
1196+
const task = queue.add(async () => {
1197+
throw new Error('Task error');
1198+
});
1199+
1200+
await assert.rejects(error1, {message: 'Task error'});
1201+
await assert.rejects(error2, {message: 'Task error'});
1202+
1203+
// Clean up unhandled rejection
1204+
await Promise.allSettled([task]);
1205+
});
1206+
1207+
test('.onError() - works when called before adding tasks', async () => {
1208+
const queue = new PQueue({concurrency: 1});
1209+
1210+
// Call onError() before adding any tasks
1211+
const errorPromise = queue.onError();
1212+
1213+
// Add a task that errors
1214+
const task = queue.add(async () => {
1215+
throw new Error('Early error');
1216+
});
1217+
1218+
await Promise.allSettled([task]);
1219+
1220+
await assert.rejects(
1221+
errorPromise,
1222+
{message: 'Early error'},
1223+
);
1224+
});

0 commit comments

Comments
 (0)