-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Handle old staged inbound events #10303
Changes from 2 commits
d1ecbff
7cccdf0
801e3ab
e47b116
e58ada4
1dfc6ad
0b5b8f7
0d7bfe4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Ensure that inbound events from federation that were being processed when Synapse was restarted get promptly processed on start up. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,6 +148,52 @@ def __init__(self, hs: "HomeServer"): | |
|
||
self._room_prejoin_state_types = hs.config.api.room_prejoin_state | ||
|
||
# Whether we have started handling old events in the staging area. | ||
self._started_handling_of_staged_events = False | ||
|
||
def _start_handling_old_staged_events(self) -> None: | ||
"""Start handling old events in the staging area. This should be called | ||
when we receive a new incoming transaction (and are thus an instance | ||
that handles incoming federation). | ||
""" | ||
if self._started_handling_of_staged_events: | ||
return | ||
|
||
self._started_handling_of_staged_events = True | ||
|
||
self._handle_old_staged_events() | ||
|
||
@wrap_as_background_process("_handle_old_staged_events") | ||
async def _handle_old_staged_events(self) -> None: | ||
"""Handle old staged events by fetching all rooms that have staged | ||
events and start the processing of each of those rooms. | ||
""" | ||
|
||
# Get all the rooms IDs with staged events. | ||
room_ids = await self.store.get_all_staged_rooms() | ||
|
||
# We then shuffle them so that if there are multiple instances doing | ||
# this work they're less likely to collide. | ||
random.shuffle(room_ids) | ||
|
||
for room_id in room_ids: | ||
room_version = await self.store.get_room_version(room_id) | ||
|
||
# Try and acquire the processing lock for the room, if we get it start a | ||
# background process for handling the events in the room. | ||
lock = await self.store.try_acquire_lock( | ||
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id | ||
) | ||
if lock: | ||
self._process_incoming_pdus_in_room_inner( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably worth logging something here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the common case is that we get the lock, so I wonder if it'd be better to log when we don't acquire the lock? (From reducing our log lines perspective?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've done that for now, but I'm not particularly happy about the wording. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Also note we log There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hrmmm. given this should only happen once (per room with staged events) at startup - I don't feel like the logging would be too onerous? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I completely misread which line this was talking about. Sorry and fixed |
||
room_id, | ||
room_version, | ||
lock, | ||
) | ||
|
||
# We pause a bit so that we don't start handling all rooms at once. | ||
await self._clock.sleep(random.uniform(0, 0.1)) | ||
|
||
async def on_backfill_request( | ||
self, origin: str, room_id: str, versions: List[str], limit: int | ||
) -> Tuple[int, Dict[str, Any]]: | ||
|
@@ -166,6 +212,10 @@ async def on_backfill_request( | |
async def on_incoming_transaction( | ||
self, origin: str, transaction_data: JsonDict | ||
) -> Tuple[int, Dict[str, Any]]: | ||
# If we receive a transaction we should make sure that kick off handling | ||
# any old events in the staging area. | ||
self._start_handling_old_staged_events() | ||
|
||
# keep this as early as possible to make the calculated origin ts as | ||
# accurate as possible. | ||
request_time = self._clock.time_msec() | ||
|
@@ -882,32 +932,40 @@ async def _process_incoming_pdus_in_room_inner( | |
room_id: str, | ||
room_version: RoomVersion, | ||
lock: Lock, | ||
latest_origin: str, | ||
latest_event: EventBase, | ||
latest_origin: Optional[str] = None, | ||
latest_event: Optional[EventBase] = None, | ||
) -> None: | ||
"""Process events in the staging area for the given room. | ||
|
||
The latest_origin and latest_event args are the latest origin and event | ||
received. | ||
received (or None to simply pull the next event from the database). | ||
""" | ||
|
||
# The common path is for the event we just received be the only event in | ||
# the room, so instead of pulling the event out of the DB and parsing | ||
# the event we just pull out the next event ID and check if that matches. | ||
next_origin, next_event_id = await self.store.get_next_staged_event_id_for_room( | ||
room_id | ||
) | ||
if next_origin == latest_origin and next_event_id == latest_event.event_id: | ||
origin = latest_origin | ||
event = latest_event | ||
else: | ||
found_origin = None | ||
found_event = None | ||
if latest_event is not None and latest_origin is not None: | ||
( | ||
next_origin, | ||
next_event_id, | ||
) = await self.store.get_next_staged_event_id_for_room(room_id) | ||
if next_origin == latest_origin and next_event_id == latest_event.event_id: | ||
found_origin = latest_origin | ||
found_event = latest_event | ||
|
||
if found_origin is None or found_event is None: | ||
erikjohnston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
next = await self.store.get_next_staged_event_for_room( | ||
room_id, room_version | ||
) | ||
if not next: | ||
return | ||
|
||
origin, event = next | ||
else: | ||
origin = found_origin | ||
event = found_event | ||
erikjohnston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
# We loop round until there are no more events in the room in the | ||
# staging area, or we fail to get the lock (which means another process | ||
|
Uh oh!
There was an error while loading. Please reload this page.