|
29 | 29 | logger = logging.getLogger(__name__)
|
30 | 30 |
|
31 | 31 |
|
| 32 | +_REPLACE_STREAM_ORDRING_SQL_COMMANDS = ( |
| 33 | + # there should be no leftover rows without a stream_ordering2, but just in case... |
| 34 | + "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL", |
| 35 | + # finally, we can drop the rule and switch the columns |
| 36 | + "DROP RULE populate_stream_ordering2 ON events", |
| 37 | + "ALTER TABLE events DROP COLUMN stream_ordering", |
| 38 | + "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering", |
| 39 | +) |
| 40 | + |
| 41 | + |
32 | 42 | class _BackgroundUpdates:
|
33 | 43 | EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
34 | 44 | EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
35 | 45 | DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
|
| 46 | + POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2" |
| 47 | + INDEX_STREAM_ORDERING2 = "index_stream_ordering2" |
| 48 | + REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" |
36 | 49 |
|
37 | 50 |
|
38 | 51 | @attr.s(slots=True, frozen=True)
|
@@ -142,6 +155,24 @@ def __init__(self, database: DatabasePool, db_conn, hs):
|
142 | 155 | self._purged_chain_cover_index,
|
143 | 156 | )
|
144 | 157 |
|
| 158 | + # bg updates for replacing stream_ordering with a BIGINT |
| 159 | + # (these only run on postgres.) |
| 160 | + self.db_pool.updates.register_background_update_handler( |
| 161 | + _BackgroundUpdates.POPULATE_STREAM_ORDERING2, |
| 162 | + self._background_populate_stream_ordering2, |
| 163 | + ) |
| 164 | + self.db_pool.updates.register_background_index_update( |
| 165 | + _BackgroundUpdates.INDEX_STREAM_ORDERING2, |
| 166 | + index_name="events_stream_ordering", |
| 167 | + table="events", |
| 168 | + columns=["stream_ordering2"], |
| 169 | + unique=True, |
| 170 | + ) |
| 171 | + self.db_pool.updates.register_background_update_handler( |
| 172 | + _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN, |
| 173 | + self._background_replace_stream_ordering_column, |
| 174 | + ) |
| 175 | + |
145 | 176 | async def _background_reindex_fields_sender(self, progress, batch_size):
|
146 | 177 | target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
147 | 178 | max_stream_id = progress["max_stream_id_exclusive"]
|
@@ -1012,3 +1043,75 @@ def purged_chain_cover_txn(txn) -> int:
|
1012 | 1043 | await self.db_pool.updates._end_background_update("purged_chain_cover")
|
1013 | 1044 |
|
1014 | 1045 | return result
|
| 1046 | + |
| 1047 | + async def _background_populate_stream_ordering2( |
| 1048 | + self, progress: JsonDict, batch_size: int |
| 1049 | + ) -> int: |
| 1050 | + """Populate events.stream_ordering2, then replace stream_ordering |
| 1051 | +
|
| 1052 | + This is to deal with the fact that stream_ordering was initially created as a |
| 1053 | + 32-bit integer field. |
| 1054 | + """ |
| 1055 | + batch_size = max(batch_size, 1) |
| 1056 | + |
| 1057 | + def process(txn: Cursor) -> int: |
| 1058 | + # if this is the first pass, find the minimum stream ordering |
| 1059 | + last_stream = progress.get("last_stream") |
| 1060 | + if last_stream is None: |
| 1061 | + txn.execute( |
| 1062 | + """ |
| 1063 | + SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1 |
| 1064 | + """ |
| 1065 | + ) |
| 1066 | + rows = txn.fetchall() |
| 1067 | + if not rows: |
| 1068 | + return 0 |
| 1069 | + last_stream = rows[0][0] - 1 |
| 1070 | + |
| 1071 | + txn.execute( |
| 1072 | + """ |
| 1073 | + UPDATE events SET stream_ordering2=stream_ordering |
| 1074 | + WHERE stream_ordering > ? AND stream_ordering <= ? |
| 1075 | + """, |
| 1076 | + (last_stream, last_stream + batch_size), |
| 1077 | + ) |
| 1078 | + row_count = txn.rowcount |
| 1079 | + |
| 1080 | + self.db_pool.updates._background_update_progress_txn( |
| 1081 | + txn, |
| 1082 | + _BackgroundUpdates.POPULATE_STREAM_ORDERING2, |
| 1083 | + {"last_stream": last_stream + batch_size}, |
| 1084 | + ) |
| 1085 | + return row_count |
| 1086 | + |
| 1087 | + result = await self.db_pool.runInteraction( |
| 1088 | + "_background_populate_stream_ordering2", process |
| 1089 | + ) |
| 1090 | + |
| 1091 | + if result != 0: |
| 1092 | + return result |
| 1093 | + |
| 1094 | + await self.db_pool.updates._end_background_update( |
| 1095 | + _BackgroundUpdates.POPULATE_STREAM_ORDERING2 |
| 1096 | + ) |
| 1097 | + return 0 |
| 1098 | + |
| 1099 | + async def _background_replace_stream_ordering_column( |
| 1100 | + self, progress: JsonDict, batch_size: int |
| 1101 | + ) -> int: |
| 1102 | + """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place.""" |
| 1103 | + |
| 1104 | + def process(txn: Cursor) -> None: |
| 1105 | + for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS: |
| 1106 | + logger.info("completing stream_ordering migration: %s", sql) |
| 1107 | + txn.execute(sql) |
| 1108 | + |
| 1109 | + await self.db_pool.runInteraction( |
| 1110 | + "_background_replace_stream_ordering_column", process |
| 1111 | + ) |
| 1112 | + |
| 1113 | + await self.db_pool.updates._end_background_update( |
| 1114 | + _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN |
| 1115 | + ) |
| 1116 | + |
| 1117 | + return 0 |
0 commit comments