Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10303.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that inbound events from federation that were being processed when Synapse was restarted get promptly processed on start up.
67 changes: 57 additions & 10 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,41 @@ def __init__(self, hs: "HomeServer"):

self._room_prejoin_state_types = hs.config.api.room_prejoin_state

# Whether we have started handling old events in the staging area.
self._started_handling_of_staged_events = False

@wrap_as_background_process("_handle_old_staged_events")
async def _handle_old_staged_events(self) -> None:
"""Handle old staged events by fetching all rooms that have staged
events and start the processing of each of those rooms.
"""

# Get all the rooms IDs with staged events.
room_ids = await self.store.get_all_rooms_with_staged_incoming_events()

# We then shuffle them so that if there are multiple instances doing
# this work they're less likely to collide.
random.shuffle(room_ids)

for room_id in room_ids:
room_version = await self.store.get_room_version(room_id)

# Try and acquire the processing lock for the room, if we get it start a
# background process for handling the events in the room.
lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
)
if lock:
logger.info("Handling old staged inbound events in %s", room_id)
self._process_incoming_pdus_in_room_inner(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably worth logging something here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the common case is that we get the lock, so I wonder if it'd be better to log when we don't acquire the lock? (From reducing our log lines perspective?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done that for now, but I'm not particularly happy about the wording.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also note we log handling received PDU when we come round to actually handling an event)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrmmm. given this should only happen once (per room with staged events) at startup - I don't feel like the logging would be too onerous?

Copy link
Member Author

@erikjohnston erikjohnston Jul 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely misread which line this was talking about. Sorry and fixed

room_id,
room_version,
lock,
)

# We pause a bit so that we don't start handling all rooms at once.
await self._clock.sleep(random.uniform(0, 0.1))

async def on_backfill_request(
self, origin: str, room_id: str, versions: List[str], limit: int
) -> Tuple[int, Dict[str, Any]]:
Expand All @@ -166,6 +201,12 @@ async def on_backfill_request(
async def on_incoming_transaction(
self, origin: str, transaction_data: JsonDict
) -> Tuple[int, Dict[str, Any]]:
# If we receive a transaction we should make sure that kick off handling
# any old events in the staging area.
if not self._started_handling_of_staged_events:
self._started_handling_of_staged_events = True
self._handle_old_staged_events()

# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = self._clock.time_msec()
Expand Down Expand Up @@ -882,32 +923,38 @@ async def _process_incoming_pdus_in_room_inner(
room_id: str,
room_version: RoomVersion,
lock: Lock,
latest_origin: str,
latest_event: EventBase,
latest_origin: Optional[str] = None,
latest_event: Optional[EventBase] = None,
) -> None:
"""Process events in the staging area for the given room.

The latest_origin and latest_event args are the latest origin and event
received.
received (or None to simply pull the next event from the database).
"""

# The common path is for the event we just received be the only event in
# the room, so instead of pulling the event out of the DB and parsing
# the event we just pull out the next event ID and check if that matches.
next_origin, next_event_id = await self.store.get_next_staged_event_id_for_room(
room_id
)
if next_origin == latest_origin and next_event_id == latest_event.event_id:
origin = latest_origin
event = latest_event
else:
if latest_event is not None and latest_origin is not None:
(
next_origin,
next_event_id,
) = await self.store.get_next_staged_event_id_for_room(room_id)
if next_origin != latest_origin or next_event_id != latest_event.event_id:
latest_origin = None
latest_event = None

if latest_origin is None or latest_event is None:
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)
if not next:
return

origin, event = next
else:
origin = latest_origin
event = latest_event

# We loop round until there are no more events in the room in the
# staging area, or we fail to get the lock (which means another process
Expand Down
9 changes: 9 additions & 0 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,15 @@ def _get_next_staged_event_for_room_txn(txn):

return origin, event

async def get_all_rooms_with_staged_incoming_events(self) -> List[str]:
"""Get the room IDs of all events currently staged."""
return await self.db_pool.simple_select_onecol(
table="federation_inbound_events_staging",
keyvalues={},
retcol="DISTINCT room_id",
desc="get_all_rooms_with_staged_incoming_events",
)

@wrap_as_background_process("_get_stats_for_federation_staging")
async def _get_stats_for_federation_staging(self):
"""Update the prometheus metrics for the inbound federation staging area."""
Expand Down