Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions sdcm/sct_events/argus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright (c) 2025 ScyllaDB

import time
import logging
import threading
from typing import NewType, Dict, Any, Tuple, Optional, Callable, cast
from functools import partial
from collections import defaultdict

from argus.common.sct_types import RawEventPayload
from sdcm.sct_events.events_processes import \
EVENTS_ARGUS_ANNOTATOR_ID, EVENTS_ARGUS_AGGREGATOR_ID, EVENTS_ARGUS_POSTMAN_ID, \
EventsProcessesRegistry, BaseEventsProcess, EventsProcessPipe, \
start_events_process, get_events_process, verbose_suppress
from sdcm.utils.argus import Argus


ARGUS_EVENT_AGGREGATOR_TIME_WINDOW: float = 90 # seconds
LOGGER = logging.getLogger(__name__)


SCTArgusEvent = NewType("SCTArgusEvent", RawEventPayload)
SCTArgusEventKey = NewType("SCTArgusEventKey", Tuple[str, ...])


class ArgusEventCollector(EventsProcessPipe[Tuple[str, Any], SCTArgusEvent]):
def run(self) -> None:
if Argus.get() and (client := Argus.get().client):
run_id = client.run_id
else:
run_id = None
for event_tuple in self.inbound_events():
with verbose_suppress("ArgusEventCollector failed to process %s", event_tuple):
event_class, event = event_tuple # try to unpack event from EventsDevice
if not event.publish_to_argus:
continue
evt = SCTArgusEvent({
"run_id": run_id,
"severity": event.severity.name,
"ts": event.timestamp,
"duration": getattr(event, "duration", None),
"event_type": event_class,
"message": str(event),
"known_issue": getattr(event, "known_issue", None),
"nemesis_name": getattr(event, "nemesis_name", None),
"nemesis_status": getattr(event, "nemesis_status", None),
"node": getattr(event, "node", None),
"received_timestamp": getattr(event, "received_timestamp", None),
"target_node": getattr(event, "target_node", None),
})
self.outbound_queue.put(evt)


