Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit dd2112e

Browse files
committed
Generate historic pagination token for /messages when no from token provided
Part of #12281 Context: #12319 (comment)
1 parent 9633eb2 commit dd2112e

File tree

6 files changed

+98
-11
lines changed

6 files changed

+98
-11
lines changed

scripts-dev/complement.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,4 @@ fi
7171

7272
# Run the tests!
7373
echo "Images built; running complement"
74-
go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
74+
go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/

synapse/handlers/pagination.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from synapse.metrics.background_process_metrics import run_as_background_process
2828
from synapse.storage.state import StateFilter
2929
from synapse.streams.config import PaginationConfig
30-
from synapse.types import JsonDict, Requester
30+
from synapse.types import JsonDict, Requester, RoomStreamToken
3131
from synapse.util.async_helpers import ReadWriteLock
3232
from synapse.util.stringutils import random_string
3333
from synapse.visibility import filter_events_for_client
@@ -441,7 +441,54 @@ async def get_messages(
441441
if pagin_config.from_token:
442442
from_token = pagin_config.from_token
443443
else:
444-
from_token = self.hs.get_event_sources().get_current_token_for_pagination()
444+
from_token = (
445+
await self.hs.get_event_sources().get_current_token_for_pagination(
446+
room_id
447+
)
448+
)
449+
assert from_token.room_key.topological
450+
# from_live_token = (
451+
# self.hs.get_event_sources().get_current_token_for_pagination()
452+
# )
453+
# # Convert the live token (sXXX) into a historic token (tXXX-XXX)
454+
# # which is more suitable for /messages.
455+
# current_stream_ordering = from_live_token.room_key.stream
456+
# current_topographical_ordering = (
457+
# await self.store.get_current_topological_token(
458+
# room_id, current_stream_ordering
459+
# )
460+
# )
461+
# from_token = from_live_token.copy_and_replace(
462+
# "room_key",
463+
# RoomStreamToken(
464+
# current_topographical_ordering, current_stream_ordering
465+
# ),
466+
# )
467+
# logger.info(
468+
# "get_messages(room_id=%s)\n\tfrom_token=%s\n\tcurrent_stream_ordering=%s\n\tcurrent_topographical_ordering=%s\n\tfrom_live_token=%s",
469+
# room_id,
470+
# await from_token.to_string(self.store),
471+
# current_stream_ordering,
472+
# current_topographical_ordering,
473+
# await from_live_token.to_string(self.store),
474+
# )
475+
logger.info(
476+
"get_messages(room_id=%s)\n\tfrom_token=%s",
477+
room_id,
478+
await from_token.to_string(self.store),
479+
)
480+
logger.info(
481+
"asdf_get_debug_events_in_room_ordered_by_depth %s",
482+
await self.store.asdf_get_debug_events_in_room_ordered_by_depth(
483+
room_id
484+
),
485+
)
486+
logger.info(
487+
"asdf_get_debug_events_in_room_ordered_by_stream_ordering %s",
488+
await self.store.asdf_get_debug_events_in_room_ordered_by_stream_ordering(
489+
room_id
490+
),
491+
)
445492

446493
if pagin_config.limit is None:
447494
# This shouldn't happen as we've set a default limit before this

synapse/handlers/room.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1444,7 +1444,7 @@ async def get_new_events(
14441444
def get_current_key(self) -> RoomStreamToken:
14451445
return self.store.get_room_max_token()
14461446

1447-
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
1447+
def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]:
14481448
return self.store.get_room_events_max_id(room_id)
14491449

14501450

synapse/python_dependencies.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
"matrix-common~=1.1.0",
9090
# We need packaging.requirements.Requirement, added in 16.1.
9191
"packaging>=16.1",
92+
"tabulate>=0.8.9",
9293
]
9394

