Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/services/install-services.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ SERVICES=(
ping
versionchooser
wifi
major_tom
)

# We need to install loguru, appdirs and pydantic since they may be used inside setup.py
Expand Down
85 changes: 85 additions & 0 deletions core/services/major_tom/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#! /usr/bin/env python3
import copy
import datetime
import logging
import sys
import time
import uuid
from typing import Any, Dict
from zoneinfo import ZoneInfo

import loguru
from commonwealth.utils.general import (
local_hardware_identifier,
local_unique_identifier,
)
from commonwealth.utils.logs import init_logger

from src.core import TelemetryEngine, get_latency
from src.metrics import Metrics
from src.typedefs import AnonymousTelemetryRecord, DefaultPayload

LOG_SESSION_UUID = str(uuid.uuid4())

SERVICE_NAME = "major_tom"
LOG_FOLDER_PATH = f"/var/logs/blueos/services/{SERVICE_NAME}/buffer"

TELEMETRY_ENDPOINT = "https://telemetry.blueos.cloud/api/v1/anonymous/"
S3_TELEMETRY_ENDPOINT = "https://telemetry.blueos.cloud/api/v1/anonymous/s3/"


def compose_default_record(order: int) -> Dict[str, Any]:
date_time_utc = datetime.datetime.now(ZoneInfo("UTC")).isoformat()
payload = DefaultPayload(
log_session_uuid=LOG_SESSION_UUID,
order=order,
timestamp=date_time_utc,
hardware_id=local_hardware_identifier(),
blueos_id=local_unique_identifier(),
data=None,
)

start_probing = time.time()
metrics = Metrics()
record = AnonymousTelemetryRecord(
time.clock_gettime(time.CLOCK_BOOTTIME),
get_latency(),
metrics.memory.total,
metrics.memory.used,
metrics.disk.total,
metrics.disk.used,
metrics.installed_extensions,
metrics.installed_version,
0,
)
record.probe_time = time.time() - start_probing
payload.data = record
return payload.json()


if __name__ == "__main__":

# this is required to have two loggers in the same process
# see https://loguru.readthedocs.io/en/latest/resources/recipes.html#creating-independent-loggers-with-separate-set-of-handlers
loguru.logger.remove()
log_buffer = copy.deepcopy(loguru.logger)

logging.basicConfig(level=logging.INFO)
loguru.logger.add(
sys.stderr,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO",
)
init_logger(SERVICE_NAME)
loguru.logger.info(f"Starting Major Tom, session UUID: {LOG_SESSION_UUID}")
TelemetryEngine(
label="anonymous", # used to tag telemetry type. we may have non-anonymous telemetry in the future
endpoint=TELEMETRY_ENDPOINT,
s3_endpoint=S3_TELEMETRY_ENDPOINT,
create_record=compose_default_record,
interval=60 * 5, # 5 minutes
max_file_size=1024 * 1024, # 1Mb
max_file_retention=10,
buffer_folder=LOG_FOLDER_PATH,
log_buffer=log_buffer,
)()
14 changes: 14 additions & 0 deletions core/services/major_tom/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env python3

import setuptools

setuptools.setup(
name="Major Tom",
version="0.1.0",
description="Sends telemetry back to Ground Control",
license="MIT",
install_requires=[
"requests==2.31.0",
"loguru==0.5.3",
],
)
Empty file.
136 changes: 136 additions & 0 deletions core/services/major_tom/src/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import gzip
import http
import json
import os
import shutil
import time
from typing import Any, Callable, Dict, List

import loguru
import requests
import speedtest
from loguru import logger
from typedefs import OnlineStatus


def formatter(record: "loguru.Record") -> str:
# Note this function returns the string to be formatted, not the actual message to be logged
record["extra"]["serialized"] = json.dumps(record["message"])
return "{extra[serialized]}\n"


def is_online() -> bool:
return get_latency() > 0


def get_latency() -> float:
try:
servers: List[str] = []
st = speedtest.Speedtest()
st.get_servers(servers)
best_server = st.get_best_server()
ping = best_server["latency"]
return float(ping)
except Exception:
return -1.0


class TelemetryEngine:
# pylint: disable=too-many-arguments
def __init__(
self,
label: str,
endpoint: str,
s3_endpoint: str,
create_record: Callable[[Any], Any],
interval: float,
max_file_size: int,
max_file_retention: int,
buffer_folder: str,
log_buffer: loguru._logger.Logger, # type: ignore
):
self.buffer_file = f"{buffer_folder}/{label}_usage.log"
self.buffer_folder = buffer_folder

self.telemetry_endpoint = endpoint
self.telemetry_s3_endpoint = s3_endpoint
self.create_record = create_record
self.interval = interval

self.log_buffer = log_buffer
self.log_buffer.add(
self.buffer_file,
rotation=max_file_size,
retention=max_file_retention,
format=formatter,
compression="gz",
)

def __call__(self) -> None:
order = 0
while True:
order += 1
record = self.create_record(order)
if self.save(record) == "online":
self.process_buffered_records()
time.sleep(self.interval)

