16
16
from queue import Empty , PriorityQueue
17
17
from typing import Collection , Dict , Iterable , List , Optional , Set , Tuple
18
18
19
+ from prometheus_client import Gauge
20
+
19
21
from synapse .api .constants import MAX_DEPTH
20
22
from synapse .api .errors import StoreError
21
23
from synapse .api .room_versions import RoomVersion
32
34
from synapse .util .caches .lrucache import LruCache
33
35
from synapse .util .iterutils import batch_iter
34
36
37
+ oldest_pdu_in_federation_staging = Gauge (
38
+ "synapse_federation_server_oldest_inbound_pdu_in_staging" ,
39
+ "The age in seconds since we received the oldest pdu in the federation staging area" ,
40
+ )
41
+
42
+ number_pdus_in_federation_queue = Gauge (
43
+ "synapse_federation_server_number_inbound_pdu_in_staging" ,
44
+ "The total number of events in the inbound federation staging" ,
45
+ )
46
+
35
47
logger = logging .getLogger (__name__ )
36
48
37
49
@@ -54,6 +66,8 @@ def __init__(self, database: DatabasePool, db_conn, hs):
54
66
500000 , "_event_auth_cache" , size_callback = len
55
67
) # type: LruCache[str, List[Tuple[str, int]]]
56
68
69
+ self ._clock .looping_call (self ._get_stats_for_federation_staging , 30 * 1000 )
70
+
57
71
async def get_auth_chain (
58
72
self , room_id : str , event_ids : Collection [str ], include_given : bool = False
59
73
) -> List [EventBase ]:
@@ -1193,6 +1207,31 @@ def _get_next_staged_event_for_room_txn(txn):
1193
1207
1194
1208
return origin , event
1195
1209
1210
+ @wrap_as_background_process ("_get_stats_for_federation_staging" )
1211
+ async def _get_stats_for_federation_staging (self ):
1212
+ """Update the prometheus metrics for the inbound federation staging area."""
1213
+
1214
+ def _get_stats_for_federation_staging_txn (txn ):
1215
+ txn .execute (
1216
+ "SELECT coalesce(count(*), 0) FROM federation_inbound_events_staging"
1217
+ )
1218
+ (count ,) = txn .fetchone ()
1219
+
1220
+ txn .execute (
1221
+ "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
1222
+ )
1223
+
1224
+ (age ,) = txn .fetchone ()
1225
+
1226
+ return count , age
1227
+
1228
+ count , age = await self .db_pool .runInteraction (
1229
+ "_get_stats_for_federation_staging" , _get_stats_for_federation_staging_txn
1230
+ )
1231
+
1232
+ number_pdus_in_federation_queue .set (count )
1233
+ oldest_pdu_in_federation_staging .set (age )
1234
+
1196
1235
1197
1236
class EventFederationStore (EventFederationWorkerStore ):
1198
1237
"""Responsible for storing and serving up the various graphs associated
0 commit comments