|
| 1 | +# This program is free software; you can redistribute it and/or modify |
| 2 | +# it under the terms of the GNU Affero General Public License as published by |
| 3 | +# the Free Software Foundation; either version 3 of the License, or |
| 4 | +# (at your option) any later version. |
| 5 | +# |
| 6 | +# This program is distributed in the hope that it will be useful, |
| 7 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 8 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 9 | +# |
| 10 | +# See LICENSE for more details. |
| 11 | +# |
| 12 | +# Copyright (c) 2025 ScyllaDB |
| 13 | + |
| 14 | +import time |
| 15 | +import logging |
| 16 | +import threading |
| 17 | +from typing import NewType, Dict, Any, Tuple, Optional, Callable, cast |
| 18 | +from functools import partial |
| 19 | +from collections import defaultdict |
| 20 | + |
| 21 | +from argus.common.sct_types import RawEventPayload |
| 22 | +from sdcm.sct_events.events_processes import \ |
| 23 | + EVENTS_ARGUS_ANNOTATOR_ID, EVENTS_ARGUS_AGGREGATOR_ID, EVENTS_ARGUS_POSTMAN_ID, \ |
| 24 | + EventsProcessesRegistry, BaseEventsProcess, EventsProcessPipe, \ |
| 25 | + start_events_process, get_events_process, verbose_suppress |
| 26 | +from sdcm.utils.argus import Argus |
| 27 | + |
| 28 | + |
| 29 | +ARGUS_EVENT_AGGREGATOR_TIME_WINDOW: float = 90 # seconds |
| 30 | +ARGUS_EVENT_AGGREGATOR_MAX_DUPLICATES: int = 5 |
| 31 | +ARGUS_EVENT_AGGREGATOR_QUEUE_WAIT_TIMEOUT: float = 1 # seconds |
| 32 | + |
| 33 | +LOGGER = logging.getLogger(__name__) |
| 34 | + |
| 35 | + |
| 36 | +SCTArgusEvent = NewType("SCTArgusEvent", RawEventPayload) |
| 37 | +SCTArgusEventKey = NewType("SCTArgusEventKey", Tuple[str, ...]) |
| 38 | + |
| 39 | + |
| 40 | +class ArgusEventCollector(EventsProcessPipe[Tuple[str, Any], SCTArgusEvent]): |
| 41 | + def run(self) -> None: |
| 42 | + client = Argus.get().client |
| 43 | + for event_tuple in self.inbound_events(): |
| 44 | + with verbose_suppress("ArgusEventCollector failed to process %s", event_tuple): |
| 45 | + event_class, event = event_tuple # try to unpack event from EventsDevice |
| 46 | + if not event.publish_to_argus: |
| 47 | + continue |
| 48 | + if not client: |
| 49 | + continue |
| 50 | + evt = SCTArgusEvent({ |
| 51 | + "run_id": client.run_id, |
| 52 | + "severity": event.severity.name, |
| 53 | + "ts": event.timestamp, |
| 54 | + "duration": getattr(event, "duration", None), |
| 55 | + "event_type": event_class, |
| 56 | + "message": str(event), |
| 57 | + "known_issue": getattr(event, "known_issue", None), |
| 58 | + "nemesis_name": getattr(event, "nemesis_name", None), |
| 59 | + "nemesis_status": getattr(event, "nemesis_status", None), |
| 60 | + "node": getattr(event, "node", None), |
| 61 | + "received_timestamp": getattr(event, "received_timestamp", None), |
| 62 | + "target_node": getattr(event, "target_node", None), |
| 63 | + }) |
| 64 | + self.outbound_queue.put(evt) |
| 65 | + |
| 66 | + |
| 67 | +class ArgusEventAggregator(EventsProcessPipe[SCTArgusEvent, SCTArgusEvent]): |
| 68 | + inbound_events_process = EVENTS_ARGUS_ANNOTATOR_ID |
| 69 | + time_window = ARGUS_EVENT_AGGREGATOR_TIME_WINDOW |
| 70 | + max_duplicates = ARGUS_EVENT_AGGREGATOR_MAX_DUPLICATES |
| 71 | + |
| 72 | + def run(self) -> None: |
| 73 | + time_window_counters: Dict[SCTArgusEventKey, int] = defaultdict(int) |
| 74 | + time_window_end = time.perf_counter() |
| 75 | + |
| 76 | + for event in self.inbound_events(): |
| 77 | + with verbose_suppress("ArgusEventAggregator failed to process an event %s", event): |
| 78 | + event_key = self.unique_key(event) |
| 79 | + time_diff = time.perf_counter() - time_window_end |
| 80 | + |
| 81 | + # The current time window expired. |
| 82 | + if time_diff > 0: |
| 83 | + time_window_counters.clear() |
| 84 | + |
| 85 | + # It can be more than one time window expired since last event seen. |
| 86 | + time_window_end += (time_diff // self.time_window + 1) * self.time_window |
| 87 | + |
| 88 | + time_window_counters[event_key] += 1 |
| 89 | + if time_window_counters[event_key] > self.max_duplicates: |
| 90 | + continue |
| 91 | + |
| 92 | + # Put the event to the posting queue. |
| 93 | + LOGGER.debug("Event moving to posting queue: %s", event) |
| 94 | + self.outbound_queue.put(event) |
| 95 | + |
| 96 | + @staticmethod |
| 97 | + def unique_key(event: SCTArgusEvent) -> SCTArgusEventKey: |
| 98 | + return SCTArgusEventKey(tuple([event["run_id"], event["severity"], event["event_type"]])) |
| 99 | + |
| 100 | + |
| 101 | +class ArgusEventPostman(BaseEventsProcess[SCTArgusEvent, None], threading.Thread): |
| 102 | + inbound_events_process = EVENTS_ARGUS_AGGREGATOR_ID |
| 103 | + |
| 104 | + def __init__(self, _registry: EventsProcessesRegistry): |
| 105 | + self.enabled = threading.Event() |
| 106 | + self._argus_client = None |
| 107 | + super().__init__(_registry=_registry) |
| 108 | + |
| 109 | + def run(self) -> None: |
| 110 | + self.enabled.wait() |
| 111 | + |
| 112 | + for event in self.inbound_events(): # events from ArgusAggregator |
| 113 | + with verbose_suppress("ArgusEventPostman failed to post an event to '%s' " |
| 114 | + "endpoint.\nEvent: %s", self._argus_client.Routes.SUBMIT_EVENT, event): |
| 115 | + if self._argus_client: |
| 116 | + self._argus_client.submit_event(event) |
| 117 | + |
| 118 | + def enable_argus_posting(self) -> None: |
| 119 | + self._argus_client = Argus.get().client |
| 120 | + |
| 121 | + def start_posting_argus_events(self): |
| 122 | + self.enabled.set() |
| 123 | + |
| 124 | + def terminate(self) -> None: |
| 125 | + super().terminate() |
| 126 | + self.enabled.set() |
| 127 | + |
| 128 | + |
| 129 | +start_argus_event_collector = partial(start_events_process, EVENTS_ARGUS_ANNOTATOR_ID, ArgusEventCollector) |
| 130 | +start_argus_aggregator = partial(start_events_process, EVENTS_ARGUS_AGGREGATOR_ID, ArgusEventAggregator) |
| 131 | +start_argus_postman = partial(start_events_process, EVENTS_ARGUS_POSTMAN_ID, ArgusEventPostman) |
| 132 | +get_argus_postman = cast(Callable[..., ArgusEventPostman], partial(get_events_process, EVENTS_ARGUS_POSTMAN_ID)) |
| 133 | + |
| 134 | + |
| 135 | +def start_argus_pipeline(_registry: Optional[EventsProcessesRegistry] = None) -> None: |
| 136 | + start_argus_event_collector(_registry=_registry) |
| 137 | + start_argus_aggregator(_registry=_registry) |
| 138 | + start_argus_postman(_registry=_registry) |
| 139 | + |
| 140 | + |
| 141 | +def enable_argus_posting(_registry: Optional[EventsProcessesRegistry] = None) -> None: |
| 142 | + get_argus_postman(_registry=_registry).enable_argus_posting() |
| 143 | + |
| 144 | + |
| 145 | +def start_posting_argus_events(_registry: Optional[EventsProcessesRegistry] = None) -> None: |
| 146 | + get_argus_postman(_registry=_registry).start_posting_argus_events() |
| 147 | + |
| 148 | + |
| 149 | +__all__ = ("start_argus_pipeline", "enable_argus_posting", "start_posting_argus_events") |
0 commit comments