def upload_file(self, file: str) -> bool:
"""
This method requests to telemetry API a presigned url and upload the local archived files.
"""
logger.info(f"uploading file... {file}")
try:
response = requests.get(self.telemetry_s3_endpoint, timeout=5).json()
with open(file, "rb") as fh:
files = {"file": (file, fh)}
r = requests.post(response["url"], data=response["fields"], files=files, timeout=300)
if r.status_code == http.client.NO_CONTENT:
logger.info("[Success!]")
return True
except Exception as error:
logger.info("Ground Control to Major Tom. Your circuit's dead, there's something wrong.")
logger.error(f"error upload log file: {error}")

return False

def process_buffered_records(self) -> None:
"""
Check in the buffered folder if there are archived logs to upload. If the agent connects before an archive
is created it will also archive the current buffer file and upload it.
"""
for file in os.listdir(self.buffer_folder):
file_path = os.path.join(self.buffer_folder, file)

# Upload regular archive
if file_path.endswith(".log.gz") and self.upload_file(file_path):
os.remove(file_path)
continue

# Archive current buffer and upload it
if file_path == self.buffer_file and os.path.getsize(file_path):
timestamp = int(time.time())
tmp_name = file_path.replace(".log", f".{timestamp}.log.gz")
with open(file_path, "rb") as f_in, gzip.open(tmp_name, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(file_path)
if self.upload_file(tmp_name):
os.remove(tmp_name)
with open(self.buffer_file, "w", encoding="utf-8"):
# create new empty file if not there
pass

def save(self, record: Dict[str, Any]) -> OnlineStatus:
"""
Try to POST the telemetry payload, if it fails for any reason, we buffer it locally.
"""
try:
r = requests.post(self.telemetry_endpoint, json=record, timeout=5)
if r.status_code == http.client.CREATED:
return OnlineStatus.ONLINE
except Exception as error:
logger.info("Ground Control to Major Tom. Your circuit's dead, there's something wrong.")
logger.error(f"error posting telemetry to Ground Control: {error}")

self.log_buffer.info(record)
return OnlineStatus.OFFLINE
48 changes: 48 additions & 0 deletions core/services/major_tom/src/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import http
from functools import cached_property
from typing import List, Optional

import psutil
import requests
from loguru import logger

from src.typedefs import ExtensionInfo, VersionInfo


class Metrics:
@cached_property
def installed_extensions(self) -> Optional[List[ExtensionInfo]]:
try:
req = requests.get("http://localhost/kraken/v1.0/installed_extensions", timeout=3)
if req.status_code == http.client.OK:
return [ExtensionInfo(identifier=rec["identifier"], tag=rec["tag"]) for rec in req.json()]
except Exception as error:
logger.error(f"Error getting installed extensions: {error}")
return None
return []

@cached_property
def disk(self) -> psutil._common.sdiskusage:
return psutil.disk_usage("/")

@cached_property
def memory(self) -> psutil._pslinux.svmem:
return psutil.virtual_memory()

@cached_property
def installed_version(self) -> Optional[VersionInfo]:
try:
req = requests.get("http://localhost/version-chooser/v1.0/version/current", timeout=3)
if req.status_code == http.client.OK:
data = req.json()
return VersionInfo(
repository=data["repository"],
tag=data["tag"],
last_modified=data["last_modified"],
sha=data["sha"],
architecture=data["architecture"],
)

except Exception as error:
logger.error(f"Error getting version info: {error}")
return None
59 changes: 59 additions & 0 deletions core/services/major_tom/src/typedefs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, Optional


@dataclass
class ExtensionInfo:
identifier: str
tag: str


@dataclass
class VersionInfo:
repository: str
tag: str
last_modified: str
sha: str
architecture: str


class OnlineStatus(str, Enum):
ONLINE = "online"
OFFLINE = "offline"
UNKNOWN = "unknown"


@dataclass
class TelemetryRecord:
pass


@dataclass
# pylint: disable=too-many-instance-attributes
class AnonymousTelemetryRecord(TelemetryRecord):
uptime: float
latency: float
memory_size: int
memory_usage: int
disk_size: int
disk_usage: int
extensions: Optional[list[ExtensionInfo]]
blueos_version: Optional[VersionInfo]
probe_time: float

def json(self) -> dict[str, Any]:
return asdict(self)


@dataclass
class DefaultPayload:
log_session_uuid: str
order: int
timestamp: str
hardware_id: str
blueos_id: str
data: Optional[TelemetryRecord]

def json(self) -> dict[str, Any]:
return asdict(self)
1 change: 1 addition & 0 deletions core/start-blueos-core
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ SERVICES=(
'nginx',"nice -18 nginx -g \"daemon off;\" -c $TOOLS_PATH/nginx/nginx.conf"
'log_zipper',"nice -20 $SERVICES_PATH/log_zipper/main.py '/shortcuts/system_logs/**/*.log' --max-age-minutes 60"
'bag_of_holding',"$SERVICES_PATH/bag_of_holding/main.py"
'major_tom',"$SERVICES_PATH/major_tom/main.py"
)

tmux -f /etc/tmux.conf start-server
Expand Down