Skip to content

Commit 0034b2e

Browse files
authored
Merge pull request #241 from mila-iqia/refactor
Plugin refactor for diskusage.
2 parents a579275 + ef16fec commit 0034b2e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+734
-677
lines changed

config/sarc-dev.yaml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,15 @@ sarc:
2727
sacct_bin: "/opt/slurm/bin/sacct"
2828
duc_inodes_command:
2929
duc_storage_command:
30-
diskusage_report_command: beegfs-ctl --cfgFile=/etc/beegfs/home.d/beegfs-client.conf
31-
--getquota --uid $USER --csv
30+
diskusage:
31+
- name: beegfs
32+
params:
33+
config_files:
34+
default: /etc/beegfs/scratch.d/beegfs-client.conf
35+
- name: beegfs
36+
params:
37+
config_files:
38+
default: /etc/beegfs/projects.d/beegfs-client.conf
3239
prometheus_url: http://prometheus01.server.mila.quebec:9090/
3340
start_date: '2022-04-01'
3441
billing_is_gpu: true
@@ -81,7 +88,8 @@ sarc:
8188
--count /project/rrg-bengioy-ad
8289
duc_storage_command: duc ls -d /project/.duc_databases/rrg-bengioy-ad.sqlite
8390
/project/rrg-bengioy-ad
84-
diskusage_report_command: diskusage_report --project --all_users
91+
diskusage:
92+
- name: drac
8593
prometheus_url: https://mila-thanos.calculquebec.ca
8694
prometheus_headers_file: ../../SARC_secrets/secrets/drac_prometheus/headers.json
8795
start_date: '2022-04-01'

sarc/cli/acquire/storages.py

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,14 @@
11
import logging
22
from dataclasses import dataclass
3-
from typing import Callable
43

54
from simple_parsing import field
65

7-
from sarc.config import ClusterConfig, config
8-
from sarc.storage.diskusage import DiskUsage, get_diskusage_collection
9-
from sarc.storage.drac import fetch_diskusage_report as fetch_dirac_diskusage
10-
from sarc.storage.mila import fetch_diskusage_report as fetch_mila_diskusage
6+
from sarc.config import config
7+
from sarc.core.scraping.diskusage import get_diskusage_scraper
8+
from sarc.storage.diskusage import get_diskusage_collection
119

1210
logger = logging.getLogger(__name__)
1311

14-
methods: dict[str, Callable[[ClusterConfig], DiskUsage]] = {
15-
"default": fetch_dirac_diskusage,
16-
"mila": fetch_mila_diskusage,
17-
}
18-
1912

2013
@dataclass
2114
class AcquireStorages:
@@ -29,14 +22,30 @@ def execute(self) -> int:
2922
logger.info(f"Acquiring {cluster_name} storages report...")
3023

3124
cluster = cfg.clusters[cluster_name]
32-
33-
fetch_diskusage = methods.get(cluster_name, methods["default"])
34-
du = fetch_diskusage(cluster)
35-
36-
if not self.dry:
37-
collection = get_diskusage_collection()
38-
collection.add(du)
39-
else:
40-
logger.info("Document:\n" + du.model_dump_json(indent=2))
25+
diskusage_configs = cluster.diskusage
26+
if diskusage_configs is None:
27+
continue
28+
29+
# Process each diskusage configuration
30+
for diskusage_config in diskusage_configs:
31+
try:
32+
scraper = get_diskusage_scraper(diskusage_config.name)
33+
except KeyError as ke:
34+
logger.exception(
35+
"Invalid or absent diskusage scraper name: %s",
36+
diskusage_config.name,
37+
exc_info=ke,
38+
)
39+
continue
40+
41+
disk_config = scraper.validate_config(diskusage_config.params)
42+
data = scraper.get_diskusage_report(cluster.ssh, disk_config)
43+
du = scraper.parse_diskusage_report(disk_config, cluster_name, data)
44+
45+
if not self.dry:
46+
collection = get_diskusage_collection()
47+
collection.add(du)
48+
else:
49+
logger.info("Document:\n" + du.model_dump_json(indent=2))
4150

4251
return 0

sarc/config.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
from .alerts.common import HealthMonitorConfig
1919

