Skip to content

Commit dd4ec55

Browse files
authored
Merge pull request #667 from onkelandy/sdp_resend
proposal: sdp resend feature
2 parents 1038d60 + ada53c3 commit dd4ec55

File tree

4 files changed

+214
-22
lines changed

4 files changed

+214
-22
lines changed

lib/model/sdp/connection.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def close(self):
147147
self._close()
148148
self._is_connected = False
149149

150-
def send(self, data_dict):
150+
def send(self, data_dict, **kwargs):
151151
"""
152152
Send data, possibly return response
153153
@@ -175,7 +175,7 @@ def send(self, data_dict):
175175
self._send_lock.acquire()
176176

177177
if self._send_init_on_send():
178-
response = self._send(data_dict)
178+
response = self._send(data_dict, **kwargs)
179179
except Exception:
180180
raise
181181
finally:
@@ -210,6 +210,12 @@ def on_disconnect(self, by=None):
210210
if self._params[PLUGIN_ATTR_CB_ON_DISCONNECT]:
211211
self._params[PLUGIN_ATTR_CB_ON_DISCONNECT](by)
212212

213+
def check_reply(self, command, value):
214+
"""
215+
checking reply, e.g. for resend feature
216+
"""
217+
return self._check_reply(command, value)
218+
213219
#
214220
#
215221
# overwriting needed for at least some of the following methods...
@@ -232,14 +238,21 @@ def _close(self):
232238
"""
233239
self.logger.debug(f'simulating closing connection as {__name__} with params {self._params}')
234240

235-
def _send(self, data_dict):
241+
def _send(self, data_dict, **kwargs):
236242
"""
237243
overwrite with sending of data and - possibly - returning response data
238244
Return None if no response is received or expected.
239245
"""
240246
self.logger.debug(f'simulating to send data {data_dict}...')
241247
return self.dummy
242248

249+
def _check_reply(self, command, value):
250+
"""
251+
overwrite with checking of data
252+
Return False by default
253+
"""
254+
return False
255+
243256
def _send_init_on_open(self):
244257
"""
245258
This class can be overwritten if anything special is needed to make the
@@ -428,7 +441,7 @@ def _open(self):
428441
def _close(self):
429442
self.logger.debug(f'{self.__class__.__name__} closing connection as {__name__} with params {self._params}')
430443

431-
def _send(self, data_dict):
444+
def _send(self, data_dict, **kwargs):
432445
url = data_dict.get('payload', None)
433446
if not url:
434447
self.logger.error(f'can not send without url parameter from data_dict {data_dict}, aborting')
@@ -439,7 +452,7 @@ def _send(self, data_dict):
439452

440453
# check for additional data
441454
par = {}
442-
for arg in (REQUEST_DICT_ARGS):
455+
for arg in REQUEST_DICT_ARGS:
443456
par[arg] = data_dict.get(arg, {})
444457

445458
if request_method == 'get':
@@ -527,7 +540,7 @@ def _close(self):
527540
self.logger.debug(f'{self.__class__.__name__} closing connection')
528541
self._tcp.close()
529542

530-
def _send(self, data_dict):
543+
def _send(self, data_dict, **kwargs):
531544
self._tcp.send(data_dict['payload'])
532545

533546
# we receive only via callback, so we return "no reply".
@@ -539,6 +552,7 @@ def _on_abort(self):
539552
else:
540553
self.logger.warning('suspend callback wanted, but not set by plugin. Check plugin code...')
541554

555+
542556
class UDPServer(socket.socket):
543557
"""
544558
This class sets up a UDP unicast socket listener on local_port
@@ -737,7 +751,7 @@ def _close(self):
737751
if self._params[PLUGIN_ATTR_CB_ON_DISCONNECT]:
738752
self._params[PLUGIN_ATTR_CB_ON_DISCONNECT](self)
739753

