Skip to content

Commit 476ecc5

Browse files
committed
support for JSON Digest
1 parent 6368463 commit 476ecc5

File tree

1 file changed

+67
-29
lines changed

1 file changed

+67
-29
lines changed

object-ledger/object_ledger_pub_sub.py

Lines changed: 67 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def __init__(
2929
self,
3030
hostname: str,
3131
ads_b_json_topic: str,
32+
ads_b_json_digest_topic: str,
3233
ais_json_topic: str,
3334
ledger_topic: str,
3435
max_aircraft_entry_age: float = 60.0,
@@ -82,6 +83,7 @@ def __init__(
8283
super().__init__(**kwargs)
8384
self.hostname = hostname
8485
self.ads_b_json_topic = ads_b_json_topic
86+
self.ads_b_json_digest_topic = ads_b_json_digest_topic
8587
self.ais_json_topic = ais_json_topic
8688
self.ledger_topic = ledger_topic
8789
self.max_aircraft_entry_age = max_aircraft_entry_age
@@ -135,6 +137,7 @@ def __init__(
135137
f"""ObjectLedgerPubSub initialized with parameters:
136138
hostname = {hostname}
137139
ads_b_json_topic = {ads_b_json_topic}
140+
ads_b_json_digest_topic = {ads_b_json_digest_topic}
138141
ais_json_topic = {ais_json_topic}
139142
ledger_topic = {ledger_topic}
140143
max_aircraft_entry_age = {max_aircraft_entry_age}
@@ -199,6 +202,54 @@ def _get_max_entry_age(self, object_type: str) -> float:
199202
"""
200203
return self.max_entry_age[object_type]
201204

205+
def _update_state(self, state: Dict[Any, Any]) -> None:
206+
"""Update the ledger with a state message.
207+
208+
Parameters
209+
----------
210+
state: Dict[Any, Any]
211+
The state message
212+
213+
Returns
214+
-------
215+
None
216+
"""
217+
try:
218+
# Pop keys that are not required columns
219+
[state.pop(key) for key in set(state.keys()) - set(self.required_columns)]
220+
221+
# Acquire, then release a lock on the callback thread to
222+
# protect Pandas operations
223+
with self.state_lock:
224+
# Add or update the entry in the ledger
225+
entry = pd.DataFrame(state, index=[state["object_id"]])
226+
entry.set_index("object_id", inplace=True)
227+
if entry.notna().all(axis=1).bool():
228+
if not entry.index.isin(self.ledger.index):
229+
logging.debug(
230+
f"Adding entry state data for object id: {entry.index}"
231+
)
232+
self.ledger = pd.concat(
233+
[self.ledger, entry], ignore_index=False
234+
)
235+
236+
else:
237+
if (entry['timestamp'] - self.ledger.loc[entry.index, 'timestamp']).iloc[0] > 0:
238+
self.ledger.update(entry)
239+
logging.debug(f"Updating entry for {entry.index[0]} is {(entry['timestamp'] - self.ledger.loc[entry.index, 'timestamp']).iloc[0]} newer than ledger!")
240+
#logging.info(
241+
# f"Index: {entry.index[0]} | Time Delta (seconds): {(entry['timestamp'] - self.ledger.loc[entry.index, 'timestamp']).iloc[0]} | "
242+
# f"New Time: {entry['timestamp'].iloc[0]} | Ledger Time: {self.ledger.loc[entry.index, 'timestamp'].iloc[0]}"
243+
#)
244+
else:
245+
logging.debug(f"Invalid entry: {entry}")
246+
247+
except Exception as exception:
248+
# Set exception
249+
self.exception = exception # type: ignore
250+
251+
252+
202253
def _state_callback(
203254
self,
204255
_client: Union[mqtt.Client, None],
@@ -226,58 +277,42 @@ def _state_callback(
226277

227278
# Populate required state based on message type
228279
data = self._decode_payload(msg)
280+
281+
282+
if "ADS-B Digest" in data:
283+
logging.debug(f"Processing ADS-B state message data: {data}")
284+
aircrafts = json.loads(data["ADS-B Digest"])
285+
for state in aircrafts:
286+
state["object_id"] = state["icao_hex"]
287+
state["object_type"] = "aircraft"
288+
self._update_state(state)
289+
229290
if "ADS-B" in data:
230291
logging.debug(f"Processing ADS-B state message data: {data}")
231292
state = json.loads(data["ADS-B"])
232293
state["object_id"] = state["icao_hex"]
233294
state["object_type"] = "aircraft"
295+
self._update_state(state)
234296

235297
elif "Decoded AIS" in data:
236298
logging.debug(f"Processing AIS state message data: {data}")
237299
state = json.loads(data["Decoded AIS"])
238300
state["object_id"] = state["mmsi"]
239301
state["object_type"] = "ship"
240302
state["track"] = state["course"]
303+
self._update_state(state)
241304

242305
elif "Radiosonde" in data:
243306
logging.debug(f"Processing Radiosonde state message data: {data}")
244307
state = json.loads(data["Radiosonde"])
245308
state["object_id"] = state["sonde_serial"]
246309
state["object_type"] = "balloon"
310+
self._update_state(state)
247311

248312
else:
249313
logging.debug(f"Skipping state message data: {data}")
250314
return
251315

252-
# Pop keys that are not required columns
253-
[state.pop(key) for key in set(state.keys()) - set(self.required_columns)]
254-
255-
# Acquire, then release a lock on the callback thread to
256-
# protect Pandas operations
257-
with self.state_lock:
258-
# Add or update the entry in the ledger
259-
entry = pd.DataFrame(state, index=[state["object_id"]])
260-
entry.set_index("object_id", inplace=True)
261-
if entry.notna().all(axis=1).bool():
262-
if not entry.index.isin(self.ledger.index):
263-
logging.debug(
264-
f"Adding entry state data for object id: {entry.index}"
265-
)
266-
self.ledger = pd.concat(
267-
[self.ledger, entry], ignore_index=False
268-
)
269-
270-
else:
271-
if (entry['timestamp'] - self.ledger.loc[entry.index, 'timestamp']).iloc[0] > 0:
272-
self.ledger.update(entry)
273-
logging.debug(f"Updating entry for {entry.index[0]} is {(entry['timestamp'] - self.ledger.loc[entry.index, 'timestamp']).iloc[0]} newer than ledger!")
274-
#logging.info(
275-
# f"Index: {entry.index[0]} | Time Delta (seconds): {(entry['timestamp'] - self.ledger.loc[entry.index, 'timestamp']).iloc[0]} | "
276-
# f"New Time: {entry['timestamp'].iloc[0]} | Ledger Time: {self.ledger.loc[entry.index, 'timestamp'].iloc[0]}"
277-
#)
278-
else:
279-
logging.debug(f"Invalid entry: {entry}")
280-
281316
except Exception as exception:
282317
# Set exception
283318
self.exception = exception # type: ignore
@@ -378,6 +413,8 @@ def main(self) -> None:
378413
# Subscribe to required topics
379414
if not self.ads_b_json_topic == "":
380415
self.add_subscribe_topic(self.ads_b_json_topic, self._state_callback)
416+
if not self.ads_b_json_digest_topic == "":
417+
self.add_subscribe_topic(self.ads_b_json_digest_topic, self._state_callback)
381418
if not self.ais_json_topic == "":
382419
self.add_subscribe_topic(self.ais_json_topic, self._state_callback)
383420

@@ -416,6 +453,7 @@ def main(self) -> None:
416453
mqtt_ip=os.getenv("MQTT_IP", "mqtt"),
417454
hostname=os.environ.get("HOSTNAME", ""),
418455
ads_b_json_topic=os.getenv("ADS_B_JSON_TOPIC", ""),
456+
ads_b_json_digest_topic=os.getenv("ADS_B_JSON_DIGEST_TOPIC", ""),
419457
ais_json_topic=os.getenv("AIS_JSON_TOPIC", ""),
420458
ledger_topic=os.getenv("LEDGER_TOPIC", ""),
421459
max_aircraft_entry_age=float(os.getenv("MAX_AIRCRAFT_ENTRY_AGE", 60.0)),

0 commit comments

Comments
 (0)