Skip to content

Commit 05ab2e6

Browse files
committed
feat: Adds the ability to disable strict ack checks
1 parent 786cfe5 commit 05ab2e6

File tree

12 files changed

+389
-140
lines changed

12 files changed

+389
-140
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
>
1717
> DocMQ is also a strong choice when your messaging queue needs to care about time and time zones. When you want to "send a message at 4:00 am", it matters if you mean 4am in Los Angeles or 4am in Phoenix because only one of those locations implements Daylight Savings Time. DocMQ reduces the pain associated with figuring out if one day is `86400`, `90000`, or `85800` seconds in the future.
1818
>
19+
> Finally, DocMQ is database agnostic. You can run one backend in development where you are less concerned about scale, and run a robust solution in production. A [suite of tests](./test/driver/) makes it easy to ensure your beahvior is consistent across deployments.
20+
>
1921
> **Why AVOID This** :grey_question:
2022
>
21-
> Simple. Performance. This kind of solution will never be as fast as an in-memory Redis queue or an event bus. If fast FIFO is your goal, you should consider BullMQ, Kue, Bee, Owl, and others.
23+
> Simple. Performance. This kind of solution will never be as fast as an in-memory Redis queue or an event bus. If fast FIFO is your goal, you should consider BullMQ, Kue, Bee, Owl, and others. They're all excellent libraries, and I can't recommend them enough if they fit your use case!
2224
2325
# DocMQ
2426

@@ -32,7 +34,7 @@
3234

3335
| Feature | [BullMQ](https://github.com/taskforcesh/bullmq) | [Agenda](https://github.com/agenda/agenda) | [DocMQ](https://github.com/jakobo/docmq) |
3436
| :----------------- | :---------------------------------------------: | :----------------------------------------: | :--------------------------------------: |
35-
| Backend | redis | mongo | any |
37+
| Backend | redis | mongo | **(any)[#-custom-driver-support]** |
3638
| Parent/Child || | |
3739
| Priorities ||| |
3840
| Concurrency ||||

src/driver/base.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@ export class BaseDriver<Schema = unknown, Table = unknown, TxInfo = unknown>
2020
implements Driver<Schema, Table, TxInfo>
2121
{
2222
events: DriverEmitter;
23-
private conn: unknown;
2423
private schema: string;
2524
private table: string;
2625
private init: Promise<boolean>;
26+
private strict: boolean;
2727
constructor(connection: unknown, options?: DriverOptions) {
28-
this.conn = connection;
2928
this.events = new EventEmitter() as DriverEmitter;
3029
this.schema = options?.schema ?? "docmq";
3130
this.table = options?.table ?? "jobs";
3231
this.init = this.initialize(connection);
32+
this.strict = options?.strict === false ? false : true;
3333
}
3434

3535
/** Initialize and connect to the driver. Operation should be treated as idempoetent */
@@ -44,6 +44,10 @@ export class BaseDriver<Schema = unknown, Table = unknown, TxInfo = unknown>
4444
return true;
4545
}
4646

47+
protected isStrict(): boolean {
48+
return this.strict;
49+
}
50+
4751
/** Gets the schema object or name */
4852
async getSchema(): Promise<Schema> {
4953
await asynced();

src/driver/loki.ts

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,12 @@ export class LokiDriver extends BaseDriver<
239239
.data()?.[0];
240240

241241
if (!next) {
242-
throw new DriverNoMatchingAckError(ack);
242+
const err = new DriverNoMatchingAckError(ack);
243+
if (this.isStrict()) {
244+
throw err;
245+
} else {
246+
this.events.emit("warn", err);
247+
}
243248
}
244249
}
245250

@@ -272,7 +277,12 @@ export class LokiDriver extends BaseDriver<
272277
.data()?.[0];
273278

274279
if (!next) {
275-
throw new DriverNoMatchingAckError(ack);
280+
const err = new DriverNoMatchingAckError(ack);
281+
if (this.isStrict()) {
282+
throw err;
283+
} else {
284+
this.events.emit("warn", err);
285+
}
276286
}
277287
}
278288

@@ -314,7 +324,12 @@ export class LokiDriver extends BaseDriver<
314324
.data()?.[0];
315325

316326
if (!next) {
317-
throw new DriverNoMatchingAckError(ackVal);
327+
const err = new DriverNoMatchingAckError(ackVal);
328+
if (this.isStrict()) {
329+
throw err;
330+
} else {
331+
this.events.emit("warn", err);
332+
}
318333
}
319334
}
320335

@@ -345,7 +360,12 @@ export class LokiDriver extends BaseDriver<
345360
.data()?.[0];
346361

347362
if (!next) {
348-
throw new DriverNoMatchingAckError(ack);
363+
const err = new DriverNoMatchingAckError(ack);
364+
if (this.isStrict()) {
365+
throw err;
366+
} else {
367+
this.events.emit("warn", err);
368+
}
349369
}
350370
}
351371

