Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit febb10e

Browse files
committed
add a cache to have_seen_event
1 parent 6c84778 commit febb10e

File tree

4 files changed

+53
-8
lines changed

4 files changed

+53
-8
lines changed

changelog.d/9953.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add a cache to `have_seen_events`.

synapse/storage/databases/main/cache.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ def _invalidate_caches_for_event(
168168
backfilled,
169169
):
170170
self._invalidate_get_event_cache(event_id)
171+
self.have_seen_event.prefill((event_id,), True)
171172

172173
self.get_latest_event_ids_in_room.invalidate((room_id,))
173174

synapse/storage/databases/main/events_worker.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
Iterable,
2323
List,
2424
Optional,
25+
Set,
2526
Tuple,
2627
overload,
2728
)
@@ -55,7 +56,7 @@
5556
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
5657
from synapse.storage.util.sequence import build_sequence_generator
5758
from synapse.types import JsonDict, get_domain_from_id
58-
from synapse.util.caches.descriptors import cached
59+
from synapse.util.caches.descriptors import cached, cachedList
5960
from synapse.util.caches.lrucache import LruCache
6061
from synapse.util.iterutils import batch_iter
6162
from synapse.util.metrics import Measure
@@ -1046,7 +1047,7 @@ async def have_events_in_timeline(self, event_ids):
10461047

10471048
return {r["event_id"] for r in rows}
10481049

1049-
async def have_seen_events(self, event_ids):
1050+
async def have_seen_events(self, event_ids: Collection[str]) -> Set[str]:
10501051
"""Given a list of event ids, check if we have already processed them.
10511052
10521053
Args:
@@ -1055,23 +1056,46 @@ async def have_seen_events(self, event_ids):
10551056
Returns:
10561057
set[str]: The events we have already seen.
10571058
"""
1059+
res = await self._have_seen_events_dict(event_ids)
1060+
return {x for (x, y) in res.items() if y}
1061+
1062+
@cachedList("have_seen_event", "event_ids")
1063+
async def _have_seen_events_dict(
1064+
self, event_ids: Collection[str]
1065+
) -> Dict[str, bool]:
1066+
"""Helper for have_seen_events
1067+
1068+
Returns a dict, which is the right format for @cachedList
1069+
"""
10581070
# if the event cache contains the event, obviously we've seen it.
1059-
results = {x for x in event_ids if self._get_event_cache.contains(x)}
1071+
cache_results = {x for x in event_ids if self._get_event_cache.contains(x)}
1072+
results = {x: True for x in cache_results}
10601073

10611074
def have_seen_events_txn(txn, chunk):
1075+
# assume everything in this chunk is not found initially
1076+
results.update({x: False for x in chunk})
1077+
1078+
# check the db and update the results for any row that is found
10621079
sql = "SELECT event_id FROM events as e WHERE "
10631080
clause, args = make_in_list_sql_clause(
10641081
txn.database_engine, "e.event_id", chunk
10651082
)
10661083
txn.execute(sql + clause, args)
1067-
results.update(row[0] for row in txn)
1084+
results.update({row[0]: True for row in txn})
10681085

1069-
for chunk in batch_iter((x for x in event_ids if x not in results), 100):
1086+
for chunk in batch_iter((x for x in event_ids if x not in cache_results), 100):
10701087
await self.db_pool.runInteraction(
10711088
"have_seen_events", have_seen_events_txn, chunk
10721089
)
1090+
10731091
return results
10741092

1093+
@cached(max_entries=100000)
1094+
async def have_seen_event(self, event_id):
1095+
# this only exists for the benefit of the @cachedList descriptor on
1096+
# _have_seen_events_dict
1097+
raise NotImplementedError()
1098+
10751099
def _get_current_state_event_counts_txn(self, txn, room_id):
10761100
"""
10771101
See get_current_state_event_counts.

synapse/storage/databases/main/purge_events.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,6 @@ def _purge_history_txn(
203203
"DELETE FROM event_to_state_groups "
204204
"WHERE event_id IN (SELECT event_id from events_to_purge)"
205205
)
206-
for event_id, _ in event_rows:
207-
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
208206

209207
# Delete all remote non-state events
210208
for table in (
@@ -283,6 +281,17 @@ def _purge_history_txn(
283281
# so make sure to keep this actually last.
284282
txn.execute("DROP TABLE events_to_purge")
285283

284+
def invalidate_caches():
285+
for event_id, should_delete in event_rows:
286+
self._get_state_group_for_event.invalidate((event_id,))
287+
288+
# FIXME: this is racy - what if have_seen_event gets called between the
289+
# DELETE completing and the invalidation running?
290+
if should_delete:
291+
self._have_seen_events.invalidate((event_id,))
292+
293+
txn.call_after(invalidate_caches)
294+
286295
logger.info("[purge] done")
287296

288297
return referenced_state_groups
@@ -422,7 +431,17 @@ def _purge_room_txn(self, txn, room_id: str) -> List[int]:
422431
# index on them. In any case we should be clearing out 'stream' tables
423432
# periodically anyway (#5888)
424433

425-
# TODO: we could probably usefully do a bunch of cache invalidation here
434+
def invalidate_caches():
435+
# TODO: we could probably usefully do a bunch more cache invalidation here
436+
437+
# FIXME: this is racy - what if have_seen_event gets called between the
438+
# DELETE completing and the invalidation running?
439+
440+
# we have no way to know which events to clear out of have_seen_event
441+
# so just have to drop the whole cache
442+
self.have_seen_event.invalidate_all()
443+
444+
txn.call_after(invalidate_caches)
426445

427446
logger.info("[purge] done")
428447

0 commit comments

Comments
 (0)