-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Store arbitrary relations from events #11391
Changes from 6 commits
9d2497a
f9952d6
5333205
08a2048
6f5fd12
c8b866a
cf90b40
999ab1c
3df00a3
1e65f14
8699b4a
d918d20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| import attr | ||
|
|
||
| from synapse.api.constants import EventContentFields | ||
| from synapse.api.constants import EventContentFields, RelationTypes | ||
| from synapse.api.room_versions import KNOWN_ROOM_VERSIONS | ||
| from synapse.events import make_event_from_dict | ||
| from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause | ||
|
|
@@ -171,6 +171,9 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): | |
| self._purged_chain_cover_index, | ||
| ) | ||
|
|
||
| # The event_thread_relation background update was replaced with the | ||
| # event_arbitrary_relations one, which handles any relation to avoid | ||
| # needed to potentially crawl the entire events table in the future. | ||
| self.db_pool.updates.register_noop_background_update("event_thread_relation") | ||
|
|
||
| self.db_pool.updates.register_background_update_handler( | ||
|
|
@@ -1109,21 +1112,20 @@ async def _event_arbitrary_relations( | |
| last_event_id = progress.get("last_event_id", "") | ||
|
|
||
| def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int: | ||
| # Iterate over events which do not appear in the event_relations | ||
| # table -- they might have a relation that was not previously stored | ||
| # due to an unknown relation type. | ||
| # Fetch events and then filter based on whether the event has a | ||
| # relation or not. | ||
| txn.execute( | ||
| """ | ||
| SELECT event_id, json FROM event_json | ||
| LEFT JOIN event_relations USING (event_id) | ||
| WHERE event_id > ? AND event_relations.event_id IS NULL | ||
| WHERE event_id > ? | ||
| ORDER BY event_id LIMIT ? | ||
| """, | ||
| (last_event_id, batch_size), | ||
| ) | ||
|
|
||
| results = list(txn) | ||
| missing_relations = [] | ||
| # (event_id, parent_id, rel_type) for each relation | ||
| missing_relations: List[Tuple[str, str, str]] = [] | ||
clokep marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| for (event_id, event_json_raw) in results: | ||
| try: | ||
| event_json = db_to_json(event_json_raw) | ||
|
|
@@ -1140,32 +1142,49 @@ def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int: | |
| if not relates_to or not isinstance(relates_to, dict): | ||
| continue | ||
|
|
||
| # The only expected relation type would be from threads, but | ||
| # there could be other unknown ones. | ||
| # If the relation type or parent event ID is not a string a | ||
| # string, skip it. | ||
clokep marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # | ||
| # Do not consider relation types that have existed for a long time, | ||
| # only include the new thread relation and any unknown relations. | ||
clokep marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| rel_type = relates_to.get("rel_type") | ||
| if not isinstance(rel_type, str): | ||
| if not isinstance(rel_type, str) or rel_type in ( | ||
| RelationTypes.ANNOTATION, | ||
| RelationTypes.REFERENCE, | ||
| RelationTypes.REPLACE, | ||
| ): | ||
| continue | ||
|
|
||
| # Get the parent ID. | ||
| parent_id = relates_to.get("event_id") | ||
| if not isinstance(parent_id, str): | ||
| continue | ||
|
|
||
| missing_relations.append((event_id, parent_id, rel_type)) | ||
|
|
||
| # Insert the missing data. | ||
| self.db_pool.simple_insert_many_txn( | ||
| txn=txn, | ||
| table="event_relations", | ||
| values=[ | ||
| { | ||
| "event_id": event_id, | ||
| "relates_to_Id": parent_id, | ||
| "relation_type": rel_type, | ||
| } | ||
| for event_id, parent_id, rel_type in missing_relations | ||
| ], | ||
| ) | ||
| # Insert the missing data, note that we upsert here in-case the event | ||
clokep marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # has already been processed. | ||
| if missing_relations: | ||
| self.db_pool.simple_upsert_many_txn( | ||
| txn=txn, | ||
| table="event_relations", | ||
| key_names=("event_id",), | ||
| key_values=[(r[0],) for r in missing_relations], | ||
| value_names=("relates_to_id", "relation_type"), | ||
| value_values=[r[1:] for r in missing_relations], | ||
| ) | ||
|
|
||
| # Iterate the parent IDs and invalidate caches. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know if we usually clear caches as part of background updates, but in this case I think it could be a bit broken without doing it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems sensible |
||
| for parent_id in {r[1] for r in missing_relations}: | ||
| cache_tuple = (parent_id,) | ||
| self._invalidate_cache_and_stream( | ||
| txn, self.get_relations_for_event, cache_tuple | ||
| ) | ||
| self._invalidate_cache_and_stream( | ||
| txn, self.get_aggregation_groups_for_event, cache_tuple | ||
| ) | ||
| self._invalidate_cache_and_stream( | ||
| txn, self.get_thread_summary, cache_tuple | ||
| ) | ||
|
|
||
| if results: | ||
| latest_event_id = results[-1][0] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,6 +47,8 @@ def default_config(self) -> dict: | |
| return config | ||
|
|
||
| def prepare(self, reactor, clock, hs): | ||
| self.store = hs.get_datastore() | ||
|
|
||
| self.user_id, self.user_token = self._create_user("alice") | ||
| self.user2_id, self.user2_token = self._create_user("bob") | ||
|
|
||
|
|
@@ -858,3 +860,65 @@ def _create_user(self, localpart: str) -> Tuple[str, str]: | |
| access_token = self.login(localpart, "abc123") | ||
|
|
||
| return user_id, access_token | ||
|
|
||
| def test_background_update(self): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Putting this in this file seems dirty, but it seems best to keep relations tests together? Maybe it would be better to start a storage test file and we can add to it in the future when fleshing out some unit tests? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's best here. |
||
| """Test the event_arbitrary_relations background update.""" | ||
| channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="👍") | ||
| self.assertEquals(200, channel.code, channel.json_body) | ||
| annotation_event_id_good = channel.json_body["event_id"] | ||
|
|
||
| channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="A") | ||
| self.assertEquals(200, channel.code, channel.json_body) | ||
| annotation_event_id_bad = channel.json_body["event_id"] | ||
|
|
||
| channel = self._send_relation(RelationTypes.THREAD, "m.room.test") | ||
| self.assertEquals(200, channel.code, channel.json_body) | ||
| thread_event_id = channel.json_body["event_id"] | ||
|
|
||
| # Clean-up the table as if the inserts did not happen during event creation. | ||
| self.get_success( | ||
| self.store.db_pool.simple_delete_many( | ||
| table="event_relations", | ||
| column="event_id", | ||
| iterable=(annotation_event_id_bad, thread_event_id), | ||
| keyvalues={}, | ||
| desc="RelationsTestCase.test_background_update", | ||
| ) | ||
| ) | ||
|
|
||
| # Only the "good" annotation should be found. | ||
| channel = self.make_request( | ||
| "GET", | ||
| f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=10", | ||
| access_token=self.user_token, | ||
| ) | ||
| self.assertEquals(200, channel.code, channel.json_body) | ||
| self.assertEquals( | ||
| [ev["event_id"] for ev in channel.json_body["chunk"]], | ||
| [annotation_event_id_good], | ||
| ) | ||
|
|
||
| # Insert and run the background update. | ||
| self.get_success( | ||
| self.store.db_pool.simple_insert( | ||
| "background_updates", | ||
| {"update_name": "event_arbitrary_relations", "progress_json": "{}"}, | ||
| ) | ||
| ) | ||
|
|
||
| # Ugh, have to reset this flag | ||
| self.store.db_pool.updates._all_done = False | ||
| self.wait_for_background_updates() | ||
|
|
||
| # The "good" annotation and the thread should be found, but not the "bad" | ||
| # annotation. | ||
| channel = self.make_request( | ||
| "GET", | ||
| f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=10", | ||
| access_token=self.user_token, | ||
| ) | ||
| self.assertEquals(200, channel.code, channel.json_body) | ||
| self.assertCountEqual( | ||
| [ev["event_id"] for ev in channel.json_body["chunk"]], | ||
| [annotation_event_id_good, thread_event_id], | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.