Skip to content
Snippets Groups Projects
Commit 2c238a0d authored by fxk8y's avatar fxk8y :spider:
Browse files

Merge branch 'master' of git.services.c3pb.de:fxk8y/CyanLight

parents 531175a8 9569116a
No related branches found
No related tags found
No related merge requests found
class MyDec:
def __init__(*args, **kwargs):
print(f'__init__ed with: {args} aaand: {kwargs}')
def __call__(self, *args, **kwargs):
print(f'called with: {args} aaand: {kwargs}')
if callable(args[0]):
print('used as decorated with arguments')
return self.execute
else:
print('used as decorator WITHOUT arguments (a long time ago ;) )')
self.execute()
def execute(*args, **kwargs):
print(f'executed with: {args} aaand: {kwargs}')
class C:
@MyDec
def decorated0(self, *args):
print(f'decorated0 called with: {args}')
return 'decorated0'
print('\n' + '-'*120 + '\n')
class D:
@MyDec(name='ctor-test')
def decorated42(self, *args):
print(f'decorated42 called with: {args}')
return 'decorated42'
c = C()
d = D()
from MQTT import Topic
from typing import Any, Callable
# TODO: dev is stalled!
# at first, we'll design easier things to not loose focus on the project
# Just use simple properties and triggers to get the same (write-only) behavior
#
# TODO ^2: use functools.wraps to copy the annotations of decorated functions
class MQTTProperty:
def __init__(self, name: str = None, initialValue: Any = None, getConvert: Callable[[str], Any] = None, setConvert: Callable[[Any], str] = None, readOnly = False):
self.__ro = readOnly
self.__value = initialValue
self.__mqtt = None
if getConvert is not None:
self.__name = getConvert.__name__
self.__getConvert = getConvert
else:
self.__getConvert = lambda x: x
if setConvert is not None:
self.__setConvert = setConvert
else:
self.__setConvert = lambda x: x
if name is not None:
self.__name = name
def __call__(self, getConvert: Callable[[str], Any]):
self.__getConvert = getConvert
if self.__name is not None:
self.__name = getConvert.__name__
def __get__(self, obj, cls):
if self.__value is not None:
return self.__value
else:
raise AttributeError('not in sync')
def __set__(self, obj, value):
if self.__ro: raise AttributeError(f'MQTTProperty[ {self.__name} ] is read-only')
# TODO: publish value!
def __del__(*ignored):
raise AttributeError('deletion not supported')
from inspect import signature
class MQTTTrigger:
# TODO: should we check message's type instead to decide if it is binary? evaluate!
def __init__(self, topic: str = None, message: str = None, binaryMessage: bytes = None):
if topic is not None:
self.__topic = Topic(topic)
else:
raise AttributeError('topic is mandatory') # TODO: correct exception…?
if binaryMessage is not None:
self.__message = binaryMessage
elif message is not None:
self.__message = bytes(message, 'UTF-8')
else:
self.__message = b''
# ###################
# ### TODO: !!! ###
# ###################
def __call__(self, func) -> Callable[[Any], Any]:
print(f'__call__ with {signature(func)}')
self.__function = func
return self
class Device:
def __init__(self, torch, name, online = True):
def __init__(self, torch, name: str, online: bool = True):
self._torch = torch
self._name = name
self.__name = name
self.__online = online
@property
def name(self):
return self._name
return self.__name
@property
def online(self):
return self.__online
@MQTTTrigger(topic='{NameSpace}/{DeviceName}/command', message='restart')
def restart(self): pass
class Extension:
......
from typing import Callable
import yaml
import json
import random
import logging
import paho.mqtt.client as mqtt
########################################
#### ####
#### Code from fxk8y/SpiderMQTT ####
#### ####
#### Will be replaced with import ####
#### when SpiderMQTT matures! ####
#### ####
########################################
# - Topic was taken as is
# - Message was taken as is (and will be debugged here!)
# - Executor was taken as is, but won't be used for threaded execution
# - SpiderMQTT will lose its request-feature, other bugs will be fixed here
class Topic(str):
"""A str-type for interaction with MQTT topics.
Behaves like a normal str object except:
- can't have leading or trailing slashes
- can't contain double slashes
- addition is only defined for adding other Topic() objects"""
def __new__(cls, data=''):
data = str(data)
if len(data) < 1:
raise ValueError('Topic cannot be constructed from empty str')
while '//' in data:
data = data.replace('//', '/')
if data.startswith('/'):
data = data[1:]
if data.endswith('/'):
data = data[:-1]
return super().__new__(cls, data)
def __add__(self, other):
if isinstance(other, Topic):
return Topic(str(self) + '/' + str(other))
else:
return NotImplemented
def containsWildcard(self):
return self.count('+') > 0 or self.count('#') > 0
def compare(self, other):
"""Compares two topics according to the MQTT specification
One argument may contain topic wildcards.
Can be called both as a class method or instance method.
Arguments:
self -- The Topic object itself when called as an instance method, or any Topic/str object in case of a class method call
other -- The other Topic or string to compare to"""
if (self.count('+') > 0 or self.count('#') > 0) and (other.count('+') > 0 or other.count('#') > 0):
raise ValueError('Only one topic may contain wildcards')
x_parts = self.split('/')
y_parts = other.split('/')
lx = len(x_parts)
ly = len(y_parts)
result = True
for i in range(min(lx, ly)):
x = x_parts[i]
y = y_parts[i]
if x == y:
continue
elif x == '+' or y == '+':
continue
elif x == '#' or y == '#':
return True
elif x != y:
return False
else:
if lx == ly:
return True
elif lx < ly and x_parts[-1] == '#':
return True
elif ly < lx and y_parts[-1] == '#':
return True
else:
return False
class Message:
"""Fancy MQTT message container
Implements various conversions to python-native types.
The conversion results are cached internally.
Every conversion method takes a parameter called default whose value is returned as-is.
Default values are not cached in case of conversion failure.
"""
class DEFAULT:
"""Marker object for caching failed conversions"""
pass
def __init__(self, topic: str, payload: bytearray):
self.cache = {}
self.topic = topic
self.payload = payload
def raw(self):
"""Get the raw payload as bytearray"""
return self.payload
def bool(self, default=False):
"""Coerce payload to bool
'true', 'yes', 'ja', and '1' are treated as True.
'false', 'no', 'nope', 'nein' and '0' are treated as False.
The conversion is case-insensitive."""
def convert():
payload = self.payload.lower()
if payload in ['true', 'yes', 'ja', '1']:
return True
elif payload in ['false', 'no', 'nope', 'nein', '0']:
return False
else:
return Message.DEFAULT
return self._getOrElseUpdate(bool, convert, default)
def int(self, default=0):
"""Coerce payload to int"""
def convert():
try:
return int(self.payload)
except:
return Message.DEFAULT
return self._getOrElseUpdate(int, convert, default)
def float(self, default=0.0):
"""Coerce payload to float"""
def convert():
try:
return float(self.payload)
except:
return Message.DEFAULT
return self._getOrElseUpdate(float, convert, default)
def complex(self, default=0j):
"""Coerce payload to complex"""
def convert():
try:
return complex(self.payload)
except:
return Message.DEFAULT
return self._getOrElseUpdate(complex, convert, default)
def str(self, default=''):
"""Decodes the payload as UTF-8"""
def convert():
try:
return self.payload.decode('utf-8')
except:
return Message.DEFAULT
return self._getOrElseUpdate(str, convert, default)
def json(self, default={}):
"""Parses the payload as a JSON object"""
def convert():
try:
return json.loads(self.payload.decode('utf-8'))
except:
return Message.DEFAULT
return self._getOrElseUpdate("json", convert, default)
def yaml(self, default={}):
"""Parses the payload as a YAML document"""
def convert():
try:
return yaml.safe_load(self.payload.decode('utf-8'))
except:
return Message.DEFAULT
return self._getOrElseUpdate("yaml", convert, default)
def _getOrElseUpdate(self, key, f, default):
if key in self.cache:
out = self.cache[key]
if out is Message.DEFAULT:
return default
else:
return out
else:
out = f()
self.cache[key] = out
if out is Message.DEFAULT:
return default
else:
return out
def __str__(self):
return 'Message[topic=\'{}\' payload=\'{}\']'.format(self.topic, self.str(default='<binary garbage>'))
class Executor:
__instance = None
def __new__(cls, *args, **kwargs):
if cls.__instance is None:
cls.__instance = super(Executor, cls).__new__(cls)
return cls.__instance
def __init__(self, callback, *args, **kwargs):
if isinstance(callback, (list, set)):
cbs = callback
else:
cbs = [callback]
for cb in cbs:
try:
cb(*args, **kwargs)
except Exception as ex:
pass # TODO: logging!!!
# TODO: Move into SpiderMQTT class!
class Subscription:
class ACTIVE: pass
class PENDING: pass
class CLEANUP: pass
def __init__(self, spider, topic: str, callback=None, subscribeCallback=None):
self.spider = spider
self.topic = Topic(topic)
self.state = self.PENDING
self.callbacks = set()
self.subscribeCallbacks = set()
if callback is not None:
self.addCallback(callback)
if subscribeCallback is not None:
self.addSubscribeCallback(subscribeCallback)
self.__sub()
def __sub(self):
if self.state is self.PENDING and self.spider.isConnected():
self.sub_mid = self.spider.mqtt.subscribe(self.topic, self.spider.sub_qos)[1]
def __unsub(self):
pass # TODO: implement!
def onSubscribe(self, mid):
print('onSubscribe[mid={}]: self.mid={} matching={}'.format(mid, self.sub_mid, self.sub_mid == mid))
if hasattr(self, 'sub_mid') and self.sub_mid == mid:
self.state = self.ACTIVE
del self.sub_mid
Executor(self.subscribeCallbacks)
def onMessage(self, msg: Message):
if self.topic.compare(msg.topic):
Executor(self.callbacks, msg)
def onConnect(self):
self.__sub()
def onDisconnect(self):
self.state = self.PENDING
def addCallback(self, callback: Callable[[Message], None]):
self.callbacks.add(callback)
def removeCallback(self, callback: Callable[[Message], None]):
if callback in self.callbacks:
self.callbacks.remove(callback)
if self.callbacks == set():
self.state = self.CLEANUP
self.__unsub()
def addSubscribeCallbacks(self, callback: Callable[[Message], None]):
self.subscribeCallbacks.add(callback)
if self.state is self.ACTIVE:
Executor(callback)
def removeSubscribeCallbacks(self, callback: Callable[[Message], None]):
if callback in self.subscribeCallbacks:
self.subscribeCallbacks.remove(callback)
class SpiderMQTT:
def __init__(self, broker: str = 'localhost', port: int = 1883, user: str = None, password: str = None, sub_qos: int = 0,
will_topic: str = None, will_payload = None, will_qos: int = 0, will_retain: bool = False, backgroundTask: bool = True) -> None:
"""backgroundTask: True for run in Task, False for run blocking. TODO: write proper docstring!!!!"""
self.sub_qos = sub_qos
self.connected = False
# self.requests = set()
self.subscriptions = {}
self.pendingMessages = []
logging.basicConfig(format='[{asctime:s}][{levelname:s}] {name:s} in {funcName:s}(): {message:s}', datefmt='%H:%M:%S %d.%m.%Y', style='{', level=logging.DEBUG)
self.log = logging.getLogger(__name__)
client_id = 'SpiderMQTT[{:X}]'.format(random.randint(0x100000000000, 0xFFFFFFFFFFFF))
self.mqtt = mqtt.Client(client_id=client_id, clean_session=True)
self.mqtt.enable_logger(self.log)
self.mqtt.reconnect_delay_set(1, 1)
if user is not None:
self.mqtt.username_pw_set(user, password)
if will_topic is not None:
self.mqtt.will_set(will_topic, will_payload, will_qos, will_retain)
# self.mqtt.on_log = self.__onLog
self.mqtt.on_message = self.__onMessage
self.mqtt.on_publish = self.__onPublish
self.mqtt.on_connect = self.__onConnect
self.mqtt.on_subscribe = self.__onSubscribe
self.mqtt.on_disconnect = self.__onDisconnect
self.mqtt.on_unsubscribe = self.__onUnSubscribe
self.mqtt.connect(broker, port)
if backgroundTask:
def _shutdown():
self.mqtt.loop_stop()
self.__shutdownFunc = _shutdown
self.mqtt.loop_start()
else:
def _shutdown():
self.running = False
self.__shutdownFunc = _shutdown
self.running = True
while self.running:
self.mqtt.loop()
def isConnected(self) -> bool:
return self.connected
def publish(self, topic, payload=None, qos=0, retain=False, prettyPrintYAML=False) -> None:
if isinstance(payload, str):
pl = payload.encode('utf-8')
elif isinstance(payload, (bool, int, float, complex)):
pl = str(payload).encode('utf-8')
elif isinstance(payload, (list, set, dict)):
pl = yaml.dump(payload, default_flow_style=not prettyPrintYAML).encode('utf-8')
else:
pl = payload
if self.isConnected():
self.mqtt.publish(topic, pl, qos, retain)
else:
msg = Message(topic, pl)
msg.qos = qos
msg.retain = retain
self.pendingMessages += [msg]
def __onMessage(self, _cl, _ud, msg):
message = Message(msg.topic, msg.payload)
for sub in self.subscriptions.values():
sub.onMessage(message)
def __onConnect(self, *ignored):
self.connected = True
for sub in self.subscriptions.values():
sub.onConnect()
for msg in self.pendingMessages:
msg.mid = self.mqtt.publish(msg.topic, msg.payload, msg.qos, msg.retain).mid
def __onDisconnect(self, *ignored):
self.connected = False
for sub in self.subscriptions.values():
sub.onDisconnect()
def __onSubscribe(self, _cl, _ud, mid, _gq):
for sub in self.subscriptions.values():
sub.onSubscribe(mid)
def __onUnSubscribe(self, _cl, _ud, mid):
for sub in self.subscriptions.values():
sub.onUnSubscribe(mid)
def __onPublish(self, _cl, _ud, mid):
for msg in self.pendingMessages:
if hasattr(msg, 'mid') and msg.mid == mid:
self.pendingMessages.remove(msg)
def __onLog():
pass
def addCallback(self, topic, callback, subscribeCallback=None):
if topic in self.subscriptions:
self.subscriptions[topic].addCallback(callback)
self.subscriptions[topic].addSubscribeCallback(callback)
else:
self.subscriptions[topic] = Subscription(self, topic, callback, subscribeCallback)
def subscribe(self, topic, callback):
self.addCallback(topic, callback)
def removeCallback(self, callback, subscribeCallback=None):
for sub in self.subscriptions:
sub.removeCallback(callback)
sub.removeSubscribeCallback(subscribeCallback)
if sub.callbacks == {}:
self.mqtt.unsubscribe(sub.topic)
del self.subscriptions[sub.topic]
def shutdown(self):
self.mqtt.disconnect()
self.__shutdownFunc()
mq = SpiderMQTT()
def callback(name):
def _cb(msg):
print(f'callback[ {name} ] called on topic[ {msg.topic} ] with message[ {msg.str()} ]')
return _cb
mq.addCallback('#', callback('#firehose#'))
mq.addCallback('+/+/+', callback('~TRIPLE~TOPIC~'))
mq.publish('1', 'EINS!')
mq.publish('1/2', 'eins… zwei… BÄM!')
mq.publish('1/2/3', 'uno, dos tres BUHH JA!')
from inspect import signature
class MQTTTrigger:
def __init__(self, message: str = None, binaryMessage: bytes = None):
self.__function = None
if binaryMessage is not None:
self.__message = binaryMessage
elif message is not None:
self.__message = bytes(message, 'UTF-8')
else:
self.__message = b''
# ###################
# ### TODO: !!! ###
# ###################
def __call__(self, func=None):
print(f'__CALL__: self is of type {type(self)} and evaluates to str[ {self} ]')
if func is not None:
print(f'storing function[ {signature(func)} ]')
self.__function = func
return self.__execute
else:
print(f'__call__: executing business logic :)')
print(f'__call__: self is of type {type(self)} and evaluates to str[ {self} ]')
if self.__function is not None:
return self.__function(self) # TODO: is this the right self we supply?!
def __execute(self):
print(f'__execute: executing business logic :)')
print(f'__execute: self is of type {type(self)} and evaluates to str[ {self} ]')
if self.__function is not None:
return self.__function(self) # TODO: is this the right self we supply?!
class TestDevice:
@MQTTTrigger(message='restart')
def restart(self): print(f'self[ {self} ] RESTART!!!')
OTA = MQTTTrigger(message='otää otää')
dev = TestDevice()
from __future__ import annotations
import struct
from threading import Thread
from inspect import signature
from socket import socket, AF_INET, SOCK_DGRAM
class DEFAULT:
class fxCyanDefaults:
port = 4213
header = b'fxCyanF'
@staticmethod
def header(suffix: str): return bytes(f'fxCyan{suffix}', 'ASCII')
listenAddr = '0.0.0.0'
......@@ -15,7 +20,7 @@ class Sender:
_socket = socket(AF_INET, SOCK_DGRAM)
def __init__(self, host: str, port: int = DEFAULT.port):
def __init__(self, host: str = 'localhost', port: int = fxCyanDefaults.port):
self._host = host
self._port = port
......@@ -34,7 +39,7 @@ class Sender:
return struct.pack('f' * len(channels), *channels)
@classmethod
def _sendUDP(cls, data: bytearray, host: str, port: int = DEFAULT.port):
def _sendUDP(cls, data: bytearray, host: str, port: int = fxCyanDefaults.port):
try:
cls._socket.sendto(data, (host, port))
except:
......@@ -42,8 +47,8 @@ class Sender:
pass
@classmethod
def sendTo(cls, channels: list, host: str, port: int = DEFAULT.port):
data = DEFAULT.header
def sendTo(cls, channels: list, host: str, port: int = fxCyanDefaults.port):
data = fxCyanDefaults.header('F')
data += Sender._convertData(channels)
cls._sendUDP(data, host, port)
......@@ -54,24 +59,56 @@ class Sender:
class Receiver:
def __init__(self, host: str = DEFAULT.listenAddr, port: int = DEFAULT.port):
def __init__(self, host: str = fxCyanDefaults.listenAddr, port: int = fxCyanDefaults.port):
self._socket = socket(AF_INET, SOCK_DGRAM)
self._socket.bind((host, port))
self._callbacks = Set()
self._callbacks = {}
t = Thread(target=self._recvLoop)
t.daemon = True
t.start()
def addCallback(self, callback) -> Receiver:
"""Register a new callback
The callback may take 2 parameters, but must take at least 1.
If it takes more than one it will get the sender's address too.
Signature should look like:
`def callback(channels, address): pass`
where channels takes a list of floats and adress will be the senders address.
The address parameter may be ommitted."""
params = len(signature(callback).parameters)
if params < 1:
raise AttributeError(f'callback must take at least param#[ 1 ], takes param#[ {params} ]')
else:
self._callbacks[callback] = params
return self
# TODO: write a decorator which copies se docstring from the other function
def __iadd__(self, callback) -> Receiver:
return self.addCallback(callback)
def removeCallback(self, callback) -> Receiver:
if callback in self._callbacks:
del self._callbacks[callback]
return self
# TODO: write a decorator which copies se docstring from the other function
def __isubb__(self, callback) -> Receiver:
return self.removeCallback(callback)
def _recvLoop(self):
header = DEFAULT.header
header = fxCyanDefaults.header('F')
headerLen = len(header)
while True:
data = self._socket.recv(2048)
data, addr = self._socket.recvfrom(2048)
if data.startswith(header):
data = data[ headerLen : ]
......@@ -82,7 +119,8 @@ class Receiver:
# TODO: logging…?
continue
_chs = struct.unpack('f' * len(data), data)
chCnt = int(len(data) / 4)
_chs = struct.unpack('f' * chCnt, data)
channels = []
for ch in _chs:
......@@ -90,10 +128,22 @@ class Receiver:
if ch < 0.0: ch = 0.0
channels += [ch]
for callback in self._callbacks:
for callback, params in self._callbacks.items():
try:
callback(channels)
if params == 1:
callback(channels)
elif params >= 2:
callback(channels, addr)
except:
# TODO: logging…?
pass
tx = Sender()
rx = Receiver()
def _rcv(ch, addr, something=None):
print(f'Got ch#[ {len(ch)} ] from host[ {addr} ] with data[ {ch} ]')
rx.addCallback(_rcv)
This diff is collapsed.
#!/usr/bin/env nix-shell
#!nix-shell -i sh -p mosquitto
mosquitto -c mosquitto.conf
......@@ -7,7 +7,7 @@ import random
from socket import *
fps = 3
fps = 4
pixels = 50
port = 4213
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment