|
| 1 | +import gzip |
| 2 | +import json |
| 3 | +import os |
| 4 | +import shutil |
| 5 | +import time |
| 6 | +from dataclasses import asdict, dataclass |
| 7 | +from typing import Any, Callable, Dict, List |
| 8 | + |
| 9 | +import loguru |
| 10 | +import requests |
| 11 | +import speedtest |
| 12 | +from loguru import logger |
| 13 | + |
| 14 | + |
| 15 | +def formatter(record: "loguru.Record") -> str: |
| 16 | + # Note this function returns the string to be formatted, not the actual message to be logged |
| 17 | + record["extra"]["serialized"] = json.dumps(record["message"]) |
| 18 | + return "{extra[serialized]}\n" |
| 19 | + |
| 20 | + |
| 21 | +def is_online() -> bool: |
| 22 | + return get_latency() > 0 |
| 23 | + |
| 24 | + |
| 25 | +def get_latency() -> float: |
| 26 | + try: |
| 27 | + servers: List[str] = [] |
| 28 | + st = speedtest.Speedtest() |
| 29 | + st.get_servers(servers) |
| 30 | + best_server = st.get_best_server() |
| 31 | + ping = best_server["latency"] |
| 32 | + return float(ping) |
| 33 | + except Exception: |
| 34 | + return -1.0 |
| 35 | + |
| 36 | + |
| 37 | +@dataclass |
| 38 | +class DefaultPayload: |
| 39 | + log_session_uuid: str |
| 40 | + order: int |
| 41 | + timestamp: str |
| 42 | + hardware_id: str |
| 43 | + blueos_id: str |
| 44 | + data: dict[str, Any] |
| 45 | + |
| 46 | + def json(self) -> dict[str, Any]: |
| 47 | + return asdict(self) |
| 48 | + |
| 49 | + |
| 50 | +class TelemetryEngine: |
| 51 | + # pylint: disable=too-many-arguments |
| 52 | + def __init__( |
| 53 | + self, |
| 54 | + label: str, |
| 55 | + endpoint: str, |
| 56 | + s3_endpoint: str, |
| 57 | + create_record: Callable[[Any], Any], |
| 58 | + interval: float, |
| 59 | + max_file_size: int, |
| 60 | + max_file_retention: int, |
| 61 | + buffer_folder: str, |
| 62 | + log_buffer: loguru._logger.Logger, # type: ignore |
| 63 | + ): |
| 64 | + self.buffer_file = f"{buffer_folder}/{label}_usage.log" |
| 65 | + self.buffer_folder = buffer_folder |
| 66 | + |
| 67 | + self.telemetry_endpoint = endpoint |
| 68 | + self.telemetry_s3_endpoint = s3_endpoint |
| 69 | + self.create_record = create_record |
| 70 | + self.interval = interval |
| 71 | + |
| 72 | + self.log_buffer = log_buffer |
| 73 | + self.log_buffer.add( |
| 74 | + self.buffer_file, |
| 75 | + rotation=max_file_size, |
| 76 | + retention=max_file_retention, |
| 77 | + format=formatter, |
| 78 | + compression="gz", |
| 79 | + ) |
| 80 | + |
| 81 | + def __call__(self) -> None: |
| 82 | + order = 0 |
| 83 | + while True: |
| 84 | + order += 1 |
| 85 | + record = self.create_record(order) |
| 86 | + if self.save(record) == "online": |
| 87 | + self.process_buffered_records() |
| 88 | + time.sleep(self.interval) |
| 89 | + |
| 90 | + def upload_file(self, file: str) -> bool: |
| 91 | + """ |
| 92 | + This method requests to telemetry API a presigned url and upload the local archived files. |
| 93 | + """ |
| 94 | + logger.info(f"uploading file... {file}") |
| 95 | + try: |
| 96 | + response = requests.get(self.telemetry_s3_endpoint, timeout=5).json() |
| 97 | + with open(file, "rb") as fh: |
| 98 | + files = {"file": (file, fh)} |
| 99 | + r = requests.post(response["url"], data=response["fields"], files=files, timeout=300) |
| 100 | + if r.status_code == 204: |
| 101 | + logger.info("[Success!]") |
| 102 | + return True |
| 103 | + except Exception as error: |
| 104 | + logger.info("Ground Control to Major Tom. Your circuit's dead, there's something wrong.") |
| 105 | + logger.error(f"error upload log file: {error}") |
| 106 | + |
| 107 | + return False |
| 108 | + |
| 109 | + def process_buffered_records(self) -> None: |
| 110 | + """ |
| 111 | + Check in the buffered folder if there are archived logs to upload. If the agent connects before an archive |
| 112 | + is created it will also archive the current buffer file and upload it. |
| 113 | + """ |
| 114 | + for file in os.listdir(self.buffer_folder): |
| 115 | + file_path = os.path.join(self.buffer_folder, file) |
| 116 | + |
| 117 | + # Upload regular archive |
| 118 | + if file_path.endswith(".log.gz"): |
| 119 | + if self.upload_file(file_path): |
| 120 | + os.remove(file_path) |
| 121 | + |
| 122 | + # Archive current buffer and upload it |
| 123 | + if file_path.endswith(".log") and os.path.getsize(file_path): |
| 124 | + timestamp = int(time.time()) |
| 125 | + tmp_name = self.buffer_file.replace(".log", f".{timestamp}.log.gz") |
| 126 | + with open(self.buffer_file, "rb") as f_in: |
| 127 | + with gzip.open(tmp_name, "wb") as f_out: |
| 128 | + shutil.copyfileobj(f_in, f_out) |
| 129 | + if self.upload_file(tmp_name): |
| 130 | + os.remove(tmp_name) |
| 131 | + with open(self.buffer_file, "w", encoding="utf-8"): |
| 132 | + # create new empty file if not there |
| 133 | + pass |
| 134 | + |
| 135 | + def save(self, record: Dict[str, Any]) -> str: |
| 136 | + """ |
| 137 | + Try to POST the telemetry payload, if it fails for any reason, we buffer it locally. |
| 138 | + """ |
| 139 | + try: |
| 140 | + r = requests.post(self.telemetry_endpoint, json=record, timeout=5) |
| 141 | + if r.status_code == 201: |
| 142 | + return "online" |
| 143 | + except Exception as error: |
| 144 | + logger.info("Ground Control to Major Tom. Your circuit's dead, there's something wrong.") |
| 145 | + logger.error(f"error posting telemetry to Ground Control: {error}") |
| 146 | + |
| 147 | + self.log_buffer.info(record) |
| 148 | + return "offline" |
0 commit comments