-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add type annotations to synapse.metrics
#10847
Changes from 15 commits
66d63c0
934fe41
f058e4f
dbe4345
949439c
d55eb93
ab66469
d1ee5da
6924ce8
7ab1153
77120c2
dba3f61
b483e31
dd276d1
055dbde
b7d099b
6c2f682
53ff8f5
550d1bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add type annotations to `synapse.metrics`. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,10 +20,24 @@ | |
import platform | ||
import threading | ||
import time | ||
from typing import Callable, Dict, Iterable, Optional, Tuple, Union | ||
from typing import ( | ||
Any, | ||
Callable, | ||
Dict, | ||
Generic, | ||
Iterable, | ||
Optional, | ||
Sequence, | ||
Set, | ||
Tuple, | ||
Type, | ||
TypeVar, | ||
Union, | ||
cast, | ||
) | ||
|
||
import attr | ||
from prometheus_client import Counter, Gauge, Histogram | ||
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric | ||
from prometheus_client.core import ( | ||
REGISTRY, | ||
CounterMetricFamily, | ||
|
@@ -32,6 +46,7 @@ | |
) | ||
|
||
from twisted.internet import reactor | ||
from twisted.internet.base import ReactorBase | ||
|
||
import synapse | ||
from synapse.metrics._exposition import ( | ||
|
@@ -53,7 +68,7 @@ | |
|
||
class RegistryProxy: | ||
@staticmethod | ||
def collect(): | ||
def collect() -> Iterable[Metric]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would think this needs to be a This is true for a variety of spots we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Or maybe I can significantly simplify type hints in other spots!) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I opted for using the most basic type that callers want, to minimize assumptions, rather than documenting the concrete return type. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All Generators are Iterators (and all Iterators are Iterable), and in this case this seems cleaner than saying a |
||
for metric in REGISTRY.collect(): | ||
if not metric.name.startswith("__"): | ||
yield metric | ||
|
@@ -69,7 +84,7 @@ class LaterGauge: | |
# or dict mapping from a label tuple to a value | ||
caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]]) | ||
|
||
def collect(self): | ||
def collect(self) -> Iterable[Metric]: | ||
|
||
g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) | ||
|
||
|
@@ -88,10 +103,10 @@ def collect(self): | |
|
||
yield g | ||
|
||
def __attrs_post_init__(self): | ||
def __attrs_post_init__(self) -> None: | ||
self._register() | ||
|
||
def _register(self): | ||
def _register(self) -> None: | ||
if self.name in all_gauges.keys(): | ||
logger.warning("%s already registered, reregistering" % (self.name,)) | ||
REGISTRY.unregister(all_gauges.pop(self.name)) | ||
|
@@ -100,7 +115,12 @@ def _register(self): | |
all_gauges[self.name] = self | ||
|
||
|
||
class InFlightGauge: | ||
# `MetricsEntry` only makes sense when it is a `Protocol`, | ||
# but `Protocol` can't be used as a `TypeVar` bound. | ||
MetricsEntry = TypeVar("MetricsEntry") | ||
|
||
|
||
class InFlightGauge(Generic[MetricsEntry]): | ||
reivilibre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight | ||
at any given time. | ||
|
||
|
@@ -110,34 +130,45 @@ class InFlightGauge: | |
callbacks. | ||
|
||
Args: | ||
name (str) | ||
desc (str) | ||
labels (list[str]) | ||
sub_metrics (list[str]): A list of sub metrics that the callbacks | ||
will update. | ||
name | ||
desc | ||
labels | ||
sub_metrics: A list of sub metrics that the callbacks will update. | ||
""" | ||
|
||
def __init__(self, name, desc, labels, sub_metrics): | ||
def __init__( | ||
self, | ||
name: str, | ||
desc: str, | ||
labels: Sequence[str], | ||
sub_metrics: Sequence[str], | ||
): | ||
self.name = name | ||
self.desc = desc | ||
self.labels = labels | ||
self.sub_metrics = sub_metrics | ||
|
||
# Create a class which have the sub_metrics values as attributes, which | ||
# default to 0 on initialization. Used to pass to registered callbacks. | ||
self._metrics_class = attr.make_class( | ||
self._metrics_class: Type[MetricsEntry] = attr.make_class( | ||
"_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True | ||
) | ||
|
||
# Counts number of in flight blocks for a given set of label values | ||
self._registrations: Dict = {} | ||
self._registrations: Dict[ | ||
Tuple[str, ...], Set[Callable[[MetricsEntry], None]] | ||
] = {} | ||
|
||
# Protects access to _registrations | ||
self._lock = threading.Lock() | ||
|
||
self._register_with_collector() | ||
|
||
def register(self, key, callback): | ||
def register( | ||
self, | ||
key: Tuple[str, ...], | ||
callback: Callable[[MetricsEntry], None], | ||
) -> None: | ||
"""Registers that we've entered a new block with labels `key`. | ||
|
||
`callback` gets called each time the metrics are collected. The same | ||
|
@@ -153,13 +184,17 @@ def register(self, key, callback): | |
with self._lock: | ||
self._registrations.setdefault(key, set()).add(callback) | ||
|
||
def unregister(self, key, callback): | ||
def unregister( | ||
self, | ||
key: Tuple[str, ...], | ||
callback: Callable[[MetricsEntry], None], | ||
) -> None: | ||
"""Registers that we've exited a block with labels `key`.""" | ||
|
||
with self._lock: | ||
self._registrations.setdefault(key, set()).discard(callback) | ||
|
||
def collect(self): | ||
def collect(self) -> Iterable[Metric]: | ||
"""Called by prometheus client when it reads metrics. | ||
|
||
Note: may be called by a separate thread. | ||
|
@@ -195,7 +230,7 @@ def collect(self): | |
gauge.add_metric(key, getattr(metrics, name)) | ||
yield gauge | ||
|
||
def _register_with_collector(self): | ||
def _register_with_collector(self) -> None: | ||
if self.name in all_gauges.keys(): | ||
logger.warning("%s already registered, reregistering" % (self.name,)) | ||
REGISTRY.unregister(all_gauges.pop(self.name)) | ||
|
@@ -225,7 +260,7 @@ def __init__( | |
name: str, | ||
documentation: str, | ||
buckets: Iterable[float], | ||
registry=REGISTRY, | ||
registry: CollectorRegistry = REGISTRY, | ||
): | ||
""" | ||
Args: | ||
|
@@ -252,12 +287,12 @@ def __init__( | |
|
||
registry.register(self) | ||
|
||
def collect(self): | ||
def collect(self) -> Iterable[Metric]: | ||
# Don't report metrics unless we've already collected some data | ||
if self._metric is not None: | ||
yield self._metric | ||
|
||
def update_data(self, values: Iterable[float]): | ||
def update_data(self, values: Iterable[float]) -> None: | ||
"""Update the data to be reported by the metric | ||
|
||
The existing data is cleared, and each measurement in the input is assigned | ||
|
@@ -299,7 +334,7 @@ def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFami | |
|
||
|
||
class CPUMetrics: | ||
def __init__(self): | ||
def __init__(self) -> None: | ||
ticks_per_sec = 100 | ||
try: | ||
# Try and get the system config | ||
|
@@ -309,7 +344,7 @@ def __init__(self): | |
|
||
self.ticks_per_sec = ticks_per_sec | ||
|
||
def collect(self): | ||
def collect(self) -> Iterable[Metric]: | ||
if not HAVE_PROC_SELF_STAT: | ||
return | ||
DMRobertson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
@@ -359,7 +394,7 @@ def collect(self): | |
|
||
|
||
class GCCounts: | ||
def collect(self): | ||
def collect(self) -> Iterable[Metric]: | ||
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) | ||
for n, m in enumerate(gc.get_count()): | ||
cm.add_metric([str(n)], m) | ||
|
@@ -377,7 +412,7 @@ def collect(self): | |
|
||
|
||
class PyPyGCStats: | ||
def collect(self): | ||
def collect(self) -> Iterable[Metric]: | ||
|
||
# @stats is a pretty-printer object with __str__() returning a nice table, | ||
# plus some fields that contain data from that table. | ||
|
@@ -524,7 +559,7 @@ def collect(self): | |
|
||
|
||
class ReactorLastSeenMetric: | ||
def collect(self): | ||
def collect(self) -> Iterable[Metric]: | ||
cm = GaugeMetricFamily( | ||
"python_twisted_reactor_last_seen", | ||
"Seconds since the Twisted reactor was last seen", | ||
|
@@ -543,9 +578,12 @@ def collect(self): | |
_last_gc = [0.0, 0.0, 0.0] | ||
|
||
|
||
def runUntilCurrentTimer(reactor, func): | ||
F = TypeVar("F", bound=Callable[..., Any]) | ||
|
||
|
||
def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
@functools.wraps(func) | ||
def f(*args, **kwargs): | ||
def f(*args: Any, **kwargs: Any) -> Any: | ||
now = reactor.seconds() | ||
num_pending = 0 | ||
|
||
|
@@ -608,7 +646,7 @@ def f(*args, **kwargs): | |
|
||
return ret | ||
|
||
return f | ||
return cast(F, f) | ||
|
||
|
||
try: | ||
|
@@ -636,5 +674,5 @@ def f(*args, **kwargs): | |
"start_http_server", | ||
"LaterGauge", | ||
"InFlightGauge", | ||
"BucketCollector", | ||
"GaugeBucketCollector", | ||
] |
Uh oh!
There was an error while loading. Please reload this page.