-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Make inflight background metrics more efficient. #7597
Changes from 3 commits
0a7091e
5b5a6a5
60783b3
ae630f3
fa5d4bf
faee73c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix metrics failing when there is a large number of active background processes. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,16 +17,18 @@ | |
| import threading | ||
| from asyncio import iscoroutine | ||
| from functools import wraps | ||
| from typing import Dict, Set | ||
| from typing import TYPE_CHECKING, Dict, Optional, Set | ||
|
|
||
| import six | ||
|
|
||
| from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily | ||
| from prometheus_client.core import REGISTRY, Counter, Gauge | ||
|
|
||
| from twisted.internet import defer | ||
|
|
||
| from synapse.logging.context import LoggingContext, PreserveLoggingContext | ||
|
|
||
| if TYPE_CHECKING: | ||
| import resource | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
|
|
@@ -36,6 +38,12 @@ | |
| ["name"], | ||
| ) | ||
|
|
||
| _background_process_in_flight_count = Gauge( | ||
| "synapse_background_process_in_flight_count", | ||
| "Number of background processes in flight", | ||
| labelnames=["name"], | ||
| ) | ||
|
|
||
| # we set registry=None in all of these to stop them getting registered with | ||
| # the default registry. Instead we collect them all via the CustomCollector, | ||
| # which ensures that we can update them before they are collected. | ||
|
|
@@ -83,11 +91,14 @@ | |
| # it's much simpler to do so than to try to combine them.) | ||
| _background_process_counts = {} # type: Dict[str, int] | ||
|
|
||
| # map from description to the currently running background processes. | ||
| # Set of all running background processes that have been active since the last | ||
| # time metrics were scraped. | ||
| # | ||
| # it's kept as a dict of sets rather than a big set so that we can keep track | ||
| # of process descriptions that no longer have any active processes. | ||
| _background_processes = {} # type: Dict[str, Set[_BackgroundProcess]] | ||
| # We do it like this to handle the case where we have a large number of | ||
| # background processes stacking up behind a lock or linearizer, where we then | ||
| # only need to iterate over and update metrics for the process that have | ||
| # actually been active and can ignore the idle ones. | ||
| _background_processes_active_since_last_scrape = set() # type: Set[_BackgroundProcess] | ||
|
|
||
| # A lock that covers the above dicts | ||
| _bg_metrics_lock = threading.Lock() | ||
|
|
@@ -101,25 +112,16 @@ class _Collector(object): | |
| """ | ||
|
|
||
| def collect(self): | ||
| background_process_in_flight_count = GaugeMetricFamily( | ||
| "synapse_background_process_in_flight_count", | ||
| "Number of background processes in flight", | ||
| labels=["name"], | ||
| ) | ||
| global _background_processes_active_since_last_scrape | ||
|
|
||
| # We copy the dict so that it doesn't change from underneath us. | ||
| # We also copy the process lists as that can also change | ||
| # We swap out the _background_processes set with an empty one so that | ||
| # we can safely iterate over the set without holding the lock. | ||
| with _bg_metrics_lock: | ||
| _background_processes_copy = { | ||
| k: list(v) for k, v in six.iteritems(_background_processes) | ||
| } | ||
| _background_processes_copy = _background_processes_active_since_last_scrape | ||
| _background_processes_active_since_last_scrape = set() | ||
|
|
||
| for desc, processes in six.iteritems(_background_processes_copy): | ||
| background_process_in_flight_count.add_metric((desc,), len(processes)) | ||
| for process in processes: | ||
| process.update_metrics() | ||
|
|
||
| yield background_process_in_flight_count | ||
| for process in _background_processes_copy: | ||
| process.update_metrics() | ||
|
|
||
| # now we need to run collect() over each of the static Counters, and | ||
| # yield each metric they return. | ||
|
|
@@ -191,13 +193,10 @@ def run(): | |
| _background_process_counts[desc] = count + 1 | ||
|
|
||
| _background_process_start_count.labels(desc).inc() | ||
| _background_process_in_flight_count.labels(desc).inc() | ||
|
|
||
| with LoggingContext(desc) as context: | ||
| with BackgroundProcessLoggingContext(desc) as context: | ||
| context.request = "%s-%i" % (desc, count) | ||
| proc = _BackgroundProcess(desc, context) | ||
|
|
||
| with _bg_metrics_lock: | ||
| _background_processes.setdefault(desc, set()).add(proc) | ||
|
|
||
| try: | ||
| result = func(*args, **kwargs) | ||
|
|
@@ -214,10 +213,7 @@ def run(): | |
| except Exception: | ||
| logger.exception("Background process '%s' threw an exception", desc) | ||
| finally: | ||
| proc.update_metrics() | ||
|
|
||
| with _bg_metrics_lock: | ||
| _background_processes[desc].remove(proc) | ||
| _background_process_in_flight_count.labels(desc).dec() | ||
anoadragon453 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| with PreserveLoggingContext(): | ||
| return run() | ||
|
|
@@ -238,3 +234,42 @@ def wrap_as_background_process_inner_2(*args, **kwargs): | |
| return wrap_as_background_process_inner_2 | ||
|
|
||
| return wrap_as_background_process_inner | ||
|
|
||
|
|
||
| class BackgroundProcessLoggingContext(LoggingContext): | ||
| """A logging context that tracks in flight metrics for background | ||
| processes. | ||
| """ | ||
|
|
||
| __slots__ = ["_proc"] | ||
|
|
||
| def __init__(self, name: str): | ||
| super().__init__(name) | ||
|
|
||
| self._proc = _BackgroundProcess(name, self) | ||
|
|
||
| def start(self, rusage: "Optional[resource._RUsage]"): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So does this run on every batch, or just when the process stops (runs out of things to process) and then starts up again?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The naming is a bit confusing here. Basically
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, and "entering the context" occurs each time a background processes starts processing. So
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, exactly
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a quick comment to where |
||
| """Log context has started running (again). | ||
| """ | ||
|
|
||
| super().start(rusage) | ||
|
|
||
| # We've become active again so we make sure we're in the list of active | ||
| # procs. (Note that "start" here means we've become active, as opposed | ||
| # to starting for the first time.) | ||
| with _bg_metrics_lock: | ||
| _background_processes_active_since_last_scrape.add(self._proc) | ||
|
|
||
| def __exit__(self, type, value, traceback) -> None: | ||
| """Log context has finished. | ||
| """ | ||
|
|
||
| super().__exit__(type, value, traceback) | ||
|
|
||
| # The background process has finished. We explictly remove and manually | ||
| # update the metrics here so that if nothing is scraping metrics the set | ||
| # doesn't infinitely grow. | ||
| with _bg_metrics_lock: | ||
| _background_processes_active_since_last_scrape.discard(self._proc) | ||
|
|
||
| self._proc.update_metrics() | ||
Uh oh!
There was an error while loading. Please reload this page.