29
29
logger = logging .getLogger (__name__ )
30
30
31
31
32
- _REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
32
+ _REPLACE_STREAM_ORDERING_SQL_COMMANDS = (
33
33
# there should be no leftover rows without a stream_ordering2, but just in case...
34
34
"UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL" ,
35
- # finally, we can drop the rule and switch the columns
35
+ # now we can drop the rule and switch the columns
36
36
"DROP RULE populate_stream_ordering2 ON events" ,
37
37
"ALTER TABLE events DROP COLUMN stream_ordering" ,
38
38
"ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering" ,
39
+ # ... and finally, rename the indexes into place for consistency with sqlite
40
+ "ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index" ,
41
+ "ALTER INDEX events_order_room2 RENAME TO events_order_room" ,
42
+ "ALTER INDEX events_room_stream2 RENAME TO events_room_stream" ,
43
+ "ALTER INDEX events_ts2 RENAME TO events_ts" ,
39
44
)
40
45
41
46
@@ -45,6 +50,10 @@ class _BackgroundUpdates:
45
50
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
46
51
POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
47
52
INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
53
+ INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
54
+ INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
55
+ INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
56
+ INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
48
57
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
49
58
50
59
@@ -155,24 +164,59 @@ def __init__(self, database: DatabasePool, db_conn, hs):
155
164
self ._purged_chain_cover_index ,
156
165
)
157
166
167
+ ################################################################################
168
+
158
169
# bg updates for replacing stream_ordering with a BIGINT
159
170
# (these only run on postgres.)
171
+
160
172
self .db_pool .updates .register_background_update_handler (
161
173
_BackgroundUpdates .POPULATE_STREAM_ORDERING2 ,
162
174
self ._background_populate_stream_ordering2 ,
163
175
)
176
+ # CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2);
164
177
self .db_pool .updates .register_background_index_update (
165
178
_BackgroundUpdates .INDEX_STREAM_ORDERING2 ,
166
179
index_name = "events_stream_ordering" ,
167
180
table = "events" ,
168
181
columns = ["stream_ordering2" ],
169
182
unique = True ,
170
183
)
184
+ # CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false;
185
+ self .db_pool .updates .register_background_index_update (
186
+ _BackgroundUpdates .INDEX_STREAM_ORDERING2_CONTAINS_URL ,
187
+ index_name = "event_contains_url_index2" ,
188
+ table = "events" ,
189
+ columns = ["room_id" , "topological_ordering" , "stream_ordering2" ],
190
+ where_clause = "contains_url = true AND outlier = false" ,
191
+ )
192
+ # CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering);
193
+ self .db_pool .updates .register_background_index_update (
194
+ _BackgroundUpdates .INDEX_STREAM_ORDERING2_ROOM_ORDER ,
195
+ index_name = "events_order_room2" ,
196
+ table = "events" ,
197
+ columns = ["room_id" , "topological_ordering" , "stream_ordering2" ],
198
+ )
199
+ # CREATE INDEX events_room_stream ON events(room_id, stream_ordering);
200
+ self .db_pool .updates .register_background_index_update (
201
+ _BackgroundUpdates .INDEX_STREAM_ORDERING2_ROOM_STREAM ,
202
+ index_name = "events_room_stream2" ,
203
+ table = "events" ,
204
+ columns = ["room_id" , "stream_ordering2" ],
205
+ )
206
+ # CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);
207
+ self .db_pool .updates .register_background_index_update (
208
+ _BackgroundUpdates .INDEX_STREAM_ORDERING2_TS ,
209
+ index_name = "events_ts2" ,
210
+ table = "events" ,
211
+ columns = ["origin_server_ts" , "stream_ordering2" ],
212
+ )
171
213
self .db_pool .updates .register_background_update_handler (
172
214
_BackgroundUpdates .REPLACE_STREAM_ORDERING_COLUMN ,
173
215
self ._background_replace_stream_ordering_column ,
174
216
)
175
217
218
+ ################################################################################
219
+
176
220
async def _background_reindex_fields_sender (self , progress , batch_size ):
177
221
target_min_stream_id = progress ["target_min_stream_id_inclusive" ]
178
222
max_stream_id = progress ["max_stream_id_exclusive" ]
@@ -1098,7 +1142,7 @@ async def _background_replace_stream_ordering_column(
1098
1142
"""Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""
1099
1143
1100
1144
def process (txn : Cursor ) -> None :
1101
- for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS :
1145
+ for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS :
1102
1146
logger .info ("completing stream_ordering migration: %s" , sql )
1103
1147
txn .execute (sql )
1104
1148
0 commit comments