9495
CONDITIONAL_REQUIREMENTS = {

synapse/storage/databases/main/stream.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from frozendict import frozendict
4343

4444
from twisted.internet import defer
45+
from tabulate import tabulate
4546

4647
from synapse.api.filtering import Filter
4748
from synapse.events import EventBase
@@ -748,21 +749,21 @@ def _f(txn):
748749
"get_room_event_before_stream_ordering", _f
749750
)
750751

751-
async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
752+
async def get_room_events_max_id(self, room_id: Optional[str] = None) -> RoomStreamToken:
752753
"""Returns the current token for rooms stream.
753754
754755
By default, it returns the current global stream token. Specifying a
755756
`room_id` causes it to return the current room specific topological
756757
token.
757758
"""
758-
token = self.get_room_max_stream_ordering()
759+
stream_ordering = self.get_room_max_stream_ordering()
759760
if room_id is None:
760-
return "s%d" % (token,)
761+
return RoomStreamToken(None, stream_ordering)
761762
else:
762763
topo = await self.db_pool.runInteraction(
763764
"_get_max_topological_txn", self._get_max_topological_txn, room_id
764765
)
765-
return "t%d-%d" % (topo, token)
766+
return RoomStreamToken(topo, stream_ordering)
766767

767768
def get_stream_id_for_event_txn(
768769
self,
@@ -808,6 +809,44 @@ async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToke
808809
)
809810
return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])
810811

812+
async def asdf_get_debug_events_in_room_ordered_by_depth(self, room_id: str) -> Any:
813+
"""Gets the topological token in a room after or at the given stream
814+
ordering.
815+
816+
Args:
817+
room_id
818+
"""
819+
sql = (
820+
"SELECT depth, stream_ordering, type, state_key, event_id FROM events"
821+
" WHERE events.room_id = ?"
822+
" ORDER BY depth DESC, stream_ordering DESC;"
823+
)
824+
rows = await self.db_pool.execute(
825+
"asdf_get_debug_events_in_room_ordered_by_depth", None, sql, room_id
826+
)
827+
828+
headers = ["depth", "stream_ordering", "type", "state_key", "event_id"]
829+
return tabulate(rows, headers=headers)
830+
831+
async def asdf_get_debug_events_in_room_ordered_by_stream_ordering(self, room_id: str) -> Any:
832+
"""Gets the topological token in a room after or at the given stream
833+
ordering.
834+
835+
Args:
836+
room_id
837+
"""
838+
sql = (
839+
"SELECT depth, stream_ordering, type, state_key, event_id FROM events"
840+
" WHERE events.room_id = ?"
841+
" ORDER BY stream_ordering DESC, depth DESC;"
842+
)
843+
rows = await self.db_pool.execute(
844+
"asdf_get_debug_events_in_room_ordered_by_depth", None, sql, room_id
845+
)
846+
847+
headers = ["depth", "stream_ordering", "type", "state_key", "event_id"]
848+
return tabulate(rows, headers=headers)
849+
811850
async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
812851
"""Gets the topological token in a room after or at the given stream
813852
ordering.

synapse/streams/events.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import TYPE_CHECKING, Iterator, Tuple
15+
from typing import TYPE_CHECKING, Iterator, Optional, Tuple
1616

1717
import attr
1818

@@ -69,7 +69,7 @@ def get_current_token(self) -> StreamToken:
6969
)
7070
return token
7171

72-
def get_current_token_for_pagination(self) -> StreamToken:
72+
async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
7373
"""Get the current token for a given room to be used to paginate
7474
events.
7575
@@ -80,7 +80,7 @@ def get_current_token_for_pagination(self) -> StreamToken:
8080
The current token for pagination.
8181
"""
8282
token = StreamToken(
83-
room_key=self.sources.room.get_current_key(),
83+
room_key=await self.sources.room.get_current_key_for_room(room_id),
8484
presence_key=0,
8585
typing_key=0,
8686
receipt_key=0,

0 commit comments

Comments
 (0)