Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 5 additions & 19 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ async def action_for_event_by_user(self, event, context) -> None:
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)

actions_by_user[uid] = []

for rule in rules:
if "enabled" in rule and not rule["enabled"]:
continue
Expand All @@ -233,28 +235,12 @@ async def action_for_event_by_user(self, event, context) -> None:
actions_by_user[uid] = actions
break

# If the event should be counted as unread, add mark_unread to its actions.
# mark_unread is an internal action we use to tell add_push_actions_to_staging
# that we want this event to have the unread bit set to 1 in the push action
# tables.
# The push rules endpoint on the CS API checks the actions on new push rules
# and limit them to spec'd ones, so we shouldn't have to worry about users
# changing their push rules to include this action.
# An alternative way to do this would be to pass count_as_unread directly to
# add_push_actions_to_staging, but that wouldn't work as that function relies
# on actions_by_user to determine the list of users to insert a row for,
# therefore it wouldn't add a row for any user that's in the room but doesn't
# get notified by the event.
if count_as_unread:
if uid in actions_by_user:
actions_by_user[uid].append("mark_unread")
else:
actions_by_user[uid] = ["mark_unread"]

# Mark in the DB staging area the push actions for users who should be
# notified for this event. (This will then get handled when we persist
# the event)
await self.store.add_push_actions_to_staging(event.event_id, actions_by_user)
await self.store.add_push_actions_to_staging(
event.event_id, actions_by_user, count_as_unread,
)


def _condition_checker(evaluator, conditions, uid, display_name, cache):
Expand Down
153 changes: 50 additions & 103 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

import logging
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple, Union

import attr

Expand Down Expand Up @@ -66,16 +66,6 @@ def _deserialize_action(actions, is_highlight):
return DEFAULT_NOTIF_ACTION


@attr.s
class EventPushSummary:
"""Summary of pending event push actions for a given user in a given room."""

unread_count = attr.ib(type=int)
stream_ordering = attr.ib(type=int)
old_user_id = attr.ib(type=str)
notif_count = attr.ib(type=int)


class EventPushActionsWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super(EventPushActionsWorkerStore, self).__init__(database, db_conn, hs)
Expand Down Expand Up @@ -106,7 +96,7 @@ async def get_unread_event_push_actions_by_room_for_user(
for a given user in a given room after the given read receipt.

Note that this function assumes the user to be a current member of the room,
since it's either call by the sync handler to handle joined room entries, or by
since it's either called by the sync handler to handle joined room entries, or by
the HTTP pusher to calculate the badge of unread joined rooms.

Args:
Expand Down Expand Up @@ -157,101 +147,43 @@ def _get_unread_counts_by_receipt_txn(
)

def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):

# First get number of notifications.
# We need to look specifically for events with notif = 1 because otherwise we'll
# count unread events (from MSC2654) that might not notify.
notify_count = self._get_count_from_push_actions_txn(
txn=txn,
user_id=user_id,
room_id=room_id,
stream_ordering=stream_ordering,
push_actions_column="notif",
push_summary_column="notif_count",
)

# Now get the number of highlights
highlight_count = self._get_count_from_push_actions_txn(
txn=txn,
user_id=user_id,
room_id=room_id,
stream_ordering=stream_ordering,
push_actions_column="highlight",
push_summary_column=None,
)

# Finally, get the number of unread messages
unread_count = self._get_count_from_push_actions_txn(
txn=txn,
user_id=user_id,
room_id=room_id,
stream_ordering=stream_ordering,
push_actions_column="unread",
push_summary_column="unread_count",
sql = (
"SELECT"
" COUNT(CASE WHEN notif = 1 THEN 1 END),"
" COUNT(CASE WHEN highlight = 1 THEN 1 END),"
" COUNT(CASE WHEN unread = 1 THEN 1 END)"
" FROM event_push_actions ea"
" WHERE user_id = ?"
" AND room_id = ?"
" AND stream_ordering > ?"
)

return {
"notify_count": notify_count,
"unread_count": unread_count,
"highlight_count": highlight_count,
}

def _get_count_from_push_actions_txn(
self,
txn: LoggingTransaction,
user_id: str,
room_id: str,
stream_ordering: int,
push_actions_column: str,
push_summary_column: Optional[str],
) -> int:
"""Counts the number of rows in event_push_actions with the given flag set to
true, and adds it up with its matching count in event_push_summary if any.
txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()

Args:
user_id: The user to calculate the count for.
room_id: The room to calculate the count for.
stream_ordering: The stream ordering to use in the conditional clause when
querying from event_push_actions.
push_actions_column: The column to filter by when querying from
event_push_actions. The filtering will be done on the condition
"[column] = 1".
push_summary_column: The count in event_push_summary to retrieve and add to
the results from the first query. None if there's no such count.
(notif_count, highlight_count, unread_count) = (0, 0, 0)

Returns:
The desired count.
"""
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND stream_ordering > ?"
" AND %s = 1"
)
if row:
(notif_count, highlight_count, unread_count) = row

