Skip to content
Snippets Groups Projects
Commit d2c09e04 authored by fxk8y's avatar fxk8y :spider:
Browse files

Moar MQTT stuff

parent 1f573c10
No related branches found
No related tags found
No related merge requests found
from MQTT import Topic
from typing import Any, Callable from typing import Any, Callable
...@@ -6,6 +7,8 @@ from typing import Any, Callable ...@@ -6,6 +7,8 @@ from typing import Any, Callable
# TODO: dev is stalled! # TODO: dev is stalled!
# at first, we'll design easier things to not loose focus on the project # at first, we'll design easier things to not loose focus on the project
# Just use simple properties and triggers to get the same (write-only) behavior # Just use simple properties and triggers to get the same (write-only) behavior
#
# TODO ^2: use functools.wraps to copy the annotations of decorated functions
class MQTTProperty: class MQTTProperty:
def __init__(self, name: str = None, initialValue: Any = None, getConvert: Callable[[str], Any] = None, setConvert: Callable[[Any], str] = None, readOnly = False): def __init__(self, name: str = None, initialValue: Any = None, getConvert: Callable[[str], Any] = None, setConvert: Callable[[Any], str] = None, readOnly = False):
...@@ -55,48 +58,47 @@ class MQTTProperty: ...@@ -55,48 +58,47 @@ class MQTTProperty:
class MQTTTrigger: class MQTTTrigger:
# TODO: should we check message's type instead to decide if it is binary? evaluate!
def __init__(self, topic: str = None, message: str = None, binaryMessage: bytes = None): def __init__(self, topic: str = None, message: str = None, binaryMessage: bytes = None):
if topic is not None: if topic is not None:
self.topic = topic self.__topic = Topic(topic)
else: else:
raise AttributeError('topic is mandatory') # TODO: correct exception…? raise AttributeError('topic is mandatory') # TODO: correct exception…?
if binaryMessage is not None: if binaryMessage is not None:
self.message = binaryMessage self.__message = binaryMessage
elif message is not None: elif message is not None:
self.message = bytes(message, 'UTF-8') self.__message = bytes(message, 'UTF-8')
else: else:
self.message = b'' self.__message = b''
# ###################
# ### TODO: !!! ###
# ###################
def __call__(self, func) -> Callable[[Any], Any]:
self.__function = func
return self
class Device: class Device:
def __init__(self, torch, name, online = True): def __init__(self, torch, name: str, online: bool = True):
self._torch = torch self._torch = torch
self._name = name self.__name = name
self.__online = online self.__online = online
@property @property
def name(self): def name(self):
return self._name return self.__name
@property @property
def online(self): def online(self):
return self.__online return self.__online
@MQTTTrigger(topic='{NameSpace}/{DeviceName}/command', message='restart')
def restart(self): pass
class Extension: class Extension:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment