Skip to content

Commit 89edc8a

Browse files
voorloopnulWilliangalvani
authored andcommitted
core: create Major Tom
1 parent afa82ab commit 89edc8a

File tree

8 files changed

+332
-0
lines changed

8 files changed

+332
-0
lines changed

core/services/install-services.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ SERVICES=(
3535
ping
3636
versionchooser
3737
wifi
38+
major_tom
3839
)
3940

4041
# We need to install loguru, appdirs and pydantic since they may be used inside setup.py

core/services/major_tom/main.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#! /usr/bin/env python3
2+
import copy
3+
import datetime
4+
import logging
5+
import sys
6+
import time
7+
import uuid
8+
from dataclasses import asdict, dataclass
9+
from typing import Any, Dict, Optional
10+
from zoneinfo import ZoneInfo
11+
12+
import loguru
13+
from commonwealth.utils.general import (
14+
local_hardware_identifier,
15+
local_unique_identifier,
16+
)
17+
from commonwealth.utils.logs import init_logger
18+
19+
from src.core import DefaultPayload, TelemetryEngine, get_latency
20+
from src.metrics import Metrics
21+
from src.typedefs import ExtensionInfo, VersionInfo
22+
23+
LOG_SESSION_UUID = str(uuid.uuid4())
24+
25+
SERVICE_NAME = "major_tom"
26+
LOG_FOLDER_PATH = f"/var/logs/blueos/services/{SERVICE_NAME}/buffer"
27+
28+
TELEMETRY_ENDPOINT = "https://telemetry.blueos.cloud/api/v1/anonymous/"
29+
S3_TELEMETRY_ENDPOINT = "https://telemetry.blueos.cloud/api/v1/anonymous/s3/"
30+
31+
32+
# pylint: disable=too-many-instance-attributes
33+
@dataclass
34+
class AnonymousTelemetryRecord:
35+
uptime: float
36+
latency: float
37+
memory_size: int
38+
memory_usage: int
39+
disk_size: int
40+
disk_usage: int
41+
extensions: Optional[list[ExtensionInfo]]
42+
blueos_version: Optional[VersionInfo]
43+
probe_time: float
44+
45+
def json(self) -> dict[str, Any]:
46+
return asdict(self)
47+
48+
49+
def compose_default_record(order: int) -> Dict[str, Any]:
50+
date_time_utc = datetime.datetime.now(ZoneInfo("UTC")).isoformat()
51+
payload = DefaultPayload(
52+
log_session_uuid=LOG_SESSION_UUID,
53+
order=order,
54+
timestamp=date_time_utc,
55+
hardware_id=local_hardware_identifier(),
56+
blueos_id=local_unique_identifier(),
57+
data={},
58+
)
59+
60+
start_probing = time.time()
61+
metrics = Metrics()
62+
record = AnonymousTelemetryRecord(
63+
time.clock_gettime(time.CLOCK_BOOTTIME),
64+
get_latency(),
65+
metrics.memory.total,
66+
metrics.memory.used,
67+
metrics.disk.total,
68+
metrics.disk.used,
69+
metrics.installed_extensions,
70+
metrics.installed_version,
71+
0,
72+
)
73+
record.probe_time = time.time() - start_probing
74+
payload.data = record.json()
75+
return payload.json()
76+
77+
78+
if __name__ == "__main__":
79+
80+
# this is required to have two loggers in the same process
81+
# see https://loguru.readthedocs.io/en/latest/resources/recipes.html#creating-independent-loggers-with-separate-set-of-handlers
82+
loguru.logger.remove()
83+
log_buffer = copy.deepcopy(loguru.logger)
84+
85+
logging.basicConfig(level=logging.INFO)
86+
loguru.logger.add(
87+
sys.stderr,
88+
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
89+
level="INFO",
90+
)
91+
init_logger(SERVICE_NAME)
92+
loguru.logger.info("Starting Major Tom, session UUID: {LOG_SESSION_UUID}")
93+
TelemetryEngine(
94+
label="anonymous", # used to tag telemetry type. we may have non-anonymous telemetry in the future
95+
endpoint=TELEMETRY_ENDPOINT,
96+
s3_endpoint=S3_TELEMETRY_ENDPOINT,
97+
create_record=compose_default_record,
98+
interval=60 * 5, # 5 minutes
99+
max_file_size=1024 * 1024, # 1Mb
100+
max_file_retention=10,
101+
buffer_folder=LOG_FOLDER_PATH,
102+
log_buffer=log_buffer,
103+
)()