txn.execute(
sql % push_actions_column, (user_id, room_id, stream_ordering),
"""
SELECT notif_count, unread_count FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
""",
(room_id, user_id, stream_ordering),
)
row = txn.fetchone()
count = row[0] if row else 0

if push_summary_column:
txn.execute(
"""
SELECT %s FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
"""
% push_summary_column,
(room_id, user_id, stream_ordering),
)
rows = txn.fetchall()
if rows:
count += rows[0][0]
if row:
notif_count += row[0]
unread_count += row[1]

return count
return {
"notify_count": notif_count,
"unread_count": unread_count,
"highlight_count": highlight_count,
}

async def get_push_action_users_in_range(
self, min_stream_ordering, max_stream_ordering
Expand Down Expand Up @@ -509,14 +441,18 @@ def _get_if_maybe_push_in_range_for_user_txn(txn):
)

async def add_push_actions_to_staging(
self, event_id: str, user_id_actions: Dict[str, List[Union[dict, str]]]
self,
event_id: str,
user_id_actions: Dict[str, List[Union[dict, str]]],
count_as_unread=False,
) -> None:
"""Add the push actions for the event to the push action staging area.

Args:
event_id
user_id_actions: A mapping of user_id to list of push actions, where
an action can either be a string or dict.
count_as_unread: Whether this event should increment unread counts.
"""
if not user_id_actions:
return
Expand All @@ -526,14 +462,13 @@ async def add_push_actions_to_staging(
def _gen_entry(user_id, actions):
is_highlight = 1 if _action_has_highlight(actions) else 0
notif = 1 if "notify" in actions else 0
unread = 1 if "mark_unread" in actions else 0
return (
event_id, # event_id column
user_id, # user_id column
_serialize_action(actions, is_highlight), # actions column
notif, # notif column
is_highlight, # highlight column
unread, # unread column
int(count_as_unread), # unread column
)

def _add_push_actions_to_staging_txn(txn):
Expand Down Expand Up @@ -933,9 +868,9 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
# object because we might not have the same amount of rows in each of them. To do
# this, we use a dict indexed on the user ID and room ID to make it easier to
# populate.
summaries = {} # type: Dict[Tuple[str, str], EventPushSummary]
summaries = {} # type: Dict[Tuple[str, str], _EventPushSummary]
for row in txn:
summaries[(row[0], row[1])] = EventPushSummary(
summaries[(row[0], row[1])] = _EventPushSummary(
unread_count=row[2],
stream_ordering=row[3],
old_user_id=row[4],
Expand All @@ -956,7 +891,7 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
# a message unread, we might end up with messages that notify but aren't
# marked unread, so we might not have a summary for this (user, room)
# tuple to complete.
summaries[(row[0], row[1])] = EventPushSummary(
summaries[(row[0], row[1])] = _EventPushSummary(
unread_count=0,
stream_ordering=row[3],
old_user_id=row[4],
Expand Down Expand Up @@ -1026,3 +961,15 @@ def _action_has_highlight(actions):
pass

return False


@attr.s
class _EventPushSummary:
"""Summary of pending event push actions for a given user in a given room.
Used in _rotate_notifs_before_txn to manipulate results from event_push_actions.
"""

unread_count = attr.ib(type=int)
stream_ordering = attr.ib(type=int)
old_user_id = attr.ib(type=str)
notif_count = attr.ib(type=int)