740-
def _send(self, data_dict):
754+
def _send(self, data_dict, **kwargs):
741755
"""
742756
send data. data_dict needs to contain the following information:
743757

lib/model/sdp/globals.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@
6262
PLUGIN_ATTR_CONN_RETRY_CYCLE = 'retry_cycle' # if autoreconnect: how many seconds to wait between retry rounds
6363
PLUGIN_ATTR_CONN_RETRY_SUSPD = 'retry_suspend' # after this number of failed connect cycles, activate suspend mode (if enabled)
6464

65+
PLUGIN_ATTR_SEND_RETRIES = 'send_retries' # how often should a command be resent (when not receiving expected answer)
66+
PLUGIN_ATTR_SEND_RETRIES_CYCLE= 'send_retries_cycle' # if using resend protocol: how many seconds to wait between resend rounds
67+
6568
# network attributes
6669
PLUGIN_ATTR_NET_HOST = 'host' # hostname / IP for network connection
6770
PLUGIN_ATTR_NET_PORT = 'port' # port for network connection
@@ -90,7 +93,8 @@
9093
PLUGIN_ATTR_CONN_RETRY_CYCLE, PLUGIN_ATTR_CONN_RETRY_SUSPD, PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT,
9194
PLUGIN_ATTR_SERIAL_PORT, PLUGIN_ATTR_SERIAL_BAUD, PLUGIN_ATTR_SERIAL_BSIZE, PLUGIN_ATTR_SERIAL_PARITY,
9295
PLUGIN_ATTR_SERIAL_STOP, PLUGIN_ATTR_PROTOCOL, PLUGIN_ATTR_MSG_TIMEOUT, PLUGIN_ATTR_MSG_REPEAT,
93-
PLUGIN_ATTR_CB_ON_CONNECT, PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CB_SUSPEND)
96+
PLUGIN_ATTR_CB_ON_CONNECT, PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CB_SUSPEND,
97+
PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE)
9498

9599
# connection types for PLUGIN_ATTR_CONNECTION
96100
CONN_NULL = '' # use base connection class without real connection functionality, for testing
@@ -107,8 +111,9 @@
107111
PROTO_NULL = '' # use base protocol class without added functionality (why??)
108112
PROTO_JSONRPC = 'jsonrpc' # JSON-RPC 2.0 support with send queue, msgid and resend of unanswered commands
109113
PROTO_VIESSMANN = 'viessmann' # Viessmann P300 / KW
114+
PROTO_RESEND = 'resend'
110115

111-
PROTOCOL_TYPES = (PROTO_NULL, PROTO_JSONRPC, PROTO_VIESSMANN)
116+
PROTOCOL_TYPES = (PROTO_NULL, PROTO_JSONRPC, PROTO_VIESSMANN, PROTO_RESEND)
112117

113118
# item attribute suffixes (as defined with individual prefix in plugin.yaml)
114119
ITEM_ATTR_COMMAND = '_command' # command to issue/read for the item

lib/model/sdp/protocol.py

Lines changed: 151 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CONNECTION,
3232
PLUGIN_ATTR_CONN_AUTO_CONN, PLUGIN_ATTR_CONN_CYCLE, PLUGIN_ATTR_CONN_RETRIES,
3333
PLUGIN_ATTR_CONN_TIMEOUT, PLUGIN_ATTR_MSG_REPEAT, PLUGIN_ATTR_MSG_TIMEOUT,
34-
PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT)
34+
PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT, PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE)
3535
from lib.model.sdp.connection import SDPConnection
3636

3737
from collections import OrderedDict
@@ -97,9 +97,12 @@ def _close(self):
9797
self._connection.close()
9898
self._is_connected = False
9999

100-
def _send(self, data_dict):
100+
def _send(self, data_dict, **kwargs):
101101
self.logger.debug(f'{self.__class__.__name__} _send called with {data_dict}')
102-
return self._connection.send(data_dict)
102+
return self._connection.send(data_dict, **kwargs)
103+
104+
def _check_reply(self, command, value):
105+
return False
103106

104107
def _get_connection(self, use_callbacks=False, name=None):
105108
conn_params = self._params.copy()
@@ -192,7 +195,7 @@ def on_data_received(self, connection, response):
192195
"""
193196
Handle received data
194197
195-
Data is handed over as byte/bytearray and needs to be converted to
198+
Data is handed over as byte/bytearray and needs to be converted to
196199
utf8 strings. As packets can be fragmented, all data is written into
197200
a buffer and then checked for complete json expressions. Those are
198201
separated, converted to dict and processed with respect to saved
@@ -335,7 +338,7 @@ def check_chunk(data):
335338
else:
336339
self.logger.debug(f'Skipping stale check {time() - self._last_stale_check} seconds after last check')
337340

