Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
23 changes: 15 additions & 8 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,24 @@ async def _run_notifier_loop(self):
# The token has advanced but there is no data to
# send, so we send a `POSITION` to inform other
# workers of the updated position.
logger.info(
"Sending position: %s -> %s", stream.NAME, current_token
)
self.command_handler.send_command(
PositionCommand(
if stream.NAME == EventsStream.NAME:
# XXX: We only do this for the EventStream as it
# turns out that e.g. account data streams share
# their "current token" with each other, meaning
# that it is *not* safe to send a POSITION.
Comment on lines +188 to +191
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why exactly is it not safe to share positions with each other? Can you only have one worker advancing the account data stream?

Are you only doing the eventstream here out of caution? Would this be a problem for streams other than account?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take the account data streams as an example, say we persist a bunch of room account data rows at 3, 4, 5, then if before we handle the room account data stream we send a POSITION global_account_data 2 5. The workers will update the shared account data current token to 5, so when they finally do receive RDATA room_account_data 3 ... etc they will compare the stream ID with the current token (i.e. 3 vs 5) and drop the RDATA as old.

Really, we should have one replication stream per stream ID generator, and I plan to do that but didn't want to change the world in this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, that makes sense. For the event persisters this is fine as an event only needs to be persisted once.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeeeah, specifically it's fine as we're using an ID gen that supports sharded streams (rather than reusing the same ID gen across different replication streams)

logger.info(
"Sending position: %s -> %s",
stream.NAME,
self._instance_name,
last_token,
current_token,
)
)
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
last_token,
current_token,
)
)
continue

# Some streams return multiple rows with the same stream IDs,
Expand Down