Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 684d19a

Browse files
Add support for MSC2716 marker events (#10498)
* Make historical messages available to federated servers Part of MSC2716: matrix-org/matrix-spec-proposals#2716 Follow-up to #9247 * Debug message not available on federation * Add base starting insertion point when no chunk ID is provided * Fix messages from multiple senders in historical chunk Follow-up to #9247 Part of MSC2716: matrix-org/matrix-spec-proposals#2716 --- Previously, Synapse would throw a 403, `Cannot force another user to join.`, because we were trying to use `?user_id` from a single virtual user which did not match with messages from other users in the chunk. * Remove debug lines * Messing with selecting insertion event extremeties * Move db schema change to new version * Add more better comments * Make a fake requester with just what we need See #10276 (comment) * Store insertion events in table * Make base insertion event float off on its own See #10250 (comment) Conflicts: synapse/rest/client/v1/room.py * Validate that the app service can actually control the given user See #10276 (comment) Conflicts: synapse/rest/client/v1/room.py * Add some better comments on what we're trying to check for * Continue debugging * Share validation logic * Add inserted historical messages to /backfill response * Remove debug sql queries * Some marker event implemntation trials * Clean up PR * Rename insertion_event_id to just event_id * Add some better sql comments * More accurate description * Add changelog * Make it clear what MSC the change is part of * Add more detail on which insertion event came through * Address review and improve sql queries * Only use event_id as unique constraint * Fix test case where insertion event is already in the normal DAG * Remove debug changes * Add support for MSC2716 marker events * Process markers when we receive it over federation * WIP: make hs2 backfill historical messages after marker event * hs2 to better ask for insertion event extremity But running into the `sqlite3.IntegrityError: NOT NULL constraint failed: event_to_state_groups.state_group` error * Add insertion_event_extremities table * Switch to chunk events so we can auth via power_levels Previously, we were using `content.chunk_id` to connect one chunk to another. But these events can be from any `sender` and we can't tell who should be able to send historical events. We know we only want the application service to do it but these events have the sender of a real historical message, not the application service user ID as the sender. Other federated homeservers also have no indicator which senders are an application service on the originating homeserver. So we want to auth all of the MSC2716 events via power_levels and have them be sent by the application service with proper PL levels in the room. * Switch to chunk events for federation * Add unstable room version to support new historical PL * Messy: Fix undefined state_group for federated historical events ``` 2021-07-13 02:27:57,810 - synapse.handlers.federation - 1248 - ERROR - GET-4 - Failed to backfill from hs1 because NOT NULL constraint failed: event_to_state_groups.state_group Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 1216, in try_backfill await self.backfill( File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 1035, in backfill await self._auth_and_persist_event(dest, event, context, backfilled=True) File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 2222, in _auth_and_persist_event await self._run_push_actions_and_persist_event(event, context, backfilled) File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 2244, in _run_push_actions_and_persist_event await self.persist_events_and_notify( File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 3290, in persist_events_and_notify events, max_stream_token = await self.storage.persistence.persist_events( File "/usr/local/lib/python3.8/site-packages/synapse/logging/opentracing.py", line 774, in _trace_inner return await func(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 320, in persist_events ret_vals = await yieldable_gather_results(enqueue, partitioned.items()) File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 237, in handle_queue_loop ret = await self._per_item_callback( File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 577, in _persist_event_batch await self.persist_events_store._persist_events_and_state_updates( File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 176, in _persist_events_and_state_updates await self.db_pool.runInteraction( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 681, in runInteraction result = await self.runWithConnection( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 770, in runWithConnection return await make_deferred_yieldable( File "/usr/local/lib/python3.8/site-packages/twisted/python/threadpool.py", line 238, in inContext result = inContext.theWork() # type: ignore[attr-defined] File "/usr/local/lib/python3.8/site-packages/twisted/python/threadpool.py", line 254, in <lambda> inContext.theWork = lambda: context.call( # type: ignore[attr-defined] File "/usr/local/lib/python3.8/site-packages/twisted/python/context.py", line 118, in callWithContext return self.currentContext().callWithContext(ctx, func, *args, **kw) File "/usr/local/lib/python3.8/site-packages/twisted/python/context.py", line 83, in callWithContext return func(*args, **kw) File "/usr/local/lib/python3.8/site-packages/twisted/enterprise/adbapi.py", line 293, in _runWithConnection compat.reraise(excValue, excTraceback) File "/usr/local/lib/python3.8/site-packages/twisted/python/deprecate.py", line 298, in deprecatedFunction return function(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/twisted/python/compat.py", line 403, in reraise raise exception.with_traceback(traceback) File "/usr/local/lib/python3.8/site-packages/twisted/enterprise/adbapi.py", line 284, in _runWithConnection result = func(conn, *args, **kw) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 765, in inner_func return func(db_conn, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 549, in new_transaction r = func(cursor, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/logging/utils.py", line 69, in wrapped return f(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 385, in _persist_events_txn self._store_event_state_mappings_txn(txn, events_and_contexts) File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 2065, in _store_event_state_mappings_txn self.db_pool.simple_insert_many_txn( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 923, in simple_insert_many_txn txn.execute_batch(sql, vals) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 280, in execute_batch self.executemany(sql, args) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 300, in executemany self._do_execute(self.txn.executemany, sql, *args) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 330, in _do_execute return func(sql, *args) sqlite3.IntegrityError: NOT NULL constraint failed: event_to_state_groups.state_group ``` * Revert "Messy: Fix undefined state_group for federated historical events" This reverts commit 187ab28. * Fix federated events being rejected for no state_groups Add fix from #10439 until it merges. * Adapting to experimental room version * Some log cleanup * Add better comments around extremity fetching code and why * Rename to be more accurate to what the function returns * Add changelog * Ignore rejected events * Use simplified upsert * Add Erik's explanation of extra event checks See #10498 (comment) * Clarify that the depth is not directly correlated to the backwards extremity that we return See #10498 (comment) * lock only matters for sqlite See #10498 (comment) * Move new SQL changes to its own delta file * Clean up upsert docstring * Bump database schema version (62)
1 parent c37dad6 commit 684d19a

File tree

8 files changed

+265
-35
lines changed

8 files changed

+265
-35
lines changed

changelog.d/10498.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add support for "marker" events which makes historical events discoverable for servers that already have all of the scrollback history (part of MSC2716).

scripts-dev/complement.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
6565
fi
6666

6767
# Run the tests!
68-
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
68+
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...

synapse/handlers/federation.py

Lines changed: 113 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
from synapse import event_auth
4444
from synapse.api.constants import (
45+
EventContentFields,
4546
EventTypes,
4647
Membership,
4748
RejectedReason,
@@ -262,7 +263,12 @@ async def on_receive_pdu(
262263

263264
state = None
264265

265-
# Get missing pdus if necessary.
266+
# Check that the event passes auth based on the state at the event. This is
267+
# done for events that are to be added to the timeline (non-outliers).
268+
#
269+
# Get missing pdus if necessary:
270+
# - Fetching any missing prev events to fill in gaps in the graph
271+
# - Fetching state if we have a hole in the graph
266272
if not pdu.internal_metadata.is_outlier():
267273
# We only backfill backwards to the min depth.
268274
min_depth = await self.get_min_depth_for_context(pdu.room_id)
@@ -432,6 +438,13 @@ async def on_receive_pdu(
432438
affected=event_id,
433439
)
434440

441+
# A second round of checks for all events. Check that the event passes auth
442+
# based on `auth_events`, this allows us to assert that the event would
443+
# have been allowed at some point. If an event passes this check its OK
444+
# for it to be used as part of a returned `/state` request, as either
445+
# a) we received the event as part of the original join and so trust it, or
446+
# b) we'll do a state resolution with existing state before it becomes
447+
# part of the "current state", which adds more protection.
435448
await self._process_received_pdu(origin, pdu, state=state)
436449

437450
async def _get_missing_events_for_pdu(
@@ -889,6 +902,79 @@ async def _process_received_pdu(
889902
"resync_device_due_to_pdu", self._resync_device, event.sender
890903
)
891904

905+
await self._handle_marker_event(origin, event)
906+
907+
async def _handle_marker_event(self, origin: str, marker_event: EventBase):
908+
"""Handles backfilling the insertion event when we receive a marker
909+
event that points to one.
910+
911+
Args:
912+
origin: Origin of the event. Will be called to get the insertion event
913+
marker_event: The event to process
914+
"""
915+
916+
if marker_event.type != EventTypes.MSC2716_MARKER:
917+
# Not a marker event
918+
return
919+
920+
if marker_event.rejected_reason is not None:
921+
# Rejected event
922+
return
923+
924+
# Skip processing a marker event if the room version doesn't
925+
# support it.
926+
room_version = await self.store.get_room_version(marker_event.room_id)
927+
if not room_version.msc2716_historical:
928+
return
929+
930+
logger.debug("_handle_marker_event: received %s", marker_event)
931+
932+
insertion_event_id = marker_event.content.get(
933+
EventContentFields.MSC2716_MARKER_INSERTION
934+
)
935+
936+
if insertion_event_id is None:
937+
# Nothing to retrieve then (invalid marker)
938+
return
939+
940+
logger.debug(
941+
"_handle_marker_event: backfilling insertion event %s", insertion_event_id
942+
)
943+
944+
await self._get_events_and_persist(
945+
origin,
946+
marker_event.room_id,
947+
[insertion_event_id],
948+
)
949+
950+
insertion_event = await self.store.get_event(
951+
insertion_event_id, allow_none=True
952+
)
953+
if insertion_event is None:
954+
logger.warning(
955+
"_handle_marker_event: server %s didn't return insertion event %s for marker %s",
956+
origin,
957+
insertion_event_id,
958+
marker_event.event_id,
959+
)
960+
return
961+
962+
logger.debug(
963+
"_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
964+
insertion_event,
965+
marker_event,
966+
)
967+
968+
await self.store.insert_insertion_extremity(
969+
insertion_event_id, marker_event.room_id
970+
)
971+
972+
logger.debug(
973+
"_handle_marker_event: insertion extremity added for %s from marker event %s",
974+
insertion_event,
975+
marker_event,
976+
)
977+
892978
async def _resync_device(self, sender: str) -> None:
893979
"""We have detected that the device list for the given user may be out
894980
of sync, so we try and resync them.
@@ -1057,9 +1143,19 @@ async def maybe_backfill(
10571143
async def _maybe_backfill_inner(
10581144
self, room_id: str, current_depth: int, limit: int
10591145
) -> bool:
1060-
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
1146+
oldest_events_with_depth = (
1147+
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
1148+
)
1149+
insertion_events_to_be_backfilled = (
1150+
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
1151+
)
1152+
logger.debug(
1153+
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
1154+
oldest_events_with_depth,
1155+
insertion_events_to_be_backfilled,
1156+
)
10611157

1062-
if not extremities:
1158+
if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
10631159
logger.debug("Not backfilling as no extremeties found.")
10641160
return False
10651161

@@ -1089,10 +1185,12 @@ async def _maybe_backfill_inner(
10891185
# state *before* the event, ignoring the special casing certain event
10901186
# types have.
10911187

1092-
forward_events = await self.store.get_successor_events(list(extremities))
1188+
forward_event_ids = await self.store.get_successor_events(
1189+
list(oldest_events_with_depth)
1190+
)
10931191

10941192
extremities_events = await self.store.get_events(
1095-
forward_events,
1193+
forward_event_ids,
10961194
redact_behaviour=EventRedactBehaviour.AS_IS,
10971195
get_prev_content=False,
10981196
)
@@ -1106,10 +1204,19 @@ async def _maybe_backfill_inner(
11061204
redact=False,
11071205
check_history_visibility_only=True,
11081206
)
1207+
logger.debug(
1208+
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
1209+
)
11091210

1110-
if not filtered_extremities:
1211+
if not filtered_extremities and not insertion_events_to_be_backfilled:
11111212
return False
11121213

1214+
extremities = {
1215+
**oldest_events_with_depth,
1216+
# TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
1217+
**insertion_events_to_be_backfilled,
1218+
}
1219+
11131220
# Check if we reached a point where we should start backfilling.
11141221
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
11151222
max_depth = sorted_extremeties_tuple[0][1]

synapse/storage/database.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -941,13 +941,13 @@ async def simple_upsert(
941941
942942
`lock` should generally be set to True (the default), but can be set
943943
to False if either of the following are true:
944-
945-
* there is a UNIQUE INDEX on the key columns. In this case a conflict
946-
will cause an IntegrityError in which case this function will retry
947-
the update.
948-
949-
* we somehow know that we are the only thread which will be updating
950-
this table.
944+
1. there is a UNIQUE INDEX on the key columns. In this case a conflict
945+
will cause an IntegrityError in which case this function will retry
946+
the update.
947+
2. we somehow know that we are the only thread which will be updating
948+
this table.
949+
As an additional note, this parameter only matters for old SQLite versions
950+
because we will use native upserts otherwise.
951951
952952
Args:
953953
table: The table to upsert into

synapse/storage/databases/main/event_federation.py

Lines changed: 98 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -671,27 +671,97 @@ def _get_auth_chain_difference_txn(
671671
# Return all events where not all sets can reach them.
672672
return {eid for eid, n in event_to_missing_sets.items() if n}
673673

674-
async def get_oldest_events_with_depth_in_room(self, room_id):
674+
async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
675+
"""Gets the oldest events(backwards extremities) in the room along with the
676+
aproximate depth.
677+
678+
We use this function so that we can compare and see if someones current
679+
depth at their current scrollback is within pagination range of the
680+
event extremeties. If the current depth is close to the depth of given
681+
oldest event, we can trigger a backfill.
682+
683+
Args:
684+
room_id: Room where we want to find the oldest events
685+
686+
Returns:
687+
Map from event_id to depth
688+
"""
689+
690+
def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
691+
# Assemble a dictionary with event_id -> depth for the oldest events
692+
# we know of in the room. Backwards extremeties are the oldest
693+
# events we know of in the room but we only know of them because
694+
# some other event referenced them by prev_event and aren't peristed
695+
# in our database yet (meaning we don't know their depth
696+
# specifically). So we need to look for the aproximate depth from
697+
# the events connected to the current backwards extremeties.
698+
sql = """
699+
SELECT b.event_id, MAX(e.depth) FROM events as e
700+
/**
701+
* Get the edge connections from the event_edges table
702+
* so we can see whether this event's prev_events points
703+
* to a backward extremity in the next join.
704+
*/
705+
INNER JOIN event_edges as g
706+
ON g.event_id = e.event_id
707+
/**
708+
* We find the "oldest" events in the room by looking for
709+
* events connected to backwards extremeties (oldest events
710+
* in the room that we know of so far).
711+
*/
712+
INNER JOIN event_backward_extremities as b
713+
ON g.prev_event_id = b.event_id
714+
WHERE b.room_id = ? AND g.is_state is ?
715+
GROUP BY b.event_id
716+
"""
717+
718+
txn.execute(sql, (room_id, False))
719+
720+
return dict(txn)
721+
675722
return await self.db_pool.runInteraction(
676-
"get_oldest_events_with_depth_in_room",
677-
self.get_oldest_events_with_depth_in_room_txn,
723+
"get_oldest_event_ids_with_depth_in_room",
724+
get_oldest_event_ids_with_depth_in_room_txn,
678725
room_id,
679726
)
680727

681-
def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
682-
sql = (
683-
"SELECT b.event_id, MAX(e.depth) FROM events as e"
684-
" INNER JOIN event_edges as g"
685-
" ON g.event_id = e.event_id"
686-
" INNER JOIN event_backward_extremities as b"
687-
" ON g.prev_event_id = b.event_id"
688-
" WHERE b.room_id = ? AND g.is_state is ?"
689-
" GROUP BY b.event_id"
690-
)
728+
async def get_insertion_event_backwards_extremities_in_room(
729+
self, room_id
730+
) -> Dict[str, int]:
731+
"""Get the insertion events we know about that we haven't backfilled yet.
691732
692-
txn.execute(sql, (room_id, False))
733+
We use this function so that we can compare and see if someones current
734+
depth at their current scrollback is within pagination range of the
735+
insertion event. If the current depth is close to the depth of given
736+
insertion event, we can trigger a backfill.
693737
694-
return dict(txn)
738+
Args:
739+
room_id: Room where we want to find the oldest events
740+
741+
Returns:
742+
Map from event_id to depth
743+
"""
744+
745+
def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
746+
sql = """
747+
SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
748+
/* We only want insertion events that are also marked as backwards extremities */
749+
INNER JOIN insertion_event_extremities as b USING (event_id)
750+
/* Get the depth of the insertion event from the events table */
751+
INNER JOIN events AS e USING (event_id)
752+
WHERE b.room_id = ?
753+
GROUP BY b.event_id
754+
"""
755+
756+
txn.execute(sql, (room_id,))
757+
758+
return dict(txn)
759+
760+
return await self.db_pool.runInteraction(
761+
"get_insertion_event_backwards_extremities_in_room",
762+
get_insertion_event_backwards_extremities_in_room_txn,
763+
room_id,
764+
)
695765

696766
async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
697767
"""Returns the event ID and depth for the event that has the max depth from a set of event IDs
@@ -1041,7 +1111,6 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
10411111
if row[1] not in event_results:
10421112
queue.put((-row[0], row[1]))
10431113

1044-
# Navigate up the DAG by prev_event
10451114
txn.execute(query, (event_id, False, limit - len(event_results)))
10461115
prev_event_id_results = txn.fetchall()
10471116
logger.debug(
@@ -1136,6 +1205,19 @@ def _delete_old_forward_extrem_cache_txn(txn):
11361205
_delete_old_forward_extrem_cache_txn,
11371206
)
11381207

1208+
async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
1209+
await self.db_pool.simple_upsert(
1210+
table="insertion_event_extremities",
1211+
keyvalues={"event_id": event_id},
1212+
values={
1213+
"event_id": event_id,
1214+
"room_id": room_id,
1215+
},
1216+
insertion_values={},
1217+
desc="insert_insertion_extremity",
1218+
lock=False,
1219+
)
1220+
11391221
async def insert_received_event_to_staging(
11401222
self, origin: str, event: EventBase
11411223
) -> None:

synapse/storage/databases/main/events.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1845,6 +1845,18 @@ def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
18451845
},
18461846
)
18471847

1848+
# When we receive an event with a `chunk_id` referencing the
1849+
# `next_chunk_id` of the insertion event, we can remove it from the
1850+
# `insertion_event_extremities` table.
1851+
sql = """
1852+
DELETE FROM insertion_event_extremities WHERE event_id IN (
1853+
SELECT event_id FROM insertion_events
1854+
WHERE next_chunk_id = ?
1855+
)
1856+
"""
1857+
1858+
txn.execute(sql, (chunk_id,))
1859+
18481860
def _handle_redaction(self, txn, redacted_event_id):
18491861
"""Handles receiving a redaction and checking whether we need to remove
18501862
any redacted relations from the database.
@@ -2101,15 +2113,17 @@ def _update_backward_extremeties(self, txn, events):
21012113
21022114
Forward extremities are handled when we first start persisting the events.
21032115
"""
2116+
# From the events passed in, add all of the prev events as backwards extremities.
2117+
# Ignore any events that are already backwards extrems or outliers.
21042118
query = (
21052119
"INSERT INTO event_backward_extremities (event_id, room_id)"
21062120
" SELECT ?, ? WHERE NOT EXISTS ("
2107-
" SELECT 1 FROM event_backward_extremities"
2108-
" WHERE event_id = ? AND room_id = ?"
2121+
" SELECT 1 FROM event_backward_extremities"
2122+
" WHERE event_id = ? AND room_id = ?"
21092123
" )"
21102124
" AND NOT EXISTS ("
2111-
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
2112-
" AND outlier = ?"
2125+
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
2126+
" AND outlier = ?"
21132127
" )"
21142128
)
21152129

@@ -2123,6 +2137,8 @@ def _update_backward_extremeties(self, txn, events):
21232137
],
21242138
)
21252139

2140+
# Delete all these events that we've already fetched and now know that their
2141+
# prev events are the new backwards extremeties.
21262142
query = (
21272143
"DELETE FROM event_backward_extremities"
21282144
" WHERE event_id = ? AND room_id = ?"

0 commit comments

Comments
 (0)