-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Catch-up after Federation Outage (split, 1) #8230
Changes from 11 commits
e7866a5
9251ac2
8839eea
27b5d29
e6671ae
7893b14
a577751
a58ef8e
ae92ef5
59417df
d02e56b
24da031
9c933b7
f36855e
17b3f40
91b0607
5777854
4c47a1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Track the latest event for every destination and room for catch-up after federation outage. | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,6 +69,7 @@ def _purge_history_txn(self, txn, room_id, token_str, delete_local_events): | |
# room_depth | ||
# state_groups | ||
# state_groups_state | ||
# destination_rooms | ||
|
||
# we will build a temporary table listing the events so that we don't | ||
# have to keep shovelling the list back and forth across the | ||
|
@@ -201,6 +202,14 @@ def _purge_history_txn(self, txn, room_id, token_str, delete_local_events): | |
for event_id, _ in event_rows: | ||
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) | ||
|
||
logger.info("[purge] removing events from destination_rooms") | ||
reivilibre marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
txn.execute( | ||
"DELETE FROM destination_rooms WHERE stream_ordering IN (" | ||
" SELECT stream_ordering FROM events_to_purge" | ||
" JOIN events USING (event_id)" | ||
")" | ||
|
||
) | ||
|
||
# Delete all remote non-state events | ||
for table in ( | ||
"events", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* Copyright 2020 The Matrix.org Foundation C.I.C | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
-- This schema delta alters the schema to enable 'catching up' remote homeservers | ||
-- after there has been a connectivity problem for any reason. | ||
|
||
-- This stores, for each (destination, room) pair, the stream_ordering of the | ||
-- latest event for that destination. | ||
CREATE TABLE IF NOT EXISTS destination_rooms ( | ||
-- the destination in question. | ||
destination TEXT NOT NULL REFERENCES destinations (destination), | ||
-- the ID of the room in question | ||
room_id TEXT NOT NULL, | ||
-- the stream_ordering of the event | ||
stream_ordering INTEGER NOT NULL, | ||
PRIMARY KEY (destination, room_id), | ||
FOREIGN KEY (room_id) REFERENCES rooms (room_id), | ||
FOREIGN KEY (stream_ordering) REFERENCES events (stream_ordering) | ||
); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
from mock import Mock | ||
|
||
from synapse.rest import admin | ||
from synapse.rest.client.v1 import login, room | ||
|
||
from tests.test_utils import event_injection, make_awaitable | ||
from tests.unittest import FederatingHomeserverTestCase, override_config | ||
|
||
|
||
class FederationCatchUpTestCases(FederatingHomeserverTestCase): | ||
servlets = [ | ||
admin.register_servlets, | ||
room.register_servlets, | ||
login.register_servlets, | ||
] | ||
|
||
def make_homeserver(self, reactor, clock): | ||
return self.setup_test_homeserver( | ||
federation_transport_client=Mock(spec=["send_transaction"]), | ||
) | ||
|
||
def prepare(self, reactor, clock, hs): | ||
# stub out get_current_hosts_in_room | ||
state_handler = hs.get_state_handler() | ||
|
||
# This mock is crucial for destination_rooms to be populated. | ||
state_handler.get_current_hosts_in_room = Mock( | ||
return_value=make_awaitable(["test", "host2"]) | ||
) | ||
|
||
def get_destination_room(self, room: str, destination: str = "host2") -> dict: | ||
""" | ||
Gets the destination_rooms entry for a (destination, room_id) pair. | ||
|
||
Args: | ||
room: room ID | ||
destination: what destination, default is "host2" | ||
|
||
Returns: | ||
Dictionary of { event_id: str, stream_ordering: int } | ||
""" | ||
event_id, stream_ordering = self.get_success( | ||
self.hs.get_datastore().db_pool.execute( | ||
"test:get_destination_rooms", | ||
None, | ||
""" | ||
SELECT event_id, stream_ordering | ||
FROM destination_rooms dr | ||
JOIN events USING (stream_ordering) | ||
WHERE dr.destination = ? AND dr.room_id = ? | ||
""", | ||
destination, | ||
room, | ||
) | ||
)[0] | ||
return {"event_id": event_id, "stream_ordering": stream_ordering} | ||
|
||
@override_config({"send_federation": True}) | ||
def test_catch_up_destination_rooms_tracking(self): | ||
""" | ||
Tests that we populate the `destination_rooms` table as needed. | ||
""" | ||
self.register_user("u1", "you the one") | ||
u1_token = self.login("u1", "you the one") | ||
room = self.helper.create_room_as("u1", tok=u1_token) | ||
|
||
self.get_success( | ||
event_injection.inject_member_event(self.hs, room, "@user:host2", "join") | ||
) | ||
|
||
event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"] | ||
|
||
row_1 = self.get_destination_room(room) | ||
|
||
event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"] | ||
|
||
row_2 = self.get_destination_room(room) | ||
|
||
# check: events correctly registered in order | ||
self.assertEqual(row_1["event_id"], event_id_1) | ||
self.assertEqual(row_2["event_id"], event_id_2) | ||
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once you land the other parts of this feature, it'll be good to update this so that the changelog entries get combined.