Skip to content

Commit 8a4566b

Browse files
voorloopnulWilliangalvani
authored andcommitted
core: create Major Tom
1 parent bbb9c13 commit 8a4566b

File tree

9 files changed

+311
-0
lines changed

9 files changed

+311
-0
lines changed

core/services/install-services.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ SERVICES=(
3535
ping
3636
versionchooser
3737
wifi
38+
major_tom
3839
)
3940

4041
# We need to install loguru and appdirs since they may be used inside setup.py

core/services/major_tom/major_tom.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#! /usr/bin/env python3
2+
import datetime
3+
import time
4+
import uuid
5+
from dataclasses import asdict, dataclass
6+
from typing import Any, Dict, Optional
7+
from zoneinfo import ZoneInfo
8+
9+
from src.core import DefaultPayload, TelemetryEngine, get_latency
10+
from src.metrics import Metrics
11+
from src.typedefs import ExtensionInfo, VersionInfo
12+
13+
LOG_SESSION_UUID = str(uuid.uuid4())
14+
15+
SERVICE_NAME = "major_tom"
16+
LOG_FOLDER_PATH = f"/var/logs/blueos/services/{SERVICE_NAME}/"
17+
18+
TELEMETRY_ENDPOINT = "https://telemetry.blueos.cloud/api/v1/anonymous/"
19+
S3_TELEMETRY_ENDPOINT = "https://telemetry.blueos.cloud/api/v1/anonymous/s3/"
20+
21+
22+
def get_hardware_uuid() -> str:
23+
try:
24+
with open("/etc/blueos/hardware-uuid", "r", encoding="utf-8") as f:
25+
return f.read().strip()
26+
except Exception as error:
27+
print(f"unable to read machine id: {error}")
28+
return ""
29+
30+
31+
def get_blueos_uuid() -> str:
32+
try:
33+
with open("/etc/blueos/uuid", "r", encoding="utf-8") as f:
34+
return f.read().strip()
35+
except Exception as error:
36+
print(f"unable to read blueos id: {error}")
37+
return ""
38+
39+
40+
# pylint: disable=too-many-instance-attributes
41+
@dataclass
42+
class AnonymousTelemetryRecord:
43+
uptime: float
44+
latency: float
45+
memory_size: int
46+
memory_usage: int
47+
disk_size: int
48+
disk_usage: int
49+
extensions: Optional[list[ExtensionInfo]]
50+
blueos_version: Optional[VersionInfo]
51+
probe_time: float
52+
53+
def json(self) -> dict[str, Any]:
54+
return asdict(self)
55+
56+
57+
def compose_default_record(order: int) -> Dict[str, Any]:
58+
date_time_utc = datetime.datetime.now(ZoneInfo("UTC")).isoformat()
59+
payload = DefaultPayload(
60+
log_session_uuid=LOG_SESSION_UUID,
61+
order=order,
62+
timestamp=date_time_utc,
63+
hardware_id=get_hardware_uuid(),
64+
blueos_id=get_blueos_uuid(),
65+
data={},
66+
)
67+
68+
start_probing = time.time()
69+
metrics = Metrics()
70+
record = AnonymousTelemetryRecord(
71+
time.clock_gettime(time.CLOCK_BOOTTIME),
72+
get_latency(),
73+
metrics.memory.total,
74+
metrics.memory.used,
75+
metrics.disk.total,
76+
metrics.disk.used,
77+
metrics.installed_extensions,
78+
metrics.installed_version,
79+
0,
80+
)
81+
record.probe_time = time.time() - start_probing
82+
payload.data = record.json()
83+
return payload.json()
84+
85+
86+
if __name__ == "__main__":
87+
TelemetryEngine(
88+
label="anonymous", # used to tag telemetry type. we may have non-anonymous telemetry in the future
89+
endpoint=TELEMETRY_ENDPOINT,
90+
s3_endpoint=S3_TELEMETRY_ENDPOINT,
91+
create_record=compose_default_record,
92+
interval=60 * 5, # 5 minutes
93+
max_file_size=1024 * 1024, # 1Mb
94+
max_file_retention=10,
95+
buffer_folder=LOG_FOLDER_PATH,
96+
)()
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
psutil==5.9.5
2+
requests==2.31.0
3+
speedtest-cli==2.1.3
4+
Flask==2.3.2
5+
py-machineid==0.4.3
6+
loguru==0.7.0

