Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit d5fd6b0

Browse files
committed
add a cache to have_seen_event
1 parent 6c84778 commit d5fd6b0

File tree

4 files changed

+53
-10
lines changed

4 files changed

+53
-10
lines changed

changelog.d/9953.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add a cache to `have_seen_events`.

synapse/storage/databases/main/cache.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ def _invalidate_caches_for_event(
168168
backfilled,
169169
):
170170
self._invalidate_get_event_cache(event_id)
171+
self.have_seen_event.prefill((event_id,), True)
171172

172173
self.get_latest_event_ids_in_room.invalidate((room_id,))
173174

synapse/storage/databases/main/events_worker.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
Iterable,
2323
List,
2424
Optional,
25+
Set,
2526
Tuple,
2627
overload,
2728
)
@@ -55,7 +56,7 @@
5556
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
5657
from synapse.storage.util.sequence import build_sequence_generator
5758
from synapse.types import JsonDict, get_domain_from_id
58-
from synapse.util.caches.descriptors import cached
59+
from synapse.util.caches.descriptors import cached, cachedList
5960
from synapse.util.caches.lrucache import LruCache
6061
from synapse.util.iterutils import batch_iter
6162
from synapse.util.metrics import Measure
@@ -1046,7 +1047,7 @@ async def have_events_in_timeline(self, event_ids):
10461047

10471048
return {r["event_id"] for r in rows}
10481049

1049-
async def have_seen_events(self, event_ids):
1050+
async def have_seen_events(self, event_ids: Collection[str]) -> Set[str]:
10501051
"""Given a list of event ids, check if we have already processed them.
10511052
10521053
Args:
@@ -1055,23 +1056,46 @@ async def have_seen_events(self, event_ids):
10551056
Returns:
10561057
set[str]: The events we have already seen.
10571058
"""
1059+
res = await self._have_seen_events_dict(event_ids)
1060+
return {x for (x, y) in res.items() if y}
1061+
1062+
@cachedList("have_seen_event", "event_ids")
1063+
async def _have_seen_events_dict(
1064+
self, event_ids: Collection[str]
1065+
) -> Dict[str, bool]:
1066+
"""Helper for have_seen_events
1067+
1068+
Returns a dict, which is the right format for @cachedList
1069+
"""
10581070
# if the event cache contains the event, obviously we've seen it.
1059-
results = {x for x in event_ids if self._get_event_cache.contains(x)}
1071+
cache_results = {x for x in event_ids if self._get_event_cache.contains(x)}
1072+
results = {x: True for x in cache_results}
10601073

10611074
def have_seen_events_txn(txn, chunk):
1075+
# assume everything in this chunk is not found initially
1076+
results.update({x: False for x in chunk})
1077+
1078+
# check the db and update the results for any row that is found
10621079
sql = "SELECT event_id FROM events as e WHERE "
10631080
clause, args = make_in_list_sql_clause(
10641081
txn.database_engine, "e.event_id", chunk
10651082
)
10661083
txn.execute(sql + clause, args)
1067-
results.update(row[0] for row in txn)
1084+
results.update({row[0]: True for row in txn})
10681085

1069-
for chunk in batch_iter((x for x in event_ids if x not in results), 100):
1086+
for chunk in batch_iter((x for x in event_ids if x not in cache_results), 100):
10701087
await self.db_pool.runInteraction(
10711088
"have_seen_events", have_seen_events_txn, chunk
10721089
)
1090+
10731091
return results
10741092

1093+
@cached(max_entries=100000)
1094+
async def have_seen_event(self, event_id):
1095+
# this only exists for the benefit of the @cachedList descriptor on
1096+
# _have_seen_events_dict
1097+
raise NotImplementedError()
1098+
10751099
def _get_current_state_event_counts_txn(self, txn, room_id):
10761100
"""
10771101
See get_current_state_event_counts.

synapse/storage/databases/main/purge_events.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
from typing import Any, List, Set, Tuple
1717

1818
from synapse.api.errors import SynapseError
19-
from synapse.storage._base import SQLBaseStore
19+
from synapse.storage.databases.main import CacheInvalidationWorkerStore
2020
from synapse.storage.databases.main.state import StateGroupWorkerStore
2121
from synapse.types import RoomStreamToken
2222

2323
logger = logging.getLogger(__name__)
2424

2525

26-
class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
26+
class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
2727
async def purge_history(
2828
self, room_id: str, token: str, delete_local_events: bool
2929
) -> Set[int]:
@@ -203,8 +203,6 @@ def _purge_history_txn(
203203
"DELETE FROM event_to_state_groups "
204204
"WHERE event_id IN (SELECT event_id from events_to_purge)"
205205
)
206-
for event_id, _ in event_rows:
207-
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
208206

209207
# Delete all remote non-state events
210208
for table in (
@@ -283,6 +281,18 @@ def _purge_history_txn(
283281
# so make sure to keep this actually last.
284282
txn.execute("DROP TABLE events_to_purge")
285283

284+
for event_id, should_delete in event_rows:
285+
self._invalidate_cache_and_stream(
286+
txn, self._get_state_group_for_event, (event_id,)
287+
)
288+
289+
# FIXME: this is racy - what if have_seen_event gets called between the
290+
# transaction completing and the invalidation running?
291+
if should_delete:
292+
self._invalidate_cache_and_stream(
293+
txn, self.have_seen_event, (event_id,)
294+
)
295+
286296
logger.info("[purge] done")
287297

288298
return referenced_state_groups
@@ -422,7 +432,14 @@ def _purge_room_txn(self, txn, room_id: str) -> List[int]:
422432
# index on them. In any case we should be clearing out 'stream' tables
423433
# periodically anyway (#5888)
424434

425-
# TODO: we could probably usefully do a bunch of cache invalidation here
435+
# TODO: we could probably usefully do a bunch more cache invalidation here
436+
437+
# we have no way to know which events to clear out of have_seen_event
438+
# so just have to drop the whole cache
439+
#
440+
# FIXME: this is racy - what if have_seen_event gets called between the
441+
# DELETE completing and the invalidation running?
442+
self._invalidate_all_cache_and_stream(txn, self.have_seen_event)
426443

427444
logger.info("[purge] done")
428445

0 commit comments

Comments
 (0)