Skip to content

Commit 5b7e3c0

Browse files
chore: configurable database migration in concurrency (#14004)
> [!NOTE] > Adds a configurable concurrency for link migrations (CLI/commands), forces concurrency=1 when pgstream is detected, and ignores duplicate link-table inserts. > > - **CLI** > - Add `--concurrency` option to `db:migrate` and `db:sync-links`. > - **Medusa commands** > - `migrate` and `sync-links` accept `concurrency`; set `DB_MIGRATION_CONCURRENCY` and force `1` when `pgstream` schema exists via new `isPgstreamEnabled`. > - **Link Modules (migrations)** > - Execute plan actions with `executeWithConcurrency` using `DB_MIGRATION_CONCURRENCY`. > - Make link-table tracking inserts idempotent with `ON CONFLICT DO NOTHING` (including bulk/upsert and per-create). > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 0743229. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup>
1 parent a33ef1e commit 5b7e3c0

File tree

6 files changed

+103
-27
lines changed

6 files changed

+103
-27
lines changed

.changeset/tender-clocks-talk.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@medusajs/link-modules": patch
3+
"@medusajs/cli": patch
4+
"@medusajs/medusa": patch
5+
---
6+
7+
chore(link-modules): ignore duplicates link creation

packages/cli/medusa-cli/src/create-cli.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ function buildLocalCommands(cli, isLocalProject) {
196196
describe:
197197
"Skip prompts and execute only safe actions from sync links",
198198
})
199+
builder.option("concurrency", {
200+
type: "number",
201+
describe: "Number of concurrent migrations to run",
202+
})
199203
},
200204
handler: handlerP(
201205
getCommandHandler("db/migrate", (args, cmd) => {
@@ -270,6 +274,10 @@ function buildLocalCommands(cli, isLocalProject) {
270274
type: "boolean",
271275
describe: "Skip prompts and execute only safe actions",
272276
})
277+
builder.option("concurrency", {
278+
type: "number",
279+
describe: "Number of concurrent migrations to run",
280+
})
273281
},
274282
handler: handlerP(
275283
getCommandHandler("db/sync-links", (args, cmd) => {

packages/medusa/src/commands/db/migrate.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@ import { LinkLoader } from "@medusajs/framework/links"
33
import {
44
ContainerRegistrationKeys,
55
getResolvedPlugins,
6+
isDefined,
67
mergePluginModules,
78
} from "@medusajs/framework/utils"
89
import { Logger, MedusaContainer } from "@medusajs/types"
910
import { fork } from "child_process"
1011
import path, { join } from "path"
1112
import { initializeContainer } from "../../loaders"
12-
import { ensureDbExists } from "../utils"
13+
import { ensureDbExists, isPgstreamEnabled } from "../utils"
1314
import { syncLinks } from "./sync-links"
1415

1516
const TERMINAL_SIZE = process.stdout.columns
@@ -26,6 +27,7 @@ export async function migrate({
2627
skipScripts,
2728
executeAllLinks,
2829
executeSafeLinks,
30+
concurrency,
2931
logger,
3032
container,
3133
}: {
@@ -34,6 +36,7 @@ export async function migrate({
3436
skipScripts: boolean
3537
executeAllLinks: boolean
3638
executeSafeLinks: boolean
39+
concurrency?: number
3740
logger: Logger
3841
container: MedusaContainer
3942
}): Promise<boolean> {
@@ -43,6 +46,16 @@ export async function migrate({
4346

4447
await ensureDbExists(container)
4548

49+
// If pgstream is enabled, force concurrency to 1
50+
const pgstreamEnabled = await isPgstreamEnabled(container)
51+
if (pgstreamEnabled) {
52+
concurrency = 1
53+
}
54+
55+
if (isDefined(concurrency)) {
56+
process.env.DB_MIGRATION_CONCURRENCY = String(concurrency)
57+
}
58+
4659
const medusaAppLoader = new MedusaAppLoader()
4760
const configModule = container.resolve(
4861
ContainerRegistrationKeys.CONFIG_MODULE
@@ -80,6 +93,7 @@ export async function migrate({
8093
executeSafe: executeSafeLinks,
8194
directory,
8295
container,
96+
concurrency,
8397
})
8498
}
8599

@@ -112,6 +126,7 @@ const main = async function ({
112126
skipScripts,
113127
executeAllLinks,
114128
executeSafeLinks,
129+
concurrency,
115130
}) {
116131
const container = await initializeContainer(directory)
117132
const logger = container.resolve(ContainerRegistrationKeys.LOGGER)
@@ -123,6 +138,7 @@ const main = async function ({
123138
skipScripts,
124139
executeAllLinks,
125140
executeSafeLinks,
141+
concurrency,
126142
logger,
127143
container,
128144
})

packages/medusa/src/commands/db/sync-links.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ import {
99
import {
1010
ContainerRegistrationKeys,
1111
getResolvedPlugins,
12+
isDefined,
1213
mergePluginModules,
1314
} from "@medusajs/framework/utils"
1415
import boxen from "boxen"
1516
import chalk from "chalk"
1617
import { join } from "path"
1718

1819
import { initializeContainer } from "../../loaders"
19-
import { ensureDbExists } from "../utils"
20+
import { ensureDbExists, isPgstreamEnabled } from "../utils"
2021

2122
/**
2223
* Groups action tables by their "action" property
@@ -102,15 +103,27 @@ export async function syncLinks(
102103
executeSafe,
103104
directory,
104105
container,
106+
concurrency,
105107
}: {
106108
executeSafe: boolean
107109
executeAll: boolean
108110
directory: string
109111
container: MedusaContainer
112+
concurrency?: number
110113
}
111114
) {
112115
const logger = container.resolve(ContainerRegistrationKeys.LOGGER)
113116

117+
// Check if pgstream is enabled - if so, force concurrency to 1
118+
const pgstreamEnabled = await isPgstreamEnabled(container)
119+
if (pgstreamEnabled) {
120+
concurrency = 1
121+
}
122+
123+
if (isDefined(concurrency)) {
124+
process.env.DB_MIGRATION_CONCURRENCY = String(concurrency)
125+
}
126+
114127
const planner = await medusaAppLoader.getLinksExecutionPlanner()
115128

116129
logger.info("Syncing links...")
@@ -192,7 +205,12 @@ export async function syncLinks(
192205
}
193206
}
194207

195-
const main = async function ({ directory, executeSafe, executeAll }) {
208+
const main = async function ({
209+
directory,
210+
executeSafe,
211+
executeAll,
212+
concurrency,
213+
}) {
196214
const container = await initializeContainer(directory)
197215
const logger = container.resolve(ContainerRegistrationKeys.LOGGER)
198216

@@ -218,6 +236,7 @@ const main = async function ({ directory, executeSafe, executeAll }) {
218236
executeSafe,
219237
directory,
220238
container,
239+
concurrency,
221240
})
222241
process.exit()
223242
} catch (error) {

packages/medusa/src/commands/utils/index.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,21 @@ export async function ensureDbExists(container: MedusaContainer) {
1919
process.exit(1)
2020
}
2121
}
22+
23+
export async function isPgstreamEnabled(
24+
container: MedusaContainer
25+
): Promise<boolean> {
26+
const pgConnection = container.resolve(
27+
ContainerRegistrationKeys.PG_CONNECTION
28+
)
29+
30+
try {
31+
const result = await pgConnection.raw(
32+
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'pgstream'"
33+
)
34+
return result.rows.length > 0
35+
} catch (error) {
36+
// If there's an error checking, assume pgstream is not enabled
37+
return false
38+
}
39+
}

packages/modules/link-modules/src/migration/index.ts

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,18 @@ import {
66
PlannerActionLinkDescriptor,
77
} from "@medusajs/framework/types"
88

9+
import { EntitySchema, MikroORM } from "@medusajs/framework/mikro-orm/core"
10+
import {
11+
DatabaseSchema,
12+
PostgreSqlDriver,
13+
} from "@medusajs/framework/mikro-orm/postgresql"
914
import {
1015
arrayDifference,
1116
DALUtils,
17+
executeWithConcurrency,
1218
ModulesSdkUtils,
1319
normalizeMigrationSQL,
14-
promiseAll,
1520
} from "@medusajs/framework/utils"
16-
import { EntitySchema, MikroORM } from "@medusajs/framework/mikro-orm/core"
17-
import {
18-
DatabaseSchema,
19-
PostgreSqlDriver,
20-
} from "@medusajs/framework/mikro-orm/postgresql"
2121
import { generateEntity } from "../utils"
2222

2323
/**
@@ -195,7 +195,7 @@ export class MigrationsExecutionPlanner implements ILinkMigrationsPlanner {
195195
196196
INSERT INTO "${
197197
this.tableName
198-
}" (table_name, link_descriptor) VALUES (?, ?);
198+
}" (table_name, link_descriptor) VALUES (?, ?) ON CONFLICT DO NOTHING;
199199
200200
${sql}
201201
`,
@@ -513,22 +513,30 @@ export class MigrationsExecutionPlanner implements ILinkMigrationsPlanner {
513513
async executePlan(actionPlan: LinkMigrationsPlannerAction[]): Promise<void> {
514514
const orm = await this.createORM()
515515

516-
await promiseAll(
517-
actionPlan.map(async (action) => {
518-
switch (action.action) {
519-
case "delete":
520-
return await this.dropLinkTable(orm, action.tableName)
521-
case "create":
522-
return await this.createLinkTable(orm, action)
523-
case "update":
524-
const sql = `SET LOCAL search_path TO "${this.#schema}"; \n\n${
525-
action.sql
526-
}`
527-
return await orm.em.getDriver().getConnection().execute(sql)
528-
default:
529-
return
530-
}
531-
})
532-
).finally(() => orm.close(true))
516+
try {
517+
const concurrency = parseInt(process.env.DB_MIGRATION_CONCURRENCY ?? "1")
518+
await executeWithConcurrency(
519+
actionPlan.map((action) => {
520+
return async () => {
521+
switch (action.action) {
522+
case "delete":
523+
return await this.dropLinkTable(orm, action.tableName)
524+
case "create":
525+
return await this.createLinkTable(orm, action)
526+
case "update":
527+
const sql = `SET LOCAL search_path TO "${this.#schema}"; \n\n${
528+
action.sql
529+
}`
530+
return await orm.em.getDriver().getConnection().execute(sql)
531+
default:
532+
return
533+
}
534+
}
535+
}),
536+
concurrency
537+
)
538+
} finally {
539+
await orm.close(true)
540+
}
533541
}
534542
}

0 commit comments

Comments
 (0)