core/services/major_tom/setup.py

Whitespace-only changes.

core/services/major_tom/src/__init__.py

Whitespace-only changes.

core/services/major_tom/src/core.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import copy
2+
import gzip
3+
import json
4+
import os
5+
import shutil
6+
import time
7+
from dataclasses import asdict, dataclass
8+
from typing import Any, Callable, Dict, List
9+
10+
import loguru
11+
import requests
12+
import speedtest
13+
14+
loguru.logger.remove()
15+
16+
17+
def formatter(record: "loguru.Record") -> str:
18+
# Note this function returns the string to be formatted, not the actual message to be logged
19+
record["extra"]["serialized"] = json.dumps(record["message"])
20+
return "{extra[serialized]}\n"
21+
22+
23+
def is_online() -> bool:
24+
return get_latency() > 0
25+
26+
27+
def get_latency() -> float:
28+
try:
29+
servers: List[str] = []
30+
st = speedtest.Speedtest()
31+
st.get_servers(servers)
32+
best_server = st.get_best_server()
33+
ping = best_server["latency"]
34+
return float(ping)
35+
except Exception:
36+
return -1.0
37+
38+
39+
@dataclass
40+
class DefaultPayload:
41+
log_session_uuid: str
42+
order: int
43+
timestamp: str
44+
hardware_id: str
45+
blueos_id: str
46+
data: dict[str, Any]
47+
48+
def json(self) -> dict[str, Any]:
49+
return asdict(self)
50+
51+
52+
class TelemetryEngine:
53+
# pylint: disable=too-many-arguments
54+
def __init__(
55+
self,
56+
label: str,
57+
endpoint: str,
58+
s3_endpoint: str,
59+
create_record: Callable[[Any], Any],
60+
interval: float,
61+
max_file_size: int,
62+
max_file_retention: int,
63+
buffer_folder: str,
64+
):
65+
self.buffer_file = f"{buffer_folder}/{label}_usage.log"
66+
self.buffer_folder = buffer_folder
67+
68+
self.telemetry_endpoint = endpoint
69+
self.telemetry_s3_endpoint = s3_endpoint
70+
self.create_record = create_record
71+
self.interval = interval
72+
73+
self.buffer = copy.deepcopy(loguru.logger)
74+
self.buffer.add(
75+
self.buffer_file,
76+
rotation=max_file_size,
77+
retention=max_file_retention,
78+
format=formatter,
79+
compression="gz",
80+
)
81+
82+
def __call__(self) -> None:
83+
order = 0
84+
while True:
85+
order += 1
86+
record = self.create_record(order)
87+
if self.save(record) == "online":
88+
self.process_buffered_records()
89+
time.sleep(self.interval)
90+
91+
def upload_file(self, file: str) -> bool:
92+
"""
93+
This method requests to telemetry API a presigned url and upload the local archived files.
94+
"""
95+
print(f"uploading file... {file}")
96+
try:
97+
response = requests.get(self.telemetry_s3_endpoint, timeout=5).json()
98+
with open(file, "rb") as fh:
99+
files = {"file": (file, fh)}
100+
r = requests.post(response["url"], data=response["fields"], files=files, timeout=300)
101+
if r.status_code == 204:
102+
print("[Success!]")
103+
return True
104+
except Exception:
105+
pass
106+
107+
return False
108+
109+
def process_buffered_records(self) -> None:
110+
"""
111+
Check in the buffered folder if there are archived logs to upload. If the agent connects before an archive
112+
is created it will also archive the current buffer file and upload it.
113+
"""
114+
for file in os.listdir(self.buffer_folder):
115+
file_path = os.path.join(self.buffer_folder, file)
116+
117+
# Upload regular archive
118+
if file_path.endswith(".log.gz"):
119+
if self.upload_file(file_path):
120+
os.remove(file_path)
121+
122+
# Archive current buffer and upload it
123+
if file_path.endswith(".log") and os.path.getsize(file_path):
124+
timestamp = int(time.time())
125+
tmp_name = self.buffer_file.replace(".log", f".{timestamp}.log.gz")
126+
with open(self.buffer_file, "rb") as f_in:
127+
with gzip.open(tmp_name, "wb") as f_out:
128+
shutil.copyfileobj(f_in, f_out)
129+
if self.upload_file(tmp_name):
130+
os.remove(tmp_name)
131+
with open(self.buffer_file, "w", encoding="utf-8"):
132+
# create new empty file if not there
133+
pass
134+
135+
def save(self, record: Dict[str, Any]) -> str:
136+
"""
137+
Try to POST the telemetry payload, if it fails for any reason, we buffer it locally.
138+
"""
139+
try:
140+
r = requests.post(self.telemetry_endpoint, json=record, timeout=5)
141+
if r.status_code == 201:
142+
return "online"
143+
except Exception as err:
144+
print(err)
145+
146+
self.buffer.info(record)
147+
return "offline"
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from functools import cached_property
2+
from typing import List, Optional
3+
4+
import psutil
5+
import requests
6+
7+
from src.typedefs import ExtensionInfo, VersionInfo
8+
9+
10+
class Metrics:
11+
@cached_property
12+
def installed_extensions(self) -> Optional[List[ExtensionInfo]]:
13+
try:
14+
req = requests.get("http://localhost/kraken/v1.0/installed_extensions", timeout=3)
15+
if req.status_code == 200:
16+
return [ExtensionInfo(identifier=rec["identifier"], tag=rec["tag"]) for rec in req.json()]
17+
except Exception:
18+
return None
19+
return []
20+
21+
@cached_property
22+
def disk(self) -> psutil._common.sdiskusage:
23+
return psutil.disk_usage("/")
24+
25+
@cached_property
26+
def memory(self) -> psutil._pslinux.svmem:
27+
return psutil.virtual_memory()
28+
29+
@cached_property
30+
def installed_version(self) -> Optional[VersionInfo]:
31+
try:
32+
req = requests.get("http://localhost/version-chooser/v1.0/version/current", timeout=3)
33+
if req.status_code == 200:
34+
data = req.json()
35+
return VersionInfo(
36+
repository=data["repository"],
37+
tag=data["tag"],
38+
last_modified=data["last_modified"],
39+
sha=data["sha"],
40+
architecture=data["architecture"],
41+
)
42+
43+
except Exception:
44+
return None
45+
return None
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from dataclasses import dataclass
2+
3+
4+
@dataclass
5+
class ExtensionInfo:
6+
identifier: str
7+
tag: str
8+
9+
10+
class VersionInfo:
11+
repository: str
12+
tag: str
13+
last_modified: str
14+
sha: str
15+
architecture: str

core/start-blueos-core

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ SERVICES=(
9393
'nginx',"nice -18 nginx -g \"daemon off;\" -c $TOOLS_PATH/nginx/nginx.conf"
9494
'log_zipper',"nice -20 $SERVICES_PATH/log_zipper/main.py '/shortcuts/system_logs/**/*.log' --max-age-minutes 60"
9595
'bag_of_holding',"$SERVICES_PATH/bag_of_holding/main.py"
96+
'major_tom',"$SERVICES_PATH/major_tom/major_tom.py"
9697
)
9798

9899
tmux -f /etc/tmux.conf start-server

0 commit comments

Comments
 (0)