Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
1 change: 1 addition & 0 deletions changelog.d/11247.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean up some areas of the codebase relating to to-device messages and sending ephemeral events to application services.
22 changes: 18 additions & 4 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def notify_interested_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
users: Collection[Union[str, UserID]],
) -> None:
"""
This is called by the notifier in the background when an ephemeral event is handled
Expand All @@ -203,7 +203,7 @@ def notify_interested_services_ephemeral(
value for `stream_key` will cause this function to return early.

Ephemeral events will only be pushed to appservices that have opted into
them.
them via the ephemeral AS registration file option.

Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.
Expand All @@ -214,6 +214,7 @@ def notify_interested_services_ephemeral(
if not self.notify_appservices:
return

# Ignore any unsupported streams
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return

Expand All @@ -230,18 +231,25 @@ def notify_interested_services_ephemeral(
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)

# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
# Note that whether these events are actually relevant to these appservices
# is decided later on.
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
if not services:
# Bail out early if none of the target appservices have explicitly registered
# to receive these ephemeral events.
return

# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services_ephemeral(
services, stream_key, new_token, users or []
services, stream_key, new_token, users
)

@wrap_as_background_process("notify_interested_services_ephemeral")
Expand All @@ -252,7 +260,7 @@ async def _notify_interested_services_ephemeral(
new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s" % (stream_key))
logger.debug("Checking interested services for %s" % stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
if stream_key == "typing_key":
Expand Down Expand Up @@ -345,6 +353,9 @@ async def _handle_receipts(

Args:
service: The application service to check for which events it should receive.
new_token: A receipts event stream token. Purely used to double-check that the
from_token we pull from the database isn't greater than or equal to this
token. Prevents accidentally duplicating work.

Returns:
A list of JSON dictionaries containing data derived from the read receipts that
Expand Down Expand Up @@ -382,6 +393,9 @@ async def _handle_presence(
Args:
service: The application service that ephemeral events are being sent to.
users: The users that should receive the presence update.
new_token: A presence update stream token. Purely used to double-check that the
from_token we pull from the database isn't greater than or equal to this
token. Prevents accidentally duplicating work.

Returns:
A list of json dictionaries containing data derived from the presence events
Expand Down
31 changes: 27 additions & 4 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ def __init__(self, hs: "HomeServer"):
)

async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
"""
Handle receiving to-device messages from remote homeservers.

Args:
origin: The remote homeserver.
content: The JSON dictionary containing the to-device messages.
"""
local_messages = {}
sender_user_id = content["sender"]
if origin != get_domain_from_id(sender_user_id):
Expand Down Expand Up @@ -135,12 +142,16 @@ async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
message_type, sender_user_id, by_device
)

stream_id = await self.store.add_messages_from_remote_to_device_inbox(
# Add messages to the database.
# Retrieve the stream id of the last-processed to-device message.
max_stream_id = await self.store.add_messages_from_remote_to_device_inbox(
origin, message_id, local_messages
)

# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
"to_device_key", stream_id, users=local_messages.keys()
"to_device_key", max_stream_id, users=local_messages.keys()
)

async def _check_for_unknown_devices(
Expand Down Expand Up @@ -195,6 +206,14 @@ async def send_device_message(
message_type: str,
messages: Dict[str, Dict[str, JsonDict]],
) -> None:
"""
Handle a request from a user to send to-device message(s).

Args:
requester: The user that is sending the to-device messages.
message_type: The type of to-device messages that are being sent.
messages: A dictionary containing recipients mapped to messages intended for them.
"""
sender_user_id = requester.user.to_string()

message_id = random_string(16)
Expand Down Expand Up @@ -257,12 +276,16 @@ async def send_device_message(
"org.matrix.opentracing_context": json_encoder.encode(context),
}

stream_id = await self.store.add_messages_to_device_inbox(
# Add messages to the database.
# Retrieve the stream id of the last-processed to-device message.
max_stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)

# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
"to_device_key", stream_id, users=local_messages.keys()
"to_device_key", max_stream_id, users=local_messages.keys()
)

if self.federation_sender:
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,16 +412,16 @@ def get_type_stream_id_for_appservice_txn(txn):
)

async def set_type_stream_id_for_appservice(
self, service: ApplicationService, type: str, pos: Optional[int]
self, service: ApplicationService, stream_type: str, pos: Optional[int]
) -> None:
if type not in ("read_receipt", "presence"):
if stream_type not in ("read_receipt", "presence"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (type,)
% (stream_type,)
)

def set_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type
stream_id_type = "%s_stream_id" % stream_type
txn.execute(
"UPDATE application_services_state SET %s = ? WHERE as_id=?"
% stream_id_type,
Expand Down
25 changes: 22 additions & 3 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ async def get_new_messages_for_device(
limit: The maximum number of messages to retrieve.

Returns:
A list of messages for the device and where in the stream the messages got to.
A tuple containing:
* A list of messages for the device.
* The max stream token of these messages. There may be more to retrieve
if the given limit was reached.
"""
has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_stream_id
Expand All @@ -153,12 +156,21 @@ def get_new_messages_for_device_txn(txn):
txn.execute(
sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
)

messages = []
stream_pos = current_stream_id

for row in txn:
stream_pos = row[0]
messages.append(db_to_json(row[1]))

# If we don't end up hitting the limit, we still want to return the equivalent
# value of `current_stream_id` to the calling function. This is needed as we'll
# be processing up to `current_stream_id`, without necessarily fetching a message
# with a stream token of `current_stream_id`.
if len(messages) < limit:
stream_pos = current_stream_id

return messages, stream_pos

return await self.db_pool.runInteraction(
Expand Down Expand Up @@ -261,12 +273,19 @@ def get_new_messages_for_remote_destination_txn(txn):
)
txn.execute(sql, (destination, last_stream_id, current_stream_id, limit))
messages = []

for row in txn:
stream_pos = row[0]
messages.append(db_to_json(row[1]))

# If we don't end up hitting the limit, we still want to return the equivalent
# value of `current_stream_id` to the calling function. This is needed as we'll
# be processing up to `current_stream_id`, without necessarily fetching a message
# with a stream token of `current_stream_id`.
if len(messages) < limit:
log_kv({"message": "Set stream position to current position"})
stream_pos = current_stream_id

return messages, stream_pos

return await self.db_pool.runInteraction(
Expand Down Expand Up @@ -372,8 +391,8 @@ async def add_messages_to_device_inbox(
"""Used to send messages from this server.

Args:
local_messages_by_user_and_device:
Dictionary of user_id to device_id to message.
local_messages_by_user_then_device:
Dictionary of recipient user_id to recipient device_id to message.
remote_messages_by_destination:
Dictionary of destination server_name to the EDU JSON to send.

Expand Down
8 changes: 6 additions & 2 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ def test_notify_interested_services_ephemeral(self):
make_awaitable(([event], None))
)

self.handler.notify_interested_services_ephemeral("receipt_key", 580)
self.handler.notify_interested_services_ephemeral(
"receipt_key", 580, ["@fakerecipient:example.com"]
)
self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
interested_service, [event]
)
Expand Down Expand Up @@ -300,7 +302,9 @@ def test_notify_interested_services_ephemeral_out_of_order(self):
make_awaitable(([event], None))
)

self.handler.notify_interested_services_ephemeral("receipt_key", 579)
self.handler.notify_interested_services_ephemeral(
"receipt_key", 580, ["@fakerecipient:example.com"]
)
self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called()

def _mkservice(self, is_interested, protocols=None):
Expand Down