diff --git a/.DS_Store b/.DS_Store index 5008ddf..ae33536 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f42e134 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 J0EK3R + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b7f8182 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "mkconnect" +version = "0.1.0" +description = "MouldKing Bluetooth hub connector" +readme = "README.md" +requires-python = ">=3.8" +license = {file = "LICENSE"} + +[tool.setuptools] +package-dir = {"" = "src"} +include-package-data = true + +[tool.setuptools.packages.find] +where = ["src"] + +[project.scripts] +mkconnect = "mkconnect.cli:main" \ No newline at end of file diff --git a/src/.DS_Store b/src/.DS_Store new file mode 100644 index 0000000..68eb586 Binary files /dev/null and b/src/.DS_Store differ diff --git a/src/mkconnect.egg-info/PKG-INFO b/src/mkconnect.egg-info/PKG-INFO new file mode 100644 index 0000000..846cd82 --- /dev/null +++ b/src/mkconnect.egg-info/PKG-INFO @@ -0,0 +1,149 @@ +Metadata-Version: 2.4 +Name: mkconnect +Version: 0.1.0 +Summary: MouldKing Bluetooth hub connector +License: MIT License + + Copyright (c) 2024 J0EK3R + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. + +Requires-Python: >=3.8 +Description-Content-Type: text/markdown +License-File: LICENSE +Dynamic: license-file + +# mkconnect-python +...a bit of code to connect to MouldKing Bluetooth Hubs in python. + +# MouldKing Hubs +## MouldKing 6.0 Hub +The MouldKing 6.0 Hub has two modes: +* RC-Mode to be controlled with a MouldKing remotecontrol +* Bluetooth-Mode to be controlled with an app + +You can control a maximum of three MK6.0 Hubs at the same time with bluetooth. +> (Currently this project can send only one advertising telegram the same time - so only one hub can be controlled, all others will go in timeout-mode till next telegram with their device address is sent.) + +## Setting the address of the Hub +To switch the Hub's device address to the next one (device 0, device 1, device 2) just press the button on the hub. + +> i.E.: If the script runs with **mkcontrol(2,0,1)** (-> device2, channel0, full speed forward) and nothing is happening, you have to short-press the button, perhaps again...) + +# usage +Start the script [consoletest.py](https://github.com/J0EK3R/mkconnect-python/blob/main/consoletest.py) on your raspberry: +``` +pi@devpi:~/dev/mkconnect-python $ sudo python -i consoletest.py +``` + +## mkbtstop() - stop bluetooth advertising +``` +Ready to execute commands + +>>> mkbtstop() +``` + +## mkconnect() - switch hubs in bluetooth mode +If you power-on the hubs they will listen to telegrams to the **first device by default**. +Call mkconnect() to switch all hubs in Bluetooth mode. +By short-pressing the button on MK6.0 Hubs you can choose the hubId: +* hubId=0 - one Led flash +* hubId=1 - two Led flashs +* hubId=2 - three Led flashs +``` +Ready to execute commands + +>>> mkconnect() +``` + +## mkcontrol(deviceId, channel, power and powerAndDirection) +i.E.: mkcontrol(0, 0, 1) - on first device (deviceId=0) run channel A (channel=0) with fullspeed (powerAndDirection=1) +``` +Ready to execute commands + +>>> mkcontrol(0, 0, 1) +``` + +## mkstop(deviceId) +Set all channels of device to zero +``` +Ready to execute commands + +>>> mkstop(0) +``` + +--- +# old stuff + +There is a testscript [consoletest.py](https://github.com/J0EK3R/mkconnect-python/blob/main/consoletest.py) where (on raspberry pi) **hcitool** is used to advertise telegrams over bluetooth. + +Maybe you habe to **sudo** the command: +``` +pi@devpi:~/dev/mkconnect-python $ sudo python -i consoletest.py + +Ready to execute commands + +For connecting: mkconnect(hubId) ex: mkconnect(0) or mkconnect(1) for the second hub + + Available commands: mkconnect(hubId) + mkstop(hubId) + mkcontrol(deviceId, channel, powerAndDirection) + +ex: mkcontrol(0, 0, 0.5) ; mkcontrol(0, 'B', -1) + the minus sign - indicate reverse motor direction +``` + + + + + +Just look in [main.py](https://github.com/J0EK3R/mkconnect-python/blob/main/main.py) for current usage... + +Current output in [https://wokwi.com/projects/new/micropython-pi-pico](https://wokwi.com/projects/398314618803830785) +...looks very good! :) +``` +connect-telegram +rawdata: 6d 7b a7 80 80 80 80 92 +crypted: 6d b6 43 cf 7e 8f 47 11 88 66 59 38 d1 7a aa 26 49 5e 13 14 15 16 17 18 + +stop-telegram +rawdata: 61 7b a7 80 80 80 80 80 80 9e +crypted: 6d b6 43 cf 7e 8f 47 11 84 66 59 38 d1 7a aa 34 67 4a 55 bf 15 16 17 18 + +C1: fullspeed forwards +rawdata: 61 7b a7 ff 80 80 80 80 80 9e +crypted: 6d b6 43 cf 7e 8f 47 11 84 66 59 47 d1 7a aa 34 67 4a ed b7 15 16 17 18 + +C1: halfspeed forwards +rawdata: 61 7b a7 bf 80 80 80 80 80 9e +crypted: 6d b6 43 cf 7e 8f 47 11 84 66 59 07 d1 7a aa 34 67 4a eb 70 15 16 17 18 + +C1: halfspeed backwards +rawdata: 61 7b a7 40 80 80 80 80 80 9e +crypted: 6d b6 43 cf 7e 8f 47 11 84 66 59 f8 d1 7a aa 34 67 4a 4e fe 15 16 17 18 + +C2: halfspeed backwards +rawdata: 61 7b a7 40 40 80 80 80 80 9e +crypted: 6d b6 43 cf 7e 8f 47 11 84 66 59 f8 11 7a aa 34 67 4a 3d f9 15 16 17 18 +MicroPython v1.22.0 on 2023-12-27; Raspberry Pi Pico with RP2040 +Type "help()" for more information. +>>> +raw REPL; CTRL-B to exit +> +``` diff --git a/src/mkconnect.egg-info/SOURCES.txt b/src/mkconnect.egg-info/SOURCES.txt new file mode 100644 index 0000000..37eac76 --- /dev/null +++ b/src/mkconnect.egg-info/SOURCES.txt @@ -0,0 +1,41 @@ +LICENSE +README.md +pyproject.toml +src/mkconnect/__init__.py +src/mkconnect/__main__.py +src/mkconnect/cli.py +src/mkconnect.egg-info/PKG-INFO +src/mkconnect.egg-info/SOURCES.txt +src/mkconnect.egg-info/dependency_links.txt +src/mkconnect.egg-info/entry_points.txt +src/mkconnect.egg-info/top_level.txt +src/mkconnect/advertiser/Advertiser.py +src/mkconnect/advertiser/AdvertiserBTMgmt.py +src/mkconnect/advertiser/AdvertiserBTSocket.py +src/mkconnect/advertiser/AdvertiserDummy.py +src/mkconnect/advertiser/AdvertiserHCITool.py +src/mkconnect/advertiser/AdvertiserMicroPython.py +src/mkconnect/advertiser/AdvertisingDevice.py +src/mkconnect/advertiser/IAdvertiser.py +src/mkconnect/advertiser/IAdvertisingDevice.py +src/mkconnect/advertiser/__init__.py +src/mkconnect/btsocket/__init__.py +src/mkconnect/btsocket/btmgmt_callback.py +src/mkconnect/btsocket/btmgmt_protocol.py +src/mkconnect/btsocket/btmgmt_socket.py +src/mkconnect/btsocket/btmgmt_sync.py +src/mkconnect/btsocket/tools.py +src/mkconnect/examples/consoletest.py +src/mkconnect/examples/main.py +src/mkconnect/mouldking/MouldKing.py +src/mkconnect/mouldking/MouldKingCrypt.py +src/mkconnect/mouldking/MouldKingHub.py +src/mkconnect/mouldking/MouldKingHub_Byte.py +src/mkconnect/mouldking/MouldKingHub_Nibble.py +src/mkconnect/mouldking/MouldKing_Hub_4.py +src/mkconnect/mouldking/MouldKing_Hub_6.py +src/mkconnect/mouldking/MouldKing_Hubs_4_12Ch.py +src/mkconnect/mouldking/__init__.py +src/mkconnect/tracer/Tracer.py +src/mkconnect/tracer/TracerConsole.py +src/mkconnect/tracer/__init__.py \ No newline at end of file diff --git a/src/mkconnect.egg-info/dependency_links.txt b/src/mkconnect.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/mkconnect.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/src/mkconnect.egg-info/entry_points.txt b/src/mkconnect.egg-info/entry_points.txt new file mode 100644 index 0000000..9bb90d3 --- /dev/null +++ b/src/mkconnect.egg-info/entry_points.txt @@ -0,0 +1,2 @@ +[console_scripts] +mkconnect = mkconnect.cli:main diff --git a/src/mkconnect.egg-info/top_level.txt b/src/mkconnect.egg-info/top_level.txt new file mode 100644 index 0000000..8bef19a --- /dev/null +++ b/src/mkconnect.egg-info/top_level.txt @@ -0,0 +1 @@ +mkconnect diff --git a/src/mkconnect/__init__.py b/src/mkconnect/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/mkconnect/__main__.py b/src/mkconnect/__main__.py new file mode 100644 index 0000000..6b1df52 --- /dev/null +++ b/src/mkconnect/__main__.py @@ -0,0 +1,3 @@ +from .cli import main +if __name__ == "__main__": + raise SystemExit(main()) \ No newline at end of file diff --git a/src/mkconnect/__pycache__/__init__.cpython-313.pyc b/src/mkconnect/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..356ef03 Binary files /dev/null and b/src/mkconnect/__pycache__/__init__.cpython-313.pyc differ diff --git a/src/mkconnect/__pycache__/cli.cpython-313.pyc b/src/mkconnect/__pycache__/cli.cpython-313.pyc new file mode 100644 index 0000000..84aa2fb Binary files /dev/null and b/src/mkconnect/__pycache__/cli.cpython-313.pyc differ diff --git a/src/mkconnect/advertiser/Advertiser.py b/src/mkconnect/advertiser/Advertiser.py new file mode 100644 index 0000000..7a31661 --- /dev/null +++ b/src/mkconnect/advertiser/Advertiser.py @@ -0,0 +1,108 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +import sys + +if (sys.platform == 'rp2'): + import _thread as thread +else: + import threading as thread + +from .IAdvertiser import IAdvertiser +from .IAdvertisingDevice import IAdvertisingDevice + +from ..tracer.Tracer import Tracer + +class Advertiser(IAdvertiser) : + """ + This is the BaseClass for all Advertiser classes. + It implements the interface IAdvertiser. + """ + + def __init__(self): + """ + initializes the object and defines the member fields + """ + self._tracer = None + + # dictionary to administer the registered AdvertisingDevices. + # * key is the instance of the AdvertisingDevice + # * value is the AdvertisementIdentifier of the AdvertisingDevice + self._registeredDeviceTable = dict() + + if (sys.platform == 'rp2'): + self._registeredDeviceTable_Lock = thread.allocate_lock() + else: + self._registeredDeviceTable_Lock = thread.Lock() + return + + + def SetTracer(self, tracer: Tracer) -> Tracer: + """ + set tracer object + """ + self._tracer = tracer + return tracer + + + def AdvertisementStop(self) -> None: + """ + stop bluetooth advertising for the Advertiser + """ + return + + + def TryRegisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool: + """ + try to register the given AdvertisingDevice + * returns True if the AdvertisingDevice was registered successfully + * returns False if the AdvertisingDevice wasn't registered successfully (because it still was registered) + """ + if(advertisingDevice is None): + return False + + try: + # acquire lock for table + #self._registeredDeviceTable_Lock.acquire(blocking=True) + self._registeredDeviceTable_Lock.acquire() + + if(advertisingDevice in self._registeredDeviceTable): + return False + else: + self._registeredDeviceTable[advertisingDevice] = advertisingDevice.GetAdvertisementIdentifier() + return True + finally: + # release lock for table + self._registeredDeviceTable_Lock.release() + + + def TryUnregisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool: + """ + try to unregister the given AdvertisingDevice + * returns True if the AdvertisingDevice was unregistered successfully + * returns False if the AdvertisingDevice wasn't unregistered successfully + """ + if(advertisingDevice is None): + return False + + try: + # acquire lock for table + #self._registeredDeviceTable_Lock.acquire(blocking=True) + self._registeredDeviceTable_Lock.acquire() + + if(advertisingDevice in self._registeredDeviceTable): + self._registeredDeviceTable.pop(advertisingDevice) + return True + else: + return False + finally: + # release lock for table + self._registeredDeviceTable_Lock.release() + + + def AdvertisementDataSet(self, advertisementIdentifier: str, manufacturerId: bytes, rawdata: bytes) -> None: + """ + Sets Advertisement-Data for a specific AdvertisementIdentifier + This Methode has to be overridden by the implementation of the AdvertisingDevice! + """ + return diff --git a/src/mkconnect/advertiser/AdvertiserBTMgmt.py b/src/mkconnect/advertiser/AdvertiserBTMgmt.py new file mode 100644 index 0000000..92c9a51 --- /dev/null +++ b/src/mkconnect/advertiser/AdvertiserBTMgmt.py @@ -0,0 +1,277 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from ..tracer.Tracer import Tracer +from .IAdvertisingDevice import IAdvertisingDevice +from .Advertiser import Advertiser + +import subprocess +import threading +import time + +class AdvertiserBTMgmt(Advertiser) : + """ + baseclass + """ + + # protected static field + _BTMgmt_path = '/usr/bin/btmgmt' + + # Number of repetitions per second + _RepetitionsPerSecond = 4 + + def __init__(self): + """ + initializes the object and defines the member fields + """ + + super().__init__() # call baseclass + + self._advertisement_thread_Run = False + self._advertisement_thread = None + self._advertisement_thread_Lock = threading.Lock() + + # Table + # * key: AdvertisementIdentifier + # * value: advertisement-command for the call of btmgmt tool + self._advertisementTable_thread_Lock = threading.Lock() + self._advertisementTable = dict() + + self._lastSetAdvertisementCommand = None + + return + + + def TryRegisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool: + """ + try to register the given AdvertisingDevice + * returns True if the AdvertisingDevice was registered successfully + * returns False if the AdvertisingDevice wasn't registered successfully (because it still was registered) + """ + result = super().TryRegisterAdvertisingDevice(advertisingDevice) + + # AdvertisingDevice was registered successfully in baseclass + if(result): + # register AdvertisindIdentifier -> only registered AdvertisindIdentifier will be sent + advertisementIdentifier = advertisingDevice.GetAdvertisementIdentifier() + self._RegisterAdvertisementIdentifier(advertisementIdentifier) + + return result + + + def TryUnregisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool: + """ + try to unregister the given AdvertisingDevice + * returns True if the AdvertisingDevice was unregistered successfully + * returns False if the AdvertisingDevice wasn't unregistered successfully + """ + result = super().TryUnregisterAdvertisingDevice(advertisingDevice) + + # AdvertisingDevice was unregistered successfully in baseclass + if(result): + # unregister AdvertisementIdentifier to remove from publishing + advertisementIdentifier = advertisingDevice.GetAdvertisementIdentifier() + self._UnregisterAdvertisementIdentifier(advertisementIdentifier) + + return result + + + def AdvertisementStop(self) -> None: + """ + stop bluetooth advertising + """ + + # stop publishing thread + self._advertisement_thread_Run = False + if(self._advertisement_thread is not None): + self._advertisement_thread.join() + self._advertisement_thread = None + + #self._advertisementTable.clear() + + advertisementCommand = self._BTMgmt_path + ' rm-adv 1' + ' &> /dev/null' + subprocess.run(advertisementCommand, shell=True, executable="/bin/bash") + + if (self._tracer is not None): + self._tracer.TraceInfo('AdvertiserBTMgmnt.AdvertisementStop') + self._tracer.TraceInfo(advertisementCommand) + + return + + + def _RegisterAdvertisementIdentifier(self, advertisementIdentifier: str) -> None: + """ + Register AdvertisementIdentifier + """ + + try: + self._advertisementTable_thread_Lock.acquire(blocking=True) + + if(not advertisementIdentifier in self._advertisementTable): + self._advertisementTable[advertisementIdentifier] = None + finally: + self._advertisementTable_thread_Lock.release() + + return + + + def _UnregisterAdvertisementIdentifier(self, advertisementIdentifier: str) -> None: + """ + Unregister AdvertisementIdentifier + """ + try: + self._registeredDeviceTable_Lock.acquire(blocking=True) + + foundAdvertisementIdentifier = False + + # there are devices wich share the same AdvertisementIdentifier + # check if AdvertisementIdentifier is still present + for currentAdvertisementIdentifier in self._registeredDeviceTable.values(): + if(currentAdvertisementIdentifier == advertisementIdentifier): + foundAdvertisementIdentifier = True + break + + if(not foundAdvertisementIdentifier): + self._RemoveAdvertisementIdentifier(advertisementIdentifier) + + finally: + self._registeredDeviceTable_Lock.release() + return + + def _RemoveAdvertisementIdentifier(self, advertisementIdentifier: str) -> None: + """ + Remove AdvertisementIdentifier + """ + + try: + self._advertisementTable_thread_Lock.acquire(blocking=True) + + if(advertisementIdentifier in self._advertisementTable): + self._advertisementTable.pop(advertisementIdentifier) + + finally: + self._advertisementTable_thread_Lock.release() + + if(len(self._advertisementTable) == 0): + self.AdvertisementStop() + + return + + def AdvertisementDataSet(self, advertisementIdentifier: str, manufacturerId: bytes, rawdata: bytes) -> None: + """ + Set Advertisement data + """ + + try: + self._advertisementTable_thread_Lock.acquire(blocking=True) + + # only registered AdvertisementIdentifier are handled + if(advertisementIdentifier in self._advertisementTable): + advertisementCommand = self._BTMgmt_path + ' add-adv -d ' + self._CreateTelegramForBTMgmmt(manufacturerId, rawdata) + ' --general-discov 1' + ' &> /dev/null' + self._advertisementTable[advertisementIdentifier] = advertisementCommand + + # for quick change handle immediately + timeSlot = self._CalcTimeSlot() + self._Advertise(advertisementCommand, timeSlot) + finally: + self._advertisementTable_thread_Lock.release() + + # start publish thread if necessary + if(not self._advertisement_thread_Run): + self._advertisement_thread = threading.Thread(target=self._publish) + self._advertisement_thread.daemon = True + self._advertisement_thread.start() + self._advertisement_thread_Run = True + + if (self._tracer is not None): + self._tracer.TraceInfo('AdvertiserBTMgmnt.AdvertisementSet') + + return + + + def _publish(self) -> None: + """ + publishing loop + """ + + if (self._tracer is not None): + self._tracer.TraceInfo('AdvertiserBTMgmnt._publish') + + # loop while field is True + while(self._advertisement_thread_Run): + try: + try: + self._advertisementTable_thread_Lock.acquire(blocking=True) + + # make a copy of the table to release the lock as quick as possible + copy_of_advertisementTable = self._advertisementTable.copy() + + # calc time for one publishing slot + timeSlot = self._CalcTimeSlot() + finally: + self._advertisementTable_thread_Lock.release() + + if(len(copy_of_advertisementTable) == 0): + pass + else: + for key, advertisementCommand in copy_of_advertisementTable.items(): + # stop publishing? + if(not self._advertisement_thread_Run): + return + + self._Advertise(advertisementCommand, timeSlot) + except: + pass + + + def _CalcTimeSlot(self) -> float: + """ + Calculates the timespan in seconds for each timeslot + """ + + # timeSlot = 1 second / repetitionsPerSecond / len(self._advertisementTable) + timeSlot = 1 / self._RepetitionsPerSecond / max(1, len(self._advertisementTable)) + return timeSlot + + + def _Advertise(self, advertisementCommand: str, timeSlot: float) -> None: + """ + calls the btmgmt tool as subprocess + """ + + try: + self._advertisement_thread_Lock.acquire(blocking=True) + timeStart = time.time() + + if (self._lastSetAdvertisementCommand != advertisementCommand): + self._lastSetAdvertisementCommand = advertisementCommand + + subprocess.run(advertisementCommand, shell=True, executable="/bin/bash") + + timeEnd = time.time() + timeDelta = timeEnd - timeStart + timeSlotRemain = max(0.001, timeSlot - timeDelta) + + # stop publishing? + if(self._advertisement_thread_Run): + time.sleep(timeSlotRemain) + finally: + self._advertisement_thread_Lock.release() + + return + + def _CreateTelegramForBTMgmmt(self, manufacturerId: bytes, rawDataArray: bytes) -> str: + """ + Create input data for btmgmt + """ + rawDataArrayLen = len(rawDataArray) + + resultArray = bytearray(4 + rawDataArrayLen) + resultArray[0] = rawDataArrayLen + 3 # len + resultArray[1] = 0xFF # type manufacturer specific + resultArray[2] = manufacturerId[1] # companyId + resultArray[3] = manufacturerId[0] # companyId + for index in range(rawDataArrayLen): + resultArray[index + 4] = rawDataArray[index] + + return ''.join(f'{x:02x}' for x in resultArray) diff --git a/src/mkconnect/advertiser/AdvertiserBTSocket.py b/src/mkconnect/advertiser/AdvertiserBTSocket.py new file mode 100644 index 0000000..541c2a3 --- /dev/null +++ b/src/mkconnect/advertiser/AdvertiserBTSocket.py @@ -0,0 +1,500 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +import sys +import enum +import threading +import time + +from ..btsocket import btmgmt_socket +from ..btsocket import btmgmt_protocol + +from ..tracer.Tracer import Tracer + +from .Advertiser import Advertiser, IAdvertisingDevice + +class AdvertiserBTSocket(Advertiser) : + """ + baseclass + """ + + class Flags(enum.IntEnum): + CONNECTABLE = enum.auto() + GENERAL_DISCOVERABLE = enum.auto() + LIMITED_DISCOVERABLE = enum.auto() + FLAGS_IN_ADV_DATA = enum.auto() + TX_IN_ADV_DATA = enum.auto() + APPEARANCE_IN_ADV_DATA = enum.auto() + LOCAL_NAME_IN_ADV_DATA = enum.auto() + PHY_LE_1M = enum.auto() + PHY_LE_2M = enum.auto() + PHY_LE_CODED = enum.auto() + + # Number of repetitions per second + _RepetitionsPerSecond = 4 + + def __init__(self): + """ + initializes the object and defines the member fields + """ + super().__init__() # call baseclass + + self._advertisement_thread_Run = False + self._advertisement_thread = None + self._advertisement_thread_Lock = threading.Lock() + + # Table + # * key: AdvertisementIdentifier + # * value: advertisement-command for the call of btmgmt tool + self._advertisementTable_thread_Lock = threading.Lock() + self._advertisementTable = dict() + + self._lastSetAdvertisementCommand = None + + self.sock = btmgmt_socket.open() + + return + + + def TryRegisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool: + """ + try to register the given AdvertisingDevice + * returns True if the AdvertisingDevice was registered successfully + * returns False if the AdvertisingDevice wasn't registered successfully (because it still was registered) + """ + result = super().TryRegisterAdvertisingDevice(advertisingDevice) + + # AdvertisingDevice was registered successfully in baseclass + if(result): + # register AdvertisindIdentifier -> only registered AdvertisindIdentifier will be sent + advertisementIdentifier = advertisingDevice.GetAdvertisementIdentifier() + self._RegisterAdvertisementIdentifier(advertisementIdentifier) + + return result + + + def TryUnregisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool: + """ + try to unregister the given AdvertisingDevice + * returns True if the AdvertisingDevice was unregistered successfully + * returns False if the AdvertisingDevice wasn't unregistered successfully + """ + result = super().TryUnregisterAdvertisingDevice(advertisingDevice) + + # AdvertisingDevice was unregistered successfully in baseclass + if(result): + # unregister AdvertisementIdentifier to remove from publishing + advertisementIdentifier = advertisingDevice.GetAdvertisementIdentifier() + self._UnregisterAdvertisementIdentifier(advertisementIdentifier) + + return result + + + def AdvertisementStop(self) -> None: + """ + stop bluetooth advertising + """ + + # stop publishing thread + self._advertisement_thread_Run = False + if(self._advertisement_thread is not None): + self._advertisement_thread.join() + self._advertisement_thread = None + + advertisementCommand = AdvertiserBTSocket._create_rm_advert_command(1) + self.sock.send(advertisementCommand) + + # if (self._tracer is not None): + # self._tracer.TraceInfo('AdvertiserBTMgmnt.AdvertisementStop') + # self._tracer.TraceInfo(advertisementCommand) + + return + + + def _RegisterAdvertisementIdentifier(self, advertisementIdentifier: str) -> None: + """ + Register AdvertisementIdentifier + """ + + try: + self._advertisementTable_thread_Lock.acquire(blocking=True) + + if(not advertisementIdentifier in self._advertisementTable): + self._advertisementTable[advertisementIdentifier] = None + finally: + self._advertisementTable_thread_Lock.release() + + return + + + def _UnregisterAdvertisementIdentifier(self, advertisementIdentifier: str) -> None: + """ + Unregister AdvertisementIdentifier + """ + try: + self._registeredDeviceTable_Lock.acquire(blocking=True) + + foundAdvertisementIdentifier = False + + # there are devices wich share the same AdvertisementIdentifier + # check if AdvertisementIdentifier is still present + for currentAdvertisementIdentifier in self._registeredDeviceTable.values(): + if(currentAdvertisementIdentifier == advertisementIdentifier): + foundAdvertisementIdentifier = True + break + + if(not foundAdvertisementIdentifier): + self._RemoveAdvertisementIdentifier(advertisementIdentifier) + + finally: + self._registeredDeviceTable_Lock.release() + return + + def _RemoveAdvertisementIdentifier(self, advertisementIdentifier: str) -> None: + """ + Remove AdvertisementIdentifier + """ + + try: + self._advertisementTable_thread_Lock.acquire(blocking=True) + + if(advertisementIdentifier in self._advertisementTable): + self._advertisementTable.pop(advertisementIdentifier) + + finally: + self._advertisementTable_thread_Lock.release() + + if(len(self._advertisementTable) == 0): + self.AdvertisementStop() + + return + + def AdvertisementDataSet(self, advertisementIdentifier: str, manufacturerId: bytes, rawdata: bytes) -> None: + """ + Set Advertisement data + """ + + try: + self._advertisementTable_thread_Lock.acquire(blocking=True) + + # only registered AdvertisementIdentifier are handled + if(advertisementIdentifier in self._advertisementTable): + # advertisementCommand = self._BTMgmt_path + ' add-adv -d ' + self._CreateTelegramForBTMgmmt(manufacturerId, rawdata) + ' --general-discov 1' + ' &> /dev/null' + advertisingData = self._CreateAdvertisingDataString(manufacturerId, rawdata) + + advertisementCommand = AdvertiserBTSocket._create_add_advert_command( + instance_id=1, + flags=AdvertiserBTSocket.Flags.GENERAL_DISCOVERABLE, + duration=0x00, # zero means use default + timeout=0x00, # zero means use default + #adv_data='1bfff0ff6DB643CF7E8F471188665938D17AAA26495E131415161718', + adv_data=advertisingData, + scan_rsp='', + ) + self._advertisementTable[advertisementIdentifier] = advertisementCommand + + # for quick change handle immediately + timeSlot = self._CalcTimeSlot() + self._Advertise(advertisementCommand, timeSlot) + finally: + self._advertisementTable_thread_Lock.release() + + # start publish thread if necessary + if(not self._advertisement_thread_Run): + self._advertisement_thread = threading.Thread(target=self._publish) + self._advertisement_thread.daemon = True + self._advertisement_thread.start() + self._advertisement_thread_Run = True + + if (self._tracer is not None): + self._tracer.TraceInfo('AdvertiserBTMgmnt.AdvertisementSet') + + return + + + def _publish(self) -> None: + """ + publishing loop + """ + + if (self._tracer is not None): + self._tracer.TraceInfo('AdvertiserBTMgmnt._publish') + + # loop while field is True + while(self._advertisement_thread_Run): + try: + try: + self._advertisementTable_thread_Lock.acquire(blocking=True) + + # make a copy of the table to release the lock as quick as possible + copy_of_advertisementTable = self._advertisementTable.copy() + + # calc time for one publishing slot + timeSlot = self._CalcTimeSlot() + finally: + self._advertisementTable_thread_Lock.release() + + if(len(copy_of_advertisementTable) == 0): + pass + else: + for key, advertisementCommand in copy_of_advertisementTable.items(): + # stop publishing? + if(not self._advertisement_thread_Run): + return + + self._Advertise(advertisementCommand, timeSlot) + except: + pass + + + def _CalcTimeSlot(self) -> float: + """ + Calculates the timespan in seconds for each timeslot + """ + + # timeSlot = 1 second / repetitionsPerSecond / len(self._advertisementTable) + timeSlot = 1 / self._RepetitionsPerSecond / max(1, len(self._advertisementTable)) + return timeSlot + + + def _Advertise(self, advertisementCommand: str, timeSlot: float) -> None: + """ + calls the btmgmt tool as subprocess + """ + + try: + self._advertisement_thread_Lock.acquire(blocking=True) + timeStart = time.time() + + if (self._lastSetAdvertisementCommand != advertisementCommand): + self._lastSetAdvertisementCommand = advertisementCommand + + # self.loop.call_soon(self.sock.send, advertisementCommand) + self.sock.send(advertisementCommand) + + timeEnd = time.time() + timeDelta = timeEnd - timeStart + timeSlotRemain = max(0.001, timeSlot - timeDelta) + + # stop publishing? + if(self._advertisement_thread_Run): + time.sleep(timeSlotRemain) + finally: + self._advertisement_thread_Lock.release() + + return + + def _CreateAdvertisingDataString(self, manufacturerId: bytes, rawDataArray: bytes) -> str: + """ + Create input data for btmgmt + """ + rawDataArrayLen = len(rawDataArray) + + resultArray = bytearray(4 + rawDataArrayLen) + resultArray[0] = rawDataArrayLen + 3 # len + resultArray[1] = 0xFF # type manufacturer specific + resultArray[2] = manufacturerId[1] # companyId + resultArray[3] = manufacturerId[0] # companyId + for index in range(rawDataArrayLen): + resultArray[index + 4] = rawDataArray[index] + + return ''.join(f'{x:02x}' for x in resultArray) + + @staticmethod + def _little_bytes(value, size_of): + return int(value).to_bytes(size_of, byteorder='little') + + @staticmethod + def _create_add_advert_command(instance_id, flags, duration, timeout, adv_data, scan_rsp): + """ Add Advertising Command + + Command Code: 0x003e + Controller Index: + Command Parameters: Instance (1 Octet) + Flags (4 Octets) + Duration (2 Octets) + Timeout (2 Octets) + Adv_Data_Len (1 Octet) + Scan_Rsp_Len (1 Octet) + Adv_Data (0-255 Octets) + Scan_Rsp (0-255 Octets) + Return Parameters: Instance (1 Octet) + + This command is used to configure an advertising instance that + can be used to switch a Bluetooth Low Energy controller into + advertising mode. + + Added advertising information with this command will not be visible + immediately if advertising is enabled via the Set Advertising + command. The usage of the Set Advertising command takes precedence + over this command. Instance information is stored and will be + advertised once advertising via Set Advertising has been disabled. + + The Instance identifier is a value between 1 and the number of + supported instances. The value 0 is reserved. + + With the Flags value the type of advertising is controlled and + the following flags are defined: + + 0 Switch into Connectable mode + 1 Advertise as Discoverable + 2 Advertise as Limited Discoverable + 3 Add Flags field to Adv_Data + 4 Add TX Power field to Adv_Data + 5 Add Appearance field to Scan_Rsp + 6 Add Local Name in Scan_Rsp + 7 Secondary Channel with LE 1M + 8 Secondary Channel with LE 2M + 9 Secondary Channel with LE Coded + + When the connectable flag is set, then the controller will use + undirected connectable advertising. The value of the connectable + setting can be overwritten this way. This is useful to switch a + controller into connectable mode only for LE operation. This is + similar to the mode 0x02 from the Set Advertising command. + + When the connectable flag is not set, then the controller will + use advertising based on the connectable setting. When using + non-connectable or scannable advertising, the controller will + be programmed with a non-resolvable random address. When the + system is connectable, then the identity address or resolvable + private address will be used. + + Using the connectable flag is useful for peripheral mode support + where BR/EDR (and/or LE) is controlled by Add Device. This allows + making the peripheral connectable without having to interfere + with the global connectable setting. + + If Scan_Rsp_Len is zero and connectable flag is not set and + the global connectable setting is off, then non-connectable + advertising is used. If Scan_Rsp_Len is larger than zero and + connectable flag is not set and the global advertising is off, + then scannable advertising is used. This small difference is + supported to provide less air traffic for devices implementing + broadcaster role. + + Secondary channel flags can be used to advertise in secondary + channel with the corresponding PHYs. These flag bits are mutually + exclusive and setting multiple will result in Invalid Parameter + error. Choosing either LE 1M or LE 2M will result in using + extended advertising on the primary channel with LE 1M and the + respectively LE 1M or LE 2M on the secondary channel. Choosing + LE Coded will result in using extended advertising on the primary + and secondary channels with LE Coded. Choosing none of these flags + will result in legacy advertising. + + The Duration parameter configures the length of an Instance. The + value is in seconds. + + A value of 0 indicates a default value is chosen for the + Duration. The default is 2 seconds. + + If only one advertising Instance has been added, then the Duration + value will be ignored. It only applies for the case where multiple + Instances are configured. In that case every Instance will be + available for the Duration time and after that it switches to + the next one. This is a simple round-robin based approach. + + The Timeout parameter configures the life-time of an Instance. In + case the value 0 is used it indicates no expiration time. If a + timeout value is provided, then the advertising Instance will be + automatically removed when the timeout passes. The value for the + timeout is in seconds. Powering down a controller will invalidate + all advertising Instances and it is not possible to add a new + Instance with a timeout when the controller is powered down. + + When a Timeout is provided, then the Duration subtracts from + the actual Timeout value of that Instance. For example an Instance + with Timeout of 5 and Duration of 2 will be scheduled exactly 3 + times, twice with 2 seconds and once with one second. Other + Instances have no influence on the Timeout. + + Re-adding an already existing instance (i.e. issuing the Add + Advertising command with an Instance identifier of an existing + instance) will update that instance's configuration. + + An instance being added or changed while another instance is + being advertised will not be visible immediately but only when + the new/changed instance is being scheduled by the round robin + advertising algorithm. + + Changes to an instance that is currently being advertised will + cancel that instance and switch to the next instance. The changes + will be visible the next time the instance is scheduled for + advertising. In case a single instance is active, this means + that changes will be visible right away. + + A pre-requisite is that LE is already enabled, otherwise this + command will return a "rejected" response. + + This command can be used when the controller is not powered and + all settings will be programmed once powered. + + This command generates a Command Complete event on success or a + Command Status event on failure. + + Possible errors: Failed + Rejected + Not Supported + Invalid Parameters + Invalid Index + """ + cmd = AdvertiserBTSocket._little_bytes(0x003e, 2) + ctrl_idx = AdvertiserBTSocket._little_bytes(0x00, 2) + instance = AdvertiserBTSocket._little_bytes(instance_id, 1) # (1 Octet) + flags = AdvertiserBTSocket._little_bytes(flags, 4) # (4 Octets) + duration = AdvertiserBTSocket._little_bytes(duration, 2) # (2 Octets) + timeout = AdvertiserBTSocket._little_bytes(timeout, 2) # (2 Octets) + adv_data = bytes.fromhex(adv_data) # (0-255 Octets) + adv_data_len = AdvertiserBTSocket._little_bytes(len(adv_data), 1) # (1 Octet) + scan_rsp = bytes.fromhex(scan_rsp) # (0-255 Octets) + scan_rsp_len = AdvertiserBTSocket._little_bytes(len(scan_rsp), 1) # (1 Octet) + params = instance + flags + duration + timeout + adv_data_len + scan_rsp_len + adv_data + scan_rsp + param_len = AdvertiserBTSocket._little_bytes(len(params), 2) + + return cmd + ctrl_idx + param_len + params + + @staticmethod + def _create_rm_advert_command(instance_id): + """ Remove Advertising Command + + Command Code: 0x003f + Controller Index: + Command Parameters: Instance (1 Octet) + Return Parameters: Instance (1 Octet) + + This command is used to remove an advertising instance that + can be used to switch a Bluetooth Low Energy controller into + advertising mode. + + When the Instance parameter is zero, then all previously added + advertising Instances will be removed. + + Removing advertising information with this command will not be + visible as long as advertising is enabled via the Set Advertising + command. The usage of the Set Advertising command takes precedence + over this command. Changes to Instance information are stored and + will be advertised once advertising via Set Advertising has been + disabled. + + Removing an instance while it is being advertised will immediately + cancel the instance, even when it has been advertised less then its + configured Timeout or Duration. + + This command can be used when the controller is not powered and + all settings will be programmed once powered. + + This command generates a Command Complete event on success or + a Command Status event on failure. + + Possible errors: Invalid Parameters + Invalid Index + """ + + cmd = AdvertiserBTSocket._little_bytes(0x003f, 2) + ctrl_idx = AdvertiserBTSocket._little_bytes(0x00, 2) + instance = AdvertiserBTSocket._little_bytes(instance_id, 1) # (1 Octet) + params = instance + param_len = AdvertiserBTSocket._little_bytes(len(params), 2) + + return cmd + ctrl_idx + param_len + params \ No newline at end of file diff --git a/src/mkconnect/advertiser/AdvertiserDummy.py b/src/mkconnect/advertiser/AdvertiserDummy.py new file mode 100644 index 0000000..e87206c --- /dev/null +++ b/src/mkconnect/advertiser/AdvertiserDummy.py @@ -0,0 +1,35 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from ..tracer.Tracer import Tracer + +from .Advertiser import Advertiser + + +class AdvertiserDummy(Advertiser) : + """ + Dummy Advertiser + """ + + def __init__(self): + """ + initializes the object and defines the fields + """ + + super().__init__() + + return + + def AdvertisementStop(self) -> None: + """ + stop bluetooth advertising + + """ + + return + + def AdvertisementDataSet(self, identifier: str, manufacturerId: bytes, rawdata: bytes) -> None: + """ + Set Advertisement data + """ + return diff --git a/src/mkconnect/advertiser/AdvertiserHCITool.py b/src/mkconnect/advertiser/AdvertiserHCITool.py new file mode 100644 index 0000000..3b0b719 --- /dev/null +++ b/src/mkconnect/advertiser/AdvertiserHCITool.py @@ -0,0 +1,145 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from ..tracer.Tracer import Tracer + +from .Advertiser import Advertiser + +import subprocess +import threading +import time + +class AdvertiserHCITool(Advertiser) : + """ + baseclass + """ + + HCITool_path = '/usr/bin/hcitool' + + def __init__(self): + """ + initializes the object and defines the fields + """ + + super().__init__() + + self._isInitialized = False + self._ad_thread_Run = False + self._ad_thread = None + self._ad_thread_Lock = threading.Lock() + self._advertisementTable = dict() + + return + + def AdvertisementStop(self) -> None: + """ + stop bluetooth advertising + """ + + self._ad_thread_Run = False + if(self._ad_thread is not None): + self._ad_thread.join() + self._ad_thread = None + self._isInitialized = False + + hcitool_args_0x08_0x000a = self.HCITool_path + ' -i hci0 cmd 0x08 0x000a 00' + + subprocess.run(hcitool_args_0x08_0x000a + ' &> /dev/null', shell=True, executable="/bin/bash") + + if (self._tracer is not None): + self._tracer.TraceInfo('AdvertiserHCITool.AdvertisementStop') + self._tracer.TraceInfo(hcitool_args_0x08_0x000a) + + return + + def AdvertisementDataSet(self, identifier: str, manufacturerId: bytes, rawdata: bytes) -> None: + """ + Set Advertisement data + """ + + advertisementCommand = self.HCITool_path + ' -i hci0 cmd 0x08 0x0008 ' + self._CreateTelegramForHCITool(manufacturerId, rawdata) + self._ad_thread_Lock.acquire(blocking=True) + self._advertisementTable[identifier] = advertisementCommand + self._ad_thread_Lock.release() + + if(not self._ad_thread_Run): + self._ad_thread = threading.Thread(target=self._publish) + self._ad_thread.daemon = True + self._ad_thread.start() + self._ad_thread_Run = True + + if (self._tracer is not None): + self._tracer.TraceInfo('AdvertiserHCITool.AdvertisementSet') + + return + + def _publish(self) -> None: + if (self._tracer is not None): + self._tracer.TraceInfo('AdvertiserHCITool._publish') + + while(self._ad_thread_Run): + try: + self._ad_thread_Lock.acquire(blocking=True) + copy_of_advertisementTable = self._advertisementTable.copy() + self._ad_thread_Lock.release() + + # We want to repeat each command + repetitionsPerSecond = 4 + # timeSlot = 1 second / repetitionsPerSecond / len(copy_of_advertisementTable) + timeSlot = 1 / repetitionsPerSecond / max(1, len(copy_of_advertisementTable)) + + for key, advertisementCommand in copy_of_advertisementTable.items(): + # stopp publishing? + if(not self._ad_thread_Run): + return + + timeStart = time.time() + subprocess.run(advertisementCommand + ' &> /dev/null', shell=True, executable="/bin/bash") + + if(not self._isInitialized): + hcitool_args_0x08_0x0006 = self.HCITool_path + ' -i hci0 cmd 0x08 0x0006 A0 00 A0 00 03 00 00 00 00 00 00 00 00 07 00' + hcitool_args_0x08_0x000a = self.HCITool_path + ' -i hci0 cmd 0x08 0x000a 01' + + subprocess.run(hcitool_args_0x08_0x0006 + ' &> /dev/null', shell=True, executable="/bin/bash") + subprocess.run(hcitool_args_0x08_0x000a + ' &> /dev/null', shell=True, executable="/bin/bash") + + if (self._tracer is not None): + self._tracer.TraceInfo(str(hcitool_args_0x08_0x0006)) + self._tracer.TraceInfo(str(hcitool_args_0x08_0x000a)) + self._tracer.TraceInfo() + + self._isInitialized = True + + timeEnd = time.time() + timeDelta = timeEnd - timeStart + timeSlotRemain = max(0.001, timeSlot - timeDelta) + + # if (self._tracer is not None): + # self._tracer.TraceInfo(str(timeSlotRemain) + " " + str(key) + ": " + str(advertisement)) + + # if (self._tracer is not None): + # self._tracer.TraceInfo(str(timeSlotRemain)) + + time.sleep(timeSlotRemain) + except: + pass + + def _CreateTelegramForHCITool(self, manufacturerId: bytes, rawDataArray: bytes) -> str: + """ + Create input data for hcitool + """ + rawDataArrayLen = len(rawDataArray) + + resultArray = bytearray(8 + rawDataArrayLen) + resultArray[0] = rawDataArrayLen + 7 # len + resultArray[1] = 0x02 # flags + resultArray[2] = 0x01 + resultArray[3] = 0x02 + resultArray[4] = rawDataArrayLen + 3 # len + resultArray[5] = 0xFF # type manufacturer specific + resultArray[6] = manufacturerId[1] # companyId + resultArray[7] = manufacturerId[0] # companyId + for index in range(rawDataArrayLen): + resultArray[index + 8] = rawDataArray[index] + + return ' '.join(f'{x:02x}' for x in resultArray) diff --git a/src/mkconnect/advertiser/AdvertiserMicroPython.py b/src/mkconnect/advertiser/AdvertiserMicroPython.py new file mode 100644 index 0000000..2ef153b --- /dev/null +++ b/src/mkconnect/advertiser/AdvertiserMicroPython.py @@ -0,0 +1,90 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from ..tracer.Tracer import Tracer + +from .Advertiser import Advertiser + +try: + import bluetooth +except ImportError as err: + print("AdvertiserMicroPython: " + str(err)) + +class AdvertiserMicroPython(Advertiser) : + """ + Advertiser using bluetooth library from MicroPython + """ + + def __init__(self): + """ + initializes the object and defines the fields + """ + + super().__init__() + + # Activate bluetooth + try: + self.ble = bluetooth.BLE() + self.ble.active(True) + except Exception as exception: + self.ble = None + print("AdvertiserMicroPython.init: " + str(exception)) + + return + + + def AdvertisementStop(self) -> None: + """ + stop bluetooth advertising + + """ + + if(self.ble is not None): + self.ble.gap_advertise(None) + + if (self._tracer is not None): + self._tracer.TraceInfo("AdvertisementSet") + else: + if (self._tracer is not None): + self._tracer.TraceInfo("self.ble is None") + + if (self._tracer is not None): + pass + + return + + + def AdvertisementDataSet(self, identifier: str, manufacturerId: bytes, rawdata: bytes) -> None: + """ + Set Advertisement data + """ + data = self._CreateTelegramForPicoW(manufacturerId, rawdata) + + if(self.ble is not None): + self.ble.gap_advertise(100, data) + + if (self._tracer is not None): + self._tracer.TraceInfo("AdvertisementSet") + else: + if (self._tracer is not None): + self._tracer.TraceInfo("self.ble is None") + + return + + + def _CreateTelegramForPicoW(self, manufacturerId: bytes, rawDataArray: bytes) -> bytes: + """ + Create input data for bluetooth lib for Pico W + """ + rawDataArrayLen = len(rawDataArray) + + btdata = bytearray(2 + rawDataArrayLen) + btdata[0] = 0x00 + btdata[1] = 0xFF + for index in range(rawDataArrayLen): + btdata[index + 2] = rawDataArray[index] + + btcrypteddata = bytearray(b'\x02\x01\x02') + bytearray((len(btdata) + 1, 0xFF)) + btdata + + return bytes(btcrypteddata) + diff --git a/src/mkconnect/advertiser/AdvertisingDevice.py b/src/mkconnect/advertiser/AdvertisingDevice.py new file mode 100644 index 0000000..3d9284e --- /dev/null +++ b/src/mkconnect/advertiser/AdvertisingDevice.py @@ -0,0 +1,99 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from ..tracer.Tracer import Tracer +from .IAdvertisingDevice import IAdvertisingDevice +from .Advertiser import Advertiser + +class AdvertisingDevice(IAdvertisingDevice) : + """ + baseclass + """ + + def __init__(self, identifier: str): + """ + initializes the object and defines the fields + """ + + self._connected = False + self._advertiser = None + self._advertiser_registered = False + self._tracer = None + self._identifier = identifier + + + def SetAdvertiser(self, advertiser: Advertiser) -> Advertiser: + """ + set advertiser object + """ + if(self._advertiser == advertiser): + return advertiser + + reconnect = self._connected + + # unregister + if(self._advertiser is not None and self._advertiser_registered): + self._advertiser_registered = not self._advertiser.TryUnregisterAdvertisingDevice(self) + self._connected = False + + self._advertiser = advertiser + + # register + if(self._advertiser is not None and reconnect): + self._advertiser_registered = self._advertiser.TryRegisterAdvertisingDevice(self) + + return advertiser + + + def SetTracer(self, tracer: Tracer) -> Tracer: + """ + set tracer object + """ + self._tracer = tracer + + return tracer + + + def Connect(self): + """ + connects the device to the advertiser + """ + + if(self._advertiser is not None and not self._advertiser_registered): + self._advertiser_registered = self._advertiser.TryRegisterAdvertisingDevice(self) + + self._connected = True + + return + + + def Disconnect(self) -> None: + """ + disconnects the device from the advertiser + """ + + if(self._advertiser is not None and self._advertiser_registered): + self._advertiser_registered = not self._advertiser.TryUnregisterAdvertisingDevice(self) + + self._connected = False + + return + + + def Stop(self) -> bytes: + """ + stops the device + """ + + raise NotImplementedError # override this methode + + + def AdvertisementSet(self, manufacturerId: bytes, rawdata: bytes) -> None: + """ + Set Advertisement data + """ + pass + + + def GetAdvertisementIdentifier(self) -> str: + return self._identifier diff --git a/src/mkconnect/advertiser/IAdvertiser.py b/src/mkconnect/advertiser/IAdvertiser.py new file mode 100644 index 0000000..1edca46 --- /dev/null +++ b/src/mkconnect/advertiser/IAdvertiser.py @@ -0,0 +1,9 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +class IAdvertiser : + """ + (kind of) interface for Advertiser + This Type mustn't import any AdvertisingDevice stuff! + To prevent cyclic imports caused by imports of Advertiser <--> AdvertisingDevice. + """ \ No newline at end of file diff --git a/src/mkconnect/advertiser/IAdvertisingDevice.py b/src/mkconnect/advertiser/IAdvertisingDevice.py new file mode 100644 index 0000000..a816a68 --- /dev/null +++ b/src/mkconnect/advertiser/IAdvertisingDevice.py @@ -0,0 +1,48 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +class IAdvertisingDevice : + """ + (kind of interface) for AdvertisingDevice + This Type mustn't import any Advertiser stuff! + To prevent cyclic imports caused by imports of Advertiser <--> AdvertisingDevice. + """ + + def GetAdvertisementIdentifier(self) -> str: + """ + Returns the AdvertisementIdentifier to differentiate the Advertising-Data. + The AdvertisementIdentifier is used to register the Advertising-Data object. + This Methode has to be overridden by the implementation of the AdvertisingDevice! + + Some AdvertisingDevices like MouldKing 4.0 Hub use only one Advertising telegram for all + (three possible) devices. So each AdvertisingDevices returns the same AdvertisementIdentifier + """ + raise NotImplementedError # override this methode + + + def Connect(self) -> None: + """ + connects the device to the advertiser + """ + raise NotImplementedError # override this methode + + + def Disconnect(self) -> None: + """ + disconnects the device from the advertiser + """ + raise NotImplementedError # override this methode + + + def Stop(self) -> bytes: + """ + stops the device + """ + raise NotImplementedError # override this methode + + + def SetChannel(self, channelId: int, value: float) -> bytes: + """ + set internal stored value of channel with channelId to value and return the telegram + """ + raise NotImplementedError # override this methode diff --git a/src/mkconnect/advertiser/__init__.py b/src/mkconnect/advertiser/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/mkconnect/advertiser/__pycache__/Advertiser.cpython-313.pyc b/src/mkconnect/advertiser/__pycache__/Advertiser.cpython-313.pyc new file mode 100644 index 0000000..e28fc7b Binary files /dev/null and b/src/mkconnect/advertiser/__pycache__/Advertiser.cpython-313.pyc differ diff --git a/src/mkconnect/advertiser/__pycache__/AdvertisingDevice.cpython-313.pyc b/src/mkconnect/advertiser/__pycache__/AdvertisingDevice.cpython-313.pyc new file mode 100644 index 0000000..e20f650 Binary files /dev/null and b/src/mkconnect/advertiser/__pycache__/AdvertisingDevice.cpython-313.pyc differ diff --git a/src/mkconnect/advertiser/__pycache__/IAdvertiser.cpython-313.pyc b/src/mkconnect/advertiser/__pycache__/IAdvertiser.cpython-313.pyc new file mode 100644 index 0000000..2f325da Binary files /dev/null and b/src/mkconnect/advertiser/__pycache__/IAdvertiser.cpython-313.pyc differ diff --git a/src/mkconnect/advertiser/__pycache__/IAdvertisingDevice.cpython-313.pyc b/src/mkconnect/advertiser/__pycache__/IAdvertisingDevice.cpython-313.pyc new file mode 100644 index 0000000..f2271f9 Binary files /dev/null and b/src/mkconnect/advertiser/__pycache__/IAdvertisingDevice.cpython-313.pyc differ diff --git a/src/mkconnect/advertiser/__pycache__/__init__.cpython-313.pyc b/src/mkconnect/advertiser/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..9a6a72d Binary files /dev/null and b/src/mkconnect/advertiser/__pycache__/__init__.cpython-313.pyc differ diff --git a/src/mkconnect/btsocket/__init__.py b/src/mkconnect/btsocket/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/mkconnect/btsocket/btmgmt_callback.py b/src/mkconnect/btsocket/btmgmt_callback.py new file mode 100644 index 0000000..aecde67 --- /dev/null +++ b/src/mkconnect/btsocket/btmgmt_callback.py @@ -0,0 +1,123 @@ +""" +Use callback-based programming style to read and write to BlueZ Management API +""" +import asyncio +from collections import deque + +from . import btmgmt_socket +from . import btmgmt_protocol +from . import tools + +logger = tools.create_module_logger(__name__) + + +class Mgmt: + + def __init__(self): + # Setup read and write sockets + self.sock = btmgmt_socket.open() + self.loop = asyncio.get_event_loop() + # Store for event callbacks + self._event_callbacks = dict() + # Queue for commands to be written to BlueZ socket + self.cmd_queue = deque() + self.running = False + + def add_event_callback(self, event, callback): + """ + Assign a callback to be called when a specific event happens. + The callback should take two arguments. + 1) The response packet + 2) the AsyncMgmt() class instance object + + :param event: An entry from the enum btmgmt.Events + :param callback: A callback function + """ + self._event_callbacks[event] = callback + + def reader(self): + """ + Read callback is called when data available on Bluetooth socket. + Processes packet and hands-off to event callbacks that have subscribed + to events. + """ + logger.debug('Reader callback') + data = self.sock.recv(100) + pkt = btmgmt_protocol.reader(data) + logger.info('pkt: [%s]', pkt) + if pkt.header.event_code in self._event_callbacks: + self._event_callbacks[pkt.header.event_code](pkt, self) + if not self.running: + self.stop() + + def writer(self): + """ + Write callback when Bluetooth socket is available for writing. + Takes commands that are on the cmd_queue and sends. + """ + logger.debug('Writer callback') + if len(self.cmd_queue) > 0: + this_cmd = self.cmd_queue.popleft() + logger.info('sending pkt [%s]', tools.format_pkt(this_cmd)) + self.sock.send(this_cmd) + if not self.running and len(self.cmd_queue) == 0: + self.loop.stop() + # Do one more read to get the response from the last command + self.reader() + + @staticmethod + def _as_packet(pkt_objs): + """Pack bytes together for sending""" + full_pkt = b'' + for frame in pkt_objs: + if frame: + full_pkt += frame.octets + return full_pkt + + def send(self, cmd, ctrl_idx, *params): + """ + Add commands onto the queue ready to be sent. + Basic structure of the command + send(, , ) + + :param cmd: A value from btmgmt.Commands + :param ctrl_idx: The index of the controller [0xFFFF is non-controller] + :param params: 0 or more input parameters for command + """ + pkt_objs = btmgmt_protocol.command(cmd, ctrl_idx, *params) + cmd_pkt = self._as_packet(pkt_objs) + logger.debug('Queue command: %s', tools.format_pkt(cmd_pkt)) + self.cmd_queue.append(cmd_pkt) + + def stop(self): + """ + Once all commands have been sent, exit the event loop + """ + self.running = False + + self.loop.remove_writer(self.sock) + self.loop.remove_reader(self.sock) + self.loop.stop() + + def close(self): + """ + Stop the event loop and close sockets etc. + """ + btmgmt_socket.close(self.sock) + # Stop the event loop + self.loop.close() + + def start(self): + self.running = True + # Setup reader and writer for socket streams + self.loop.add_reader(self.sock, self.reader) + self.loop.add_writer(self.sock, self.writer) + logger.debug('Starting event loop...') + try: + # Run the event loop + self.loop.run_forever() + except KeyboardInterrupt: + self.loop.stop() + finally: + # We are done. Close sockets and the event loop. + self.close() diff --git a/src/mkconnect/btsocket/btmgmt_protocol.py b/src/mkconnect/btsocket/btmgmt_protocol.py new file mode 100644 index 0000000..41ff3c6 --- /dev/null +++ b/src/mkconnect/btsocket/btmgmt_protocol.py @@ -0,0 +1,1075 @@ +""" +Hand the BlueZ Bluetooth Management (mgmt) API +""" +import abc +from collections import namedtuple +from enum import Enum +import sys +from . import tools + +logger = tools.create_module_logger(__name__) +current_module = sys.modules[__name__] + +Parameter = namedtuple('Parameter', + ('name', 'width', 'repeat', 'bt_type'), + defaults=(1, 'IntUL')) +Response = namedtuple('Response', + ('header', 'event_frame', 'cmd_response_frame'), + defaults=(None,)) +Command = namedtuple('Command', ('header', 'cmd_params_frame'), + defaults=(None,)) + + +class DataField(metaclass=abc.ABCMeta): + def __init__(self): + self.octets = b'' + self.value = None + + def __repr__(self): + return f'{self.value}' + + @abc.abstractmethod + def decode(self, data): + pass + + @abc.abstractmethod + def encode(self, value, width): + pass + + +class Address(DataField): + def decode(self, data): + self.value = (f'{data[5]:02X}:{data[4]:02X}:{data[3]:02X}:' + f'{data[2]:02X}:{data[1]:02X}:{data[0]:02X}') + self.octets = data + + def encode(self, value, width): + parts = value.split(':') + for idx in range(5, -1, -1): + self.octets += int(parts[idx], 16).to_bytes(1, byteorder='little') + self.value = value + + +class AddressTypeField(DataField): + def decode(self, data): + addr_types = [] + as_int = int.from_bytes(data, byteorder='little', signed=False) + for i in range(len(AddressType)): + if (as_int >> i) & 1: + addr_types.append(AddressType(i)) + self.value = addr_types + self.octets = data + + def encode(self, value, width): + self.value = value + bits = 0 + for i in value: + bits = bits | (1 << i.value) + self.octets = int(bits).to_bytes(1, byteorder='little', signed=False) + + +class IntUL(DataField): + + def decode(self, data): + self.value = int.from_bytes(data, byteorder='little', signed=False) + self.octets = data + + def encode(self, value, width): + self.octets = int(value).to_bytes(width, byteorder='little', + signed=False) + self.value = value + + +class HexStr(DataField): + + def decode(self, data): + self.value = data.hex() + self.octets = data + + def encode(self, value, width): + self.octets = bytes.fromhex(value) + self.value = value + + +class CmdCode(DataField): + + def decode(self, data): + self.value = Commands(int.from_bytes(data, byteorder='little')) + self.octets = data + + def encode(self, value, width): + cmd_code = Commands[value] + self.octets = int(cmd_code.value).to_bytes(width, byteorder='little', + signed=False) + self.value = cmd_code + + +class EvtCode(DataField): + + def decode(self, data): + self.value = Events(int.from_bytes(data, byteorder='little')) + self.octets = data + + def encode(self, value, width): + evt_code = Events[value] + self.octets = int(evt_code.value).to_bytes(width, byteorder='little', + signed=False) + self.value = evt_code + + +class Status(DataField): + + def decode(self, data): + self.value = ErrorCodes(int.from_bytes(data, byteorder='little')) + self.octets = data + + def encode(self, value, width): + status_code = ErrorCodes[value] + self.octets = int(status_code.value).to_bytes(width, + byteorder='little', + signed=False) + self.value = status_code + + +class Controller(DataField): + + def decode(self, data): + self.value = int.from_bytes(data, byteorder='little', signed=False) + self.octets = data + + def encode(self, value, width): + if value is None: + value = 0xffff + self.octets = int(value).to_bytes(width, byteorder='little', + signed=False) + self.value = value + + +class ParamLen(DataField): + + def decode(self, data): + self.value = int.from_bytes(data, byteorder='little', signed=False) + self.octets = data + + def encode(self, value, width): + self.value = value + try: + len_bytes = len(value) + except IndexError: + len_bytes = 0 + self.octets = len_bytes.to_bytes(width, byteorder='little', + signed=False) + + +class Name(DataField): + + def decode(self, data): + self.value = data.rstrip(b'\x00') + self.octets = data + + def encode(self, value, width): + self.value = value + self.octets = value.ljust(width, b'\x00') + + +class CurrentSettings(DataField): + + def decode(self, data): + self.value = dict() + as_int = int.from_bytes(data, byteorder='little', signed=False) + for i in range(len(SupportedSettings)): + self.value[SupportedSettings(i)] = bool((as_int >> i) & 1) + self.octets = data + + def encode(self, value, width): + raise NotImplementedError + + +class EIRData(DataField): + def decode(self, data): + self.value = dict() + pointer = 0 + while pointer < len(data): + len_data = data[pointer] + data_type = data[pointer + 1] + data_start = pointer + 2 + data_end = data_start + len_data - 1 + self.value[ADType(data_type)] = data[data_start:data_end] + pointer += data[pointer] + 1 + + def encode(self, value, width): + raise NotImplementedError + + +class Packet: + def __init__(self, shape): + # e.g. + # shape = (Parameter('opcode', 2), Parameter('status', 1)) + self.shape = shape + for param in shape: + self.__setattr__(param.name, None) + # params = b'\x01\x00\x00\x01\x0e\x00' + self.octets = b'' + + def __repr__(self): + key_values = ', '.join([f'{x.name}={self.__getattribute__(x.name)}' + for x in self.shape]) + return f'<{key_values}>' + + def _add_to_value(self, param, p_value): + if param.repeat != 1: + self.__getattribute__(param.name).append(p_value) + else: + self.__setattr__(param.name, p_value) + + def decode(self, pkt): + self.octets = pkt + pointer = 0 + for param in self.shape: + logger.debug('Decoding %s as type %s', param.name, param.bt_type) + if param.repeat != 1: + repeated = self.__getattribute__(param.repeat) + self.__setattr__(param.name, list()) + else: + repeated = param.repeat + for index in range(repeated): + class_ = getattr(current_module, param.bt_type) + data_type = class_() + data_type.decode(pkt[pointer:pointer + param.width]) + self._add_to_value(param, data_type.value) + pointer += param.width + if pointer < len(pkt): + # self.value['parameters'] = pkt[pointer:] + return pkt[pointer:] + return None + + def encode(self, *args): + self.octets = b'' + cmd_args = args[0] + for entry in range(len(self.shape)): + param = self.shape[entry] + logger.debug('Encoding %s as type %s', param.name, param.bt_type) + class_ = getattr(current_module, param.bt_type) + data_type = class_() + if param.bt_type == 'ParamLen': + try: + data_type.encode(cmd_args[entry], param.width) + except IndexError: + data_type.encode(b'', param.width) + else: + data_type.encode(cmd_args[entry], param.width) + self.octets += data_type.octets + self._add_to_value(param, data_type.value) + return cmd_args[2:] + + +class EventHeader(Packet): + def __init__(self): + super().__init__([Parameter(name='event_code', width=2, + bt_type='EvtCode'), + Parameter(name='controller_idx', width=2, + bt_type='Controller'), + Parameter(name='param_len', width=2, + bt_type='ParamLen')]) + + +class CmdHeader(Packet): + def __init__(self): + super().__init__([Parameter(name='cmd_code', width=2, + bt_type='CmdCode'), + Parameter(name='controller_idx', width=2, + bt_type='Controller'), + Parameter(name='param_len', width=2, + bt_type='ParamLen')]) + + +class AddressType(Enum): + def __str__(self): + return str(self.name) + + def __repr__(self): + return str(self.name) + + # Possible values for the Address_Type parameter are a bit-wise OR of + # the following bits + BREDR = 0x00 + LEPublic = 0x01 + LERandom = 0x02 + + +class SupportedSettings(Enum): + """ + 0 Powered + 1 Connectable + 2 Fast Connectable + 3 Discoverable + 4 Bondable + 5 Link Level Security (Sec. mode 3) + 6 Secure Simple Pairing + 7 Basic Rate/Enhanced Data Rate + 8 High Speed + 9 Low Energy + 10 Advertising + 11 Secure Connections + 12 Debug Keys + 13 Privacy + 14 Controller Configuration + 15 Static Address + 16 PHY Configuration + 17 Wideband Speech + + """ + def __str__(self): + return str(self.name) + + def __repr__(self): + return str(self.name) + + Powered = 0x00 + Connectable = 0x01 + FastConnectable = 0x02 + Discoverable = 0x03 + Bondable = 0x04 + LinkLevelSecurity = 0x05 + SecureSimplePairing = 0x06 + BREDR = 0x07 + HighSpeed = 0x08 + LowEnergy = 0x09 + Advertising = 0x0A + SecureConnections = 0x0B + DebugKeys = 0x0C + Privacy = 0x0D + ControllerConfiguration = 0x0E + StaticAddress = 0x0F + PHYConfiguration = 0x10 + WidebandSpeech = 0x11 + + +class ADType(Enum): + def __str__(self): + return str(self.name) + + def __repr__(self): + return str(self.name) + + Flags = 0x01 + IncompleteUUID16ServiceList = 0x02 + CompleteUUID16ServiceList = 0x03 + CompleteUUID32ServiceList = 0x04 + IncompleteUUID32ServiceList = 0x05 + IncompleteUUID128ServiceList = 0x06 + CompleteUUID128ServiceList = 0x07 + ShortName = 0x08 + CompleteName = 0x09 + TXPower = 0x0a + DeviceClass = 0x0d + SimplePairingHashC192 = 0x0e + SimplePairingRandomizer192 = 0x0f + SecurityManagerTKValue = 0x10 + SecurityManagerOOBFlags = 0x11 + ConnectionIntervalRange = 0x12 + SolicitUUID16ServiceList = 0x14 + SolicitUUID128ServiceList = 0x15 + ServiceDataUUID16 = 0x16 + PublicTargetAddress = 0x17 + RandomTargetAddress = 0x18 + Appearance = 0x19 + AdvertisingInterval = 0x1a + LEDeviceAddress = 0x1b + LERole = 0x1c + SimplePairingHashC256 = 0x1d + SimplePairingRandomizer256 = 0x1e + SolicitUUID32ServiceList = 0x1f + ServiceDataUUID32 = 0x20 + ServiceDataUUID128 = 0x21 + LESecureConnectionsConfirmationValue = 0x22 + LESecureConnectionsRandomValue = 0x23 + URI = 0x24 + IndoorPositioning = 0x25 + TransportDiscoverData = 0x26 + LESupportedFeatures = 0x27 + ChannelMapUpdateIndication = 0x28 + PBADV = 0x29 + MeshMessage = 0x2a + MeshBeacon = 0x2b + BIGInfo = 0x2c + BroadcastCode = 0x2d + InformationData3d = 0x3d + ManufacturerData = 0xff + + +class ErrorCodes(Enum): + def __str__(self): + return str(self.name) + + def __repr__(self): + return str(self.name) + + Success = 0x00 + UnknownCommand = 0x01 + NotConnected = 0x02 + Failed = 0x03 + ConnectFailed = 0x04 + AuthenticationFailed = 0x05 + NotPaired = 0x06 + NoResources = 0x07 + Timeout = 0x08 + AlreadyConnected = 0x09 + Busy = 0x0A + Rejected = 0x0B + NotSupported = 0x0C + InvalidParameters = 0x0D + Disconnected = 0x0E + NotPowered = 0x0F + Cancelled = 0x10 + InvalidIndex = 0x11 + RFKilled = 0x12 + AlreadyPaired = 0x13 + PermissionDenied = 0x14 + + +class Commands(Enum): + def __str__(self): + return str(self.name) + + def __repr__(self): + return str(self.name) + + ReadManagementVersionInformation = 0x0001 + ReadManagementSupportedCommands = 0x0002 + ReadControllerIndexList = 0x0003 + ReadControllerInformation = 0x0004 + SetPowered = 0x0005 + SetDiscoverable = 0x0006 + SetConnectable = 0x0007 + SetFastConnectable = 0x0008 + SetBondable = 0x0009 + SetLinkSecurity = 0x000A + SetSecureSimplePairing = 0x000B + SetHighSpeed = 0x000C + SetLowEnergy = 0x000D + SetDeviceClass = 0x000E + SetLocalName = 0x000F + AddUUID = 0x0010 + RemoveUUID = 0x0011 + LoadLinkKeys = 0x0012 + LoadLongTermKeys = 0x0013 + Disconnect = 0x0014 + GetConnections = 0x0015 + PINCodeReply = 0x0016 + PINCodeNegativeReply = 0x0017 + SetIOCapability = 0x0018 + PairDevice = 0x0019 + CancelPairDevice = 0x001A + UnpairDevice = 0x001B + UserConfirmationReply = 0x001C + UserConfirmationNegativeReply = 0x001D + UserPasskeyReply = 0x001E + UserPasskeyNegativeReply = 0x001F + ReadLocalOutOfBandData = 0x0020 + AddRemoteOutOfBandData = 0x0021 + RemoveRemoteOutOfBandData = 0x0022 + StartDiscovery = 0x0023 + StopDiscovery = 0x0024 + ConfirmName = 0x0025 + BlockDevice = 0x0026 + UnblockDevice = 0x0027 + SetDeviceID = 0x0028 + SetAdvertising = 0x0029 + SetBREDR = 0x002A + SetStaticAddress = 0x002B + SetScanParameters = 0x002C + SetSecureConnections = 0x002D + SetDebugKeys = 0x002E + SetPrivacy = 0x002F + LoadIdentityResolvingKeys = 0x0030 + GetConnectionInformation = 0x0031 + GetClockInformation = 0x0032 + AddDevice = 0x0033 + RemoveDevice = 0x0034 + LoadConnectionParameters = 0x0035 + ReadUnconfiguredControllerIndexList = 0x0036 + ReadControllerConfigurationInformation = 0x0037 + SetExternalConfiguration = 0x0038 + SetPublicAddress = 0x0039 + StartServiceDiscovery = 0x003a + ReadLocalOutOfBandExtendedData = 0x003b + ReadExtendedControllerIndexList = 0x003c + ReadAdvertisingFeatures = 0x003d + AddAdvertising = 0x003e + RemoveAdvertising = 0x003f + GetAdvertisingSizeInformation = 0x0040 + StartLimitedDiscovery = 0x0041 + ReadExtendedControllerInformation = 0x0042 + SetAppearance = 0x0043 + GetPHYConfiguration = 0x0044 + SetPHYConfiguration = 0x0045 + LoadBlockedKeys = 0x0046 + SetWidebandSpeech = 0x0047 + + +class Events(Enum): + def __str__(self): + return str(self.name) + + def __repr__(self): + return str(self.name) + + CommandCompleteEvent = 0x0001 + CommandStatusEvent = 0x0002 + ControllerErrorEvent = 0x0003 + IndexAddedEvent = 0x0004 + IndexRemovedEvent = 0x0005 + NewSettingsEvent = 0x0006 + ClassOfDeviceChangedEvent = 0x0007 + LocalNameChangedEvent = 0x0008 + NewLinkKeyEvent = 0x0009 + NewLongTermKeyEvent = 0x000A + DeviceConnectedEvent = 0x000B + DeviceDisconnectedEvent = 0x000C + ConnectFailedEvent = 0x000D + PINCodeRequestEvent = 0x000E + UserConfirmationRequestEvent = 0x000F + UserPasskeyRequestEvent = 0x0010 + AuthenticationFailedEvent = 0x0011 + DeviceFoundEvent = 0x0012 + DiscoveringEvent = 0x0013 + DeviceBlockedEvent = 0x0014 + DeviceUnblockedEvent = 0x0015 + DeviceUnpairedEvent = 0x0016 + PasskeyNotifyEvent = 0x0017 + NewIdentityResolvingKeyEvent = 0x0018 + NewSignatureResolvingKeyEvent = 0x0019 + DeviceAddedEvent = 0x001a + DeviceRemovedEvent = 0x001b + NewConnectionParameterEvent = 0x001c + UnconfiguredIndexAddedEvent = 0x001d + UnconfiguredIndexRemovedEvent = 0x001e + NewConfigurationOptionsEvent = 0x001f + ExtendedIndexAddedEvent = 0x0020 + ExtendedIndexRemovedEvent = 0x0021 + LocalOutOfBandExtendedDataUpdatedEvent = 0x0022 + AdvertisingAddedEvent = 0x0023 + AdvertisingRemovedEvent = 0x0024 + ExtendedControllerInformationChangedEvent = 0x0025 + PHYConfigurationChangedEvent = 0x0026 + + +cmds = { + 0x0005: Packet([Parameter(name='powered', width=1)]), + 0x0006: Packet([Parameter(name='discoverable', width=1), + Parameter(name='timeout', width=2)]), + 0x0007: Packet([Parameter(name='connectable', width=1)]), + 0x0008: Packet([Parameter(name='enable', width=1)]), + 0x0009: Packet([Parameter(name='bondable', width=1)]), + 0x000A: Packet([Parameter(name='link_security', width=1)]), + 0x000B: Packet([Parameter(name='secure_simple_pairing', width=1)]), + 0x000C: Packet([Parameter(name='high_speed', width=1)]), + 0x000D: Packet([Parameter(name='low_energy', width=1)]), + 0x000E: Packet([Parameter(name='major_class', width=1), + Parameter(name='minor_class', width=1)]), + 0x000F: Packet([Parameter(name='name', width=249, bt_type='Name'), + Parameter(name='short_name', width=11)]), + 0x0010: Packet([Parameter(name='uuid', width=16), + Parameter(name='svc_hint', width=1)]), + 0x0011: Packet([Parameter(name='uuid', width=16)]), + 0x0012: Packet([Parameter(name='debug_keys', width=1), + Parameter(name='key_count', width=2), + Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='key_type', width=1), + Parameter(name='value', width=16), + Parameter(name='pin_length', width=1)]), + 0x0013: Packet([Parameter(name='key_count', width=2), + Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='key_type', width=1), + Parameter(name='master', width=1), + Parameter(name='encryption_size', width=1), + Parameter(name='encryption_diversifier', width=2), + Parameter(name='random_number', width=8), + Parameter(name='value', width=16)]), + 0x0014: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0016: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='pin_length', width=1), + Parameter(name='pin_code', width=16)]), + 0x0017: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0018: Packet([Parameter(name='io_capability', width=1)]), + 0x0019: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='io_capability', width=1)]), + 0x001A: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x001B: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='disconnect', width=1)]), + 0x001C: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x001D: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x001E: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='passkey', width=4)]), + 0x001F: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0021: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='hash_192', width=16), + Parameter(name='randomizer_192', width=16), + Parameter(name='hash_256', width=16), + Parameter(name='randomizer_256', width=16)]), + 0x0022: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0023: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0024: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0025: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='name_known', width=1)]), + 0x0026: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0027: Packet([Parameter(name='address', width=6), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0028: Packet([Parameter(name='source', width=2), + Parameter(name='vendor', width=2), + Parameter(name='product', width=2), + Parameter(name='version', width=2)]), + 0x0029: Packet([Parameter(name='advertising', width=1)]), + 0x002A: Packet([Parameter(name='br/edr', width=1)]), + 0x002B: Packet([Parameter(name='address', width=6, bt_type='Address')]), + 0x002C: Packet([Parameter(name='interval', width=2), + Parameter(name='window', width=2)]), + 0x002D: Packet([Parameter(name='secure_connections', width=1)]), + 0x002E: Packet([Parameter(name='debug_keys', width=1)]), + 0x002F: Packet([Parameter(name='privacy', width=1), + Parameter(name='identity_resolving_key', width=16)]), + 0x0030: Packet([Parameter(name='key_count', width=2), + Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='value', width=16)]), + 0x0031: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0032: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0033: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='action', width=1)]), + 0x0034: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0035: Packet([Parameter(name='param_count', width=2), + Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='min_connection_interval', width=2), + Parameter(name='max_connection_interval', width=2), + Parameter(name='connection_latency', width=2), + Parameter(name='supervision_timeout', width=2)]), + 0x0038: Packet([Parameter(name='configuration', width=1)]), + 0x0039: Packet([Parameter(name='address', width=6, bt_type='Address')]), + 0x003a: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='rssi_threshold', width=1), + Parameter(name='uuid_count', width=2), + Parameter(name='uuid[i]', width=16, repeat='uuid_count')]), + 0x003b: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x003e: Packet( + [ + Parameter(name='instance', width=1), + Parameter(name='flags', width=4), + Parameter(name='duration', width=2), + Parameter(name='timeout', width=2), + Parameter(name='adv_data_len', width=1), + Parameter(name='scan_rsp_len', width=1), + Parameter( + name='adv_data', width=None, repeat=1, bt_type='HexStr'), + Parameter( + name='scan_rsp', width=None, repeat=1, bt_type='HexStr') + ] + ), + 0x003f: Packet([Parameter(name='instance', width=1)]), + 0x0040: Packet([Parameter(name='instance', width=1), + Parameter(name='flags', width=4)]), + 0x0041: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0043: Packet([Parameter(name='appearance', width=2)]), + 0x0045: Packet([Parameter(name='selected_phys', width=4)]), + 0x0046: Packet([Parameter(name='key_count', width=2), + Parameter(name='key_type', width=1), + Parameter(name='value', width=16)]), + 0x0047: Packet([Parameter(name='wideband_speech', width=1)]), + +} + +events = { + 0x0001: Packet([Parameter(name='command_opcode', width=2, + bt_type='CmdCode'), + Parameter(name='status', width=1, bt_type='Status')]), + 0x0002: Packet([Parameter(name='command_opcode', width=2, + bt_type='CmdCode'), + Parameter(name='status', width=1, bt_type='Status')]), + 0x0003: Packet([Parameter(name='error_code', width=1)]), + 0x0006: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x0007: Packet([Parameter(name='class_of_device', width=3)]), + 0x0008: Packet([Parameter(name='name', width=249, bt_type='Name'), + Parameter(name='short_name', width=11)]), + 0x0009: Packet([Parameter(name='store_hint', width=1), + Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='key_type', width=1), + Parameter(name='value', width=16), + Parameter(name='pin_length', width=1)]), + 0x000A: Packet([Parameter(name='store_hint', width=1), + Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='key_type', width=1), + Parameter(name='master', width=1), + Parameter(name='size', width=1), + Parameter(name='diversifier', width=2), + Parameter(name='number', width=8), + Parameter(name='value', width=16)]), + 0x000B: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='flags', width=4), + Parameter(name='eir_data_length', width=2), + Parameter(name='eir_data', width=65535, + bt_type='EIRData')]), + 0x000C: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='reason', width=1)]), + 0x000D: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='status', width=1, bt_type='Status')]), + 0x000E: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='secure', width=1)]), + 0x000F: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='confirm_hint', width=1), + Parameter(name='value', width=4)]), + 0x0010: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0011: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='status', width=1, bt_type='Status')]), + 0x0012: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='rssi', width=1), + Parameter(name='flags', width=4), + Parameter(name='eir_data_length', width=2), + Parameter(name='eir_data', width=65535, + bt_type='EIRData')]), + 0x0013: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='discovering', width=1)]), + 0x0014: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0015: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0016: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0017: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='passkey', width=4), + Parameter(name='entered', width=1)]), + 0x0018: Packet([Parameter(name='store_hint', width=1), + Parameter(name='random_address', width=6), + Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='value', width=16)]), + 0x0019: Packet([Parameter(name='store_hint', width=1), + Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='type', width=1), + Parameter(name='value', width=16)]), + 0x001a: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='action', width=1)]), + 0x001b: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x001c: Packet([Parameter(name='store_hint', width=1), + Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='min_connection_interval', width=2), + Parameter(name='max_connection_interval', width=2), + Parameter(name='connection_latency', width=2), + Parameter(name='supervision_timeout', width=2)]), + 0x001f: Packet([Parameter(name='missing_options', width=4)]), + 0x0020: Packet([Parameter(name='controller_type', width=1), + Parameter(name='controller_bus', width=1)]), + 0x0021: Packet([Parameter(name='controller_type', width=1), + Parameter(name='controller_bus', width=1)]), + 0x0022: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='eir_data_length', width=2), + Parameter(name='eir_data', width=65535, + bt_type='EIRData')]), + 0x0023: Packet([Parameter(name='instance', width=1)]), + 0x0024: Packet([Parameter(name='instance', width=1)]), + 0x0025: Packet([Parameter(name='eir_data_length', width=2), + Parameter(name='eir_data', width=65535, + bt_type='EIRData')]), + 0x0026: Packet([Parameter(name='selected_phys', width=4)]), +} + +cmd_response = { + 0x0001: Packet([Parameter(name='version', width=1), + Parameter(name='revision', width=2)]), + 0x0002: Packet([Parameter(name='num_of_commands', width=2), + Parameter(name='num_of_events', width=2), + Parameter(name='command', width=2, + repeat='num_of_commands'), + Parameter(name='event', width=2, repeat='num_of_events')]), + 0x0003: Packet([Parameter(name='num_controllers', width=2), + Parameter(name='controller_index[i]', width=2, + repeat='num_controllers')]), + 0x0004: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='bluetooth_version', width=1), + Parameter(name='manufacturer', width=2), + Parameter(name='supported_settings', width=4), + Parameter(name='current_settings', width=4, + bt_type='CurrentSettings'), + Parameter(name='class_of_device', width=3), + Parameter(name='name', width=249, bt_type='Name'), + Parameter(name='short_name', width=11)]), + 0x0005: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x0006: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x0007: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x0008: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x0009: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x000A: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x000B: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x000C: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x000D: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x000E: Packet([Parameter(name='class_of_device', width=3)]), + 0x000F: Packet([Parameter(name='name', width=249, bt_type='Name'), + Parameter(name='short_name', width=11)]), + 0x0010: Packet([Parameter(name='class_of_device', width=3)]), + 0x0011: Packet([Parameter(name='class_of_device', width=3)]), + 0x0014: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0015: Packet([Parameter(name='connection_count', width=2), + Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0016: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0017: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0019: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x001A: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x001B: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x001C: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x001D: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x001E: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x001F: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0020: Packet([Parameter(name='hash_192', width=16), + Parameter(name='randomizer_192', width=16), + Parameter(name='hash_256', width=16), + Parameter(name='randomizer_256', width=16)]), + 0x0021: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0022: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0023: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0024: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0025: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0026: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0027: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0029: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x002A: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x002B: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x002D: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x002E: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x002F: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), + 0x0031: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='rssi', width=1), + Parameter(name='tx_power', width=1), + Parameter(name='max_tx_power', width=1)]), + 0x0032: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='local_clock', width=4), + Parameter(name='piconet_clock', width=4), + Parameter(name='accuracy', width=2)]), + 0x0033: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0034: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0036: Packet([Parameter(name='num_controllers', width=2), + Parameter(name='controller_index[i]', width=2)]), + 0x0037: Packet([Parameter(name='manufacturer', width=2), + Parameter(name='supported_options', width=4), + Parameter(name='missing_options', width=4)]), + 0x0038: Packet([Parameter(name='missing_options', width=4)]), + 0x0039: Packet([Parameter(name='missing_options', width=4)]), + 0x003a: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x003b: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField'), + Parameter(name='eir_data_length', width=2), + Parameter(name='eir_data', width=65535, + bt_type='EIRData')]), + 0x003c: Packet([Parameter(name='num_controllers', width=2), + Parameter(name='controller_index', width=2, + repeat='num_controllers'), + Parameter(name='controller_type', width=1, + repeat='num_controllers'), + Parameter(name='controller_bus', width=1, + repeat='num_controllers')]), + 0x003d: Packet([Parameter(name='supported_flags', width=4), + Parameter(name='max_adv_data_len', width=1), + Parameter(name='max_scan_rsp_len', width=1), + Parameter(name='max_instances', width=1), + Parameter(name='num_instances', width=1), + Parameter(name='instance[i]', width=1, + repeat='num_instances')]), + 0x003e: Packet([Parameter(name='instance', width=1)]), + 0x003f: Packet([Parameter(name='instance', width=1)]), + 0x0040: Packet([Parameter(name='instance', width=1), + Parameter(name='flags', width=4), + Parameter(name='max_adv_data_len', width=1), + Parameter(name='max_scan_rsp_len', width=1)]), + 0x0041: Packet([Parameter(name='address_type', width=1, + bt_type='AddressTypeField')]), + 0x0042: Packet([Parameter(name='address', width=6, bt_type='Address'), + Parameter(name='bluetooth_version', width=1), + Parameter(name='manufacturer', width=2), + Parameter(name='supported_settings', width=4), + Parameter(name='current_settings', width=4, + bt_type='CurrentSettings'), + Parameter(name='eir_data_length', width=2), + Parameter(name='eir_data', width=65535, + bt_type='EIRData')]), + 0x0044: Packet([Parameter(name='supported_phys', width=4), + Parameter(name='configurable_phys', width=4), + Parameter(name='selected_phys', width=4)]), + 0x0047: Packet([Parameter(name='current_settings', width=4, + bt_type='CurrentSettings')]), +} + + +def reader(pckt): + # <- Response packet -> + # <- event_header ->|<- event_frame -> + # <- event_header ->|<- event_frame ->|<- cmd_response_frame -> + # + # <- Command Packet -> + # <- cmd_header ->|<- cmd_frame -> + header = EventHeader() + evt_params = header.decode(pckt) + event_frame = events.get(header.event_code.value) + + cmd_params = event_frame.decode(evt_params) + if cmd_params: + cmd_response_frame = cmd_response.get(event_frame.command_opcode.value) + cmd_response_frame.decode(cmd_params) + logger.debug('Socket Read: %s %s %s', + header, event_frame, cmd_response_frame) + return Response(header, event_frame, cmd_response_frame) + logger.debug('Socket read %s %s', header, event_frame) + return Response(header, event_frame) + + +def command(*args): + header = CmdHeader() + if len(args) == 2: + header.encode(args) + return Command(header) + + cmd_frame = cmds.get(Commands[args[0]].value) + cmd_frame.encode(args[2:]) + header.encode((args[0], args[1], cmd_frame.octets)) + return Command(header, cmd_frame) diff --git a/src/mkconnect/btsocket/btmgmt_socket.py b/src/mkconnect/btsocket/btmgmt_socket.py new file mode 100644 index 0000000..f027354 --- /dev/null +++ b/src/mkconnect/btsocket/btmgmt_socket.py @@ -0,0 +1,120 @@ +import asyncio +import ctypes +import socket +import sys + + +AF_BLUETOOTH = 31 +PF_BLUETOOTH = AF_BLUETOOTH +SOCK_RAW = 3 +BTPROTO_HCI = 1 +SOCK_CLOEXEC = 524288 +SOCK_NONBLOCK = 2048 +HCI_CHANNEL_CONTROL = 3 +HCI_DEV_NONE = 0xffff + + +class BluetoothSocketError(BaseException): + pass + + +class BluetoothCommandError(BaseException): + pass + + +class SocketAddr(ctypes.Structure): + _fields_ = [ + ("hci_family", ctypes.c_ushort), + ("hci_dev", ctypes.c_ushort), + ("hci_channel", ctypes.c_ushort), + ] + + +def open(): + """ + Because of the following issue with Python the Bluetooth User socket + on linux needs to be done with lower level calls. + https://bugs.python.org/issue36132 + Based on mgmt socket at: + https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/mgmt-api.txt + """ + + sockaddr_hcip = ctypes.POINTER(SocketAddr) + ctypes.cdll.LoadLibrary("libc.so.6") + libc = ctypes.CDLL("libc.so.6") + + libc_socket = libc.socket + libc_socket.argtypes = (ctypes.c_int, ctypes.c_int, ctypes.c_int) + libc_socket.restype = ctypes.c_int + + bind = libc.bind + bind.argtypes = (ctypes.c_int, ctypes.POINTER(SocketAddr), ctypes.c_int) + bind.restype = ctypes.c_int + + # fd = libc_socket(PF_BLUETOOTH, SOCK_RAW | SOCK_CLOEXEC | SOCK_NONBLOCK, + # BTPROTO_HCI) + fd = libc_socket(PF_BLUETOOTH, SOCK_RAW, BTPROTO_HCI) + + if fd < 0: + raise BluetoothSocketError("Unable to open PF_BLUETOOTH socket") + + addr = SocketAddr() + addr.hci_family = AF_BLUETOOTH # AF_BLUETOOTH + addr.hci_dev = HCI_DEV_NONE # adapter index + addr.hci_channel = HCI_CHANNEL_CONTROL # HCI_USER_CHANNEL + r = bind(fd, sockaddr_hcip(addr), ctypes.sizeof(addr)) + if r < 0: + raise BluetoothSocketError("Unable to bind %s", r) + + sock_fd = socket.socket(AF_BLUETOOTH, SOCK_RAW, BTPROTO_HCI, fileno=fd) + return sock_fd + + +def close(bt_socket): + """Close the open socket""" + fd = bt_socket.detach() + socket.close(fd) + + +def test_asyncio_usage(): + sock = open() + + if sys.version_info < (3, 10): + loop = asyncio.get_event_loop() + else: + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + + asyncio.set_event_loop(loop) + + def reader(): + data = sock.recv(100) + print("Received:", data) + + # We are done: unregister the file descriptor + loop.remove_reader(sock) + + # Stop the event loop + loop.stop() + + # Register the file descriptor for read event + loop.add_reader(sock, reader) + + # Write a command to the socket + # Read Management Version Information Command + # b'\x01\x00\xff\xff\x00\x00' + loop.call_soon(sock.send, b'\x01\x00\xff\xff\x00\x00') + + try: + # Run the event loop + loop.run_forever() + finally: + # We are done. Close sockets and the event loop. + close(sock) + loop.close() + + +if __name__ == '__main__': + test_asyncio_usage() diff --git a/src/mkconnect/btsocket/btmgmt_sync.py b/src/mkconnect/btsocket/btmgmt_sync.py new file mode 100644 index 0000000..bbecfef --- /dev/null +++ b/src/mkconnect/btsocket/btmgmt_sync.py @@ -0,0 +1,37 @@ +from . import btmgmt_protocol +from . import btmgmt_socket +from . import tools + +logger = tools.create_module_logger(__name__) + +def _as_packet(pkt_objs): + full_pkt = b'' + for frame in pkt_objs: + if frame: + full_pkt += frame.octets + return full_pkt + + +def send(*args): + response_recvd = False + pkt_objs = btmgmt_protocol.command(*args) + logger.debug('Sending btmgmt frames %s', pkt_objs) + cmd_pkt = _as_packet(pkt_objs) + # print('cmd pkt', [f'{octets:x}' for octets in cmd_pkt]) + sock = btmgmt_socket.open() + logger.debug('Sending bytes: %s', cmd_pkt) + sock.send(cmd_pkt) + while not response_recvd: + raw_data = sock.recv(100) + logger.debug('Received: %s', raw_data) + data = btmgmt_protocol.reader(raw_data) + logger.debug('Received btmgmt frames: %s', data) + if data.cmd_response_frame: + response_recvd = True + + btmgmt_socket.close(sock) + if data.event_frame.status != btmgmt_protocol.ErrorCodes.Success: + raise NameError(f'btmgmt Error: ' + f'{data.event_frame.command_opcode} ' + f'{data.event_frame.status.name}') + return data diff --git a/src/mkconnect/btsocket/readme.md b/src/mkconnect/btsocket/readme.md new file mode 100644 index 0000000..d96144c --- /dev/null +++ b/src/mkconnect/btsocket/readme.md @@ -0,0 +1 @@ +The files are copied from https://github.com/ukBaz/python-btsocket diff --git a/src/mkconnect/btsocket/tools.py b/src/mkconnect/btsocket/tools.py new file mode 100644 index 0000000..b8ad861 --- /dev/null +++ b/src/mkconnect/btsocket/tools.py @@ -0,0 +1,18 @@ +"""Location for tools/functions to be used across various files""" +import logging + + +def create_module_logger(module_name): + """helper function to create logger in modules""" + logger = logging.getLogger(module_name) + strm_hndlr = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + strm_hndlr.setFormatter(formatter) + logger.addHandler(strm_hndlr) + return logger + + +def format_pkt(data): + """Put data packets in a human readable format""" + return ', '.join([f'{bite:#04x}' for bite in data]) diff --git a/src/mkconnect/cli.py b/src/mkconnect/cli.py new file mode 100644 index 0000000..2e3796d --- /dev/null +++ b/src/mkconnect/cli.py @@ -0,0 +1,24 @@ +import sys +from .tracer.TracerConsole import TracerConsole +from .mouldking.MouldKing import MouldKing + +def main() -> int: + tracer = TracerConsole() + + if sys.platform == "linux": + from .advertiser.AdvertiserBTSocket import AdvertiserBTSocket as Advertiser + elif sys.platform == "rp2": + from .advertiser.AdvertiserMicroPython import AdvertiserMicroPython as Advertiser + elif sys.platform == "win32": + from .advertiser.AdvertiserDummy import AdvertiserDummy as Advertiser + else: + raise SystemExit("unsupported platform") + + advertiser = Advertiser() + advertiser.SetTracer(tracer) + + MouldKing.SetTracer(tracer) + MouldKing.SetAdvertiser(advertiser) + + tracer.TraceInfo("mkconnect CLI initialized") + return 0 \ No newline at end of file diff --git a/src/mkconnect/examples/consoletest.py b/src/mkconnect/examples/consoletest.py new file mode 100644 index 0000000..a6f67f9 --- /dev/null +++ b/src/mkconnect/examples/consoletest.py @@ -0,0 +1,179 @@ +#!/usr/bin/python + +# to run: sudo python -i consoletest.py + +import sys +import time + +print('Script: consoletest.py') +print('Platform: ' + sys.platform) + +from mkconnect.tracer.Tracer import Tracer +from mkconnect.tracer.TracerConsole import TracerConsole + +# uncomment to choose advertiser +if (sys.platform == 'linux'): + #from Advertiser.AdvertiserHCITool import AdvertiserHCITool as Advertiser + #from Advertiser.AdvertiserBTMgmt import AdvertiserBTMgmt as Advertiser + from mkconnect.advertiser.AdvertiserBTSocket import AdvertiserBTSocket as Advertiser + pass +elif (sys.platform == 'rp2'): + from mkconnect.advertiser.AdvertiserMicroPython import AdvertiserMicroPython as Advertiser + pass +elif (sys.platform == 'win32'): + from mkconnect.advertiser.AdvertiserDummy import AdvertiserDummy as Advertiser +else: + raise Exception('unsupported platform') + +from mkconnect.mouldking.MouldKing import MouldKing + +# instantiate Tracer +tracer = TracerConsole() + +# instantiate Advertiser +advertiser = Advertiser() +advertiser.SetTracer(tracer) + +# Set Tracer for all MouldKing Hubs +MouldKing.SetTracer(tracer) +MouldKing.SetAdvertiser(advertiser) + +# save pre-instantiated objects in local variables +hub0 = MouldKing.Module6_0.Device0 +hub1 = MouldKing.Module6_0.Device1 +hub2 = MouldKing.Module6_0.Device2 + +hub3 = MouldKing.Module4_0.Device0 +hub4 = MouldKing.Module4_0.Device1 +hub5 = MouldKing.Module4_0.Device2 + +def _getChannelId(channel): + switch={ + 'A': 0, + 'B': 1, + 'C': 2, + 'D': 3, + 'E': 4, + 'F': 5, + } + return switch.get(channel,"") + +def _getHubId(deviceId): + # MK6 + if deviceId == 0: + return hub0 + elif deviceId == 1: + return hub1 + elif deviceId == 2: + return hub2 + # MK4 + elif deviceId == 3: + return hub3 + elif deviceId == 4: + return hub4 + elif deviceId == 5: + return hub5 + else: + raise Exception("deviceId 0..5") + +def _automate(deviceId: int, channel: int): + userinput = input("\nDo you want to test channel "+ str(channel) +" ? enter y/n\n") + + if (userinput != str("y")): + return + + tracer.TraceInfo("HUB: "+ str(deviceId) +", FORWARD : Power ramp up from 0 to 100% on channel :" + str(channel)) + for percent in range(0, 110, 10): + tracer.TraceInfo("Power : " + str(percent) + "%") + mkcontrol(deviceId,channel, percent/100) + time.sleep(1) + + mkstop(deviceId) + + tracer.TraceInfo("HUB: "+ str(deviceId) +", REVERSE: Power ramp up from 0 to 100% on channel :" + str(channel)) + for percent in range(-0, -110, -10): + tracer.TraceInfo("Power : " + str(percent) + "%") + mkcontrol(deviceId,channel,percent/100) + time.sleep(1) + + mkstop(deviceId) + +def mkbtstop(): + """ + stop bluetooth advertising + """ + advertiser.AdvertisementStop() + # hcitool_args1 = hcitool_path + ' -i hci0 cmd 0x08 0x000a 00' + ' &> /dev/null' + + # if platform.system() == 'Linux': + # subprocess.run(hcitool_args1, shell=True, executable="/bin/bash") + # elif platform.system() == 'Windows': + # print('Connect command :') + # print(hcitool_args1) + # else: + # print('Unsupported OS') + + return + +def mkconnect(deviceId: int=0): + """ + send the bluetooth connect telegram to switch the MouldKing hubs in bluetooth mode + press the button on the hub(s) and the flashing of status led should switch from blue-green to blue + """ + hub = _getHubId(deviceId) + rawdata = hub.Connect() + return + +def mkstop(deviceId: int=0): + hub = _getHubId(deviceId) + rawdata = hub.Stop() + return + +def mkcontrol(deviceId: int=0, channel: int=0, powerAndDirection: float=0): + hub = _getHubId(deviceId) + rawdata = hub.SetChannel(channel, powerAndDirection) + return + +def test_hub(hubId: int=0): + tracer.TraceInfo("HUB "+ str(hubId) +" connecting") + mkconnect() + time.sleep(1) + + for index in range(6): + _automate(hubId, index) # start channel 0 = A + tracer.TraceInfo("Channel change requested") + time.sleep(1) + +def help(): + tracer.TraceInfo("Available commands:") + tracer.TraceInfo(" help() : print available commands") + tracer.TraceInfo(" hints() : print hints and examples") + tracer.TraceInfo(" mkconnect() : Initiate hub control by sending bluetooth connect telegram") + tracer.TraceInfo(" mkstop(hubId) : Stop ALL motors") + tracer.TraceInfo(" mkcontrol(hubId, channel, powerAndDirection) : Control a specific hub, channel, power and motor direction") + tracer.TraceInfo(" test_hub(hubId) : run automated tests on each channels") + tracer.TraceInfo(" mkbtstop() : stop bluetooth advertising") + +def hints(): + tracer.TraceInfo("HINTS:") + tracer.TraceInfo("If run on windows, commands are shown but not executed (hcitool dependency)") + tracer.TraceInfo() + tracer.TraceInfo("For connecting:") + tracer.TraceInfo(" Switch MK6.0 Hubs on - led is flashing green/blue") + tracer.TraceInfo(" mkconnect() to send the bluetooth connect telegram. All hubs switch to bluetooth mode") + tracer.TraceInfo(" by short-pressing the button on MK6.0 Hubs you can choose the hubId") + tracer.TraceInfo(" hubId=0 - one Led flash") + tracer.TraceInfo(" hubId=1 - two Led flashs") + tracer.TraceInfo(" hubId=2 - three Led flashs") + tracer.TraceInfo() + tracer.TraceInfo("ex: test_hub(0), mkcontrol(0, 0, 0.5); mkcontrol(0, 1, -1, True)") + tracer.TraceInfo(" the minus sign - indicate reverse motor direction") + +################################################################## +# Entry point when script is started by python -i consoletest.py +help() +tracer.TraceInfo() +hints() + +tracer.TraceInfo() +tracer.TraceInfo("Ready to execute commands\n") diff --git a/src/mkconnect/examples/main.py b/src/mkconnect/examples/main.py new file mode 100644 index 0000000..2fb8108 --- /dev/null +++ b/src/mkconnect/examples/main.py @@ -0,0 +1,157 @@ +#!/usr/bin/python + +import sys +import time + +print('Script: main.py') +print('Platform: ' + sys.platform) + +from mkconnect.tracer.Tracer import Tracer +from mkconnect.tracer.TracerConsole import TracerConsole + +# uncomment to choose advertiser +if (sys.platform == 'linux'): + #from Advertiser.AdvertiserHCITool import AdvertiserHCITool as Advertiser + #from Advertiser.AdvertiserBTMgmt import AdvertiserBTMgmt as Advertiser + from mkconnect.advertiser.AdvertiserBTSocket import AdvertiserBTSocket as Advertiser + pass +elif (sys.platform == 'rp2'): + from mkconnect.advertiser.AdvertiserMicroPython import AdvertiserMicroPython as Advertiser + pass +elif (sys.platform == 'win32'): + from mkconnect.advertiser.AdvertiserDummy import AdvertiserDummy as Advertiser +else: + raise Exception('unsupported platform') + +from mkconnect.mouldking.MouldKing import MouldKing + +# instantiate Tracer +tracer = TracerConsole() + +# instantiate Advertiser +advertiser = Advertiser() +advertiser.SetTracer(tracer) + +# Set Tracer for all MouldKing Hubs +MouldKing.SetTracer(tracer) +MouldKing.SetAdvertiser(advertiser) + +# save pre-instantiated objects in local variables +hub0 = MouldKing.Module6_0.Device0 +hub1 = MouldKing.Module6_0.Device1 +#hub2 = MouldKing.Module6_0.Device2 + +#hub0 = MouldKing.Module4_0.Device0 +#hub1 = MouldKing.Module4_0.Device1 +hub2 = MouldKing.Module4_0.Device2 + +############################################################################ +# get uncrypted connect-telegram as bytearray +title = "connect-telegram" +tracer.TraceInfo("\n" + title) + +hub0.Connect() +hub1.Connect() +hub2.Connect() +time.sleep(5) + +#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata)) + +#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata +#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted)) + +############################################################################ +# get uncrypted stop-telegram as bytearray +title = "stop-telegram" +tracer.TraceInfo("\n" + title) + +rawdata = hub0.Stop() +rawdata = hub1.Stop() +time.sleep(1) + +#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata)) + +#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata +#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted)) + +############################################################################ +# get uncrypted telegram with channel 1 (indexer 0) fullspeed forwards +title = "C1: fullspeed forwards" +tracer.TraceInfo("\n" + title) + +rawdata = hub0.SetChannel(0, 1) +time.sleep(1) + +#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata)) + +#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata +#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted)) + +############################################################################ +# get uncrypted telegram with channel 1 (indexer 0) halfspeed forwards +title = "C1: halfspeed forwards" +tracer.TraceInfo("\n" + title) + +rawdata = hub0.SetChannel(0, 0.5) +time.sleep(1) + +#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata)) + +#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata +#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted)) + +############################################################################ +# get uncrypted telegram with channel 1 (indexer 0) fullspeed forwards +title = "C1: fullspeed forwards" +tracer.TraceInfo("\n" + title) + +rawdata = hub1.SetChannel(0, 1) +time.sleep(2) + +#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata)) + +#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata +#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted)) + +############################################################################ +# get uncrypted telegram with channel 1 (indexer 0) halfspeed backwards +title = "C1: halfspeed backwards" +tracer.TraceInfo("\n" + title) + +rawdata = hub0.SetChannel(0, -0.5) +time.sleep(1) + +#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata)) + +#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata +#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted)) + +############################################################################ +# get uncrypted telegram with channel 1 (indexer 0) halfspeed backwards +title = "C2: halfspeed backwards" +tracer.TraceInfo("\n" + title) + +rawdata = hub0.SetChannel(1, -0.5) +time.sleep(1) + +#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata)) + +#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata +#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted)) + +############################################################################ +# disconnect from advertiser +title = "Disconnect from advertiser" +tracer.TraceInfo("\n" + title) + +hub0.Disconnect() +hub1.Disconnect() +hub2.Disconnect() +time.sleep(1) + +############################################################################ +# stop Advertisement +title = "Advertisement stop" +tracer.TraceInfo("\n" + title) +advertiser.AdvertisementStop() +time.sleep(1) diff --git a/src/mkconnect/mouldking/MouldKing.py b/src/mkconnect/mouldking/MouldKing.py new file mode 100644 index 0000000..3ae8a16 --- /dev/null +++ b/src/mkconnect/mouldking/MouldKing.py @@ -0,0 +1,119 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from ..tracer.Tracer import Tracer +from ..advertiser.Advertiser import Advertiser + +from .MouldKing_Hub_4 import MouldKing_Hub_4 +from .MouldKing_Hub_6 import MouldKing_Hub_6 + +class MouldKing : + """ + abstract class to store the static objects + """ + + class Module4_0 : + """ + abstract class to store the static objects + """ + + Device0 = MouldKing_Hub_4(0) + """ + MouldKing Hub 4.0 with address 1 + """ + + Device1 = MouldKing_Hub_4(1) + """ + MouldKing Hub 4.0 with address 2 + """ + + Device2 = MouldKing_Hub_4(2) + """ + MouldKing Hub 4.0 with address 3 + """ + + @staticmethod + def SetAdvertiser(advertiser: Advertiser) -> Advertiser: + """ + Set Advertiser for all MouldKing 4.0 Hubs + """ + + # MouldKing_4_Hubs is the same instance for all MouldKing_4_Hub-Instances + MouldKing.Module4_0.Device0._MouldKing_4_Hubs.SetAdvertiser(advertiser) + # MouldKing.Module4_0.Device1.MouldKing_4_Hubs.SetAdvertiser(advertiser) + # MouldKing.Module4_0.Device2.MouldKing_4_Hubs.SetAdvertiser(advertiser) + return advertiser + + @staticmethod + def SetTracer(tracer: Tracer) -> Tracer: + """ + Set Tracer for all MouldKing 4.0 Hubs + """ + + MouldKing.Module4_0.Device0.SetTracer(tracer) + MouldKing.Module4_0.Device1.SetTracer(tracer) + MouldKing.Module4_0.Device2.SetTracer(tracer) + return tracer + + class Module6_0 : + """ + abstract class to store the static objects + """ + + Device0 = MouldKing_Hub_6(0) + """ + MouldKing Hub 6.0 with address 1 + """ + + Device1 = MouldKing_Hub_6(1) + """ + MouldKing Hub 6.0 with address 2 + """ + + Device2 = MouldKing_Hub_6(2) + """ + MouldKing Hub 6.0 with address 3 + """ + + @staticmethod + def SetAdvertiser(advertiser: Advertiser) -> Advertiser: + """ + Set Advertiser for all MouldKing 6.0 Hubs + """ + + MouldKing.Module6_0.Device0.SetAdvertiser(advertiser) + MouldKing.Module6_0.Device1.SetAdvertiser(advertiser) + MouldKing.Module6_0.Device2.SetAdvertiser(advertiser) + return advertiser + + @staticmethod + def SetTracer(tracer: Tracer) -> Tracer: + """ + Set Tracer for all MouldKing 6.0 Hubs + """ + + MouldKing.Module6_0.Device0.SetTracer(tracer) + MouldKing.Module6_0.Device1.SetTracer(tracer) + MouldKing.Module6_0.Device2.SetTracer(tracer) + return tracer + + @staticmethod + def SetAdvertiser(advertiser: Advertiser) -> Advertiser: + """ + Set Advertiser for all MouldKing 4.0 Hubs + """ + + # MouldKing_4_Hubs is the same instance for all MouldKing_4_Hub-Instances + MouldKing.Module4_0.SetAdvertiser(advertiser) + MouldKing.Module6_0.SetAdvertiser(advertiser) + return advertiser + + @staticmethod + def SetTracer(tracer: Tracer) -> Tracer: + """ + Set Tracer for all MouldKing 4.0 Hubs + """ + + MouldKing.Module4_0.SetTracer(tracer) + MouldKing.Module6_0.SetTracer(tracer) + return tracer diff --git a/src/mkconnect/mouldking/MouldKingCrypt.py b/src/mkconnect/mouldking/MouldKingCrypt.py new file mode 100644 index 0000000..a258914 --- /dev/null +++ b/src/mkconnect/mouldking/MouldKingCrypt.py @@ -0,0 +1,159 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +class MouldKingCrypt : + """ + class with static methods to do MouldKing encryption + """ + + # static class variables + __Array_C1C2C3C4C5 = bytes([0xC1, 0xC2, 0xC3, 0xC4, 0xC5]) + + @staticmethod + def CreateTelegramForHCITool(manufacturerId: bytes, rawDataArray: bytes) -> bytes: + """ + Create input data for hcitool + """ + cryptedArray = MouldKingCrypt.Crypt(rawDataArray) + cryptedArrayLen = len(cryptedArray) + + resultArray = bytearray(8 + cryptedArrayLen) + resultArray[0] = cryptedArrayLen + 7 # len + resultArray[1] = 0x02 # flags + resultArray[2] = 0x01 + resultArray[3] = 0x02 + resultArray[4] = cryptedArrayLen + 3 # len + resultArray[5] = 0xFF # type manufacturer specific + resultArray[6] = manufacturerId[1] # companyId + resultArray[7] = manufacturerId[0] # companyId + for index in range(cryptedArrayLen): + resultArray[index + 8] = cryptedArray[index] + + return ' '.join(f'{x:02x}' for x in resultArray) + + @staticmethod + def Crypt(rawDataArray: bytes) -> bytes: + """ + do the MouldKing encryption for the given byte-array and return the resulting byte-array + """ + + targetArrayLength = len(MouldKingCrypt.__Array_C1C2C3C4C5) + len(rawDataArray) + 20 + + targetArray = bytearray(targetArrayLength) + targetArray[15] = 113 # 0x71 + targetArray[16] = 15 # 0x0f + targetArray[17] = 85 # 0x55 + + # copy firstDataArray reverse into targetArray with offset 18 + for index in range(len(MouldKingCrypt.__Array_C1C2C3C4C5)): + targetArray[index + 18] = MouldKingCrypt.__Array_C1C2C3C4C5[(len(MouldKingCrypt.__Array_C1C2C3C4C5) - index) - 1] + + # copy rawDataArray into targetArray with offset 18 + len(MouldKingCrypt.__Array_C1C2C3C4C5) + for index in range(len(rawDataArray)): + targetArray[18 + len(MouldKingCrypt.__Array_C1C2C3C4C5) + index] = rawDataArray[index] + + # crypt bytes from position 15 to 22 + for index in range(15, len(MouldKingCrypt.__Array_C1C2C3C4C5) + 18): + targetArray[index] = MouldKingCrypt.__revert_bits_byte(targetArray[index]) + + # calc checksum und copy to array + checksum = MouldKingCrypt.__calc_checksum_from_arrays(MouldKingCrypt.__Array_C1C2C3C4C5, rawDataArray) + targetArray[len(MouldKingCrypt.__Array_C1C2C3C4C5) + 18 + len(rawDataArray) + 0] = (checksum & 255) + targetArray[len(MouldKingCrypt.__Array_C1C2C3C4C5) + 18 + len(rawDataArray) + 1] = ((checksum >> 8) & 255) + + # crypt bytes from offset 18 to the end with magicNumberArray_63 + magicNumberArray_63 = MouldKingCrypt.__create_magic_array(63, 7) + tempArray = bytearray(targetArrayLength - 18) + for index in range(len(tempArray)): + tempArray[index] = targetArray[index + 18] + + MouldKingCrypt.__crypt_array(tempArray, magicNumberArray_63) + targetArray[18:] = tempArray + + # crypt complete array with magicNumberArray_37 + magicNumberArray_37 = MouldKingCrypt.__create_magic_array(37, 7) + MouldKingCrypt.__crypt_array(targetArray, magicNumberArray_37) + + # resulting advertisement array has a length of constant 24 bytes + telegramArray = bytearray(24) + + lengthResultArray = len(MouldKingCrypt.__Array_C1C2C3C4C5) + len(rawDataArray) + 5 + telegramArray[:lengthResultArray] = targetArray[15:15 + lengthResultArray] + + # fill rest of array + for index in range(lengthResultArray, len(telegramArray)): + telegramArray[index] = index + 1 + + return telegramArray + + @staticmethod + def __create_magic_array(magic_number: int, size: int) -> bytes: + magic_array = [0] * size + magic_array[0] = 1 + + for index in range(1, 7): + magic_array[index] = (magic_number >> (6 - index)) & 1 + + return magic_array + + @staticmethod + def __revert_bits_byte(value: int) -> int: + result = 0 + for index_bit in range(8): + if ((1 << index_bit) & value) != 0: + result = result | (1 << (7 - index_bit)) + return result + + @staticmethod + def __revert_bits_int(value: int) -> int: + result = 0 + for index_bit in range(16): + if ((1 << index_bit) & value) != 0: + result |= 1 << (15 - index_bit) + return 65535 & result + + @staticmethod + def __crypt_array(byte_array: bytes, magic_number_array: bytes) -> bytes: + # foreach byte of array + for index_byte in range(len(byte_array)): + current_byte = byte_array[index_byte] + current_result = 0 + # foreach bit in byte + for index_bit in range(8): + current_result += (((current_byte >> index_bit) & 1) ^ MouldKingCrypt.__shift_magic_array(magic_number_array)) << index_bit + byte_array[index_byte] = current_result & 255 + return byte_array + + @staticmethod + def __calc_checksum_from_arrays(first_array: bytes, second_array: bytes) -> int: + result = 65535 + for first_array_index in range(len(first_array)): + result = (result ^ (first_array[(len(first_array) - 1) - first_array_index] << 8)) & 65535 + for index_bit in range(8): + current_result = result & 32768 + result <<= 1 + if current_result != 0: + result ^= 4129 + + for current_byte in second_array: + result = ((MouldKingCrypt.__revert_bits_byte(current_byte) << 8) ^ result) & 65535 + for index_bit in range(8): + current_result = result & 32768 + result <<= 1 + if current_result != 0: + result ^= 4129 + + return MouldKingCrypt.__revert_bits_int(result) ^ 65535 + + @staticmethod + def __shift_magic_array(i_arr: bytes) -> bytes: + r1 = i_arr[3] ^ i_arr[6] + i_arr[3] = i_arr[2] + i_arr[2] = i_arr[1] + i_arr[1] = i_arr[0] + i_arr[0] = i_arr[6] + i_arr[6] = i_arr[5] + i_arr[5] = i_arr[4] + i_arr[4] = r1 + return i_arr[0] + diff --git a/src/mkconnect/mouldking/MouldKingHub.py b/src/mkconnect/mouldking/MouldKingHub.py new file mode 100644 index 0000000..66e751f --- /dev/null +++ b/src/mkconnect/mouldking/MouldKingHub.py @@ -0,0 +1,117 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from ..tracer.Tracer import Tracer +from ..advertiser.AdvertisingDevice import AdvertisingDevice + +from .MouldKingCrypt import MouldKingCrypt + +class MouldKingHub(AdvertisingDevice) : + """ + baseclass + """ + + ManufacturerID = bytes([0xFF, 0xF0]) + + + def __init__(self, identifier: str, numberOfChannels: int, channelStartOffset: int, channelEndOffset: int, telegram_connect: bytes, basetelegram: bytes): + """ + initializes the object and defines the fields + """ + + super().__init__(identifier) + + if telegram_connect is not None: + maxArrayOffset = len(telegram_connect) - 1 + + if channelStartOffset > maxArrayOffset: + raise Exception("max channelStartOffset:" + maxArrayOffset) + + if channelEndOffset > (maxArrayOffset - 1): + raise Exception("max channelEndOffset:" + maxArrayOffset) + + self._NumberOfChannels = numberOfChannels + self._ChannelStartOffset = channelStartOffset + self._ChannelEndOffset = channelEndOffset + + self._Telegram_connect = telegram_connect + self._Basetelegram = basetelegram + + # create array + self._ChannelValueList = [float(0)] * self._NumberOfChannels + + return + + + def Connect(self) -> None: + """ + returns the telegram to switch the MouldKing Hubs in bluetooth mode + """ + + # call baseClass to register at Advertiser + super().Connect() + + self._Advertise(self._Telegram_connect) + + return + + + def Disconnect(self) -> None: + """ + disconnects the device from the advertiser + """ + + self.Stop() + + super().Disconnect() + + return + + + def Stop(self) -> bytes: + """ + set internal stored value of all channels to zero and return the telegram + """ + + # init channels + for channelId in range(0, self._NumberOfChannels): + self._ChannelValueList[channelId] = float(0) + + return self.CreateTelegram() + + + def SetChannel(self, channelId: int, value: float) -> bytes: + """ + set internal stored value of channel with channelId to value and return the telegram + """ + + if channelId > self._NumberOfChannels - 1: + raise Exception("only channelId 0.." + int(self._NumberOfChannels - 1) + "are allowed") + + self._ChannelValueList[channelId] = value + + return self.CreateTelegram() + + + def CreateTelegram(self) -> bytes: + """ + returns a telegram including the internal stored value from all channels + """ + + raise NotImplementedError # override this methode + + + def _Advertise(self, rawdata: bytes) -> bytes: + """ + sends the data to the advertiser + """ + + if(self._tracer is not None): + self._tracer.TraceInfo("Advertise") + + if(self._advertiser is not None): + cryptedData = MouldKingCrypt.Crypt(rawdata) + self._advertiser.AdvertisementDataSet(self._identifier, self.ManufacturerID, cryptedData) + + return self._Telegram_connect + diff --git a/src/mkconnect/mouldking/MouldKingHub_Byte.py b/src/mkconnect/mouldking/MouldKingHub_Byte.py new file mode 100644 index 0000000..b27de4d --- /dev/null +++ b/src/mkconnect/mouldking/MouldKingHub_Byte.py @@ -0,0 +1,50 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from .MouldKingHub import MouldKingHub + +class MouldKingHub_Byte(MouldKingHub) : + """ + baseclass handling with byte channels + """ + + + def __init__(self, identifier: str, numberOfChannels, channelStartOffset, channelEndOffset, telegram_connect, basetelegram): + """ + initializes the object and defines the fields + """ + + # call baseclass init and set number of channels + super().__init__(identifier, numberOfChannels, channelStartOffset, channelEndOffset, telegram_connect, basetelegram) + + + def CreateTelegram(self) -> bytes: + """ + returns the telegram including the internal stored value from all channels + """ + + # make a copy of the basetelegram + currentTelegramData = bytearray(self._Basetelegram) + + # calc the length to be used for channels + channelDataLength = len(currentTelegramData) - self._ChannelEndOffset + + # iterate channels + for channelId in range(0, self._NumberOfChannels): + currentChannelStartOffset = self._ChannelStartOffset + channelId + + if self._NumberOfChannels >= (channelId + 1) and channelDataLength >= currentChannelStartOffset: + channelValue = self._ChannelValueList[channelId] + + if channelValue < 0: + # Range [-1..0] -> 0x80 - [0x7F .. 0x00] = [0x01 .. 0x80] + currentTelegramData[currentChannelStartOffset] = int(0x80 - min(-channelValue * 0x80, 0x80)) + elif channelValue > 0: + # Range [0..1] -> 0x80 + [0x00 .. 0x7F] = [0x80 .. 0xFF] + currentTelegramData[currentChannelStartOffset] = int(0x80 + min(channelValue * 0x7F, 0x7F)) + else: + currentTelegramData[currentChannelStartOffset] = 0x80 + + self._Advertise(currentTelegramData) + + return currentTelegramData \ No newline at end of file diff --git a/src/mkconnect/mouldking/MouldKingHub_Nibble.py b/src/mkconnect/mouldking/MouldKingHub_Nibble.py new file mode 100644 index 0000000..6932c06 --- /dev/null +++ b/src/mkconnect/mouldking/MouldKingHub_Nibble.py @@ -0,0 +1,71 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from .MouldKingHub import MouldKingHub + +class MouldKingHub_Nibble(MouldKingHub) : + """ + baseclass handling with nibble channels + """ + + + def __init__(self, identifier: str, numberOfChannels, channelStartOffset, channelEndOffset, telegram_connect, basetelegram): + """ + initializes the object and defines the fields + """ + + # call baseclass init and set number of channels + super().__init__(identifier, numberOfChannels, channelStartOffset, channelEndOffset, telegram_connect, basetelegram) + + + def CreateTelegram(self) -> bytes: + """ + returns the telegram including the internal stored value from all channels + """ + + # make a copy of the basetelegram + currentTelegramData = bytearray(self._Basetelegram) + + # calc the length to be used for channels + channelDataLength = len(currentTelegramData) - self._ChannelEndOffset + + # iterate channels + byteOffset = 0 + for channelId in range(0, self._NumberOfChannels, 2): + currentChannelStartOffset = self._ChannelStartOffset + byteOffset + + highNibble = 0 + lowNibble = 0 + + if self._NumberOfChannels >= (channelId + 1) and channelDataLength >= currentChannelStartOffset: + evenChannelValue = self._ChannelValueList[channelId] + oddChannelValue2 = self._ChannelValueList[channelId + 1] + + # even Channel -> highNibble + if evenChannelValue < 0: + # Range [-1..0] -> [0x07 .. 0x00] = [0x07 .. 0x00] + highNibble = int(min(-evenChannelValue * 0x07, 0x07)) + elif evenChannelValue > 0: + # Range [0..1] -> 0x80 + [0x00 .. 0x07] = [0x80 .. 0x0F] + highNibble = int(0x08 + min(evenChannelValue * 0x07, 0x07)) + else: + highNibble = 0x08 + + # odd Channel -> lowNibble + if oddChannelValue2 < 0: + # Range [-1..0] -> [0x07 .. 0x00] = [0x07 .. 0x00] + lowNibble = int(min(-oddChannelValue2 * 0x07, 0x07)) + elif oddChannelValue2 > 0: + # Range [0..1] -> 0x80 + [0x00 .. 0x07] = [0x80 .. 0x0F] + lowNibble = int(0x08 + min(oddChannelValue2 * 0x07, 0x07)) + else: + lowNibble = 0x08 + + currentTelegramData[currentChannelStartOffset] = (int)((highNibble << 4) + lowNibble) + + # next byte + byteOffset = byteOffset + 1 + + self._Advertise(currentTelegramData) + + return currentTelegramData \ No newline at end of file diff --git a/src/mkconnect/mouldking/MouldKing_Hub_4.py b/src/mkconnect/mouldking/MouldKing_Hub_4.py new file mode 100644 index 0000000..3c9f2c5 --- /dev/null +++ b/src/mkconnect/mouldking/MouldKing_Hub_4.py @@ -0,0 +1,77 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from ..tracer.Tracer import Tracer + +from ..advertiser.IAdvertisingDevice import IAdvertisingDevice + +#from .MouldKingDevice import MouldKingDevice +from .MouldKing_Hubs_4_12Ch import MouldKing_Hubs_4_12Ch + +class MouldKing_Hub_4(IAdvertisingDevice) : + """ + class handling a MouldKing 4.0 Hub + """ + + # static fields/constants + _MouldKing_4_Hubs = MouldKing_Hubs_4_12Ch() + + + def __init__(self, deviceId: int): + """ + initializes the object and defines the fields + """ + + if deviceId > 2: + raise Exception('only deviceId 0..2 are allowed') + + self._deviceId = deviceId + self._NumberOfChannels = 4 + self._tracer = None + + + def SetTracer(self, tracer: Tracer) -> Tracer: + """ + set tracer object + """ + self._tracer = tracer + + return tracer + + + def Connect(self) -> None: + """ + returns the telegram to switch the MouldKing Hubs in bluetooth mode + """ + MouldKing_Hub_4._MouldKing_4_Hubs.SubDevice_Register(self) + + return + + + def Disconnect(self) -> None: + """ + disconnects the device from the advertiser + """ + MouldKing_Hub_4._MouldKing_4_Hubs.SubDevice_Unregister(self) + + return + + + def Stop(self) -> bytes: + """ + set internal stored value of all channels to zero and return the telegram + """ + + return MouldKing_Hub_4._MouldKing_4_Hubs.SubDevice_Stop(self._deviceId, self._NumberOfChannels) + + + def SetChannel(self, channelId: int, value: float) -> bytes: + """ + set internal stored value of channel with channelId to value and return the telegram + """ + + if channelId > self._NumberOfChannels - 1: + raise Exception("only channelId 0.." + int(self._NumberOfChannels - 1) + "are allowed") + + return MouldKing_Hub_4._MouldKing_4_Hubs.SubDevice_SetChannel(self._deviceId, self._NumberOfChannels, channelId, value) + diff --git a/src/mkconnect/mouldking/MouldKing_Hub_6.py b/src/mkconnect/mouldking/MouldKing_Hub_6.py new file mode 100644 index 0000000..83f60a8 --- /dev/null +++ b/src/mkconnect/mouldking/MouldKing_Hub_6.py @@ -0,0 +1,38 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from .MouldKingHub_Byte import MouldKingHub_Byte + +class MouldKing_Hub_6(MouldKingHub_Byte) : + """ + class handling the MouldKing 6.0 Hub + """ + + # static fields/constants + __telegram_connect = bytes([0x6D, 0x7B, 0xA7, 0x80, 0x80, 0x80, 0x80, 0x92]) # Define a byte array for Telegram Connect + + __telegram_base_device_a = bytes([0x61, 0x7B, 0xA7, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x9E]) # byte array for base Telegram + __telegram_base_device_b = bytes([0x62, 0x7B, 0xA7, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x9D]) # byte array for base Telegram + __telegram_base_device_c = bytes([0x63, 0x7B, 0xA7, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x9C]) # byte array for base Telegram + + + def __init__(self, deviceId: int): + """ + initializes the object and defines the fields + """ + + if deviceId == 0: + basetelegram = MouldKing_Hub_6.__telegram_base_device_a + elif deviceId == 1: + basetelegram = MouldKing_Hub_6.__telegram_base_device_b + elif deviceId == 2: + basetelegram = MouldKing_Hub_6.__telegram_base_device_c + else: + raise Exception('only deviceId 0..2 are allowed') + + # call baseclass init and set number of channels + super().__init__("MK6_" + str(deviceId), 6, 3, 1, MouldKing_Hub_6.__telegram_connect, basetelegram) + + + + diff --git a/src/mkconnect/mouldking/MouldKing_Hubs_4_12Ch.py b/src/mkconnect/mouldking/MouldKing_Hubs_4_12Ch.py new file mode 100644 index 0000000..0749662 --- /dev/null +++ b/src/mkconnect/mouldking/MouldKing_Hubs_4_12Ch.py @@ -0,0 +1,99 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from ..tracer.Tracer import Tracer +from ..advertiser.IAdvertisingDevice import IAdvertisingDevice + +from .MouldKingHub_Nibble import MouldKingHub_Nibble + +class MouldKing_Hubs_4_12Ch(MouldKingHub_Nibble) : + """ + class handling 3 x MouldKing 4.0 Hubs + Only one telegram addresses all possible 3 x MK4 hubs the same time + """ + + # static fields/constants + __telegram_connect = bytes([0xAD, 0x7B, 0xA7, 0x80, 0x80, 0x80, 0x4F, 0x52]) # Define a byte array for Telegram Connect + + __telegram_base = bytes([0x7D, 0x7B, 0xA7, 0x88, 0x88, 0x88, 0x88, 0x88, 0x88, 0x82]) # byte array for base Telegram + + + def __init__(self): + """ + initializes the object and defines the fields + """ + # call baseclass init and set number of channels + super().__init__("MK4", 12, 3, 1, MouldKing_Hubs_4_12Ch.__telegram_connect, MouldKing_Hubs_4_12Ch.__telegram_base) + + self._connectedSubDevices = list() + + + def SubDevice_Register(self, subDevice: IAdvertisingDevice) -> None: + """ + returns the telegram to switch the MouldKing Hubs in bluetooth mode + """ + connectedSubDevicesLen = len(self._connectedSubDevices) + + if(not subDevice is None and not subDevice in self._connectedSubDevices): + self._connectedSubDevices.append(subDevice) + + # first subDevice was added + if(connectedSubDevicesLen == 0): + self.Connect() + + return + + + def SubDevice_Unregister(self, subDevice: IAdvertisingDevice) -> None: + """ + disconnects the device from the advertiser + """ + if(not subDevice is None and subDevice in self._connectedSubDevices): + self._connectedSubDevices.remove(subDevice) + + # last subDevice was removed + if(len(self._connectedSubDevices) == 0): + self.Disconnect() + + return + + + def SubDevice_Stop(self, hubDeviceId: int, hubNumberOfChannels: int) -> bytes: + """ + set internal stored value of all channels to zero and return the telegram + """ + # deviceId = 0 + # -> channelId 0..4 + # deviceId = 2 + # -> channelId 5..8 + # deviceId = 3 + # -> channelId 9..12 + channelIdHubs = hubDeviceId * hubNumberOfChannels + + # init channels + for channelId in range(channelIdHubs, hubNumberOfChannels): + if channelId < self._NumberOfChannels: + self._ChannelValueList[channelId] = float(0) + + return self.CreateTelegram() + + + def SubDevice_SetChannel(self, hubDeviceId: int, hubNumberOfChannels: int, hubChannelId: int, value: float) -> bytes: + """ + set internal stored value of channel with channelId to value and return the telegram + """ + + # deviceId = 0 + # -> channelId 0..4 + # deviceId = 2 + # -> channelId 5..8 + # deviceId = 3 + # -> channelId 9..12 + channelIdHubs = hubDeviceId * hubNumberOfChannels + hubChannelId + + if channelIdHubs > self._NumberOfChannels - 1: + raise Exception("only channelId 0.." + int(self._NumberOfChannels - 1) + "are allowed") + + self._ChannelValueList[channelIdHubs] = value + + return self.CreateTelegram() diff --git a/src/mkconnect/mouldking/__init__.py b/src/mkconnect/mouldking/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/mkconnect/mouldking/__pycache__/MouldKing.cpython-313.pyc b/src/mkconnect/mouldking/__pycache__/MouldKing.cpython-313.pyc new file mode 100644 index 0000000..114ac2e Binary files /dev/null and b/src/mkconnect/mouldking/__pycache__/MouldKing.cpython-313.pyc differ diff --git a/src/mkconnect/mouldking/__pycache__/MouldKingCrypt.cpython-313.pyc b/src/mkconnect/mouldking/__pycache__/MouldKingCrypt.cpython-313.pyc new file mode 100644 index 0000000..13985a2 Binary files /dev/null and b/src/mkconnect/mouldking/__pycache__/MouldKingCrypt.cpython-313.pyc differ diff --git a/src/mkconnect/mouldking/__pycache__/MouldKingHub.cpython-313.pyc b/src/mkconnect/mouldking/__pycache__/MouldKingHub.cpython-313.pyc new file mode 100644 index 0000000..cecb814 Binary files /dev/null and b/src/mkconnect/mouldking/__pycache__/MouldKingHub.cpython-313.pyc differ diff --git a/src/mkconnect/mouldking/__pycache__/MouldKingHub_Byte.cpython-313.pyc b/src/mkconnect/mouldking/__pycache__/MouldKingHub_Byte.cpython-313.pyc new file mode 100644 index 0000000..a276143 Binary files /dev/null and b/src/mkconnect/mouldking/__pycache__/MouldKingHub_Byte.cpython-313.pyc differ diff --git a/src/mkconnect/mouldking/__pycache__/MouldKingHub_Nibble.cpython-313.pyc b/src/mkconnect/mouldking/__pycache__/MouldKingHub_Nibble.cpython-313.pyc new file mode 100644 index 0000000..d683364 Binary files /dev/null and b/src/mkconnect/mouldking/__pycache__/MouldKingHub_Nibble.cpython-313.pyc differ diff --git a/src/mkconnect/mouldking/__pycache__/MouldKing_Hub_4.cpython-313.pyc b/src/mkconnect/mouldking/__pycache__/MouldKing_Hub_4.cpython-313.pyc new file mode 100644 index 0000000..f3683a6 Binary files /dev/null and b/src/mkconnect/mouldking/__pycache__/MouldKing_Hub_4.cpython-313.pyc differ diff --git a/src/mkconnect/mouldking/__pycache__/MouldKing_Hub_6.cpython-313.pyc b/src/mkconnect/mouldking/__pycache__/MouldKing_Hub_6.cpython-313.pyc new file mode 100644 index 0000000..1783748 Binary files /dev/null and b/src/mkconnect/mouldking/__pycache__/MouldKing_Hub_6.cpython-313.pyc differ diff --git a/src/mkconnect/mouldking/__pycache__/MouldKing_Hubs_4_12Ch.cpython-313.pyc b/src/mkconnect/mouldking/__pycache__/MouldKing_Hubs_4_12Ch.cpython-313.pyc new file mode 100644 index 0000000..d2139eb Binary files /dev/null and b/src/mkconnect/mouldking/__pycache__/MouldKing_Hubs_4_12Ch.cpython-313.pyc differ diff --git a/src/mkconnect/mouldking/__pycache__/__init__.cpython-313.pyc b/src/mkconnect/mouldking/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..89b22a2 Binary files /dev/null and b/src/mkconnect/mouldking/__pycache__/__init__.cpython-313.pyc differ diff --git a/src/mkconnect/tracer/Tracer.py b/src/mkconnect/tracer/Tracer.py new file mode 100644 index 0000000..bf279d8 --- /dev/null +++ b/src/mkconnect/tracer/Tracer.py @@ -0,0 +1,18 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +class Tracer : + """ + baseclass + """ + def __init__(self): + """ + initializes the object and defines the fields + """ + pass + + def TraceInfo(self, value: str): + """ + prints out + """ + pass diff --git a/src/mkconnect/tracer/TracerConsole.py b/src/mkconnect/tracer/TracerConsole.py new file mode 100644 index 0000000..7e5bf0f --- /dev/null +++ b/src/mkconnect/tracer/TracerConsole.py @@ -0,0 +1,24 @@ +__author__ = "J0EK3R" +__version__ = "0.1" + +from .Tracer import Tracer + +class TracerConsole(Tracer) : + """ + baseclass + """ + + def __init__(self): + """ + initializes the object and defines the fields + """ + + super().__init__() + + + def TraceInfo(self, value: str=""): + """ + prints out + """ + print(value) + diff --git a/src/mkconnect/tracer/__init__.py b/src/mkconnect/tracer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/mkconnect/tracer/__pycache__/Tracer.cpython-313.pyc b/src/mkconnect/tracer/__pycache__/Tracer.cpython-313.pyc new file mode 100644 index 0000000..496881b Binary files /dev/null and b/src/mkconnect/tracer/__pycache__/Tracer.cpython-313.pyc differ diff --git a/src/mkconnect/tracer/__pycache__/TracerConsole.cpython-313.pyc b/src/mkconnect/tracer/__pycache__/TracerConsole.cpython-313.pyc new file mode 100644 index 0000000..5e06626 Binary files /dev/null and b/src/mkconnect/tracer/__pycache__/TracerConsole.cpython-313.pyc differ diff --git a/src/mkconnect/tracer/__pycache__/__init__.cpython-313.pyc b/src/mkconnect/tracer/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..ff7c6b4 Binary files /dev/null and b/src/mkconnect/tracer/__pycache__/__init__.cpython-313.pyc differ