20+
type JSON = list[JSON] | dict[str, JSON] | int | str | float | bool | None
21+
2022
if TYPE_CHECKING:
2123
from fabric import Connection
2224
from prometheus_api_client import PrometheusConnect
@@ -37,6 +39,12 @@ class ConfigurationError(Exception):
3739
pass
3840

3941

42+
@dataclass
43+
class DiskUsageConfig:
44+
name: str
45+
params: JSON = field(default_factory=dict)
46+
47+
4048
@dataclass
4149
class ClusterConfig:
4250
# pylint: disable=too-many-instance-attributes
@@ -51,7 +59,7 @@ class ClusterConfig:
5159
sshconfig: Path | None = None
5260
duc_inodes_command: str | None = None
5361
duc_storage_command: str | None = None
54-
diskusage_report_command: str | None = None
62+
diskusage: list[DiskUsageConfig] | None = None
5563
start_date: str = "2022-04-01"
5664
slurm_conf_host_path: Path = Path("/etc/slurm/slurm.conf")
5765

sarc/core/models/diskusage.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from pydantic import BaseModel, ByteSize
2+
3+
from .validators import datetime_utc
4+
5+
6+
class DiskUsageUser(BaseModel):
7+
user: str
8+
nbr_files: int
9+
size: ByteSize
10+
11+
12+
class DiskUsageGroup(BaseModel):
13+
group_name: str
14+
users: list[DiskUsageUser]
15+
16+
17+
class DiskUsage(BaseModel):
18+
"""
19+
Disk usage on a given cluster
20+
"""
21+
22+
cluster_name: str
23+
groups: list[DiskUsageGroup]
24+
timestamp: datetime_utc

sarc/core/models/validators.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from dataclasses import dataclass
2+
from datetime import UTC, datetime
3+
from typing import Annotated, Any, Callable
4+
5+
from pydantic import GetCoreSchemaHandler
6+
from pydantic_core import CoreSchema, core_schema
7+
8+
9+
@dataclass(frozen=True)
10+
class DatetimeUTCValidator:
11+
def validate_tz_utc(self, value: datetime, handler: Callable):
12+
assert value.tzinfo == UTC, "date is not in UTC time"
13+
14+
return handler(value)
15+
16+
def __get_pydantic_core_schema__(
17+
self, source_type: Any, handler: GetCoreSchemaHandler
18+
) -> CoreSchema:
19+
return core_schema.no_info_wrap_validator_function(
20+
self.validate_tz_utc, handler(source_type)
21+
)
22+
23+
24+
type datetime_utc = Annotated[datetime, DatetimeUTCValidator()]

sarc/core/scraping/diskusage.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from importlib.metadata import entry_points
2+
from typing import Any, Protocol, Type
3+
4+
from fabric import Connection
5+
from serieux import deserialize
6+
7+
from sarc.core.models.diskusage import DiskUsage
8+
9+
10+
class DiskUsageScraper[T](Protocol):
11+
config_type: Type[T]
12+
13+
def validate_config(self, config_data: Any) -> T:
14+
"""Validate the configuration"""
15+
return deserialize(self.config_type, config_data)
16+
17+
def get_diskusage_report(self, ssh: Connection, config: T) -> bytes:
18+
"""Get the raw disk usage report for caching and archiving purposes"""
19+
...
20+
21+
def parse_diskusage_report(
22+
self, config: T, cluster_name: str, data: bytes
23+
) -> DiskUsage:
24+
"""Parse previously fetched report into a DiskUsage"""
25+
...
26+
27+
28+
_builtin_scrapers: dict[str, DiskUsageScraper] = dict()
29+
_diskusage_scrapers = entry_points(group="sarc.diskusage_scraper")
30+
31+
32+
def get_diskusage_scraper(name: str) -> DiskUsageScraper:
33+
"""Raises KeyError if the name is not found"""
34+
try:
35+
return _builtin_scrapers[name]
36+
except KeyError:
37+
pass
38+
val = _diskusage_scrapers[name]
39+
return val.load()