@@ -374,7 +394,12 @@ export class LokiDriver extends BaseDriver<
374394
.data()?.[0];
375395

376396
if (!next) {
377-
throw new DriverNoMatchingRefError(ref);
397+
const err = new DriverNoMatchingRefError(ref);
398+
if (this.isStrict()) {
399+
throw err;
400+
} else {
401+
this.events.emit("warn", err);
402+
}
378403
}
379404
}
380405

@@ -403,7 +428,12 @@ export class LokiDriver extends BaseDriver<
403428
.data()?.[0];
404429

405430
if (!next) {
406-
throw new DriverNoMatchingRefError(ref);
431+
const err = new DriverNoMatchingRefError(ref);
432+
if (this.isStrict()) {
433+
throw err;
434+
} else {
435+
this.events.emit("warn", err);
436+
}
407437
}
408438
}
409439

@@ -434,7 +464,12 @@ export class LokiDriver extends BaseDriver<
434464
.data()?.[0];
435465

436466
if (!last) {
437-
throw new DriverNoMatchingRefError(ref);
467+
const err = new DriverNoMatchingRefError(ref);
468+
if (this.isStrict()) {
469+
throw err;
470+
} else {
471+
this.events.emit("warn", err);
472+
}
438473
}
439474

440475
const next: LokiDoc = JSON.parse(JSON.stringify(last));

src/driver/mongo.ts

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -409,10 +409,13 @@ export class MongoDriver extends BaseDriver<Db, Collection<QueueDoc>, MDBTxn> {
409409
}
410410
);
411411

412-
// a missing update is a warning. It may represent a problem
413-
// or it may be the item was updated by another request
414412
if (!next.value) {
415-
this.events.emit("warn", new DriverNoMatchingAckError(ack));
413+
const err = new DriverNoMatchingAckError(ack);
414+
if (this.isStrict()) {
415+
throw err;
416+
} else {
417+
this.events.emit("warn", err);
418+
}
416419
}
417420
}
418421

@@ -455,10 +458,13 @@ export class MongoDriver extends BaseDriver<Db, Collection<QueueDoc>, MDBTxn> {
455458
}
456459
);
457460

458-
// a missing update is a warning. It may represent a problem
459-
// or it may be the item was updated by another request
460461
if (!next.value) {
461-
this.events.emit("warn", new DriverNoMatchingAckError(ack));
462+
const err = new DriverNoMatchingAckError(ack);
463+
if (this.isStrict()) {
464+
throw err;
465+
} else {
466+
this.events.emit("warn", err);
467+
}
462468
}
463469
}
464470

@@ -503,7 +509,12 @@ export class MongoDriver extends BaseDriver<Db, Collection<QueueDoc>, MDBTxn> {
503509
);
504510

505511
if (next.matchedCount < 1) {
506-
throw new DriverNoMatchingAckError(ackVal);
512+
const err = new DriverNoMatchingAckError(ackVal);
513+
if (this.isStrict()) {
514+
throw err;
515+
} else {
516+
this.events.emit("warn", err);
517+
}
507518
}
508519
}
509520

@@ -538,7 +549,12 @@ export class MongoDriver extends BaseDriver<Db, Collection<QueueDoc>, MDBTxn> {
538549
);
539550

540551
if (!next.value) {
541-
throw new DriverNoMatchingAckError(ack);
552+
const err = new DriverNoMatchingAckError(ack);
553+
if (this.isStrict()) {
554+
throw err;
555+
} else {
556+
this.events.emit("warn", err);
557+
}
542558
}
543559
}
544560

@@ -569,7 +585,12 @@ export class MongoDriver extends BaseDriver<Db, Collection<QueueDoc>, MDBTxn> {
569585
);
570586

571587
if (!next.value) {
572-
throw new DriverNoMatchingRefError(ref);
588+
const err = new DriverNoMatchingRefError(ref);
589+
if (this.isStrict()) {
590+
throw err;
591+
} else {
592+
this.events.emit("warn", err);
593+
}
573594
}
574595
}
575596

@@ -604,7 +625,12 @@ export class MongoDriver extends BaseDriver<Db, Collection<QueueDoc>, MDBTxn> {
604625
);
605626

606627
if (!next.value) {
607-
throw new DriverNoMatchingRefError(ref);
628+
const err = new DriverNoMatchingRefError(ref);
629+
if (this.isStrict()) {
630+
throw err;
631+
} else {
632+
this.events.emit("warn", err);
633+
}
608634
}
609635
}
610636

