Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit a6c3810

Browse files
committed
Add a mechanism to await un-partial-stating
When we join a room via the faster-joins mechanism, we end up with "partial state" at some points on the event DAG. Many parts of the codebase need to wait for the full state to load. So, we implement a mechanism to keep track of which events have partial state, and wait for them to be fully-populated.
1 parent 32545d2 commit a6c3810

File tree

7 files changed

+237
-1
lines changed

7 files changed

+237
-1
lines changed

changelog.d/12399.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Preparation for faster-room-join work: Implement a tracking mechanism to allow functions to wait for full room state to arrive.

synapse/handlers/federation_event.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ async def update_state_for_partial_state_event(
515515
)
516516
return
517517
await self._store.update_state_for_partial_state_event(event, context)
518+
self._state_store.notify_event_un_partial_stated(event.event_id)
518519

519520
async def backfill(
520521
self, dest: str, room_id: str, limit: int, extremities: Collection[str]

synapse/storage/databases/main/events_worker.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1956,7 +1956,15 @@ def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]:
19561956
async def get_partial_state_events(
19571957
self, event_ids: Collection[str]
19581958
) -> Dict[str, bool]:
1959-
"""Checks which of the given events have partial state"""
1959+
"""Checks which of the given events have partial state
1960+
1961+
Args:
1962+
event_ids: the events we want to check for partial state.
1963+
1964+
Returns:
1965+
a dict mapping from event id to partial-stateness. We return True for
1966+
any of the events which are unknown (or are outliers).
1967+
"""
19601968
result = await self.db_pool.simple_select_many_batch(
19611969
table="partial_state_events",
19621970
column="event_id",

synapse/storage/state.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
from synapse.api.constants import EventTypes
3333
from synapse.events import EventBase
34+
from synapse.storage.util.partial_state_events_tracker import PartialStateEventsTracker
3435
from synapse.types import MutableStateMap, StateKey, StateMap
3536

3637
if TYPE_CHECKING:
@@ -542,6 +543,10 @@ class StateGroupStorage:
542543

543544
def __init__(self, hs: "HomeServer", stores: "Databases"):
544545
self.stores = stores
546+
self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
547+
548+
def notify_event_un_partial_stated(self, event_id: str) -> None:
549+
self._partial_state_events_tracker.notify_un_partial_stated(event_id)
545550

546551
async def get_state_group_delta(
547552
self, state_group: int
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Copyright 2022 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
from collections import defaultdict
17+
from typing import Collection, Dict, Set
18+
19+
from twisted.internet import defer
20+
from twisted.internet.defer import Deferred
21+
22+
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
23+
from synapse.storage.databases.main.events_worker import EventsWorkerStore
24+
from synapse.util import unwrapFirstError
25+
26+
logger = logging.getLogger(__name__)
27+
28+
29+
class PartialStateEventsTracker:
30+
"""Keeps track of which events have partial state, after a partial-state join"""
31+
32+
def __init__(self, store: EventsWorkerStore):
33+
self._store = store
34+
self._observers: Dict[str, Set[Deferred]] = defaultdict(set)
35+
36+
def notify_un_partial_stated(self, event_id: str) -> None:
37+
"""Notify that we now have full state for a given event
38+
39+
Called by the state-resynchronization loop whenever we resynchronize the state
40+
for a particular event. Unblocks any callers to await_full_state() for that
41+
event.
42+
43+
Args:
44+
event_id: the event that now has full state.
45+
"""
46+
observers = self._observers.pop(event_id, None)
47+
if not observers:
48+
return
49+
logger.info(
50+
"Notifying %i things waiting for un-partial-stating of event %s",
51+
len(observers),
52+
event_id,
53+
)
54+
with PreserveLoggingContext():
55+
for o in observers:
56+
o.callback(None)
57+
58+
async def await_full_state(self, event_ids: Collection[str]) -> None:
59+
"""Wait for all the given events to have full state.
60+
61+
Args:
62+
event_ids: the list of event ids that we want full state for
63+
"""
64+
# first try the happy path: if there are no partial-state events, we can return
65+
# quickly
66+
partial_state_event_ids = [
67+
ev
68+
for ev, p in (await self._store.get_partial_state_events(event_ids)).items()
69+
if p
70+
]
71+
72+
if not partial_state_event_ids:
73+
return
74+
75+
logger.info(
76+
"Awaiting un-partial-stating of events %s",
77+
partial_state_event_ids,
78+
stack_info=True,
79+
)
80+
81+
# create an observer for each lazy-joined event
82+
observers = {event_id: Deferred() for event_id in partial_state_event_ids}
83+
for event_id, observer in observers.items():
84+
self._observers[event_id].add(observer)
85+
86+
try:
87+
# some of them may have been un-lazy-joined between us checking the db and
88+
# registering the observer, in which case we'd wait forever for the
89+
# notification. Call back the observers now.
90+
for event_id, partial in (
91+
await self._store.get_partial_state_events(observers.keys())
92+
).items():
93+
if not partial:
94+
observers[event_id].callback(None)
95+
96+
await make_deferred_yieldable(
97+
defer.gatherResults(
98+
observers.values(),
99+
consumeErrors=True,
100+
)
101+
).addErrback(unwrapFirstError)
102+
logger.info("Events %s all un-partial-stated", observers.keys())
103+
finally:
104+
# remove any observers we created. This should happen when the notification
105+
# is received, but that might not happen for two reasons:
106+
# (a) we're bailing out early on an exception (including us being
107+
# cancelled during the await)
108+
# (b) the event got de-lazy-joined before we set up the observer.
109+
for event_id, observer in observers.items():
110+
observer_set = self._observers.get(event_id)
111+
if observer_set:
112+
observer_set.discard(observer)
113+
if not observer_set:
114+
del self._observers[event_id]

tests/storage/util/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2022 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Copyright 2022 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Dict
16+
from unittest import mock
17+
18+
from twisted.internet.defer import CancelledError, ensureDeferred
19+
20+
from synapse.storage.util.partial_state_events_tracker import PartialStateEventsTracker
21+
22+
from tests.unittest import TestCase
23+
24+
25+
class PartialStateEventsTrackerTestCase(TestCase):
26+
def setUp(self) -> None:
27+
# the results to be returned by the mocked get_partial_state_events
28+
self._events_dict: Dict[str, bool] = {}
29+
30+
async def get_partial_state_events(events):
31+
return {e: self._events_dict[e] for e in events}
32+
33+
self.mock_store = mock.Mock(spec_set=["get_partial_state_events"])
34+
self.mock_store.get_partial_state_events.side_effect = get_partial_state_events
35+
36+
self.tracker = PartialStateEventsTracker(self.mock_store)
37+
38+
def test_does_not_block_for_full_state_events(self):
39+
self._events_dict = {"event1": False, "event2": False}
40+
41+
self.successResultOf(
42+
ensureDeferred(self.tracker.await_full_state(["event1", "event2"]))
43+
)
44+
45+
self.mock_store.get_partial_state_events.assert_called_once_with(
46+
["event1", "event2"]
47+
)
48+
49+
def test_blocks_for_partial_state_events(self):
50+
self._events_dict = {"event1": True, "event2": False}
51+
52+
d = ensureDeferred(self.tracker.await_full_state(["event1", "event2"]))
53+
54+
# there should be no result yet
55+
self.assertNoResult(d)
56+
57+
# notifying that the event has been de-partial-stated should unblock
58+
self.tracker.notify_un_partial_stated("event1")
59+
self.successResultOf(d)
60+
61+
def test_un_partial_state_race(self):
62+
# if the event is un-partial-stated between the initial check and the
63+
# registration of the listener, it should not block.
64+
self._events_dict = {"event1": True, "event2": False}
65+
66+
async def get_partial_state_events(events):
67+
res = {e: self._events_dict[e] for e in events}
68+
# change the result for next time
69+
self._events_dict = {"event1": False, "event2": False}
70+
return res
71+
72+
self.mock_store.get_partial_state_events.side_effect = get_partial_state_events
73+
74+
self.successResultOf(
75+
ensureDeferred(self.tracker.await_full_state(["event1", "event2"]))
76+
)
77+
78+
def test_cancellation(self):
79+
self._events_dict = {"event1": True, "event2": False}
80+
81+
d1 = ensureDeferred(self.tracker.await_full_state(["event1", "event2"]))
82+
self.assertNoResult(d1)
83+
84+
d2 = ensureDeferred(self.tracker.await_full_state(["event1"]))
85+
self.assertNoResult(d2)
86+
87+
d1.cancel()
88+
self.assertFailure(d1, CancelledError)
89+
90+
# d2 should still be waiting!
91+
self.assertNoResult(d2)
92+
93+
self.tracker.notify_un_partial_stated("event1")
94+
self.successResultOf(d2)

0 commit comments

Comments
 (0)