Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10133.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug when using workers where pagination requests failed if a remote server returned zero events from `/backfill`.
38 changes: 25 additions & 13 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from http import HTTPStatus
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Iterable,
List,
Expand Down Expand Up @@ -1364,11 +1365,12 @@ async def get_event(event_id: str):

event_infos.append(_NewEventInfo(event, None, auth))

await self._auth_and_persist_events(
destination,
room_id,
event_infos,
)
if event_infos:
await self._auth_and_persist_events(
destination,
room_id,
event_infos,
)

def _sanity_check_event(self, ev: EventBase) -> None:
"""
Expand Down Expand Up @@ -2077,7 +2079,7 @@ async def _auth_and_persist_events(
self,
origin: str,
room_id: str,
event_infos: Iterable[_NewEventInfo],
event_infos: Collection[_NewEventInfo],
backfilled: bool = False,
) -> None:
"""Creates the appropriate contexts and persists events. The events
Expand All @@ -2088,6 +2090,9 @@ async def _auth_and_persist_events(
Notifies about the events where appropriate.
"""

if not event_infos:
return

async def prep(ev_info: _NewEventInfo):
event = ev_info.event
with nested_logging_context(suffix=event.event_id):
Expand Down Expand Up @@ -2216,13 +2221,14 @@ async def _persist_auth_tree(
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR

await self.persist_events_and_notify(
room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
],
)
if auth_events or state:
await self.persist_events_and_notify(
room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
],
)

new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
Expand Down Expand Up @@ -3061,7 +3067,13 @@ async def persist_events_and_notify(
the same room.
backfilled: Whether these events are a result of
backfilling or not

Returns:
The stream ID after which all events have been persisted.
"""
if not event_and_contexts:
return self.store.get_current_events_token()

instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
# Limit the number of events sent over replication. We choose 200
Expand Down
68 changes: 68 additions & 0 deletions synapse/util/linked_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this included on purpose?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import logging
from typing import Generic, Optional, Protocol, TypeVar

import attr

logger = logging.getLogger(__name__)


class ListValue(Protocol):
def drop(self) -> None:
...


V = TypeVar("V", bound=ListValue)


@attr.s(slots=True, auto_attribs=True)
class _ListNode(Generic[V]):
value: Optional[V] = None
prev_node: "_ListNode" = attr.Factory(lambda self: self, takes_self=True)
next_node: "_ListNode" = attr.Factory(lambda self: self, takes_self=True)

def delete_from_cache(self) -> None:
if self.value is None:
logger.warning("Tried delete list node from cache twice.")
return

self.value.drop()

def remove_from_list(self) -> None:
prev_node = self.prev_node
next_node = self.next_node
prev_node.next_node = next_node
next_node.prev_node = prev_node

self.value = None

def move_node_to_front(self, list_root: "_ListNode") -> None:
self.remove_from_list()

prev_node = list_root
next_node = prev_node.next_node

self.prev_node = prev_node
self.next_node = next_node

prev_node.next_node = self
next_node.prev_node = self


@attr.s(slots=True)
class LinkedList(Generic[V]):
root: _ListNode[V] = attr.Factory(_ListNode)