Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 96f6293

Browse files
Add endpoints for backfilling history (MSC2716) (#9247)
Work on matrix-org/matrix-spec-proposals#2716
1 parent 756fd51 commit 96f6293

File tree

14 files changed

+584
-23
lines changed

14 files changed

+584
-23
lines changed

changelog.d/9247.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add experimental support for backfilling history into rooms ([MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716)).

scripts-dev/complement.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
6565
fi
6666

6767
# Run the tests!
68-
go test -v -tags synapse_blacklist,msc2946,msc3083 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests
68+
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests

synapse/api/auth.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,8 @@ def __init__(self, hs: "HomeServer"):
9292
async def check_from_context(
9393
self, room_version: str, event, context, do_sig_check=True
9494
) -> None:
95-
prev_state_ids = await context.get_prev_state_ids()
96-
auth_events_ids = self.compute_auth_events(
97-
event, prev_state_ids, for_verification=True
98-
)
99-
auth_events_by_id = await self.store.get_events(auth_events_ids)
95+
auth_event_ids = event.auth_event_ids()
96+
auth_events_by_id = await self.store.get_events(auth_event_ids)
10097
auth_events = {(e.type, e.state_key): e for e in auth_events_by_id.values()}
10198

10299
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]

synapse/api/constants.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ class EventTypes:
119119
SpaceChild = "m.space.child"
120120
SpaceParent = "m.space.parent"
121121

122+
MSC2716_INSERTION = "org.matrix.msc2716.insertion"
123+
MSC2716_MARKER = "org.matrix.msc2716.marker"
124+
122125

123126
class ToDeviceEventTypes:
124127
RoomKeyRequest = "m.room_key_request"
@@ -185,6 +188,18 @@ class EventContentFields:
185188
# cf https://github.com/matrix-org/matrix-doc/pull/1772
186189
ROOM_TYPE = "type"
187190

191+
# Used on normal messages to indicate they were historically imported after the fact
192+
MSC2716_HISTORICAL = "org.matrix.msc2716.historical"
193+
# For "insertion" events
194+
MSC2716_NEXT_CHUNK_ID = "org.matrix.msc2716.next_chunk_id"
195+
# Used on normal message events to indicate where the chunk connects to
196+
MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
197+
# For "marker" events
198+
MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
199+
MSC2716_MARKER_INSERTION_PREV_EVENTS = (
200+
"org.matrix.msc2716.marker.insertion_prev_events"
201+
)
202+
188203

189204
class RoomEncryptionAlgorithms:
190205
MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2"

synapse/config/experimental.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,6 @@ def read_config(self, config: JsonDict, **kwargs):
2929

3030
# MSC3026 (busy presence state)
3131
self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool
32+
33+
# MSC2716 (backfill existing history)
34+
self.msc2716_enabled = experimental.get("msc2716_enabled", False) # type: bool

synapse/events/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def __init__(self, internal_metadata_dict: JsonDict):
119119
redacted = DictProperty("redacted") # type: bool
120120
txn_id = DictProperty("txn_id") # type: str
121121
token_id = DictProperty("token_id") # type: str
122+
historical = DictProperty("historical") # type: bool
122123