src/driver/postgres.ts

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -572,15 +572,18 @@ export class PgDriver extends BaseDriver<never, never, PGTransaction> {
572572
const v = QUERIES.ack.variables({
573573
ack,
574574
});
575-
576575
try {
577576
const results = await client.query<QueueRow>(q, v);
578577
if (results.rowCount < 1) {
579578
throw new DriverNoMatchingAckError(ack);
580579
}
581580
} catch (e) {
582581
if (e instanceof DriverNoMatchingAckError) {
583-
throw e; // rethrow as immediate error
582+
if (this.isStrict()) {
583+
throw e;
584+
} else {
585+
this.events.emit("warn", e);
586+
}
584587
}
585588
const err = new DriverError(
586589
"Encountered an error running a postgres query: " +
@@ -626,7 +629,11 @@ export class PgDriver extends BaseDriver<never, never, PGTransaction> {
626629
}
627630
} catch (e) {
628631
if (e instanceof DriverNoMatchingAckError) {
629-
throw e; // rethrow as immediate error
632+
if (this.isStrict()) {
633+
throw e;
634+
} else {
635+
this.events.emit("warn", e);
636+
}
630637
}
631638
const err = new DriverError(
632639
"Encountered an error running a postgres query: " +
@@ -674,7 +681,11 @@ export class PgDriver extends BaseDriver<never, never, PGTransaction> {
674681
}
675682
} catch (e) {
676683
if (e instanceof DriverNoMatchingAckError) {
677-
throw e; // rethrow as immediate error
684+
if (this.isStrict()) {
685+
throw e;
686+
} else {
687+
this.events.emit("warn", e);
688+
}
678689
}
679690
const err = new DriverError(
680691
"Encountered an error running a postgres query: " +
@@ -714,7 +725,11 @@ export class PgDriver extends BaseDriver<never, never, PGTransaction> {
714725
}
715726
} catch (e) {
716727
if (e instanceof DriverNoMatchingAckError) {
717-
throw e; // rethrow as immediate error
728+
if (this.isStrict()) {
729+
throw e;
730+
} else {
731+
this.events.emit("warn", e);
732+
}
718733
}
719734
const err = new DriverError(
720735
"Encountered an error running a postgres query: " +
@@ -749,7 +764,11 @@ export class PgDriver extends BaseDriver<never, never, PGTransaction> {
749764
}
750765
} catch (e) {
751766
if (e instanceof DriverNoMatchingRefError) {
752-
throw e; // rethrow as immediate error
767+
if (this.isStrict()) {
768+
throw e;
769+
} else {
770+
this.events.emit("warn", e);
771+
}
753772
}
754773
const err = new DriverError(
755774
"Encountered an error running a postgres query: " +
@@ -786,7 +805,11 @@ export class PgDriver extends BaseDriver<never, never, PGTransaction> {
786805
}
787806
} catch (e) {
788807
if (e instanceof DriverNoMatchingRefError) {
789-
throw e; // rethrow as immediate error
808+
if (this.isStrict()) {
809+
throw e;
810+
} else {
811+
this.events.emit("warn", e);
812+
}
790813
}
791814
const err = new DriverError(
792815
"Encountered an error running a postgres query: " +

src/queue.ts

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -123,24 +123,34 @@ export class Queue<
123123
this.stats = resetStats();
124124

125125
// emit driver errors & warnings externally
126-
this.driver.events.on("error", (e) => {
127-
this.events.emit("error", e);
128-
});
129-
this.driver.events.on("warn", (e) => {
130-
this.events.emit("warn", e);
131-
});
132-
this.driver.events.on("halt", (e) => {
133-
this.driver.destroy();
134-
this.destroy();
135-
this.events.emit("error", e);
136-
this.events.emit("halt", e);
137-
});
138-
this.driver.events.on("reconnect", () => {
139-
this.events.emit(
140-
"log",
141-
"Driver disconnected, but reconnected successfully"
142-
);
143-
});
126+
// register when the driver is ready
127+
driver
128+
.ready()
129+
.then(() => {
130+
this.driver.events.on("error", (e) => {
131+
this.events.emit("error", e);
132+
});
133+
this.driver.events.on("warn", (e) => {
134+
this.events.emit("warn", e);
135+
});
136+
this.driver.events.on("halt", (e) => {
137+
this.driver.destroy();
138+
this.destroy();
139+
this.events.emit("error", e);
140+
this.events.emit("halt", e);
141+
});
142+
this.driver.events.on("reconnect", () => {
143+
this.events.emit(
144+
"log",
145+
"Driver disconnected, but reconnected successfully"
146+
);
147+
});
148+
})
149+
.catch((e) => {
150+
const err = new DocMQError("Unable to initialize events for driver");
151+
err.original = e;
152+
this.events.emit("error", err);
153+
});
144154
}
145155

146156
/** A function that returns a promise resolving once all init dependenices are resolved */

src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,12 @@ export interface DriverOptions {
278278
schema?: string;
279279
/** Specifies the DB table or Document DB Collection to use */
280280
table?: string;
281+
/**
282+
* Disable strict-checks (default `true`).
283+
* When strict mode is disabled, mismatched ack values or calling ack()/fail()/dead() on
284+
* expired objects generates a warning instead of throwing an error
285+
*/
286+
strict?: boolean;
281287
}
282288

283289
/** Describes a DB Driver for DocMQ */

0 commit comments

Comments
 (0)