class ArgusEventAggregator(EventsProcessPipe[SCTArgusEvent, SCTArgusEvent]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, I dislike this algorithm: it delays event by time_window and in case of burst of 1000 events it will send anyway 200 of them (with 5 max duplicates set).

I think it would be better to fire off event straight away and for some time period silence the other events with the same event_key.
Something like this (needs testing with e.g. unit test):

event_key = self.unique_key(event)
if time.perf_counter() - time_window_counters.get(event_key, 0) > TIME_WINDOW  # time window e.g. 30 seconds
    # not seen event from sometime or ever 
    time_window_counters[event_key] = time.perf_counter()
else:
        # recently seen
        continue

This solution will bring some memory burden (proportional to no of distinct event_keys) - but assuming even 1M events, should fit memory anyway.

@k0machi can you try this approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, sounds better, I'll try it (probably should also figure out a way to add argus events to unit_tests)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see how other events devices are tested

inbound_events_process = EVENTS_ARGUS_ANNOTATOR_ID
time_window = ARGUS_EVENT_AGGREGATOR_TIME_WINDOW

def run(self) -> None:
time_window_counters: Dict[SCTArgusEventKey, int] = defaultdict(int)

for event in self.inbound_events():
with verbose_suppress("ArgusEventAggregator failed to process an event %s", event):
event_key = self.unique_key(event)
if time.perf_counter() - time_window_counters.get(event_key, 0) > ARGUS_EVENT_AGGREGATOR_TIME_WINDOW:
# not seen event from sometime or ever
time_window_counters[event_key] = time.perf_counter()
else:
# recently seen
continue

# Put the event to the posting queue.
LOGGER.debug("Event moving to posting queue: %s", event)
self.outbound_queue.put(event)

@staticmethod
def unique_key(event: SCTArgusEvent) -> SCTArgusEventKey:
return SCTArgusEventKey(tuple([event["run_id"], event["severity"], event["event_type"]]))


class ArgusEventPostman(BaseEventsProcess[SCTArgusEvent, None], threading.Thread):
inbound_events_process = EVENTS_ARGUS_AGGREGATOR_ID

def __init__(self, _registry: EventsProcessesRegistry):
self.enabled = threading.Event()
self._argus_client = None
super().__init__(_registry=_registry)

def run(self) -> None:
self.enabled.wait()

for event in self.inbound_events(): # events from ArgusAggregator
with verbose_suppress("ArgusEventPostman failed to post an event to '%s' "
"endpoint.\nEvent: %s", self._argus_client.Routes.SUBMIT_EVENT, event):
if self._argus_client:
self._argus_client.submit_event(event)

def enable_argus_posting(self) -> None:
self._argus_client = Argus.get().client

def start_posting_argus_events(self):
self.enabled.set()

def terminate(self) -> None:
super().terminate()
self.enabled.set()


start_argus_event_collector = partial(start_events_process, EVENTS_ARGUS_ANNOTATOR_ID, ArgusEventCollector)
start_argus_aggregator = partial(start_events_process, EVENTS_ARGUS_AGGREGATOR_ID, ArgusEventAggregator)
start_argus_postman = partial(start_events_process, EVENTS_ARGUS_POSTMAN_ID, ArgusEventPostman)
get_argus_postman = cast(Callable[..., ArgusEventPostman], partial(get_events_process, EVENTS_ARGUS_POSTMAN_ID))


def start_argus_pipeline(_registry: Optional[EventsProcessesRegistry] = None) -> None:
start_argus_event_collector(_registry=_registry)
start_argus_aggregator(_registry=_registry)
start_argus_postman(_registry=_registry)


def enable_argus_posting(_registry: Optional[EventsProcessesRegistry] = None) -> None:
get_argus_postman(_registry=_registry).enable_argus_posting()


def start_posting_argus_events(_registry: Optional[EventsProcessesRegistry] = None) -> None:
get_argus_postman(_registry=_registry).start_posting_argus_events()


__all__ = ("start_argus_pipeline", "enable_argus_posting", "start_posting_argus_events")
1 change: 1 addition & 0 deletions sdcm/sct_events/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class SctEvent:

_ready_to_publish: bool = False # set it to True in __init__() and to False in publish() to prevent double-publish
publish_to_grafana: bool = True
publish_to_argus: bool = True
save_to_files: bool = True

def __init_subclass__(cls, abstract: bool = False):
Expand Down
3 changes: 3 additions & 0 deletions sdcm/sct_events/events_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
EVENTS_GRAFANA_ANNOTATOR_ID = "EVENTS_GRAFANA_ANNOTATOR"
EVENTS_GRAFANA_AGGREGATOR_ID = "EVENTS_GRAFANA_AGGREGATOR"
EVENTS_GRAFANA_POSTMAN_ID = "EVENTS_GRAFANA_POSTMAN"
EVENTS_ARGUS_ANNOTATOR_ID = "EVENTS_ARGUS_ANNOTATOR"
EVENTS_ARGUS_AGGREGATOR_ID = "EVENTS_ARGUS_AGGREGATOR"
EVENTS_ARGUS_POSTMAN_ID = "EVENTS_ARGUS_POSTMAN"
EVENTS_ANALYZER_ID = "EVENTS_ANALYZER"
EVENTS_HANDLER_ID = "EVENTS_HANDLER"
EVENTS_COUNTER_ID = "EVENTS_COUNTER_ID"
Expand Down
7 changes: 6 additions & 1 deletion sdcm/sct_events/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from sdcm.sct_config import SCTConfiguration
from sdcm.sct_events import Severity
from sdcm.sct_events.argus import start_argus_pipeline
from sdcm.sct_events.event_handler import start_events_handler
from sdcm.sct_events.grafana import start_grafana_pipeline
from sdcm.sct_events.filters import DbEventsFilter, EventsSeverityChangerFilter
Expand All @@ -29,7 +30,7 @@
from sdcm.sct_events.events_analyzer import start_events_analyzer
from sdcm.sct_events.event_counter import start_events_counter
from sdcm.sct_events.events_processes import \
EVENTS_MAIN_DEVICE_ID, EVENTS_FILE_LOGGER_ID, EVENTS_ANALYZER_ID, \
EVENTS_ARGUS_AGGREGATOR_ID, EVENTS_ARGUS_ANNOTATOR_ID, EVENTS_ARGUS_POSTMAN_ID, EVENTS_MAIN_DEVICE_ID, EVENTS_FILE_LOGGER_ID, EVENTS_ANALYZER_ID, \
EVENTS_GRAFANA_ANNOTATOR_ID, EVENTS_GRAFANA_AGGREGATOR_ID, EVENTS_GRAFANA_POSTMAN_ID, \
EventsProcessesRegistry, create_default_events_process_registry, get_events_process, EVENTS_HANDLER_ID, EVENTS_COUNTER_ID
from sdcm.utils.issues import SkipPerIssues
Expand All @@ -55,6 +56,7 @@ def start_events_device(log_dir: Optional[Union[str, Path]] = None,

start_events_logger(_registry=_registry)
start_grafana_pipeline(_registry=_registry)
start_argus_pipeline(_registry=_registry)
start_events_analyzer(_registry=_registry)
start_events_handler(_registry=_registry)
start_events_counter(_registry=_registry)
Expand All @@ -69,6 +71,9 @@ def stop_events_device(_registry: Optional[EventsProcessesRegistry] = None) -> N
EVENTS_GRAFANA_ANNOTATOR_ID,
EVENTS_GRAFANA_AGGREGATOR_ID,
EVENTS_GRAFANA_POSTMAN_ID,
EVENTS_ARGUS_ANNOTATOR_ID,
EVENTS_ARGUS_AGGREGATOR_ID,
EVENTS_ARGUS_POSTMAN_ID,
EVENTS_COUNTER_ID,
EVENTS_HANDLER_ID,
EVENTS_ANALYZER_ID,
Expand Down
7 changes: 7 additions & 0 deletions sdcm/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from sdcm.keystore import KeyStore
from sdcm.provision.common.configuration_script import ConfigurationScriptBuilder
from sdcm.sct_events import Severity
from sdcm.sct_events.argus import enable_argus_posting, start_posting_argus_events
from sdcm.sct_events.system import TestFrameworkEvent
from sdcm.utils.argus import ArgusError, get_argus_client
from sdcm.utils.ci_tools import get_job_name
Expand Down Expand Up @@ -294,9 +295,15 @@ def init_argus_client(cls, params: dict, test_id: str | None = None):
LOGGER.info("Initializing Argus connection...")
try:
cls._argus_client = get_argus_client(run_id=cls.test_id() if not test_id else test_id)
enable_argus_posting()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should start it along with other devices in sdcm.sct_events.setup.start_events_device
so we're not starting it before main events device is there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't - this is to release the lock on the poster, which would either be started or not already by main events device - the reason this exception handler is there is to guard against situations where events device isn't initialized, for example inside the create_argus_run cli command

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_events_device also is not run when create_argus_run is fired - is it?
We start other devices there - so this place looks natural.
Also this enable_argus_posting would be redundant then

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are currently 3 steps that need to be taken to enable argus events:

  • Events device, done in start_events_device, which initializes the argus events pipeline
  • enable_posting_posting, which sets the client for the postman
  • start_posting_argus_events, which releases the lock on the postman to start submitting the events

Latter two can be combined into one, but the split here is warranted as test_config logic decides whether or not argus is enabled and does not enable argus events if they aren't, the pipeline in this case just idles - same as grafana.

start_posting_argus_events()
return
except ArgusError as exc:
LOGGER.warning("Failed to initialize argus client: %s", exc.message)
except RuntimeError as exc:
LOGGER.warning("Skipping setting up argus events: %s", exc)
return

TestFrameworkEvent(
source=cls.__name__,
source_method='init_argus_client',
Expand Down
32 changes: 31 additions & 1 deletion sdcm/utils/argus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import re
import os
from pathlib import Path
import threading
from typing import Optional
from uuid import UUID

from argus.client.sct.client import ArgusSCTClient
Expand All @@ -15,6 +17,31 @@
LOGGER = logging.getLogger(__name__)


class Argus:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need that? can we create argus client when enabling posting in ArgusEventPostman?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need that? can we create argus client when enabling posting in ArgusEventPostman?

This means having to instantiate SCTConfiguration (circular dependency) or reading env vars and then also having to manage logic for when argus isn't available (which again, necessitates importing SCTConfiguration), where this singleton class should help other parts of SCT to get already initialized argus client without either resolving the dependency chain or having to manually initialize it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have TestConfig class which should have been initialized when trying to send event to Argus.
Can we use sdcm.test_config.TestConfig.argus_client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circular dependency as well here as importing TestConfig into argus.py means we can't import enablement functions into the test config (NOT the initialization function, those are imported into the events setup.py).

INSTANCE: Optional[ArgusSCTClient] = None
INIT_DONE = threading.Event()

def __init__(self, client: ArgusSCTClient):
self._client = client

@classmethod
def init_global(cls, client: ArgusSCTClient):
if cls.INIT_DONE.is_set():
return
cls.INSTANCE = cls(client)
cls.INIT_DONE.set()

@classmethod
def get(cls, init_default=False) -> 'Argus':
if init_default and not cls.INIT_DONE.is_set():
cls.init_global(get_argus_client(run_id=os.environ.get("SCT_TEST_ID"), init_global=False))
return cls.INSTANCE

@property
def client(self) -> ArgusSCTClient:
return self._client


class ArgusError(Exception):

def __init__(self, message: str, *args: list) -> None:
Expand All @@ -37,13 +64,16 @@ def is_uuid(uuid) -> bool:
return False


def get_argus_client(run_id: UUID | str) -> ArgusSCTClient:
def get_argus_client(run_id: UUID | str, init_global=True) -> ArgusSCTClient:
if not is_uuid(run_id):
raise ArgusError("Malformed UUID provided")
creds = KeyStore().get_argus_rest_credentials()
argus_client = ArgusSCTClient(
run_id=run_id, auth_token=creds["token"], base_url=creds["baseUrl"], extra_headers=creds.get("extra_headers"))

if init_global:
Argus.init_global(argus_client)

return argus_client


Expand Down