123124
# XXX: These are set by StreamWorkerStore._set_before_and_after.
124125
# I'm pretty sure that these are never persisted to the database, so shouldn't
@@ -204,6 +205,14 @@ def is_redacted(self):
204205
"""
205206
return self._dict.get("redacted", False)
206207

208+
def is_historical(self) -> bool:
209+
"""Whether this is a historical message.
210+
This is used by the batchsend historical message endpoint and
211+
is needed to and mark the event as backfilled and skip some checks
212+
like push notifications.
213+
"""
214+
return self._dict.get("historical", False)
215+
207216

208217
class EventBase(metaclass=abc.ABCMeta):
209218
@property

synapse/events/builder.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import logging
1415
from typing import Any, Dict, List, Optional, Tuple, Union
1516

1617
import attr
@@ -33,6 +34,8 @@
3334
from synapse.util import Clock
3435
from synapse.util.stringutils import random_string
3536

37+
logger = logging.getLogger(__name__)
38+
3639

3740
@attr.s(slots=True, cmp=False, frozen=True)
3841
class EventBuilder:
@@ -100,6 +103,7 @@ async def build(
100103
self,
101104
prev_event_ids: List[str],
102105
auth_event_ids: Optional[List[str]],
106+
depth: Optional[int] = None,
103107
) -> EventBase:
104108
"""Transform into a fully signed and hashed event
105109
@@ -108,6 +112,9 @@ async def build(
108112
auth_event_ids: The event IDs to use as the auth events.
109113
Should normally be set to None, which will cause them to be calculated
110114
based on the room state at the prev_events.
115+
depth: Override the depth used to order the event in the DAG.
116+
Should normally be set to None, which will cause the depth to be calculated
117+
based on the prev_events.
111118
112119
Returns:
113120
The signed and hashed event.
@@ -131,8 +138,14 @@ async def build(
131138
auth_events = auth_event_ids
132139
prev_events = prev_event_ids
133140

134-
old_depth = await self._store.get_max_depth_of(prev_event_ids)
135-
depth = old_depth + 1
141+
# Otherwise, progress the depth as normal
142+
if depth is None:
143+
(
144+
_,
145+
most_recent_prev_event_depth,
146+
) = await self._store.get_max_depth_of(prev_event_ids)
147+
148+
depth = most_recent_prev_event_depth + 1
136149

137150
# we cap depth of generated events, to ensure that they are not
138151
# rejected by other servers (and so that they can be persisted in

synapse/handlers/message.py

Lines changed: 99 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,9 @@ async def create_event(
482482
prev_event_ids: Optional[List[str]] = None,
483483
auth_event_ids: Optional[List[str]] = None,
484484
require_consent: bool = True,
485+
outlier: bool = False,
486+
historical: bool = False,
487+
depth: Optional[int] = None,
485488
) -> Tuple[EventBase, EventContext]:
486489
"""
487490
Given a dict from a client, create a new event.
@@ -508,6 +511,14 @@ async def create_event(
508511
509512
require_consent: Whether to check if the requester has
510513
consented to the privacy policy.
514+
515+
outlier: Indicates whether the event is an `outlier`, i.e. if
516+
it's from an arbitrary point and floating in the DAG as
517+
opposed to being inline with the current DAG.
518+
depth: Override the depth used to order the event in the DAG.
519+
Should normally be set to None, which will cause the depth to be calculated
520+
based on the prev_events.
521+
511522
Raises:
512523
ResourceLimitError if server is blocked to some resource being
513524
exceeded
@@ -563,11 +574,36 @@ async def create_event(
563574
if txn_id is not None:
564575
builder.internal_metadata.txn_id = txn_id
565576

577+
builder.internal_metadata.outlier = outlier
578+
579+
builder.internal_metadata.historical = historical
580+
581+
# Strip down the auth_event_ids to only what we need to auth the event.
582+
# For example, we don't need extra m.room.member that don't match event.sender
583+
if auth_event_ids is not None:
584+
temp_event = await builder.build(
585+
prev_event_ids=prev_event_ids,
586+
auth_event_ids=auth_event_ids,
587+
depth=depth,
588+
)
589+
auth_events = await self.store.get_events_as_list(auth_event_ids)
590+
# Create a StateMap[str]
591+
auth_event_state_map = {
592+
(e.type, e.state_key): e.event_id for e in auth_events
593+
}
594+
# Actually strip down and use the necessary auth events
595+
auth_event_ids = self.auth.compute_auth_events(
596+
event=temp_event,
597+
current_state_ids=auth_event_state_map,
598+
for_verification=False,
599+
)
600+
566601
event, context = await self.create_new_client_event(
567602
builder=builder,
568603
requester=requester,
569604
prev_event_ids=prev_event_ids,
570605
auth_event_ids=auth_event_ids,
606+
depth=depth,
571607
)
572608

573609
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -724,9 +760,13 @@ async def create_and_send_nonmember_event(
724760
self,
725761
requester: Requester,
726762
event_dict: dict,
763+
prev_event_ids: Optional[List[str]] = None,
764+
auth_event_ids: Optional[List[str]] = None,
727765
ratelimit: bool = True,
728766
txn_id: Optional[str] = None,
729767
ignore_shadow_ban: bool = False,
768+
outlier: bool = False,
769+
depth: Optional[int] = None,
730770
) -> Tuple[EventBase, int]:
731771
"""
732772
Creates an event, then sends it.
@@ -736,10 +776,24 @@ async def create_and_send_nonmember_event(
736776
Args:
737777
requester: The requester sending the event.
738778
event_dict: An entire event.
779+
prev_event_ids:
780+
The event IDs to use as the prev events.
781+
Should normally be left as None to automatically request them
782+
from the database.
783+
auth_event_ids:
784+
The event ids to use as the auth_events for the new event.
785+
Should normally be left as None, which will cause them to be calculated
786+
based on the room state at the prev_events.
739787
ratelimit: Whether to rate limit this send.
740788
txn_id: The transaction ID.
741789
ignore_shadow_ban: True if shadow-banned users should be allowed to
742790
send this event.
791+
outlier: Indicates whether the event is an `outlier`, i.e. if
792+
it's from an arbitrary point and floating in the DAG as
793+
opposed to being inline with the current DAG.
794+
depth: Override the depth used to order the event in the DAG.
795+
Should normally be set to None, which will cause the depth to be calculated
796+
based on the prev_events.
743797
744798
Returns:
745799
The event, and its stream ordering (if deduplication happened,
@@ -779,7 +833,13 @@ async def create_and_send_nonmember_event(
779833
return event, event.internal_metadata.stream_ordering
780834

781835
event, context = await self.create_event(
782-
requester, event_dict, txn_id=txn_id
836+
requester,
837+
event_dict,
838+
txn_id=txn_id,
839+
prev_event_ids=prev_event_ids,
840+
auth_event_ids=auth_event_ids,
841+
outlier=outlier,
842+
depth=depth,
783843
)
784844

785845
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
@@ -811,6 +871,7 @@ async def create_new_client_event(
811871
requester: Optional[Requester] = None,
812872
prev_event_ids: Optional[List[str]] = None,
813873
auth_event_ids: Optional[List[str]] = None,
874+
depth: Optional[int] = None,
814875
) -> Tuple[EventBase, EventContext]:
815876
"""Create a new event for a local client
816877
@@ -828,6 +889,10 @@ async def create_new_client_event(
828889
Should normally be left as None, which will cause them to be calculated
829890
based on the room state at the prev_events.
830891
892+
depth: Override the depth used to order the event in the DAG.
893+
Should normally be set to None, which will cause the depth to be calculated
894+
based on the prev_events.
895+
831896
Returns:
832897
Tuple of created event, context
833898
"""
@@ -851,9 +916,24 @@ async def create_new_client_event(
851916
), "Attempting to create an event with no prev_events"
852917

853918
event = await builder.build(
854-
prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids
919+
prev_event_ids=prev_event_ids,
920+
auth_event_ids=auth_event_ids,
921+
depth=depth,
855922
)
856-
context = await self.state.compute_event_context(event)
923+
924+
old_state = None
925+
926+
# Pass on the outlier property from the builder to the event
927+
# after it is created
928+
if builder.internal_metadata.outlier:
929+
event.internal_metadata.outlier = builder.internal_metadata.outlier
930+
931+
# Calculate the state for outliers that pass in their own `auth_event_ids`
932+
if auth_event_ids:
933+
old_state = await self.store.get_events_as_list(auth_event_ids)
934+
935+
context = await self.state.compute_event_context(event, old_state=old_state)
936+
857937
if requester:
858938
context.app_service = requester.app_service
859939

@@ -1018,7 +1098,13 @@ async def _persist_event(
10181098
the arguments.
10191099
"""
10201100

1021-
await self.action_generator.handle_push_actions_for_event(event, context)
1101+
# Skip push notification actions for historical messages
1102+
# because we don't want to notify people about old history back in time.
1103+
# The historical messages also do not have the proper `context.current_state_ids`
1104+
# and `state_groups` because they have `prev_events` that aren't persisted yet
1105+
# (historical messages persisted in reverse-chronological order).
1106+
if not event.internal_metadata.is_historical():
1107+
await self.action_generator.handle_push_actions_for_event(event, context)
10221108

10231109
try:
10241110
# If we're a worker we need to hit out to the master.
@@ -1317,13 +1403,21 @@ async def persist_and_notify_client_event(
13171403
if prev_state_ids:
13181404
raise AuthError(403, "Changing the room create event is forbidden")
13191405

1406+
# Mark any `m.historical` messages as backfilled so they don't appear
1407+
# in `/sync` and have the proper decrementing `stream_ordering` as we import
1408+
backfilled = False
1409+
if event.internal_metadata.is_historical():
1410+
backfilled = True
1411+
13201412
# Note that this returns the event that was persisted, which may not be
13211413
# the same as we passed in if it was deduplicated due transaction IDs.
13221414
(
13231415
event,
13241416
event_pos,
13251417
max_stream_token,
1326-
) = await self.storage.persistence.persist_event(event, context=context)
1418+
) = await self.storage.persistence.persist_event(
1419+
event, context=context, backfilled=backfilled
1420+
)
13271421

13281422
if self._ephemeral_events_enabled:
13291423
# If there's an expiry timestamp on the event, schedule its expiry.

0 commit comments

Comments
 (0)