338-
def _send(self, data_dict):
341+
def _send(self, data_dict, **kwargs):
339342
"""
340343
wrapper to prepare json rpc message to send. extracts command, id, repeat and
341344
params (data) from data_dict and call send_rpc_message(command, params, id, repeat)
@@ -425,3 +428,146 @@ def _send_rpc_message(self, command, ddict=None, message_id=None, repeat=0):
425428
response = self._connection.send(ddict)
426429
if response:
427430
self.on_data_received('request', response)
431+
432+
433+
class SDPProtocolResend(SDPProtocol):
434+
""" Protocol supporting resend of command and checking reply_pattern
435+
436+
This class implements a protocol to resend commands if reply does not align with reply_pattern
437+
438+
"""
439+
440+
def __init__(self, data_received_callback, name=None, **kwargs):
441+
442+
# init super, get logger
443+
super().__init__(data_received_callback, name, **kwargs)
444+
# get relevant plugin parameters
445+
self._send_retries = int(self._params.get(PLUGIN_ATTR_SEND_RETRIES) or 0)
446+
self._send_retries_cycle = int(self._params.get(PLUGIN_ATTR_SEND_RETRIES_CYCLE) or 1)
447+
self._sending = {}
448+
self._sending_retries = {}
449+
self._sending_lock = threading.Lock()
450+
451+
# tell someone about our actual class
452+
self.logger.debug(f'protocol initialized from {self.__class__.__name__}')
453+
454+
def on_connect(self, by=None):
455+
"""
456+
When connecting, remove resend scheduler first. If send_retries is set > 0, add new scheduler with given cycle
457+
"""
458+
super().on_connect(by)
459+
self.logger.info(f'connect called, resending queue is {self._sending}')
460+
if self._plugin.scheduler_get('resend'):
461+
self._plugin.scheduler_remove('resend')
462+
self._sending = {}
463+
if self._send_retries >= 1:
464+
self._plugin.scheduler_add('resend', self.resend, cycle=self._send_retries_cycle)
465+
self.logger.dbghigh(
466+
f"Adding resend scheduler with cycle {self._send_retries_cycle}.")
467+
468+
def on_disconnect(self, by=None):
469+
"""
470+
Remove resend scheduler on disconnect
471+
"""
472+
if self._plugin.scheduler_get('resend'):
473+
self._plugin.scheduler_remove('resend')
474+
self._sending = {}
475+
self.logger.info(f'disconnect called.')
476+
super().on_disconnect(by)
477+
478+
def _send(self, data_dict, **kwargs):
479+
"""
480+
Send data, possibly return response
481+
482+
:param data_dict: dict with raw data and possible additional parameters to send
483+
:type data_dict: dict
484+
:param kwargs: additional information needed for checking the reply_pattern
485+
:return: raw response data if applicable, None otherwise.
486+
"""
487+
self._store_commands(kwargs.get('resend_info'), data_dict)
488+
self.logger.debug(f'Sending {data_dict}, kwargs {kwargs}')
489+
return self._connection.send(data_dict, **kwargs)
490+
491+
def _store_commands(self, resend_info, data_dict):
492+
"""
493+
Store the command in _sending dict and the number of retries is _sending_retries dict
494+
495+
:param resend_info: dict with command, returnvalue and read_command
496+
:type resend_info: dict
497+
:param data_dict: dict with raw data and possible additional parameters to send
498+
:type data_dict: dict
499+
:param kwargs: additional information needed for checking the reply_pattern
500+
:return: False by default, True if returnvalue is given in resend_info
501+
:rtype: bool
502+
"""
503+
if resend_info is None:
504+
resend_info = {}
505+
else:
506+
resend_info['data_dict'] = data_dict
507+
if resend_info.get('returnvalue') is not None:
508+
self._sending.update({resend_info.get('command'): resend_info})
509+
if resend_info.get('command') not in self._sending_retries:
510+
self._sending_retries.update({resend_info.get('command'): 0})
511+
self.logger.debug(f'Saving {resend_info}, resending queue is {self._sending}')
512+
return True
513+
return False
514+
515+
def _check_reply(self, command, value):
516+
"""
517+
Check if the command is in _sending dict and if response is same as expected or not
518+
519+
:param command: name of command
520+
:type command: str
521+
:param value: value the command (item) should be set to
522+
:type value: str
523+
:return: False by default, True if received expected response
524+
:rtype: bool
525+
"""
526+
returnvalue = False
527+
if command in self._sending:
528+
with self._sending_lock:
529+
# getting current retries for current command
530+
retry = self._sending_retries.get(command)
531+
# compare the expected returnvalue with the received value after aligning the type of both values
532+
compare = self._sending[command].get('returnvalue')
533+
if type(compare)(value) == compare:
534+
# if received value equals expexted value, remove command from _sending dict
535+
self._sending.pop(command)
536+
self._sending_retries.pop(command)
537+
self.logger.debug(f'Got correct response for {command}, '
538+
f'removing from send. Resending queue is {self._sending}')
539+
returnvalue = True
540+
elif retry is not None and retry <= self._send_retries:
541+
# return False and log info if response is not the same as the expected response
542+
self.logger.debug(f'Should send again {self._sending}...')
543+
return returnvalue
544+
545+
def resend(self):
546+
"""
547+
Resend function that is scheduled with a given cycle.
548+
Send command again if response is not as expected and retries are < given retry parameter
549+
If expected response is not received after given retries, give up sending and query value by sending read_command
550+
"""
551+
if self._sending:
552+
self.logger.debug(f"Resending queue is {self._sending}, retries {self._sending_retries}")
553+
with self._sending_lock:
554+
remove_commands = []
555+
# Iterate through resend queue
556+
for command in list(self._sending.keys()):
557+
retry = self._sending_retries.get(command, 0)
558+
sent = True
559+
if retry < self._send_retries:
560+
self.logger.debug(f'Resending {command}, retries {retry}.')
561+
sent = self._send(self._sending[command].get("data_dict"))
562+
self._sending_retries[command] = retry + 1
563+
elif retry >= self._send_retries:
564+
sent = False
565+
if sent is False:
566+
remove_commands.append(command)
567+
self.logger.info(f"Giving up re-sending {command} after {retry} retries.")
568+
if self._sending[command].get("read_cmd") is not None:
569+
self.logger.info(f"Querying current value.")
570+
self._send(self._sending[command].get("read_cmd"))
571+
for command in remove_commands:
572+
self._sending.pop(command)
573+
self._sending_retries.pop(command)

0 commit comments

Comments
 (0)