Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 77156a4

Browse files
Process previously failed backfill events in the background (#15585)
Process previously failed backfill events in the background because they are bound to fail again and we don't need to waste time holding up the request for something that is bound to fail again. Fix #13623 Follow-up to #13621 and #13622 Part of making `/messages` faster: #13356
1 parent 8839b6c commit 77156a4

File tree

6 files changed

+252
-9
lines changed

6 files changed

+252
-9
lines changed

changelog.d/15585.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Process previously failed backfill events in the background to avoid blocking requests for something that is bound to fail again.

synapse/handlers/federation_event.py

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
)
8989
from synapse.types.state import StateFilter
9090
from synapse.util.async_helpers import Linearizer, concurrently_execute
91-
from synapse.util.iterutils import batch_iter
91+
from synapse.util.iterutils import batch_iter, partition
9292
from synapse.util.retryutils import NotRetryingDestination
9393
from synapse.util.stringutils import shortstr
9494

@@ -865,7 +865,7 @@ async def _process_pulled_events(
865865
[event.event_id for event in events]
866866
)
867867

868-
new_events = []
868+
new_events: List[EventBase] = []
869869
for event in events:
870870
event_id = event.event_id
871871

@@ -895,12 +895,66 @@ async def _process_pulled_events(
895895
str(len(new_events)),
896896
)
897897

898-
# We want to sort these by depth so we process them and
899-
# tell clients about them in order.
900-
sorted_events = sorted(new_events, key=lambda x: x.depth)
901-
for ev in sorted_events:
902-
with nested_logging_context(ev.event_id):
903-
await self._process_pulled_event(origin, ev, backfilled=backfilled)
898+
@trace
899+
async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
900+
# We want to sort these by depth so we process them and tell clients about
901+
# them in order. It's also more efficient to backfill this way (`depth`
902+
# ascending) because one backfill event is likely to be the `prev_event` of
903+
# the next event we're going to process.
904+
sorted_events = sorted(new_events, key=lambda x: x.depth)
905+
for ev in sorted_events:
906+
with nested_logging_context(ev.event_id):
907+
await self._process_pulled_event(origin, ev, backfilled=backfilled)
908+
909+
# Check if we've already tried to process these events at some point in the
910+
# past. We aren't concerned with the expontntial backoff here, just whether it
911+
# has failed to be processed before.
912+
event_ids_with_failed_pull_attempts = (
913+
await self._store.get_event_ids_with_failed_pull_attempts(
914+
[event.event_id for event in new_events]
915+
)
916+
)
917+
918+
# We construct the event lists in source order from `/backfill` response because
919+
# it's a) easiest, but also b) the order in which we process things matters for
920+
# MSC2716 historical batches because many historical events are all at the same
921+
# `depth` and we rely on the tenuous sort that the other server gave us and hope
922+
# they're doing their best. The brittle nature of this ordering for historical
923+
# messages over federation is one of the reasons why we don't want to continue
924+
# on MSC2716 until we have online topological ordering.
925+
events_with_failed_pull_attempts, fresh_events = partition(
926+
new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts
927+
)
928+
set_tag(
929+
SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts",
930+
str(event_ids_with_failed_pull_attempts),
931+
)
932+
set_tag(
933+
SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length",
934+
str(len(events_with_failed_pull_attempts)),
935+
)
936+
set_tag(
937+
SynapseTags.FUNC_ARG_PREFIX + "fresh_events",
938+
str([event.event_id for event in fresh_events]),
939+
)
940+
set_tag(
941+
SynapseTags.RESULT_PREFIX + "fresh_events.length",
942+
str(len(fresh_events)),
943+
)
944+
945+
# Process previously failed backfill events in the background to not waste
946+
# time on something that is likely to fail again.
947+
if len(events_with_failed_pull_attempts) > 0:
948+
run_as_background_process(
949+
"_process_new_pulled_events_with_failed_pull_attempts",
950+
_process_new_pulled_events,
951+
events_with_failed_pull_attempts,
952+
)
953+
954+
# We can optimistically try to process and wait for the event to be fully
955+
# persisted if we've never tried before.
956+
if len(fresh_events) > 0:
957+
await _process_new_pulled_events(fresh_events)
904958

