31
31
from synapse .metrics .background_process_metrics import run_as_background_process
32
32
from synapse .storage .databases import Databases
33
33
from synapse .storage .databases .main .events import DeltaState
34
- from synapse .types import Collection , PersistedEventPosition , RoomStreamToken , StateMap
34
+ from synapse .storage .databases .main .events_worker import EventRedactBehaviour
35
+ from synapse .types import (
36
+ Collection ,
37
+ PersistedEventPosition ,
38
+ RoomStreamToken ,
39
+ StateMap ,
40
+ get_domain_from_id ,
41
+ )
35
42
from synapse .util .async_helpers import ObservableDeferred
36
43
from synapse .util .metrics import Measure
37
44
68
75
buckets = (0 , 1 , 2 , 3 , 5 , 7 , 10 , 15 , 20 , 50 , 100 , 200 , 500 , "+Inf" ),
69
76
)
70
77
78
+ state_resolutions_during_persistence = Counter (
79
+ "synapse_storage_events_state_resolutions_during_persistence" ,
80
+ "Number of times we had to do state res to calculate new current state" ,
81
+ )
82
+
83
+ potential_times_prune_extremities = Counter (
84
+ "synapse_storage_events_potential_times_prune_extremities" ,
85
+ "Number of times we might be able to prune extremities" ,
86
+ )
87
+
88
+ times_pruned_extremities = Counter (
89
+ "synapse_storage_events_times_pruned_extremities" ,
90
+ "Number of times we were actually be able to prune extremities" ,
91
+ )
92
+
71
93
72
94
class _EventPeristenceQueue :
73
95
"""Queues up events so that they can be persisted in bulk with only one
@@ -454,7 +476,15 @@ async def _persist_events(
454
476
latest_event_ids ,
455
477
new_latest_event_ids ,
456
478
)
457
- current_state , delta_ids = res
479
+ current_state , delta_ids , new_latest_event_ids = res
480
+
481
+ # there should always be at least one forward extremity.
482
+ # (except during the initial persistence of the send_join
483
+ # results, in which case there will be no existing
484
+ # extremities, so we'll `continue` above and skip this bit.)
485
+ assert new_latest_event_ids , "No forward extremities left!"
486
+
487
+ new_forward_extremeties [room_id ] = new_latest_event_ids
458
488
459
489
# If either are not None then there has been a change,
460
490
# and we need to work out the delta (or use that
@@ -573,29 +603,35 @@ async def _get_new_state_after_events(
573
603
self ,
574
604
room_id : str ,
575
605
events_context : List [Tuple [EventBase , EventContext ]],
576
- old_latest_event_ids : Iterable [str ],
577
- new_latest_event_ids : Iterable [str ],
578
- ) -> Tuple [Optional [StateMap [str ]], Optional [StateMap [str ]]]:
606
+ old_latest_event_ids : Set [str ],
607
+ new_latest_event_ids : Set [str ],
608
+ ) -> Tuple [Optional [StateMap [str ]], Optional [StateMap [str ]], Set [ str ] ]:
579
609
"""Calculate the current state dict after adding some new events to
580
610
a room
581
611
582
612
Args:
583
- room_id (str) :
613
+ room_id:
584
614
room to which the events are being added. Used for logging etc
585
615
586
- events_context (list[(EventBase, EventContext)]) :
616
+ events_context:
587
617
events and contexts which are being added to the room
588
618
589
- old_latest_event_ids (iterable[str]) :
619
+ old_latest_event_ids:
590
620
the old forward extremities for the room.
591
621
592
- new_latest_event_ids (iterable[str]) :
622
+ new_latest_event_ids :
593
623
the new forward extremities for the room.
594
624
595
625
Returns:
596
- Returns a tuple of two state maps, the first being the full new current
597
- state and the second being the delta to the existing current state.
598
- If both are None then there has been no change.
626
+ Returns a tuple of two state maps and a set of new forward
627
+ extremities.
628
+
629
+ The first state map is the full new current state and the second
630
+ is the delta to the existing current state. If both are None then
631
+ there has been no change.
632
+
633
+ The function may prune some old entries from the set of new
634
+ forward extremities if it's safe to do so.
599
635
600
636
If there has been a change then we only return the delta if its
601
637
already been calculated. Conversely if we do know the delta then
@@ -672,7 +708,7 @@ async def _get_new_state_after_events(
672
708
# If they old and new groups are the same then we don't need to do
673
709
# anything.
674
710
if old_state_groups == new_state_groups :
675
- return None , None
711
+ return None , None , new_latest_event_ids
676
712
677
713
if len (new_state_groups ) == 1 and len (old_state_groups ) == 1 :
678
714
# If we're going from one state group to another, lets check if
@@ -689,7 +725,7 @@ async def _get_new_state_after_events(
689
725
# the current state in memory then lets also return that,
690
726
# but it doesn't matter if we don't.
691
727
new_state = state_groups_map .get (new_state_group )
692
- return new_state , delta_ids
728
+ return new_state , delta_ids , new_latest_event_ids
693
729
694
730
# Now that we have calculated new_state_groups we need to get
695
731
# their state IDs so we can resolve to a single state set.
@@ -701,7 +737,7 @@ async def _get_new_state_after_events(
701
737
if len (new_state_groups ) == 1 :
702
738
# If there is only one state group, then we know what the current
703
739
# state is.
704
- return state_groups_map [new_state_groups .pop ()], None
740
+ return state_groups_map [new_state_groups .pop ()], None , new_latest_event_ids
705
741
706
742
# Ok, we need to defer to the state handler to resolve our state sets.
707
743
@@ -734,7 +770,139 @@ async def _get_new_state_after_events(
734
770
state_res_store = StateResolutionStore (self .main_store ),
735
771
)
736
772
737
- return res .state , None
773
+ state_resolutions_during_persistence .inc ()
774
+
775
+ # If the returned state matches the state group of one of the new
776
+ # forward extremities then we check if we are able to prune some state
777
+ # extremities.
778
+ if res .state_group and res .state_group in new_state_groups :
779
+ new_latest_event_ids = await self ._prune_extremities (
780
+ room_id ,
781
+ new_latest_event_ids ,
782
+ res .state_group ,
783
+ event_id_to_state_group ,
784
+ events_context ,
785
+ )
786
+
787
+ return res .state , None , new_latest_event_ids
788
+
789
+ async def _prune_extremities (
790
+ self ,
791
+ room_id : str ,
792
+ new_latest_event_ids : Set [str ],
793
+ resolved_state_group : int ,
794
+ event_id_to_state_group : Dict [str , int ],
795
+ events_context : List [Tuple [EventBase , EventContext ]],
796
+ ) -> Set [str ]:
797
+ """See if we can prune any of the extremities after calculating the
798
+ resolved state.
799
+ """
800
+ potential_times_prune_extremities .inc ()
801
+
802
+ # We keep all the extremities that have the same state group, and
803
+ # see if we can drop the others.
804
+ new_new_extrems = {
805
+ e
806
+ for e in new_latest_event_ids
807
+ if event_id_to_state_group [e ] == resolved_state_group
808
+ }
809
+
810
+ dropped_extrems = set (new_latest_event_ids ) - new_new_extrems
811
+
812
+ logger .debug ("Might drop extremities: %s" , dropped_extrems )
813
+
814
+ # We only drop events from the extremities list if:
815
+ # 1. we're not currently persisting them;
816
+ # 2. they're not our own events (or are dummy events); and
817
+ # 3. they're either:
818
+ # 1. over N hours old and more than N events ago (we use depth to
819
+ # calculate); or
820
+ # 2. we are persisting an event from the same domain and more than
821
+ # M events ago.
822
+ #
823
+ # The idea is that we don't want to drop events that are "legitimate"
824
+ # extremities (that we would want to include as prev events), only
825
+ # "stuck" extremities that are e.g. due to a gap in the graph.
826
+ #
827
+ # Note that we either drop all of them or none of them. If we only drop
828
+ # some of the events we don't know if state res would come to the same
829
+ # conclusion.
830
+
831
+ for ev , _ in events_context :
832
+ if ev .event_id in dropped_extrems :
833
+ logger .debug (
834
+ "Not dropping extremities: %s is being persisted" , ev .event_id
835
+ )
836
+ return new_latest_event_ids
837
+
838
+ dropped_events = await self .main_store .get_events (
839
+ dropped_extrems ,
840
+ allow_rejected = True ,
841
+ redact_behaviour = EventRedactBehaviour .AS_IS ,
842
+ )
843
+
844
+ new_senders = {get_domain_from_id (e .sender ) for e , _ in events_context }
845
+
846
+ one_day_ago = self ._clock .time_msec () - 24 * 60 * 60 * 1000
847
+ current_depth = max (e .depth for e , _ in events_context )
848
+ for event in dropped_events .values ():
849
+ # If the event is a local dummy event then we should check it
850
+ # doesn't reference any local events, as we want to reference those
851
+ # if we send any new events.
852
+ #
853
+ # Note we do this recursively to handle the case where a dummy event
854
+ # references a dummy event that only references remote events.
855
+ #
856
+ # Ideally we'd figure out a way of still being able to drop old
857
+ # dummy events that reference local events, but this is good enough
858
+ # as a first cut.
859
+ events_to_check = [event ]
860
+ while events_to_check :
861
+ new_events = set ()
862
+ for event_to_check in events_to_check :
863
+ if self .is_mine_id (event_to_check .sender ):
864
+ if event_to_check .type != EventTypes .Dummy :
865
+ logger .debug ("Not dropping own event" )
866
+ return new_latest_event_ids
867
+ new_events .update (event_to_check .prev_event_ids ())
868
+
869
+ prev_events = await self .main_store .get_events (
870
+ new_events ,
871
+ allow_rejected = True ,
872
+ redact_behaviour = EventRedactBehaviour .AS_IS ,
873
+ )
874
+ events_to_check = prev_events .values ()
875
+
876
+ if (
877
+ event .origin_server_ts < one_day_ago
878
+ and event .depth < current_depth - 100
879
+ ):
880
+ continue
881
+
882
+ # We can be less conservative about dropping extremities from the
883
+ # same domain, though we do want to wait a little bit (otherwise
884
+ # we'll immediately remove all extremities from a given server).
885
+ if (
886
+ get_domain_from_id (event .sender ) in new_senders
887
+ and event .depth < current_depth - 20
888
+ ):
889
+ continue
890
+
891
+ logger .debug (
892
+ "Not dropping as too new and not in new_senders: %s" , new_senders ,
893
+ )
894
+
895
+ return new_latest_event_ids
896
+
897
+ times_pruned_extremities .inc ()
898
+
899
+ logger .info (
900
+ "Pruning forward extremities in room %s: from %s -> %s" ,
901
+ room_id ,
902
+ new_latest_event_ids ,
903
+ new_new_extrems ,
904
+ )
905
+ return new_new_extrems
738
906
739
907
async def _calculate_state_delta (
740
908
self , room_id : str , current_state : StateMap [str ]
0 commit comments