Skip to content

Commit 06cad01

Browse files
committed
Add extension in front of workflow binding for manually disposing engine stubs
1 parent 0de7511 commit 06cad01

File tree

8 files changed

+144
-21
lines changed

8 files changed

+144
-21
lines changed

.changeset/tricky-squids-wait.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@cloudflare/vitest-pool-workers": minor
3+
"@cloudflare/workflows-shared": minor
4+
"miniflare": minor
5+
---
6+
7+
migrate workflow to use a wrapped binding

fixtures/workflow/tests/index.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ describe("Workflows", () => {
4848
await expect(fetchJson(`http://${ip}:${port}/create?workflowName=test`))
4949
.resolves.toMatchInlineSnapshot(`
5050
{
51-
"__LOCAL_DEV_STEP_OUTPUTS": [],
51+
"__LOCAL_DEV_STEP_OUTPUTS": [
52+
{
53+
"output": "First step result",
54+
},
55+
],
5256
"output": null,
5357
"status": "running",
5458
}

packages/miniflare/src/plugins/workflows/index.ts

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import fs from "fs/promises";
22
import SCRIPT_WORKFLOWS_BINDING from "worker:workflows/binding";
3+
import SCRIPT_WORKFLOWS_WRAPPED_BINDING from "worker:workflows/wrapped-binding";
34
import { z } from "zod";
45
import { Service } from "../../runtime";
56
import { getUserServiceName } from "../core";
@@ -43,13 +44,21 @@ export const WORKFLOWS_PLUGIN: Plugin<
4344
return Object.entries(options.workflows ?? {}).map(
4445
([bindingName, workflow]) => ({
4546
name: bindingName,
46-
service: {
47-
name: getUserBindingServiceName(
48-
WORKFLOWS_PLUGIN_NAME,
49-
workflow.name,
50-
workflow.remoteProxyConnectionString
51-
),
52-
entrypoint: "WorkflowBinding",
47+
wrapped: {
48+
moduleName: `${WORKFLOWS_PLUGIN_NAME}:local-wrapped-binding`,
49+
innerBindings: [
50+
{
51+
name: "binding",
52+
service: {
53+
name: getUserBindingServiceName(
54+
WORKFLOWS_PLUGIN_NAME,
55+
workflow.name,
56+
workflow.remoteProxyConnectionString
57+
),
58+
entrypoint: "WorkflowBinding",
59+
},
60+
},
61+
],
5362
},
5463
})
5564
);
@@ -64,6 +73,20 @@ export const WORKFLOWS_PLUGIN: Plugin<
6473
);
6574
},
6675

76+
getExtensions({}) {
77+
return [
78+
{
79+
modules: [
80+
{
81+
name: `${WORKFLOWS_PLUGIN_NAME}:local-wrapped-binding`,
82+
esModule: SCRIPT_WORKFLOWS_WRAPPED_BINDING(),
83+
internal: true,
84+
},
85+
],
86+
},
87+
];
88+
},
89+
6790
async getServices({ options, sharedOptions, tmpPath, defaultPersistRoot }) {
6891
const persistPath = getPersistPath(
6992
WORKFLOWS_PLUGIN_NAME,
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { WorkflowBinding } from "@cloudflare/workflows-shared/src/binding";
2+
3+
class WorkflowImpl {
4+
constructor(private binding: WorkflowBinding) {
5+
this.binding = binding;
6+
}
7+
8+
async get(id: string): Promise<WorkflowInstance> {
9+
using result = await this.binding.get(id);
10+
return result;
11+
}
12+
13+
async create(
14+
options?: WorkflowInstanceCreateOptions
15+
): Promise<WorkflowInstance> {
16+
using result = await this.binding.create(options);
17+
return result;
18+
}
19+
20+
async createBatch(
21+
options: WorkflowInstanceCreateOptions[]
22+
): Promise<WorkflowInstance[]> {
23+
// no need to dispose here since the binding already does so when mapping
24+
// through created instances
25+
return this.binding.createBatch(options);
26+
}
27+
28+
async unsafeGetBindingName(): Promise<string> {
29+
return this.binding.unsafeGetBindingName();
30+
}
31+
32+
async unsafeAbort(instanceId: string, reason?: string): Promise<void> {
33+
return this.binding.unsafeAbort(instanceId, reason);
34+
}
35+
36+
async unsafeGetInstanceModifier(instanceId: string): Promise<unknown> {
37+
return this.binding.unsafeGetInstanceModifier(instanceId);
38+
}
39+
40+
async unsafeWaitForStepResult(
41+
instanceId: string,
42+
name: string,
43+
index?: number
44+
): Promise<unknown> {
45+
return this.binding.unsafeWaitForStepResult(instanceId, name, index);
46+
}
47+
48+
async unsafeWaitForStatus(instanceId: string, status: string): Promise<void> {
49+
await this.binding.unsafeWaitForStatus(instanceId, status);
50+
}
51+
}
52+
53+
export function makeBinding(env: { binding: WorkflowBinding }): Workflow {
54+
return new WorkflowImpl(env.binding);
55+
}
56+
57+
export default makeBinding;

packages/vitest-pool-workers/src/worker/workflows.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,29 @@ class WorkflowInstanceIntrospectorHandle
4545
{
4646
#workflow: WorkflowBinding;
4747
#instanceId: string;
48-
#instanceModifier: WorkflowInstanceModifier;
48+
#instanceModifier: WorkflowInstanceModifier | undefined;
49+
#instanceModifierPromise: Promise<WorkflowInstanceModifier> | undefined;
4950

5051
constructor(workflow: WorkflowBinding, instanceId: string) {
5152
this.#workflow = workflow;
5253
this.#instanceId = instanceId;
53-
54-
this.#instanceModifier = workflow.unsafeGetInstanceModifier(
55-
instanceId
56-
) as WorkflowInstanceModifier;
54+
this.#instanceModifierPromise = workflow
55+
.unsafeGetInstanceModifier(instanceId)
56+
.then((res) => {
57+
this.#instanceModifier = res as WorkflowInstanceModifier;
58+
this.#instanceModifierPromise = undefined;
59+
return this.#instanceModifier;
60+
});
5761
}
5862

5963
async modify(fn: ModifierCallback): Promise<WorkflowInstanceIntrospector> {
64+
if (this.#instanceModifierPromise !== undefined) {
65+
this.#instanceModifier = await this.#instanceModifierPromise;
66+
}
67+
if (this.#instanceModifier === undefined) {
68+
throw new Error("instance modifier is undefined");
69+
}
70+
6071
await fn(this.#instanceModifier);
6172

6273
return this;

packages/workflows-shared/src/binding.ts

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,16 @@ type Env = {
1717

1818
// this.env.WORKFLOW is WorkflowBinding
1919
export class WorkflowBinding extends WorkerEntrypoint<Env> implements Workflow {
20+
constructor(ctx: ExecutionContext, env: Env) {
21+
super(ctx, env);
22+
}
23+
2024
public async create({
2125
id = crypto.randomUUID(),
2226
params = {},
23-
}: WorkflowInstanceCreateOptions = {}): Promise<WorkflowInstance> {
27+
}: WorkflowInstanceCreateOptions = {}): Promise<
28+
WorkflowInstance & Disposable
29+
> {
2430
if (!isValidWorkflowInstanceId(id)) {
2531
throw new WorkflowError("Workflow instance has invalid id");
2632
}
@@ -51,10 +57,11 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> implements Workflow {
5157
restart: handle.restart.bind(handle),
5258
status: handle.status.bind(handle),
5359
sendEvent: handle.sendEvent.bind(handle),
60+
[Symbol.dispose]: handle.dispose.bind(handle),
5461
};
5562
}
5663

57-
public async get(id: string): Promise<WorkflowInstance> {
64+
public async get(id: string): Promise<WorkflowInstance & Disposable> {
5865
const engineStubId = this.env.ENGINE.idFromName(id);
5966
const engineStub = this.env.ENGINE.get(engineStubId);
6067
const handle = new WorkflowHandle(id, engineStub);
@@ -73,6 +80,7 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> implements Workflow {
7380
restart: handle.restart.bind(handle),
7481
status: handle.status.bind(handle),
7582
sendEvent: handle.sendEvent.bind(handle),
83+
[Symbol.dispose]: handle.dispose.bind(handle),
7684
};
7785
}
7886
public async createBatch(
@@ -84,14 +92,21 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> implements Workflow {
8492
);
8593
}
8694

87-
return await Promise.all(batch.map((val) => this.create(val)));
95+
return await Promise.all(
96+
batch.map(async (val) => {
97+
using res = await this.create(val);
98+
return res;
99+
})
100+
);
88101
}
89102

90-
public unsafeGetBindingName(): string {
103+
public async unsafeGetBindingName(): Promise<string> {
104+
// async because of rpc
91105
return this.env.BINDING_NAME;
92106
}
93107

94-
public unsafeGetInstanceModifier(instanceId: string): unknown {
108+
public async unsafeGetInstanceModifier(instanceId: string): Promise<unknown> {
109+
// async because of rpc
95110
const stubId = this.env.ENGINE.idFromName(instanceId);
96111
const stub = this.env.ENGINE.get(stubId);
97112

@@ -108,7 +123,8 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> implements Workflow {
108123
const stubId = this.env.ENGINE.idFromName(instanceId);
109124
const stub = this.env.ENGINE.get(stubId);
110125

111-
return await stub.waitForStepResult(name, index);
126+
using res = await stub.waitForStepResult(name, index);
127+
return res;
112128
}
113129

114130
public async unsafeAbort(instanceId: string, reason?: string): Promise<void> {
@@ -207,4 +223,9 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
207223
timestamp: new Date(),
208224
});
209225
}
226+
227+
public dispose() {
228+
// @ts-expect-error - DurableObjectStub should be disposable but types don't reflect it
229+
this.stub[Symbol.dispose]?.();
230+
}
210231
}

packages/workflows-shared/src/engine.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import type { Event } from "./context";
2121
import type { InstanceMetadata, RawInstanceLog } from "./instance";
2222
import type { WorkflowEntrypoint, WorkflowEvent } from "cloudflare:workers";
2323

24-
export interface Env {
24+
interface Env {
2525
USER_WORKFLOW: WorkflowEntrypoint;
2626
}
2727

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
export { WorkflowBinding } from "./binding";
21
export { Engine } from "./engine";
2+
export { WorkflowBinding } from "./binding";

0 commit comments

Comments
 (0)