Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5706.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.
18 changes: 16 additions & 2 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,21 @@
class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object
passed to the constructor. Adds logging and metrics to the .execute()
method."""
method.

Args:
txn: The database transcation object to wrap.
name (str): The name of this transactions for logging.
database_engine (Sqlite3Engine|PostgresEngine)
after_callbacks(list|None): A list that callbacks will be appended to
that have been added by `call_after` which should be run on
successful completion of the transaction. None indicates that no
callbacks should be allowed to be scheduled to run.
exception_callbacks(list|None): A list that callbacks will be appended
to that have been added by `call_on_exception` which should be run
if transaction ends with an error. None indicates that no callbacks
should be allowed to be scheduled to run.
"""

__slots__ = [
"txn",
Expand All @@ -97,7 +111,7 @@ class LoggingTransaction(object):
]

def __init__(
self, txn, name, database_engine, after_callbacks, exception_callbacks
self, txn, name, database_engine, after_callbacks=None, exception_callbacks=None
):
object.__setattr__(self, "txn", txn)
object.__setattr__(self, "name", name)
Expand Down
2 changes: 0 additions & 2 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ def __init__(self, db_conn, hs):
db_conn.cursor(),
name="_find_stream_orderings_for_times_txn",
database_engine=self.database_engine,
after_callbacks=[],
exception_callbacks=[],
)
self._find_stream_orderings_for_times_txn(cur)
cur.close()
Expand Down
26 changes: 14 additions & 12 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,8 +918,6 @@ def _persist_events_txn(
min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering

self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)

self._update_forward_extremities_txn(
txn,
new_forward_extremities=new_forward_extremeties,
Expand Down Expand Up @@ -993,6 +991,10 @@ def _persist_events_txn(
backfilled=backfilled,
)

# We call this last as it assumes we've inserted the events into
# room_memberships, where applicable.
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)

def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
Expand Down Expand Up @@ -1062,16 +1064,16 @@ def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
),
)