905959
@trace
906960
@tag_args

synapse/storage/databases/main/event_federation.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
from synapse.storage.databases.main.events_worker import EventsWorkerStore
4747
from synapse.storage.databases.main.signatures import SignatureWorkerStore
4848
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
49-
from synapse.types import JsonDict
49+
from synapse.types import JsonDict, StrCollection
5050
from synapse.util import json_encoder
5151
from synapse.util.caches.descriptors import cached
5252
from synapse.util.caches.lrucache import LruCache
@@ -1583,6 +1583,35 @@ def _record_event_failed_pull_attempt_upsert_txn(
15831583

15841584
txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
15851585

1586+
@trace
1587+
async def get_event_ids_with_failed_pull_attempts(
1588+
self, event_ids: StrCollection
1589+
) -> Set[str]:
1590+
"""
1591+
Filter the given list of `event_ids` and return events which have any failed
1592+
pull attempts.
1593+
1594+
Args:
1595+
event_ids: A list of events to filter down.
1596+
1597+
Returns:
1598+
A filtered down list of `event_ids` that have previous failed pull attempts.
1599+
"""
1600+
1601+
rows = await self.db_pool.simple_select_many_batch(
1602+
table="event_failed_pull_attempts",
1603+
column="event_id",
1604+
iterable=event_ids,
1605+
keyvalues={},
1606+
retcols=("event_id",),
1607+
desc="get_event_ids_with_failed_pull_attempts",
1608+
)
1609+
event_ids_with_failed_pull_attempts: Set[str] = {
1610+
row["event_id"] for row in rows
1611+
}
1612+
1613+
return event_ids_with_failed_pull_attempts
1614+
15861615
@trace
15871616
async def get_event_ids_to_not_pull_from_backoff(
15881617
self,

synapse/util/iterutils.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
import heapq
1616
from itertools import islice
1717
from typing import (
18+
Callable,
1819
Collection,
1920
Dict,
2021
Generator,
2122
Iterable,
2223
Iterator,
24+
List,
2325
Mapping,
2426
Set,
2527
Sized,
@@ -71,6 +73,31 @@ def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
7173
return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen))
7274

7375

76+
def partition(
77+
iterable: Iterable[T], predicate: Callable[[T], bool]
78+
) -> Tuple[List[T], List[T]]:
79+
"""
80+
Separate a given iterable into two lists based on the result of a predicate function.
81+
82+
Args:
83+
iterable: the iterable to partition (separate)
84+
predicate: a function that takes an item from the iterable and returns a boolean
85+
86+
Returns:
87+
A tuple of two lists, the first containing all items for which the predicate
88+
returned True, the second containing all items for which the predicate returned
89+
False
90+
"""
91+
true_results = []
92+
false_results = []
93+
for item in iterable:
94+
if predicate(item):
95+
true_results.append(item)
96+
else:
97+
false_results.append(item)
98+
return true_results, false_results
99+
100+
74101
def sorted_topologically(
75102
nodes: Iterable[T],
76103
graph: Mapping[T, Collection[T]],

tests/handlers/test_federation_event.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,101 @@ async def get_room_state(
664664
StoreError,
665665
)
666666

667+
def test_backfill_process_previously_failed_pull_attempt_event_in_the_background(
668+
self,
669+
) -> None:
670+
"""
671+
Sanity check that events are still processed even if it is in the background
672+
for events that already have failed pull attempts.
673+
"""
674+
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
675+
main_store = self.hs.get_datastores().main
676+
677+
# Create the room
678+
user_id = self.register_user("kermit", "test")
679+
tok = self.login("kermit", "test")
680+
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
681+
room_version = self.get_success(main_store.get_room_version(room_id))
682+
683+
# Allow the remote user to send state events
684+
self.helper.send_state(
685+
room_id,
686+
"m.room.power_levels",
687+
{"events_default": 0, "state_default": 0},
688+
tok=tok,
689+
)
690+
691+
# Add the remote user to the room
692+
member_event = self.get_success(
693+
event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
694+
)
695+
696+
initial_state_map = self.get_success(
697+
main_store.get_partial_current_state_ids(room_id)
698+
)
699+
700+
auth_event_ids = [
701+
initial_state_map[("m.room.create", "")],
702+
initial_state_map[("m.room.power_levels", "")],
703+
member_event.event_id,
704+
]
705+
706+
# Create a regular event that should process
707+
pulled_event = make_event_from_dict(
708+
self.add_hashes_and_signatures_from_other_server(
709+
{
710+
"type": "test_regular_type",
711+
"room_id": room_id,
712+
"sender": OTHER_USER,
713+
"prev_events": [
714+
member_event.event_id,
715+
],
716+
"auth_events": auth_event_ids,
717+
"origin_server_ts": 1,
718+
"depth": 12,
719+
"content": {"body": "pulled_event"},
720+
}
721+
),
722+
room_version,
723+
)
724+
725+
# Record a failed pull attempt for this event which will cause us to backfill it
726+
# in the background from here on out.
727+
self.get_success(
728+
main_store.record_event_failed_pull_attempt(
729+
room_id, pulled_event.event_id, "fake cause"
730+
)
731+
)
732+
733+
# We expect an outbound request to /backfill, so stub that out
734+
self.mock_federation_transport_client.backfill.return_value = make_awaitable(
735+
{
736+
"origin": self.OTHER_SERVER_NAME,
737+
"origin_server_ts": 123,
738+
"pdus": [
739+
pulled_event.get_pdu_json(),
740+
],
741+
}
742+
)
743+
744+
# The function under test: try to backfill and process the pulled event
745+
with LoggingContext("test"):
746+
self.get_success(
747+
self.hs.get_federation_event_handler().backfill(
748+
self.OTHER_SERVER_NAME,
749+
room_id,
750+
limit=1,
751+
extremities=["$some_extremity"],
752+
)
753+
)
754+
755+
# Ensure `run_as_background_process(...)` has a chance to run (essentially
756+
# `wait_for_background_processes()`)
757+
self.reactor.pump((0.1,))
758+
759+
# Make sure we processed and persisted the pulled event
760+
self.get_success(main_store.get_event(pulled_event.event_id, allow_none=False))
761+
667762
def test_process_pulled_event_with_rejected_missing_state(self) -> None:
668763
"""Ensure that we correctly handle pulled events with missing state containing a
669764
rejected state event

tests/storage/test_event_federation.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,6 +1134,43 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_
11341134
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
11351135
self.assertEqual(backfill_event_ids, ["insertion_eventA"])
11361136

1137+
def test_get_event_ids_with_failed_pull_attempts(self) -> None:
1138+
"""
1139+
Test to make sure we properly get event_ids based on whether they have any
1140+
failed pull attempts.
1141+
"""
1142+
# Create the room
1143+
user_id = self.register_user("alice", "test")
1144+
tok = self.login("alice", "test")
1145+
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
1146+
1147+
self.get_success(
1148+
self.store.record_event_failed_pull_attempt(
1149+
room_id, "$failed_event_id1", "fake cause"
1150+
)
1151+
)
1152+
self.get_success(
1153+
self.store.record_event_failed_pull_attempt(
1154+
room_id, "$failed_event_id2", "fake cause"
1155+
)
1156+
)
1157+
1158+
event_ids_with_failed_pull_attempts = self.get_success(
1159+
self.store.get_event_ids_with_failed_pull_attempts(
1160+
event_ids=[
1161+
"$failed_event_id1",
1162+
"$fresh_event_id1",
1163+
"$failed_event_id2",
1164+
"$fresh_event_id2",
1165+
]
1166+
)
1167+
)
1168+
1169+
self.assertEqual(
1170+
event_ids_with_failed_pull_attempts,
1171+
{"$failed_event_id1", "$failed_event_id2"},
1172+
)
1173+
11371174
def test_get_event_ids_to_not_pull_from_backoff(self) -> None:
11381175
"""
11391176
Test to make sure only event IDs we should backoff from are returned.

0 commit comments

Comments
 (0)