Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 373c485

Browse files
authored
Handle half-created indices in receipts index background update (#14650)
When Synapse is terminated while running the background update to create the `receipts_graph` or `receipts_linearized` indexes, the indexes may be successfully created (or marked as invalid on postgres) while the background update remains unfinished. When Synapse next starts up, the background update will fail because the index already exists, or exists but is invalid on postgres. Use the existing code to create indices in background updates, since it handles these edge cases. Signed-off-by: Sean Quah <[email protected]>
1 parent 3ac412b commit 373c485

File tree

3 files changed

+60
-48
lines changed

3 files changed

+60
-48
lines changed

changelog.d/14650.bugfix

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fix a bug introduced in Synapse 1.72.0 where the background updates to add non-thread unique indexes on receipts would fail if they were previously interrupted.
2+

synapse/storage/background_updates.py

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,48 @@ def register_background_index_update(
544544
The named index will be dropped upon completion of the new index.
545545
"""
546546

547+
async def updater(progress: JsonDict, batch_size: int) -> int:
548+
await self.create_index_in_background(
549+
index_name=index_name,
550+
table=table,
551+
columns=columns,
552+
where_clause=where_clause,
553+
unique=unique,
554+
psql_only=psql_only,
555+
replaces_index=replaces_index,
556+
)
557+
await self._end_background_update(update_name)
558+
return 1
559+
560+
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
561+
updater, oneshot=True
562+
)
563+
564+
async def create_index_in_background(
565+
self,
566+
index_name: str,
567+
table: str,
568+
columns: Iterable[str],
569+
where_clause: Optional[str] = None,
570+
unique: bool = False,
571+
psql_only: bool = False,
572+
replaces_index: Optional[str] = None,
573+
) -> None:
574+
"""Add an index in the background.
575+
576+
Args:
577+
update_name: update_name to register for
578+
index_name: name of index to add
579+
table: table to add index to
580+
columns: columns/expressions to include in index
581+
where_clause: A WHERE clause to specify a partial unique index.
582+
unique: true to make a UNIQUE index
583+
psql_only: true to only create this index on psql databases (useful
584+
for virtual sqlite tables)
585+
replaces_index: The name of an index that this index replaces.
586+
The named index will be dropped upon completion of the new index.
587+
"""
588+
547589
def create_index_psql(conn: Connection) -> None:
548590
conn.rollback()
549591
# postgres insists on autocommit for the index
@@ -618,16 +660,11 @@ def create_index_sqlite(conn: Connection) -> None:
618660
else:
619661
runner = create_index_sqlite
620662

621-
async def updater(progress: JsonDict, batch_size: int) -> int:
622-
if runner is not None:
623-
logger.info("Adding index %s to %s", index_name, table)
624-
await self.db_pool.runWithConnection(runner)
625-
await self._end_background_update(update_name)
626-
return 1
663+
if runner is None:
664+
return
627665

628-
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
629-
updater, oneshot=True
630-
)
666+
logger.info("Adding index %s to %s", index_name, table)
667+
await self.db_pool.runWithConnection(runner)
631668

632669
async def _end_background_update(self, update_name: str) -> None:
633670
"""Removes a completed background update task from the queue.

synapse/storage/databases/main/receipts.py

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -924,39 +924,6 @@ def _populate_receipt_event_stream_ordering_txn(
924924

925925
return batch_size
926926

927-
async def _create_receipts_index(self, index_name: str, table: str) -> None:
928-
"""Adds a unique index on `(room_id, receipt_type, user_id)` to the given
929-
receipts table, for non-thread receipts."""
930-
931-
def _create_index(conn: LoggingDatabaseConnection) -> None:
932-
conn.rollback()
933-
934-
# we have to set autocommit, because postgres refuses to
935-
# CREATE INDEX CONCURRENTLY without it.
936-
if isinstance(self.database_engine, PostgresEngine):
937-
conn.set_session(autocommit=True)
938-
939-
try:
940-
c = conn.cursor()
941-
942-
# Now that the duplicates are gone, we can create the index.
943-
concurrently = (
944-
"CONCURRENTLY"
945-
if isinstance(self.database_engine, PostgresEngine)
946-
else ""
947-
)
948-
sql = f"""
949-
CREATE UNIQUE INDEX {concurrently} {index_name}
950-
ON {table}(room_id, receipt_type, user_id)
951-
WHERE thread_id IS NULL
952-
"""
953-
c.execute(sql)
954-
finally:
955-
if isinstance(self.database_engine, PostgresEngine):
956-
conn.set_session(autocommit=False)
957-
958-
await self.db_pool.runWithConnection(_create_index)
959-
960927
async def _background_receipts_linearized_unique_index(
961928
self, progress: dict, batch_size: int
962929
) -> int:
@@ -999,9 +966,12 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
999966
_remote_duplicate_receipts_txn,
1000967
)
1001968

1002-
await self._create_receipts_index(
1003-
"receipts_linearized_unique_index",
1004-
"receipts_linearized",
969+
await self.db_pool.updates.create_index_in_background(
970+
index_name="receipts_linearized_unique_index",
971+
table="receipts_linearized",
972+
columns=["room_id", "receipt_type", "user_id"],
973+
where_clause="thread_id IS NULL",
974+
unique=True,
1005975
)
1006976

1007977
await self.db_pool.updates._end_background_update(
@@ -1050,9 +1020,12 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
10501020
_remote_duplicate_receipts_txn,
10511021
)
10521022

1053-
await self._create_receipts_index(
1054-
"receipts_graph_unique_index",
1055-
"receipts_graph",
1023+
await self.db_pool.updates.create_index_in_background(
1024+
index_name="receipts_graph_unique_index",
1025+
table="receipts_graph",
1026+
columns=["room_id", "receipt_type", "user_id"],
1027+
where_clause="thread_id IS NULL",
1028+
unique=True,
10561029
)
10571030

10581031
await self.db_pool.updates._end_background_update(

0 commit comments

Comments
 (0)