14
14
15
15
from __future__ import annotations
16
16
17
- from typing import TYPE_CHECKING , cast
17
+ from typing import TYPE_CHECKING , Final
18
18
19
- from homeassistant .components .bluetooth import (
20
- MONOTONIC_TIME ,
21
- BluetoothScannerDevice ,
22
- )
19
+ from bluetooth_data_tools import monotonic_time_coarse
23
20
24
21
from .const import (
25
22
_LOGGER ,
34
31
)
35
32
36
33
# from .const import _LOGGER_SPAM_LESS
37
- from .util import rssi_to_metres
34
+ from .util import clean_charbuf , rssi_to_metres
38
35
39
36
if TYPE_CHECKING :
37
+ from bleak .backends .scanner import AdvertisementData
38
+
40
39
from .bermuda_device import BermudaDevice
41
40
42
41
# The if instead of min/max triggers PLR1730, but when
@@ -58,35 +57,35 @@ class BermudaDeviceScanner(dict):
58
57
This is created (and updated) by the receipt of an advertisement, which represents
59
58
a BermudaDevice hearing an advert from another BermudaDevice, if that makes sense!
60
59
61
- A BermudaDevice's "scanners " property will contain one of these for each
60
+ A BermudaDevice's "adverts " property will contain one of these for each
62
61
scanner that has "seen" it.
63
62
64
63
"""
65
64
65
+ def __hash__ (self ) -> int :
66
+ """The device-mac / scanner mac uniquely identifies a received advertisement pair."""
67
+ return hash ((self .device_address , self .scanner_address ))
68
+
66
69
def __init__ (
67
70
self ,
68
71
parent_device : BermudaDevice , # The device being tracked
69
- scandata : BluetoothScannerDevice , # The advertisement info from the device, received by the scanner
72
+ advertisementdata : AdvertisementData , # The advertisement info from the device, received by the scanner
70
73
options ,
71
74
scanner_device : BermudaDevice , # The scanner device that "saw" it.
72
75
) -> None :
73
76
# I am declaring these just to control their order in the dump,
74
77
# which is a bit silly, I suspect.
75
78
self .name : str = scanner_device .name # or scandata.scanner.name
76
79
self .scanner_device = scanner_device # links to the source device
77
- self .adapter : str = scandata .scanner .adapter # a helpful name, like hci0 or prox-test
78
- self .scanner_address = scanner_device .address
79
- self .source : str = scandata .scanner .source
80
+ self .scanner_address : Final [str ] = scanner_device .address
80
81
self .area_id : str | None = scanner_device .area_id
81
82
self .area_name : str | None = scanner_device .area_name
82
83
self ._device = parent_device
83
- self .device_address = parent_device .address
84
+ self .device_address : Final [ str ] = parent_device .address
84
85
self .options = options
85
86
self .stamp : float = 0
86
87
# Only remote scanners log timestamps, local usb adaptors do not.
87
- self .scanner_sends_stamps = hasattr (scandata .scanner , "discovered_device_timestamps" ) or hasattr (
88
- scandata .scanner , "_discovered_device_timestamps"
89
- )
88
+ self .scanner_sends_stamps = scanner_device .is_remote_scanner
90
89
self .new_stamp : float | None = None # Set when a new advert is loaded from update
91
90
self .rssi : float | None = None
92
91
self .tx_power : float | None = None
@@ -105,17 +104,15 @@ def __init__(
105
104
self .conf_attenuation = self .options .get (CONF_ATTENUATION )
106
105
self .conf_max_velocity = self .options .get (CONF_MAX_VELOCITY )
107
106
self .conf_smoothing_samples = self .options .get (CONF_SMOOTHING_SAMPLES )
108
- self .adverts : dict [str , list ] = {
109
- "manufacturer_data" : [],
110
- "service_data" : [],
111
- "service_uuids" : [],
112
- "platform_data" : [],
113
- }
107
+ self .local_name : list [tuple [str , bytes ]] = []
108
+ self .manufacturer_data : list [dict [int , bytes ]] = []
109
+ self .service_data : list [dict [str , bytes ]] = []
110
+ self .service_uuids : list [str ] = []
114
111
115
112
# Just pass the rest on to update...
116
- self .update_advertisement (scandata )
113
+ self .update_advertisement (advertisementdata )
117
114
118
- def update_advertisement (self , scandata : BluetoothScannerDevice ):
115
+ def update_advertisement (self , advertisementdata : AdvertisementData ):
119
116
"""
120
117
Update gets called every time we see a new packet or
121
118
every time we do a polled update.
@@ -124,57 +121,55 @@ def update_advertisement(self, scandata: BluetoothScannerDevice):
124
121
device+scanner combination. This method only gets called when a given scanner
125
122
claims to have data.
126
123
"""
127
- # In case the scanner has changed it's details since startup:
128
- # FIXME: This should probably be a separate function that the refresh_scanners
129
- # calls if necessary, rather than re-doing it every cycle.
130
- scanner = scandata .scanner
131
- # self.name = scanner.name
132
- # self.area_id = self.scanner_device.area_id
133
- # self.area_name = self.scanner_device.area_name
124
+ #
125
+ # We might get called without there being a new advert to process, so
126
+ # exit quickly if that's the case (ideally we will catch it earlier in future)
127
+ #
128
+ scanner = self .scanner_device
134
129
new_stamp : float | None = None
135
130
136
131
if self .scanner_sends_stamps :
137
- # Found a remote scanner which has timestamp history...
138
- # Check can be removed when we require 2025.4+
139
- if hasattr (scanner , "discovered_device_timestamps" ):
140
- stamps = scanner .discovered_device_timestamps # type: ignore
141
- else :
142
- stamps = scanner ._discovered_device_timestamps # type: ignore #noqa
132
+ new_stamp = scanner .async_as_scanner_get_stamp (self .device_address )
143
133
144
- # In this dict all MAC address keys are upper-cased
145
- uppermac = self .device_address .upper ()
146
- if uppermac in stamps :
147
- if self .stamp is None or (stamps [uppermac ] is not None and stamps [uppermac ] > self .stamp ):
148
- new_stamp = stamps [uppermac ]
149
- else :
150
- # We have no updated advert in this run.
151
- new_stamp = None
152
- self .stale_update_count += 1
153
- else :
154
- # This shouldn't happen, as we shouldn't have got a record
155
- # of this scanner if it hadn't seen this device.
156
- _LOGGER .error (
157
- "Scanner %s has no stamp for %s - very odd" ,
158
- scanner .source ,
159
- self .device_address ,
160
- )
161
- new_stamp = None
162
- elif self .rssi != scandata .advertisement .rssi :
134
+ if new_stamp is None :
135
+ self .stale_update_count += 1
136
+ _LOGGER .warning ("Advert from %s for %s lacks stamp, unexpected." , scanner .name , self ._device .name )
137
+ return
138
+
139
+ if self .stamp > new_stamp :
140
+ # The existing stamp is NEWER, bail but complain on the way.
141
+ self .stale_update_count += 1
142
+ _LOGGER .warning ("Advert from %s for %s is OLDER than last recorded" , scanner .name , self ._device .name )
143
+ return
144
+
145
+ if self .stamp == new_stamp :
146
+ # We've seen this stamp before. Bail.
147
+ self .stale_update_count += 1
148
+ return
149
+
150
+ elif self .rssi != advertisementdata .rssi :
163
151
# If the rssi has changed from last time, consider it "new". Since this scanner does
164
152
# not send stamps, this is probably a USB bluetooth adaptor.
165
- new_stamp = MONOTONIC_TIME ()
153
+ new_stamp = monotonic_time_coarse () - 3.0 # age usb adaptors slightly, since they are not "fresh"
166
154
else :
167
- new_stamp = None
155
+ # USB Adaptor has nothing new for us, bail.
156
+ return
168
157
169
158
# Update our parent scanner's last_seen if we have a new stamp.
170
- if new_stamp is not None and new_stamp > self .scanner_device .last_seen :
159
+ if new_stamp > self .scanner_device .last_seen + 0.01 : # some slight warp seems common.
160
+ _LOGGER .info (
161
+ "Advert from %s for %s is %.6fs NEWER than scanner's last_seen, odd" ,
162
+ self .scanner_device .name ,
163
+ self ._device .name ,
164
+ new_stamp - self .scanner_device .last_seen ,
165
+ )
171
166
self .scanner_device .last_seen = new_stamp
172
167
173
168
if len (self .hist_stamp ) == 0 or new_stamp is not None :
174
169
# this is the first entry or a new one, bring in the new reading
175
170
# and calculate the distance.
176
171
177
- self .rssi = scandata . advertisement .rssi
172
+ self .rssi = advertisementdata .rssi
178
173
self .hist_rssi .insert (0 , self .rssi )
179
174
180
175
self ._update_raw_distance (reading_is_new = True )
@@ -210,21 +205,51 @@ def update_advertisement(self, scandata: BluetoothScannerDevice):
210
205
# self.parent_device_address,
211
206
# scandata.advertisement.tx_power,
212
207
# )
213
- self .tx_power = scandata .advertisement .tx_power
214
-
215
- # Track each advertisement element as or if they change.
216
- for key , data in self .adverts .items ():
217
- if key == "platform_data" :
218
- # This duplicates the other keys and doesn't encode to JSON without
219
- # extra work.
220
- continue
221
- new_data = getattr (scandata .advertisement , key , {})
222
- if len (new_data ) > 0 :
223
- if len (data ) == 0 or data [0 ] != new_data :
224
- data .insert (0 , new_data )
225
- # trim to keep size in check
226
- del data [HIST_KEEP_COUNT :]
227
-
208
+ self .tx_power = advertisementdata .tx_power
209
+
210
+ # Store each of the extra advertisement fields in historical lists.
211
+ # Track if we should tell the parent device to update its name
212
+ _want_name_update = False
213
+ if advertisementdata .local_name is not None :
214
+ # It's not uncommon to find BT devices with nonascii junk in their
215
+ # local_name (like nulls, \n, etc). Store a cleaned version as str
216
+ # and the original as bytes.
217
+ # Devices may also advert multiple names over time.
218
+ nametuplet = (clean_charbuf (advertisementdata .local_name ), advertisementdata .local_name .encode ())
219
+ if len (self .local_name ) == 0 or self .local_name [0 ] != nametuplet :
220
+ self .local_name .insert (0 , nametuplet )
221
+ del self .local_name [HIST_KEEP_COUNT :]
222
+ # Lets see if we should pass the new name up to the parent device.
223
+ if self ._device .name_bt_local_name is None or len (self ._device .name_bt_local_name ) < len (nametuplet [0 ]):
224
+ self ._device .name_bt_local_name = nametuplet [0 ]
225
+ _want_name_update = True
226
+
227
+ if len (self .manufacturer_data ) == 0 or self .manufacturer_data [0 ] != advertisementdata .manufacturer_data :
228
+ self .manufacturer_data .insert (0 , advertisementdata .manufacturer_data )
229
+
230
+ if advertisementdata .manufacturer_data not in self .manufacturer_data [1 :]:
231
+ # We just stored a manu_data that wasn't in the previous history,
232
+ # so tell our parent device about it.
233
+ self ._device .process_manufacturer_data (self )
234
+ _want_name_update = True
235
+ del self .manufacturer_data [HIST_KEEP_COUNT :]
236
+
237
+ if len (self .service_data ) == 0 or self .service_data [0 ] != advertisementdata .service_data :
238
+ self .service_data .insert (0 , advertisementdata .service_data )
239
+ if advertisementdata .service_data not in self .manufacturer_data [1 :]:
240
+ _want_name_update = True
241
+ del self .service_data [HIST_KEEP_COUNT :]
242
+
243
+ for service_uuid in advertisementdata .service_uuids :
244
+ if service_uuid not in self .service_uuids :
245
+ self .service_uuids .insert (0 , service_uuid )
246
+ _want_name_update = True
247
+ del self .service_uuids [HIST_KEEP_COUNT :]
248
+
249
+ if _want_name_update :
250
+ self ._device .make_name ()
251
+
252
+ # Finally, save the new advert timestamp.
228
253
self .new_stamp = new_stamp
229
254
230
255
def _update_raw_distance (self , reading_is_new = True ) -> float :
@@ -346,7 +371,7 @@ def calculate_data(self):
346
371
self .hist_distance_by_interval .clear ()
347
372
self .hist_distance_by_interval .append (self .rssi_distance_raw )
348
373
349
- elif new_stamp is None and (self .stamp is None or self .stamp < MONOTONIC_TIME () - DISTANCE_TIMEOUT ):
374
+ elif new_stamp is None and (self .stamp is None or self .stamp < monotonic_time_coarse () - DISTANCE_TIMEOUT ):
350
375
# DEVICE IS AWAY!
351
376
# Last distance reading is stale, mark device distance as unknown.
352
377
self .rssi_distance = None
@@ -459,27 +484,51 @@ def calculate_data(self):
459
484
460
485
def to_dict (self ):
461
486
"""Convert class to serialisable dict for dump_devices."""
487
+ # using "is" comparisons instead of string matching means
488
+ # linting and typing can catch errors.
462
489
out = {}
463
490
for var , val in vars (self ).items ():
464
- if var in [" options" , "parent_device" , "scanner_device" ]:
491
+ if val in [self . options ]:
465
492
# skip certain vars that we don't want in the dump output.
466
493
continue
467
- if var == "adverts" :
468
- adout = {}
469
- for adtype , adarray in val .items ():
470
- out_adarray = []
471
- for ad_data in adarray :
472
- if adtype in ["manufacturer_data" , "service_data" ]:
473
- for ad_key , ad_value in ad_data .items ():
474
- out_adarray .append ({ad_key : cast ("bytes" , ad_value ).hex ()})
475
- else :
476
- out_adarray .append (ad_data )
477
- adout [adtype ] = out_adarray
478
- out [var ] = adout
494
+ if val in [self .options , self ._device , self .scanner_device ]:
495
+ # objects we might want to represent but not fully iterate etc.
496
+ out [var ] = val .__repr__ ()
497
+ continue
498
+ if val is self .local_name :
499
+ out [var ] = {}
500
+ for namestr , namebytes in self .local_name :
501
+ out [var ][namestr ] = namebytes .hex ()
502
+ continue
503
+ if val is self .manufacturer_data :
504
+ out [var ] = {}
505
+ for manrow in self .manufacturer_data :
506
+ for manid , manbytes in manrow .items ():
507
+ out [var ][manid ] = manbytes .hex ()
508
+ continue
509
+ if val is self .service_data :
510
+ out [var ] = {}
511
+ for svrow in self .service_data :
512
+ for svid , svbytes in svrow .items ():
513
+ out [var ][svid ] = svbytes .hex ()
514
+ continue
515
+ if isinstance (val , str | int ):
516
+ out [var ] = val
517
+ continue
518
+ if isinstance (val , float ):
519
+ out [var ] = round (val , 4 )
520
+ continue
521
+ if isinstance (val , list ):
522
+ out [var ] = []
523
+ for row in val :
524
+ if isinstance (row , float ):
525
+ out [var ].append (round (row , 4 ))
526
+ else :
527
+ out [var ].append (row )
479
528
continue
480
- out [var ] = val
529
+ out [var ] = val . __repr__ ()
481
530
return out
482
531
483
532
def __repr__ (self ) -> str :
484
533
"""Help debugging by giving it a clear name instead of empty dict."""
485
- return f"{ self .device_address } __{ self .scanner_address } "
534
+ return f"{ self .device_address } __{ self .scanner_device . name } "
0 commit comments