core/services/major_tom/setup.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/usr/bin/env python3
2+
3+
import setuptools
4+
5+
setuptools.setup(
6+
name="Major Tom",
7+
version="0.1.0",
8+
description="Sends telemetry back to Ground Control",
9+
license="MIT",
10+
install_requires=[
11+
"requests==2.31.0",
12+
"loguru==0.5.3",
13+
],
14+
)

core/services/major_tom/src/__init__.py

Whitespace-only changes.

core/services/major_tom/src/core.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import gzip
2+
import http
3+
import json
4+
import os
5+
import shutil
6+
import time
7+
from dataclasses import asdict, dataclass
8+
from typing import Any, Callable, Dict, List
9+
10+
import loguru
11+
import requests
12+
import speedtest
13+
from loguru import logger
14+
15+
16+
def formatter(record: "loguru.Record") -> str:
17+
# Note this function returns the string to be formatted, not the actual message to be logged
18+
record["extra"]["serialized"] = json.dumps(record["message"])
19+
return "{extra[serialized]}\n"
20+
21+
22+
def is_online() -> bool:
23+
return get_latency() > 0
24+
25+
26+
def get_latency() -> float:
27+
try:
28+
servers: List[str] = []
29+
st = speedtest.Speedtest()
30+
st.get_servers(servers)
31+
best_server = st.get_best_server()
32+
ping = best_server["latency"]
33+
return float(ping)
34+
except Exception:
35+
return -1.0
36+
37+
38+
@dataclass
39+
class DefaultPayload:
40+
log_session_uuid: str
41+
order: int
42+
timestamp: str
43+
hardware_id: str
44+
blueos_id: str
45+
data: dict[str, Any]
46+
47+
def json(self) -> dict[str, Any]:
48+
return asdict(self)
49+
50+
51+
class TelemetryEngine:
52+
# pylint: disable=too-many-arguments
53+
def __init__(
54+
self,
55+
label: str,
56+
endpoint: str,
57+
s3_endpoint: str,
58+
create_record: Callable[[Any], Any],
59+
interval: float,
60+
max_file_size: int,
61+
max_file_retention: int,
62+
buffer_folder: str,
63+
log_buffer: loguru._logger.Logger, # type: ignore
64+
):
65+
self.buffer_file = f"{buffer_folder}/{label}_usage.log"
66+
self.buffer_folder = buffer_folder
67+
68+
self.telemetry_endpoint = endpoint
69+
self.telemetry_s3_endpoint = s3_endpoint
70+
self.create_record = create_record
71+
self.interval = interval
72+
73+
self.log_buffer = log_buffer
74+
self.log_buffer.add(
75+
self.buffer_file,
76+
rotation=max_file_size,
77+
retention=max_file_retention,
78+
format=formatter,
79+
compression="gz",
80+
)
81+
82+
def __call__(self) -> None:
83+
order = 0
84+
while True:
85+
order += 1
86+
record = self.create_record(order)
87+
if self.save(record) == "online":
88+
self.process_buffered_records()
89+
time.sleep(self.interval)
90+
91+
def upload_file(self, file: str) -> bool:
92+
"""
93+
This method requests to telemetry API a presigned url and upload the local archived files.
94+
"""
95+
logger.info(f"uploading file... {file}")
96+
try:
97+
response = requests.get(self.telemetry_s3_endpoint, timeout=5).json()
98+
with open(file, "rb") as fh:
99+
files = {"file": (file, fh)}
100+
r = requests.post(response["url"], data=response["fields"], files=files, timeout=300)
101+
if r.status_code == http.client.NO_CONTENT:
102+
logger.info("[Success!]")
103+
return True
104+
except Exception as error:
105+
logger.info("Ground Control to Major Tom. Your circuit's dead, there's something wrong.")
106+
logger.error(f"error upload log file: {error}")
107+
108+
return False
109+
110+
def process_buffered_records(self) -> None:
111+
"""
112+
Check in the buffered folder if there are archived logs to upload. If the agent connects before an archive
113+
is created it will also archive the current buffer file and upload it.
114+
"""
115+
for file in os.listdir(self.buffer_folder):
116+
file_path = os.path.join(self.buffer_folder, file)
117+
118+
# Upload regular archive
119+
if file_path.endswith(".log.gz") and self.upload_file(file_path):
120+
os.remove(file_path)
121+
continue
122+
123+
# Archive current buffer and upload it
124+
if file_path == self.buffer_file and os.path.getsize(file_path):
125+
timestamp = int(time.time())
126+
tmp_name = file_path.replace(".log", f".{timestamp}.log.gz")
127+
with open(file_path, "rb") as f_in, gzip.open(tmp_name, "wb") as f_out:
128+
shutil.copyfileobj(f_in, f_out)
129+
os.remove(file_path)
130+
if self.upload_file(tmp_name):
131+
os.remove(tmp_name)
132+
with open(self.buffer_file, "w", encoding="utf-8"):
133+
# create new empty file if not there
134+
pass
135+
136+
def save(self, record: Dict[str, Any]) -> str:
137+
"""
138+
Try to POST the telemetry payload, if it fails for any reason, we buffer it locally.
139+
"""
140+
try:
141+
r = requests.post(self.telemetry_endpoint, json=record, timeout=5)
142+
if r.status_code == http.client.CREATED:
143+
return "online"
144+
except Exception as error:
145+
logger.info("Ground Control to Major Tom. Your circuit's dead, there's something wrong.")
146+
logger.error(f"error posting telemetry to Ground Control: {error}")
147+
148+
self.log_buffer.info(record)
149+
return "offline"
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import http
2+
from functools import cached_property
3+
from typing import List, Optional
4+
5+
import psutil
6+
import requests
7+
from loguru import logger
8+
9+
from src.typedefs import ExtensionInfo, VersionInfo
10+
11+
12+
class Metrics:
13+
@cached_property
14+
def installed_extensions(self) -> Optional[List[ExtensionInfo]]:
15+
try:
16+
req = requests.get("http://localhost/kraken/v1.0/installed_extensions", timeout=3)
17+
if req.status_code == http.client.OK:
18+
return [ExtensionInfo(identifier=rec["identifier"], tag=rec["tag"]) for rec in req.json()]
19+
except Exception as error:
20+
logger.error(f"Error getting installed extensions: {error}")
21+
return None
22+
return []
23+
24+
@cached_property
25+
def disk(self) -> psutil._common.sdiskusage:
26+
return psutil.disk_usage("/")
27+
28+
@cached_property
29+
def memory(self) -> psutil._pslinux.svmem:
30+
return psutil.virtual_memory()
31+
32+
@cached_property
33+
def installed_version(self) -> Optional[VersionInfo]:
34+
try:
35+
req = requests.get("http://localhost/version-chooser/v1.0/version/current", timeout=3)
36+
if req.status_code == 200:
37+
data = req.json()
38+
return VersionInfo(
39+
repository=data["repository"],
40+
tag=data["tag"],
41+
last_modified=data["last_modified"],
42+
sha=data["sha"],
43+
architecture=data["architecture"],
44+
)
45+
46+
except Exception as error:
47+
logger.error(f"Error getting version info: {error}")
48+
return None
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from dataclasses import dataclass
2+
3+
4+
@dataclass
5+
class ExtensionInfo:
6+
identifier: str
7+
tag: str
8+
9+
10+
@dataclass
11+
class VersionInfo:
12+
repository: str
13+
tag: str
14+
last_modified: str
15+
sha: str
16+
architecture: str

core/start-blueos-core

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ SERVICES=(
9393
'nginx',"nice -18 nginx -g \"daemon off;\" -c $TOOLS_PATH/nginx/nginx.conf"
9494
'log_zipper',"nice -20 $SERVICES_PATH/log_zipper/main.py '/shortcuts/system_logs/**/*.log' --max-age-minutes 60"
9595
'bag_of_holding',"$SERVICES_PATH/bag_of_holding/main.py"
96+
'major_tom',"$SERVICES_PATH/major_tom/main.py"
9697
)
9798

9899
tmux -f /etc/tmux.conf start-server

0 commit comments

Comments
 (0)