|  | 
|  | 1 | +import FluentKit | 
|  | 2 | +import Logging | 
|  | 3 | +import SQLKit | 
|  | 4 | + | 
|  | 5 | +/// A migration to upgrade the data from the old 1.x and 2.x versions of this driver to the current version. | 
|  | 6 | +/// | 
|  | 7 | +/// This migration is compatible with all known released versions of the driver. It is _not_ compatible with any | 
|  | 8 | +/// of the 3.x-beta tags. It is known to be compatible with MySQL 8.0+, PostgreSQL 11+, and SQLite 3.38.0+, and to be | 
|  | 9 | +/// _incompatible_ with MySQL 5.7 and earlier. | 
|  | 10 | +/// | 
|  | 11 | +/// Once run, this migration is not reversible. See discussion in ``JobModelOldFormatMigration/revert(on:)-3xv3q`` for | 
|  | 12 | +/// more details. This migration should be used **_in place of_** ``JobModelMigration``, not in addition to it. Using | 
|  | 13 | +/// both migrations will cause database errors. If an error occurs during migration, a reasonable attempt is made to | 
|  | 14 | +/// restore everything to its original state. Even under extreme conditions, the original data is guaranteed to remain | 
|  | 15 | +/// intact until the migration is succesfully completed; the original data is never modified, and is deleted only after | 
|  | 16 | +/// everything has finished without errors. | 
|  | 17 | +/// | 
|  | 18 | +/// > Note: The `payload` format used by the MySQL-specific logic is a bit bizarre; instead of the plain binary string | 
|  | 19 | +/// > used for the other databases, MySQL's version is Base64-encoded and double-quoted. This is an artifact of the | 
|  | 20 | +/// > missing conformance of `Data` to `MySQLDataConvertible` in `MySQLNIO`, a bug that cannot be fixed at the present | 
|  | 21 | +/// > time without causing problematic behavioral changes. | 
|  | 22 | +public struct JobModelOldFormatMigration: AsyncSQLMigration { | 
|  | 23 | +    private let jobsTableName: String | 
|  | 24 | +    private let jobsTableSpace: String? | 
|  | 25 | + | 
|  | 26 | +    /// Public initializer. | 
|  | 27 | +    public init( | 
|  | 28 | +        jobsTableName: String = "_jobs_meta", | 
|  | 29 | +        jobsTableSpace: String? = nil | 
|  | 30 | +    ) { | 
|  | 31 | +        self.jobsTableName = jobsTableName | 
|  | 32 | +        self.jobsTableSpace = jobsTableSpace | 
|  | 33 | +    } | 
|  | 34 | + | 
|  | 35 | +    // See `AsyncSQLMigration.prepare(on:)`. | 
|  | 36 | +    public func prepare(on database: any SQLDatabase) async throws { | 
|  | 37 | +        /// Return a `SQLQueryString` which extracts the field with the given name from the old-format "data" JSON as the given type. | 
|  | 38 | +        func dataGet(_ name: String, as type: String) -> SQLQueryString { | 
|  | 39 | +            switch database.dialect.name { | 
|  | 40 | +            case "postgresql": "((convert_from(\"data\", 'UTF8')::jsonb)->>\(literal: name))::\(unsafeRaw: type == "double" ? "double precision" : type)" | 
|  | 41 | +            case "mysql":      "json_value(convert(data USING utf8mb4), \(literal: "$.\(name)") RETURNING \(unsafeRaw: type == "text" ? "CHAR" : (type == "double" ? "DOUBLE" : "SIGNED")))" | 
|  | 42 | +            case "sqlite":     "data->>\(literal: "$.\(name)")" | 
|  | 43 | +            default: "" | 
|  | 44 | +            } | 
|  | 45 | +        } | 
|  | 46 | +        /// Return a `SQLQueryString` which extracts the timestamp field with the given name from the old-format "data" JSON as a UNIX | 
|  | 47 | +        /// timestamp, compensating for the difference between the UNIX epoch and Date's reference date (978,307,200 seconds). | 
|  | 48 | +        func dataTimestamp(_ name: String) -> SQLQueryString { | 
|  | 49 | +            switch database.dialect.name { | 
|  | 50 | +            case "postgresql": "to_timestamp(\(dataGet(name, as: "double")) + 978307200.0)" | 
|  | 51 | +            case "mysql":      "from_unixtime(\(dataGet(name, as: "double")) + 978307200.0)" | 
|  | 52 | +            case "sqlite":     "\(dataGet(name, as: "double")) + 978307200.0" | 
|  | 53 | +            default: "" | 
|  | 54 | +            } | 
|  | 55 | +        } | 
|  | 56 | +        /// Return a `SQLQueryString` which extracts the payload from the old-format "data" JSON, converting the original array of one-byte | 
|  | 57 | +        /// integers to the database's appropriate binary representation (`bytea` for Postgres, `BINARY` collation with Base64 encoding and | 
|  | 58 | +        /// surrounding quotes for MySQL (don't ask...), `BLOB` affinity for SQLite). | 
|  | 59 | +        func dataPayload() -> SQLQueryString { | 
|  | 60 | +            switch database.dialect.name { | 
|  | 61 | +            case "postgresql": #"coalesce((SELECT decode(string_agg(lpad(to_hex(b::int), 2, '0'), ''), 'hex') FROM jsonb_array_elements_text((convert_from("data", 'UTF8')::jsonb)->'payload') AS a(b)), '\x')"# | 
|  | 62 | +            case "mysql":      #"coalesce((SELECT /*+SET_VAR(group_concat_max_len=1048576)*/ concat('"',to_base64(group_concat(char(b) SEPARATOR '')),'"') FROM json_table(convert(data USING utf8mb4), '$.payload[*]' COLUMNS (b INT PATH '$')) t), X'')"# | 
|  | 63 | +            case "sqlite":     #"coalesce((SELECT unhex(group_concat(format('%02x',b.value), '')) FROM json_each(data, '$.payload') as b), '')"# | 
|  | 64 | +            default: "" | 
|  | 65 | +            } | 
|  | 66 | +        } | 
|  | 67 | + | 
|  | 68 | +        // Make sure that we keep the old table in the same space when we move it aside. | 
|  | 69 | +        let tempTable = SQLQualifiedTable("_temp_old_\(self.jobsTableName)", space: self.jobsTableSpace) | 
|  | 70 | +        let jobsTable = SQLQualifiedTable(self.jobsTableName, space: self.jobsTableSpace) | 
|  | 71 | +        let enumType = SQLQualifiedTable("\(self.jobsTableName)_storedjobstatus", space: self.jobsTableSpace) | 
|  | 72 | + | 
|  | 73 | +        // 1. Rename the existing table so we can create the new format in its place. | 
|  | 74 | +        try await database.alter(table: jobsTable).rename(to: tempTable).run() | 
|  | 75 | + | 
|  | 76 | +        do { | 
|  | 77 | +            // 2. Run the "real" migration to create the correct table structure and any associated objects. | 
|  | 78 | +            try await JobModelMigration(jobsTableName: self.jobsTableName, jobsTableSpace: self.jobsTableSpace).prepare(on: database) | 
|  | 79 | + | 
|  | 80 | +            // 3. Migrate the data from the old table. | 
|  | 81 | +            try await database.insert(into: jobsTable) | 
|  | 82 | +                .columns("id", "queue_name", "job_name", "queued_at", "delay_until", "state", "max_retry_count", "attempts", "payload", "updated_at") | 
|  | 83 | +                .select { $0 | 
|  | 84 | +                    .column("job_id") | 
|  | 85 | +                    .column("queue") | 
|  | 86 | +                    .column(.function("coalesce", dataGet("jobName", as: "text"), .literal(""))) | 
|  | 87 | +                    .column(.function("coalesce", dataTimestamp("queuedAt"), .identifier("created_at"))) | 
|  | 88 | +                    .column(dataTimestamp("delayUntil")) | 
|  | 89 | +                    .column(database.dialect.name == "postgresql" ? "state::\(enumType)" as SQLQueryString : "state") | 
|  | 90 | +                    .column(.function("coalesce", dataGet("maxRetryCount", as: "bigint"), .literal(0))) | 
|  | 91 | +                    .column(.function("coalesce", dataGet("attempts", as: "bigint"), .literal(0))) | 
|  | 92 | +                    .column(dataPayload()) | 
|  | 93 | +                    .column("updated_at") | 
|  | 94 | +                    .from(tempTable) | 
|  | 95 | +                } | 
|  | 96 | +                .run() | 
|  | 97 | +        } catch { | 
|  | 98 | +            // Attempt to clean up after ourselves by deleting the new table and moving the old one back into place. | 
|  | 99 | +            try? await database.drop(table: jobsTable).run() | 
|  | 100 | +            try? await database.alter(table: tempTable).rename(to: jobsTable).run() | 
|  | 101 | +            throw error | 
|  | 102 | +        } | 
|  | 103 | + | 
|  | 104 | +        // 4. Drop the old table. | 
|  | 105 | +        try await database.drop(table: tempTable).run() | 
|  | 106 | +    } | 
|  | 107 | + | 
|  | 108 | +    // See `AsyncSQLMigration.revert(on:)`. | 
|  | 109 | +    public func revert(on database: any SQLDatabase) async throws { | 
|  | 110 | +        /// This migration technically can be reverted, if one is willing to consider the values of the original | 
|  | 111 | +        /// `id`, `created_at`, and `deleted_at` columns spurious, and therefore disposable. However, it would be | 
|  | 112 | +        /// a good deal of work - in particular, turning the binary payload back into a JSON byte array would | 
|  | 113 | +        /// even more involved than the frontways conversion, and the utility of doing so is insufficient to | 
|  | 114 | +        /// justify the effort unless this feature ends up being commonly requested. We could call through to | 
|  | 115 | +        /// ``JobModelMigration``'s revert, but it seems best to err on the side of caution for this migration. | 
|  | 116 | + | 
|  | 117 | +        // TODO: Should we throw an error instead of logging an easily-missed message? | 
|  | 118 | +        database.logger.warning("Reverting the \(self.name) migration is not implemented; your job metadata table is unchanged!") | 
|  | 119 | +    } | 
|  | 120 | +} | 
0 commit comments