sarc/core/utils.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from fabric import Connection
2+
3+
4+
def run_command(
5+
connection: Connection, command: str, retries: int
6+
) -> tuple[str | None, list[Exception]]:
7+
errors: list[Exception] = []
8+
9+
for _ in range(retries):
10+
try:
11+
result = connection.run(command, hide=True)
12+
return result.stdout, errors
13+
14+
# pylint: disable=broad-exception-caught
15+
except Exception as err:
16+
errors.append(err)
17+
18+
return None, errors

sarc/storage/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from . import beegfs, drac # noqa: F401 (these import are to register the modules)

sarc/storage/beegfs.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""
2+
Fetching and parsing code specific to the mila cluster
3+
"""
4+
5+
import csv
6+
import json
7+
from dataclasses import dataclass
8+
from datetime import UTC, datetime
9+
from io import StringIO
10+
from typing import cast
11+
12+
from fabric import Connection
13+
from pydantic import ByteSize
14+
15+
from sarc.client.users.api import User, get_users
16+
from sarc.core.models.diskusage import DiskUsage, DiskUsageGroup, DiskUsageUser
17+
from sarc.core.scraping.diskusage import DiskUsageScraper, _builtin_scrapers
18+
from sarc.core.utils import run_command
19+
20+
beegfs_header = "name,id,size,hard,files,hard"
21+
22+
23+
@dataclass
24+
class BeeGFSDiskUsageConfig:
25+
config_files: dict[str, str]
26+
retries: int = 3
27+
beegfs_ctl_path: str = "beegfs-ctl"
28+
29+
30+
class BeeGFSDiskUsage(DiskUsageScraper[BeeGFSDiskUsageConfig]):
31+
config_type = BeeGFSDiskUsageConfig
32+
33+
def get_diskusage_report(
34+
self, ssh: Connection, config: BeeGFSDiskUsageConfig
35+
) -> bytes:
36+
users = get_users()
37+
assert len(users) > 0
38+
39+
errors: list[Exception] = []
40+
failures: list[User] = []
41+
output: dict[str, list[str]] = {}
42+
43+
for name, file in config.config_files.items():
44+
usage: list[str] = []
45+
46+
for user in users:
47+
if not user.mila.active:
48+
continue
49+
50+
cmd = f"{config.beegfs_ctl_path} --cfgFile={file} --getquota --uid {user.mila.username} --csv"
51+
result, err = run_command(ssh, cmd, config.retries)
52+
53+
if err:
54+
errors.extend(err)
55+
56+
if result is None:
57+
failures.append(user)
58+
else:
59+
usage.append(_trim_beegfs_output(result))
60+
61+
output[name] = usage
62+
63+
return json.dumps(output).encode()
64+
65+
def parse_diskusage_report(
66+
self, config: BeeGFSDiskUsageConfig, cluster_name: str, data_raw: bytes
67+
) -> DiskUsage:
68+
groups: list[DiskUsageGroup] = []
69+
data: dict[str, str] = json.loads(data_raw.decode())
70+
71+
groups = [
72+
DiskUsageGroup(
73+
group_name=name,
74+
users=[
75+
_parse_line(line)
76+
for line in csv.reader(StringIO("\n".join(data[name])))
77+
],
78+
)
79+
for name in config.config_files.keys()
80+
]
81+
82+
return DiskUsage(
83+
cluster_name=cluster_name,
84+
groups=groups,
85+
timestamp=datetime.now(UTC),
86+
)
87+
88+
89+
# Register the scraper to make it available
90+
_builtin_scrapers["beegfs"] = BeeGFSDiskUsage()
91+
92+
93+
def _trim_beegfs_output(output: str) -> str:
94+
splitted = output.splitlines()
95+
header_index = splitted.index(beegfs_header)
96+
return "\n".join(splitted[header_index + 1 :])
97+
98+
99+
def _parse_line(line: list[str]) -> DiskUsageUser:
100+
columns = {
101+
key.strip(): value.strip() for key, value in zip(beegfs_header.split(","), line)
102+
}
103+
return DiskUsageUser(
104+
user=columns["name"],
105+
nbr_files=int(columns["files"]),
106+
size=cast(ByteSize, columns["size"]),
107+
)

0 commit comments

Comments
 (0)