self._simple_insert_many_txn(
txn,
table="current_state_events",
values=[
{
"event_id": ev_id,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
}
# We include the membership in the current state table, hence we do
# a lookup when we insert. This assumes that all events have already
# been inserted into room_memberships.
txn.executemany(
"""INSERT INTO current_state_events
(room_id, type, state_key, event_id, membership)
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
""",
[
(room_id, key[0], key[1], ev_id, ev_id)
for key, ev_id in iteritems(to_insert)
],
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 55
SCHEMA_VERSION = 56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the thinking behind the schema bump?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly that this stops people from rolling back, which will cause membership column to not be correctly updated.


dir_path = os.path.abspath(os.path.dirname(__file__))

Expand Down
154 changes: 134 additions & 20 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer
Expand Down Expand Up @@ -53,9 +55,51 @@
MemberSummary = namedtuple("MemberSummary", ("members", "count"))

_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"


class RoomMemberWorkerStore(EventsWorkerStore):
def __init__(self, db_conn, hs):
super(RoomMemberWorkerStore, self).__init__(db_conn, hs)

# Is the current_state_events.membership up to date? Or is the
# background update still running?
self._current_state_events_membership_up_to_date = False

txn = LoggingTransaction(
db_conn.cursor(),
name="_check_safe_current_state_events_membership_updated",
database_engine=self.database_engine,
)
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()

def _check_safe_current_state_events_membership_updated_txn(self, txn):
"""Checks if it is safe to assume the new current_state_events
membership column is up to date
"""

pending_update = self._simple_select_one_txn(
txn,
table="background_updates",
keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
retcols=["update_name"],
allow_none=True,
)

self._current_state_events_membership_up_to_date = not pending_update

# If the update is still running, reschedule to run.
if pending_update:
self._clock.call_later(
15.0,
run_as_background_process,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\o/

"_check_safe_current_state_events_membership_updated",
self.runInteraction,
"_check_safe_current_state_events_membership_updated",
self._check_safe_current_state_events_membership_updated_txn,
)

@cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True)
def get_hosts_in_room(self, room_id, cache_context):
"""Returns the set of all hosts currently in the room
Expand All @@ -69,14 +113,23 @@ def get_hosts_in_room(self, room_id, cache_context):
@cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
sql = (
"SELECT m.user_id FROM room_memberships as m"
" INNER JOIN current_state_events as c"
" ON m.event_id = c.event_id "
" AND m.room_id = c.room_id "
" AND m.user_id = c.state_key"
" WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
)
# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT state_key FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
"""
else:
sql = """
SELECT state_key FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
"""

txn.execute(sql, (room_id, Membership.JOIN))
return [to_ascii(r[0]) for r in txn]
Expand All @@ -98,15 +151,26 @@ def _get_room_summary_txn(txn):
# first get counts.
# We do this all in one transaction to keep the cache small.
# FIXME: get rid of this when we have room_stats
sql = """
SELECT count(*), m.membership FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ?
GROUP BY m.membership
"""

# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT count(*), membership FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ?
GROUP BY membership
"""
else:
sql = """
SELECT count(*), m.membership FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ?
GROUP BY m.membership
"""

txn.execute(sql, (room_id,))
res = {}
Expand Down Expand Up @@ -224,7 +288,7 @@ def _get_rooms_for_user_where_membership_is_txn(
results = []
if membership_list:
where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
" OR ".join(["membership = ?" for _ in membership_list]),
" OR ".join(["m.membership = ?" for _ in membership_list]),
)

args = [user_id]
Expand Down Expand Up @@ -453,8 +517,8 @@ def is_host_joined(self, room_id, host):

sql = """
SELECT state_key FROM current_state_events AS c
INNER JOIN room_memberships USING (event_id)
WHERE membership = 'join'
INNER JOIN room_memberships AS m USING (event_id)
WHERE m.membership = 'join'
AND type = 'm.room.member'
AND c.room_id = ?
AND state_key LIKE ?
Expand Down Expand Up @@ -602,6 +666,10 @@ def __init__(self, db_conn, hs):
self.register_background_update_handler(
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
)
self.register_background_update_handler(
_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
self._background_current_state_membership,
)

def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database.
Expand Down Expand Up @@ -781,6 +849,52 @@ def add_membership_profile_txn(txn):

defer.returnValue(result)

@defer.inlineCallbacks
def _background_current_state_membership(self, progress, batch_size):
"""Update the new membership column on current_state_events.
"""

if "rooms" not in progress:
rooms = yield self._simple_select_onecol(
table="current_state_events",
keyvalues={},
retcol="DISTINCT room_id",
desc="_background_current_state_membership_get_rooms",
)
progress["rooms"] = rooms

rooms = progress["rooms"]

def _background_current_state_membership_txn(txn):
processed = 0
while rooms and processed < batch_size:
sql = """
UPDATE current_state_events AS c
SET membership = (
SELECT membership FROM room_memberships
WHERE event_id = c.event_id
)
WHERE room_id = ?
"""
txn.execute(sql, (rooms.pop(),))
processed += txn.rowcount

self._background_update_progress_txn(
txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress
)

return processed

result = yield self.runInteraction(
"_background_current_state_membership_update",
_background_current_state_membership_txn,
)

if not rooms:
yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)

defer.returnValue(result)


class _JoinedHostsCache(object):
"""Cache for joined hosts in a room that is optimised to handle updates
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- We add membership to current state so that we don't need to join against
-- room_memberships, which can be surprisingly costly (we do such queries
-- very frequently).
-- This will be null for non-membership events and the content.membership key
-- for membership events. (Will also be null for membership events until the
-- background update job has finished).
ALTER TABLE current_state_events ADD membership TEXT;

INSERT INTO background_updates (update_name, progress_json) VALUES
('current_state_events_membership', '{}');
8 changes: 4 additions & 4 deletions synapse/storage/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,15 +618,15 @@ def get_rooms_in_common_for_users(self, user_id, other_user_id):
sql = """
SELECT room_id FROM (
SELECT c.room_id FROM current_state_events AS c
INNER JOIN room_memberships USING (event_id)
INNER JOIN room_memberships AS m USING (event_id)
WHERE type = 'm.room.member'
AND membership = 'join'
AND m.membership = 'join'
AND state_key = ?
) AS f1 INNER JOIN (
SELECT c.room_id FROM current_state_events AS c
INNER JOIN room_memberships USING (event_id)
INNER JOIN room_memberships AS m USING (event_id)
WHERE type = 'm.room.member'
AND membership = 'join'
AND m.membership = 'join'
AND state_key = ?
) f2 USING (room_id)
"""
Expand Down