Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit dbd858e

Browse files
committed
Address review comments
1 parent 2d19155 commit dbd858e

File tree

4 files changed

+49
-29
lines changed

4 files changed

+49
-29
lines changed

synapse/handlers/federation.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,9 @@ async def _get_state_for_room(
576576

577577
# Fetch the state events from the DB, and check we have the auth events.
578578
event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
579-
auth_events_in_store = await self.store.have_seen_events(auth_event_ids)
579+
auth_events_in_store = await self.store.have_seen_events(
580+
room_id, auth_event_ids
581+
)
580582

581583
# Check for missing events. We handle state and auth event seperately,
582584
# as we want to pull the state from the DB, but we don't for the auth
@@ -609,7 +611,7 @@ async def _get_state_for_room(
609611

610612
if missing_auth_events:
611613
auth_events_in_store = await self.store.have_seen_events(
612-
missing_auth_events
614+
room_id, missing_auth_events
613615
)
614616
missing_auth_events.difference_update(auth_events_in_store)
615617

@@ -709,7 +711,7 @@ async def _get_state_after_missing_prev_event(
709711

710712
missing_auth_events = set(auth_event_ids) - fetched_events.keys()
711713
missing_auth_events.difference_update(
712-
await self.store.have_seen_events(missing_auth_events)
714+
await self.store.have_seen_events(room_id, missing_auth_events)
713715
)
714716
logger.debug("We are also missing %i auth events", len(missing_auth_events))
715717

@@ -2485,7 +2487,7 @@ async def _update_auth_events_and_context_for_auth(
24852487
#
24862488
# we start by checking if they are in the store, and then try calling /event_auth/.
24872489
if missing_auth:
2488-
have_events = await self.store.have_seen_events(missing_auth)
2490+
have_events = await self.store.have_seen_events(event.room_id, missing_auth)
24892491
logger.debug("Events %s are in the store", have_events)
24902492
missing_auth.difference_update(have_events)
24912493

@@ -2504,7 +2506,7 @@ async def _update_auth_events_and_context_for_auth(
25042506
return context
25052507

25062508
seen_remotes = await self.store.have_seen_events(
2507-
[e.event_id for e in remote_auth_chain]
2509+
event.room_id, [e.event_id for e in remote_auth_chain]
25082510
)
25092511

25102512
for e in remote_auth_chain:

synapse/storage/databases/main/cache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def _invalidate_caches_for_event(
168168
backfilled,
169169
):
170170
self._invalidate_get_event_cache(event_id)
171-
self.have_seen_event.prefill((event_id,), True)
171+
self.have_seen_event.invalidate((room_id, event_id))
172172

173173
self.get_latest_event_ids_in_room.invalidate((room_id,))
174174

synapse/storage/databases/main/events_worker.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,51 +1046,70 @@ async def have_events_in_timeline(self, event_ids):
10461046

10471047
return {r["event_id"] for r in rows}
10481048

1049-
async def have_seen_events(self, event_ids: Collection[str]) -> Set[str]:
1049+
async def have_seen_events(
1050+
self, room_id: str, event_ids: Iterable[str]
1051+
) -> Set[str]:
10501052
"""Given a list of event ids, check if we have already processed them.
10511053
1054+
The room_id is only used to structure the cache (so that it can later be
1055+
invalidated by room_id) - there is no guarantee that the events are actually
1056+
in the room in question.
1057+
10521058
Args:
1053-
event_ids (iterable[str]):
1059+
room_id: Room we are polling
1060+
event_ids: events we are looking for
10541061
10551062
Returns:
10561063
set[str]: The events we have already seen.
10571064
"""
1058-
res = await self._have_seen_events_dict(event_ids)
1059-
return {x for (x, y) in res.items() if y}
1065+
res = await self._have_seen_events_dict(
1066+
(room_id, event_id) for event_id in event_ids
1067+
)
1068+
return {eid for ((_rid, eid), have_event) in res.items() if have_event}
10601069

1061-
@cachedList("have_seen_event", "event_ids")
1070+
@cachedList("have_seen_event", "keys")
10621071
async def _have_seen_events_dict(
1063-
self, event_ids: Collection[str]
1064-
) -> Dict[str, bool]:
1072+
self, keys: Iterable[Tuple[str, str]]
1073+
) -> Dict[Tuple[str, str], bool]:
10651074
"""Helper for have_seen_events
10661075
1067-
Returns a dict, which is the right format for @cachedList
1076+
Returns:
1077+
a dict {(room_id, event_id)-> bool}
10681078
"""
10691079
# if the event cache contains the event, obviously we've seen it.
1070-
cache_results = {x for x in event_ids if self._get_event_cache.contains(x)}
1080+
cache_results = {
1081+
eid for (_rid, eid) in keys if self._get_event_cache.contains(eid)
1082+
}
10711083
results = {x: True for x in cache_results}
10721084

1073-
def have_seen_events_txn(txn, chunk):
1085+
def have_seen_events_txn(txn, chunk: Tuple[str, ...]):
10741086
# assume everything in this chunk is not found initially
10751087
results.update({x: False for x in chunk})
10761088

10771089
# check the db and update the results for any row that is found
1078-
sql = "SELECT event_id FROM events as e WHERE "
1090+
# NB this deliberately does *not* query on room_id, to make this an
1091+
# index-only lookup on `events_event_id_key`.
1092+
sql = "SELECT event_id FROM events AS e WHERE"
10791093
clause, args = make_in_list_sql_clause(
10801094
txn.database_engine, "e.event_id", chunk
10811095
)
10821096
txn.execute(sql + clause, args)
10831097
results.update({row[0]: True for row in txn})
10841098

1085-
for chunk in batch_iter((x for x in event_ids if x not in cache_results), 100):
1099+
# each batch requires its own index scan, so we make the batches as big as
1100+
# possible.
1101+
for chunk in batch_iter(
1102+
(eid for (_rid, eid) in keys if eid not in cache_results),
1103+
500,
1104+
):
10861105
await self.db_pool.runInteraction(
10871106
"have_seen_events", have_seen_events_txn, chunk
10881107
)
10891108

10901109
return results
10911110

1092-
@cached(max_entries=100000)
1093-
async def have_seen_event(self, event_id):
1111+
@cached(max_entries=100000, tree=True)
1112+
async def have_seen_event(self, room_id: str, event_id: str):
10941113
# this only exists for the benefit of the @cachedList descriptor on
10951114
# _have_seen_events_dict
10961115
raise NotImplementedError()

synapse/storage/databases/main/purge_events.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -286,11 +286,13 @@ def _purge_history_txn(
286286
txn, self._get_state_group_for_event, (event_id,)
287287
)
288288

289-
# FIXME: this is racy - what if have_seen_event gets called between the
290-
# transaction completing and the invalidation running?
289+
# XXX: This is racy, since have_seen_events could be called between the
290+
# transaction completing and the invalidation running. On the other hand,
291+
# that's no different to calling `have_seen_events` just before the
292+
# event is deleted from the database.
291293
if should_delete:
292294
self._invalidate_cache_and_stream(
293-
txn, self.have_seen_event, (event_id,)
295+
txn, self.have_seen_event, (room_id, event_id)
294296
)
295297

296298
logger.info("[purge] done")
@@ -434,12 +436,9 @@ def _purge_room_txn(self, txn, room_id: str) -> List[int]:
434436

435437
# TODO: we could probably usefully do a bunch more cache invalidation here
436438

437-
# we have no way to know which events to clear out of have_seen_event
438-
# so just have to drop the whole cache
439-
#
440-
# FIXME: this is racy - what if have_seen_event gets called between the
441-
# DELETE completing and the invalidation running?
442-
self._invalidate_all_cache_and_stream(txn, self.have_seen_event)
439+
# XXX: as with purge_history, this is racy, but no worse than other races
440+
# that already exist.
441+
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
443442

444443
logger.info("[purge] done")
445444

0 commit comments

Comments
 (0)