|
12 | 12 | # <https://www.gnu.org/licenses/agpl-3.0.html>.
|
13 | 13 | #
|
14 | 14 |
|
| 15 | +import itertools |
15 | 16 | import logging
|
16 | 17 | from typing import TYPE_CHECKING, AbstractSet, Dict, Mapping, Optional, Sequence, Set
|
17 | 18 |
|
18 | 19 | from typing_extensions import assert_never
|
19 | 20 |
|
20 |
| -from synapse.api.constants import AccountDataTypes |
| 21 | +from synapse.api.constants import AccountDataTypes, EduTypes |
21 | 22 | from synapse.handlers.receipts import ReceiptEventSource
|
22 | 23 | from synapse.handlers.sliding_sync.types import (
|
23 | 24 | HaveSentRoomFlag,
|
24 | 25 | MutablePerConnectionState,
|
25 | 26 | PerConnectionState,
|
26 | 27 | )
|
27 | 28 | from synapse.logging.opentracing import trace
|
| 29 | +from synapse.storage.databases.main.receipts import ReceiptInRoom |
28 | 30 | from synapse.types import (
|
29 | 31 | DeviceListUpdates,
|
30 | 32 | JsonMapping,
|
@@ -485,15 +487,21 @@ async def get_receipts_extension_response(
|
485 | 487 | initial_rooms.add(room_id)
|
486 | 488 | continue
|
487 | 489 |
|
488 |
| - # If we're sending down the room from scratch again for some reason, we |
489 |
| - # should always resend the receipts as well (regardless of if |
490 |
| - # we've sent them down before). This is to mimic the behaviour |
491 |
| - # of what happens on initial sync, where you get a chunk of |
492 |
| - # timeline with all of the corresponding receipts for the events in the timeline. |
| 490 | + # If we're sending down the room from scratch again for some |
| 491 | + # reason, we should always resend the receipts as well |
| 492 | + # (regardless of if we've sent them down before). This is to |
| 493 | + # mimic the behaviour of what happens on initial sync, where you |
| 494 | + # get a chunk of timeline with all of the corresponding receipts |
| 495 | + # for the events in the timeline. |
| 496 | + # |
| 497 | + # We also resend down receipts when we "expand" the timeline, |
| 498 | + # (see the "XXX: Odd behavior" in |
| 499 | + # `synapse.handlers.sliding_sync`). |
493 | 500 | room_result = actual_room_response_map.get(room_id)
|
494 |
| - if room_result is not None and room_result.initial: |
495 |
| - initial_rooms.add(room_id) |
496 |
| - continue |
| 501 | + if room_result is not None: |
| 502 | + if room_result.initial or room_result.unstable_expanded_timeline: |
| 503 | + initial_rooms.add(room_id) |
| 504 | + continue |
497 | 505 |
|
498 | 506 | room_status = previous_connection_state.receipts.have_sent_room(room_id)
|
499 | 507 | if room_status.status == HaveSentRoomFlag.LIVE:
|
@@ -536,21 +544,49 @@ async def get_receipts_extension_response(
|
536 | 544 | )
|
537 | 545 | fetched_receipts.extend(previously_receipts)
|
538 | 546 |
|
539 |
| - # For rooms we haven't previously sent down, we could send all receipts |
540 |
| - # from that room but we only want to include receipts for events |
541 |
| - # in the timeline to avoid bloating and blowing up the sync response |
542 |
| - # as the number of users in the room increases. (this behavior is part of the spec) |
543 |
| - initial_rooms_and_event_ids = [ |
544 |
| - (room_id, event.event_id) |
545 |
| - for room_id in initial_rooms |
546 |
| - if room_id in actual_room_response_map |
547 |
| - for event in actual_room_response_map[room_id].timeline_events |
548 |
| - ] |
549 |
| - if initial_rooms_and_event_ids: |
| 547 | + if initial_rooms: |
| 548 | + # We also always send down receipts for the current user. |
| 549 | + user_receipts = ( |
| 550 | + await self.store.get_linearized_receipts_for_user_in_rooms( |
| 551 | + user_id=sync_config.user.to_string(), |
| 552 | + room_ids=initial_rooms, |
| 553 | + to_key=to_token.receipt_key, |
| 554 | + ) |
| 555 | + ) |
| 556 | + |
| 557 | + # For rooms we haven't previously sent down, we could send all receipts |
| 558 | + # from that room but we only want to include receipts for events |
| 559 | + # in the timeline to avoid bloating and blowing up the sync response |
| 560 | + # as the number of users in the room increases. (this behavior is part of the spec) |
| 561 | + initial_rooms_and_event_ids = [ |
| 562 | + (room_id, event.event_id) |
| 563 | + for room_id in initial_rooms |
| 564 | + if room_id in actual_room_response_map |
| 565 | + for event in actual_room_response_map[room_id].timeline_events |
| 566 | + ] |
550 | 567 | initial_receipts = await self.store.get_linearized_receipts_for_events(
|
551 | 568 | room_and_event_ids=initial_rooms_and_event_ids,
|
552 | 569 | )
|
553 |
| - fetched_receipts.extend(initial_receipts) |
| 570 | + |
| 571 | + # Combine the receipts for a room and add them to |
| 572 | + # `fetched_receipts` |
| 573 | + for room_id in initial_receipts.keys() | user_receipts.keys(): |
| 574 | + receipt_content = ReceiptInRoom.merge_to_content( |
| 575 | + list( |
| 576 | + itertools.chain( |
| 577 | + initial_receipts.get(room_id, []), |
| 578 | + user_receipts.get(room_id, []), |
| 579 | + ) |
| 580 | + ) |
| 581 | + ) |
| 582 | + |
| 583 | + fetched_receipts.append( |
| 584 | + { |
| 585 | + "room_id": room_id, |
| 586 | + "type": EduTypes.RECEIPT, |
| 587 | + "content": receipt_content, |
| 588 | + } |
| 589 | + ) |
554 | 590 |
|
555 | 591 | fetched_receipts = ReceiptEventSource.filter_out_private_receipts(
|
556 | 592 | fetched_receipts, sync_config.user.to_string()
|
|
0 commit comments