Skip to content

Commit eb83954

Browse files
authored
chore(workflow-engine-*): Align event subscribers management (#12976)
* chore(workflow-engine-*): Align event subscribers management * Create nervous-eels-build.md
1 parent aac2feb commit eb83954

File tree

3 files changed

+61
-26
lines changed

3 files changed

+61
-26
lines changed

.changeset/nervous-eels-build.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@medusajs/workflow-engine-inmemory": patch
3+
"@medusajs/workflow-engine-redis": patch
4+
---
5+
6+
chore(workflow-engine-*): Align event subscribers management

packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -521,14 +521,16 @@ export class WorkflowOrchestratorService {
521521
const subscribers = this.subscribers.get(workflowId) ?? new Map()
522522

523523
const handlerIndex = (handlers) => {
524-
return handlers.indexOf((s) => s === subscriber || s._id === subscriberId)
524+
return handlers.findIndex(
525+
(s) => s === subscriber || s._id === subscriberId
526+
)
525527
}
526528

527529
if (transactionId) {
528530
const transactionSubscribers = subscribers.get(transactionId) ?? []
529531
const subscriberIndex = handlerIndex(transactionSubscribers)
530532
if (subscriberIndex !== -1) {
531-
transactionSubscribers.slice(subscriberIndex, 1)
533+
transactionSubscribers.splice(subscriberIndex, 1)
532534
}
533535

534536
transactionSubscribers.push(subscriber)
@@ -540,7 +542,7 @@ export class WorkflowOrchestratorService {
540542
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
541543
const subscriberIndex = handlerIndex(workflowSubscribers)
542544
if (subscriberIndex !== -1) {
543-
workflowSubscribers.slice(subscriberIndex, 1)
545+
workflowSubscribers.splice(subscriberIndex, 1)
544546
}
545547

546548
workflowSubscribers.push(subscriber)
@@ -568,14 +570,22 @@ export class WorkflowOrchestratorService {
568570
const newTransactionSubscribers = filterSubscribers(
569571
transactionSubscribers
570572
)
571-
subscribers.set(transactionId, newTransactionSubscribers)
573+
if (newTransactionSubscribers.length) {
574+
subscribers.set(transactionId, newTransactionSubscribers)
575+
} else {
576+
subscribers.delete(transactionId)
577+
}
572578
this.subscribers.set(workflowId, subscribers)
573579
return
574580
}
575581

576582
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
577583
const newWorkflowSubscribers = filterSubscribers(workflowSubscribers)
578-
subscribers.set(AnySubscriber, newWorkflowSubscribers)
584+
if (newWorkflowSubscribers.length) {
585+
subscribers.set(AnySubscriber, newWorkflowSubscribers)
586+
} else {
587+
subscribers.delete(AnySubscriber)
588+
}
579589
this.subscribers.set(workflowId, subscribers)
580590
}
581591

@@ -610,6 +620,10 @@ export class WorkflowOrchestratorService {
610620
if (transactionId) {
611621
const transactionSubscribers = subscribers.get(transactionId) ?? []
612622
notifySubscribers(transactionSubscribers)
623+
624+
if (options.eventType === "onFinish") {
625+
subscribers.delete(transactionId)
626+
}
613627
}
614628

615629
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
@@ -664,7 +678,6 @@ export class WorkflowOrchestratorService {
664678
notify({ eventType: "onCompensateBegin" })
665679
},
666680
onFinish: ({ transaction, result, errors }) => {
667-
// TODO: unsubscribe transaction handlers on finish
668681
customEventHandlers?.onFinish?.({ transaction, result, errors })
669682
},
670683

packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -572,14 +572,16 @@ export class WorkflowOrchestratorService {
572572
}
573573

574574
const handlerIndex = (handlers) => {
575-
return handlers.indexOf((s) => s === subscriber || s._id === subscriberId)
575+
return handlers.findIndex(
576+
(s) => s === subscriber || s._id === subscriberId
577+
)
576578
}
577579

578580
if (transactionId) {
579581
const transactionSubscribers = subscribers.get(transactionId) ?? []
580582
const subscriberIndex = handlerIndex(transactionSubscribers)
581583
if (subscriberIndex !== -1) {
582-
transactionSubscribers.slice(subscriberIndex, 1)
584+
transactionSubscribers.splice(subscriberIndex, 1)
583585
}
584586

585587
transactionSubscribers.push(subscriber)
@@ -591,7 +593,7 @@ export class WorkflowOrchestratorService {
591593
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
592594
const subscriberIndex = handlerIndex(workflowSubscribers)
593595
if (subscriberIndex !== -1) {
594-
workflowSubscribers.slice(subscriberIndex, 1)
596+
workflowSubscribers.splice(subscriberIndex, 1)
595597
}
596598

597599
workflowSubscribers.push(subscriber)
@@ -604,7 +606,10 @@ export class WorkflowOrchestratorService {
604606
transactionId,
605607
subscriberOrId,
606608
}: UnsubscribeOptions) {
607-
const subscribers = this.subscribers.get(workflowId) ?? new Map()
609+
const subscribers = this.subscribers.get(workflowId)
610+
if (!subscribers) {
611+
return
612+
}
608613

609614
const filterSubscribers = (handlers: SubscriberHandler[]) => {
610615
return handlers.filter((handler) => {
@@ -614,25 +619,36 @@ export class WorkflowOrchestratorService {
614619
})
615620
}
616621

617-
// Unsubscribe instance
618-
if (!this.subscribers.has(workflowId)) {
619-
void this.redisSubscriber.unsubscribe(this.getChannelName(workflowId))
620-
}
621-
622622
if (transactionId) {
623-
const transactionSubscribers = subscribers.get(transactionId) ?? []
624-
const newTransactionSubscribers = filterSubscribers(
625-
transactionSubscribers
626-
)
627-
subscribers.set(transactionId, newTransactionSubscribers)
628-
this.subscribers.set(workflowId, subscribers)
629-
return
623+
const transactionSubscribers = subscribers.get(transactionId)
624+
if (transactionSubscribers) {
625+
const newTransactionSubscribers = filterSubscribers(
626+
transactionSubscribers
627+
)
628+
629+
if (newTransactionSubscribers.length) {
630+
subscribers.set(transactionId, newTransactionSubscribers)
631+
} else {
632+
subscribers.delete(transactionId)
633+
}
634+
}
635+
} else {
636+
const workflowSubscribers = subscribers.get(AnySubscriber)
637+
if (workflowSubscribers) {
638+
const newWorkflowSubscribers = filterSubscribers(workflowSubscribers)
639+
640+
if (newWorkflowSubscribers.length) {
641+
subscribers.set(AnySubscriber, newWorkflowSubscribers)
642+
} else {
643+
subscribers.delete(AnySubscriber)
644+
}
645+
}
630646
}
631647

632-
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
633-
const newWorkflowSubscribers = filterSubscribers(workflowSubscribers)
634-
subscribers.set(AnySubscriber, newWorkflowSubscribers)
635-
this.subscribers.set(workflowId, subscribers)
648+
if (subscribers.size === 0) {
649+
this.subscribers.delete(workflowId)
650+
void this.redisSubscriber.unsubscribe(this.getChannelName(workflowId))
651+
}
636652
}
637653

638654
private async notify(

0 commit comments

Comments
 (0)