added project files

This commit is contained in:
oberon 2026-02-03 11:48:43 +01:00
parent 74da7ee616
commit 76853cd66f
62 changed files with 4056 additions and 0 deletions

BIN
.DS_Store vendored

Binary file not shown.

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 J0EK3R
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

21
pyproject.toml Normal file
View File

@ -0,0 +1,21 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "mkconnect"
version = "0.1.0"
description = "MouldKing Bluetooth hub connector"
readme = "README.md"
requires-python = ">=3.8"
license = {file = "LICENSE"}
[tool.setuptools]
package-dir = {"" = "src"}
include-package-data = true
[tool.setuptools.packages.find]
where = ["src"]
[project.scripts]
mkconnect = "mkconnect.cli:main"

BIN
src/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -0,0 +1,149 @@
Metadata-Version: 2.4
Name: mkconnect
Version: 0.1.0
Summary: MouldKing Bluetooth hub connector
License: MIT License
Copyright (c) 2024 J0EK3R
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Dynamic: license-file
# mkconnect-python
...a bit of code to connect to MouldKing Bluetooth Hubs in python.
# MouldKing Hubs
## MouldKing 6.0 Hub
The MouldKing 6.0 Hub has two modes:
* RC-Mode to be controlled with a MouldKing remotecontrol
* Bluetooth-Mode to be controlled with an app
You can control a maximum of three MK6.0 Hubs at the same time with bluetooth.
> (Currently this project can send only one advertising telegram the same time - so only one hub can be controlled, all others will go in timeout-mode till next telegram with their device address is sent.)
## Setting the address of the Hub
To switch the Hub's device address to the next one (device 0, device 1, device 2) just press the button on the hub.
> i.E.: If the script runs with **mkcontrol(2,0,1)** (-> device2, channel0, full speed forward) and nothing is happening, you have to short-press the button, perhaps again...)
# usage
Start the script [consoletest.py](https://github.com/J0EK3R/mkconnect-python/blob/main/consoletest.py) on your raspberry:
```
pi@devpi:~/dev/mkconnect-python $ sudo python -i consoletest.py
```
## mkbtstop() - stop bluetooth advertising
```
Ready to execute commands
>>> mkbtstop()
```
## mkconnect() - switch hubs in bluetooth mode
If you power-on the hubs they will listen to telegrams to the **first device by default**.
Call mkconnect() to switch all hubs in Bluetooth mode.
By short-pressing the button on MK6.0 Hubs you can choose the hubId:
* hubId=0 - one Led flash
* hubId=1 - two Led flashs
* hubId=2 - three Led flashs
```
Ready to execute commands
>>> mkconnect()
```
## mkcontrol(deviceId, channel, power and powerAndDirection)
i.E.: mkcontrol(0, 0, 1) - on first device (deviceId=0) run channel A (channel=0) with fullspeed (powerAndDirection=1)
```
Ready to execute commands
>>> mkcontrol(0, 0, 1)
```
## mkstop(deviceId)
Set all channels of device to zero
```
Ready to execute commands
>>> mkstop(0)
```
---
# old stuff
There is a testscript [consoletest.py](https://github.com/J0EK3R/mkconnect-python/blob/main/consoletest.py) where (on raspberry pi) **hcitool** is used to advertise telegrams over bluetooth.
Maybe you habe to **sudo** the command:
```
pi@devpi:~/dev/mkconnect-python $ sudo python -i consoletest.py
Ready to execute commands
For connecting: mkconnect(hubId) ex: mkconnect(0) or mkconnect(1) for the second hub
Available commands: mkconnect(hubId)
mkstop(hubId)
mkcontrol(deviceId, channel, powerAndDirection)
ex: mkcontrol(0, 0, 0.5) ; mkcontrol(0, 'B', -1)
the minus sign - indicate reverse motor direction
```
Just look in [main.py](https://github.com/J0EK3R/mkconnect-python/blob/main/main.py) for current usage...
Current output in [https://wokwi.com/projects/new/micropython-pi-pico](https://wokwi.com/projects/398314618803830785)
...looks very good! :)
```
connect-telegram
rawdata: 6d 7b a7 80 80 80 80 92
crypted: 6d b6 43 cf 7e 8f 47 11 88 66 59 38 d1 7a aa 26 49 5e 13 14 15 16 17 18
stop-telegram
rawdata: 61 7b a7 80 80 80 80 80 80 9e
crypted: 6d b6 43 cf 7e 8f 47 11 84 66 59 38 d1 7a aa 34 67 4a 55 bf 15 16 17 18
C1: fullspeed forwards
rawdata: 61 7b a7 ff 80 80 80 80 80 9e
crypted: 6d b6 43 cf 7e 8f 47 11 84 66 59 47 d1 7a aa 34 67 4a ed b7 15 16 17 18
C1: halfspeed forwards
rawdata: 61 7b a7 bf 80 80 80 80 80 9e
crypted: 6d b6 43 cf 7e 8f 47 11 84 66 59 07 d1 7a aa 34 67 4a eb 70 15 16 17 18
C1: halfspeed backwards
rawdata: 61 7b a7 40 80 80 80 80 80 9e
crypted: 6d b6 43 cf 7e 8f 47 11 84 66 59 f8 d1 7a aa 34 67 4a 4e fe 15 16 17 18
C2: halfspeed backwards
rawdata: 61 7b a7 40 40 80 80 80 80 9e
crypted: 6d b6 43 cf 7e 8f 47 11 84 66 59 f8 11 7a aa 34 67 4a 3d f9 15 16 17 18
MicroPython v1.22.0 on 2023-12-27; Raspberry Pi Pico with RP2040
Type "help()" for more information.
>>>
raw REPL; CTRL-B to exit
>
```

View File

@ -0,0 +1,41 @@
LICENSE
README.md
pyproject.toml
src/mkconnect/__init__.py
src/mkconnect/__main__.py
src/mkconnect/cli.py
src/mkconnect.egg-info/PKG-INFO
src/mkconnect.egg-info/SOURCES.txt
src/mkconnect.egg-info/dependency_links.txt
src/mkconnect.egg-info/entry_points.txt
src/mkconnect.egg-info/top_level.txt
src/mkconnect/advertiser/Advertiser.py
src/mkconnect/advertiser/AdvertiserBTMgmt.py
src/mkconnect/advertiser/AdvertiserBTSocket.py
src/mkconnect/advertiser/AdvertiserDummy.py
src/mkconnect/advertiser/AdvertiserHCITool.py
src/mkconnect/advertiser/AdvertiserMicroPython.py
src/mkconnect/advertiser/AdvertisingDevice.py
src/mkconnect/advertiser/IAdvertiser.py
src/mkconnect/advertiser/IAdvertisingDevice.py
src/mkconnect/advertiser/__init__.py
src/mkconnect/btsocket/__init__.py
src/mkconnect/btsocket/btmgmt_callback.py
src/mkconnect/btsocket/btmgmt_protocol.py
src/mkconnect/btsocket/btmgmt_socket.py
src/mkconnect/btsocket/btmgmt_sync.py
src/mkconnect/btsocket/tools.py
src/mkconnect/examples/consoletest.py
src/mkconnect/examples/main.py
src/mkconnect/mouldking/MouldKing.py
src/mkconnect/mouldking/MouldKingCrypt.py
src/mkconnect/mouldking/MouldKingHub.py
src/mkconnect/mouldking/MouldKingHub_Byte.py
src/mkconnect/mouldking/MouldKingHub_Nibble.py
src/mkconnect/mouldking/MouldKing_Hub_4.py
src/mkconnect/mouldking/MouldKing_Hub_6.py
src/mkconnect/mouldking/MouldKing_Hubs_4_12Ch.py
src/mkconnect/mouldking/__init__.py
src/mkconnect/tracer/Tracer.py
src/mkconnect/tracer/TracerConsole.py
src/mkconnect/tracer/__init__.py

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,2 @@
[console_scripts]
mkconnect = mkconnect.cli:main

View File

@ -0,0 +1 @@
mkconnect

View File

View File

@ -0,0 +1,3 @@
from .cli import main
if __name__ == "__main__":
raise SystemExit(main())

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,108 @@
__author__ = "J0EK3R"
__version__ = "0.1"
import sys
if (sys.platform == 'rp2'):
import _thread as thread
else:
import threading as thread
from .IAdvertiser import IAdvertiser
from .IAdvertisingDevice import IAdvertisingDevice
from ..tracer.Tracer import Tracer
class Advertiser(IAdvertiser) :
"""
This is the BaseClass for all Advertiser classes.
It implements the interface IAdvertiser.
"""
def __init__(self):
"""
initializes the object and defines the member fields
"""
self._tracer = None
# dictionary to administer the registered AdvertisingDevices.
# * key is the instance of the AdvertisingDevice
# * value is the AdvertisementIdentifier of the AdvertisingDevice
self._registeredDeviceTable = dict()
if (sys.platform == 'rp2'):
self._registeredDeviceTable_Lock = thread.allocate_lock()
else:
self._registeredDeviceTable_Lock = thread.Lock()
return
def SetTracer(self, tracer: Tracer) -> Tracer:
"""
set tracer object
"""
self._tracer = tracer
return tracer
def AdvertisementStop(self) -> None:
"""
stop bluetooth advertising for the Advertiser
"""
return
def TryRegisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool:
"""
try to register the given AdvertisingDevice
* returns True if the AdvertisingDevice was registered successfully
* returns False if the AdvertisingDevice wasn't registered successfully (because it still was registered)
"""
if(advertisingDevice is None):
return False
try:
# acquire lock for table
#self._registeredDeviceTable_Lock.acquire(blocking=True)
self._registeredDeviceTable_Lock.acquire()
if(advertisingDevice in self._registeredDeviceTable):
return False
else:
self._registeredDeviceTable[advertisingDevice] = advertisingDevice.GetAdvertisementIdentifier()
return True
finally:
# release lock for table
self._registeredDeviceTable_Lock.release()
def TryUnregisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool:
"""
try to unregister the given AdvertisingDevice
* returns True if the AdvertisingDevice was unregistered successfully
* returns False if the AdvertisingDevice wasn't unregistered successfully
"""
if(advertisingDevice is None):
return False
try:
# acquire lock for table
#self._registeredDeviceTable_Lock.acquire(blocking=True)
self._registeredDeviceTable_Lock.acquire()
if(advertisingDevice in self._registeredDeviceTable):
self._registeredDeviceTable.pop(advertisingDevice)
return True
else:
return False
finally:
# release lock for table
self._registeredDeviceTable_Lock.release()
def AdvertisementDataSet(self, advertisementIdentifier: str, manufacturerId: bytes, rawdata: bytes) -> None:
"""
Sets Advertisement-Data for a specific AdvertisementIdentifier
This Methode has to be overridden by the implementation of the AdvertisingDevice!
"""
return

View File

@ -0,0 +1,277 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from ..tracer.Tracer import Tracer
from .IAdvertisingDevice import IAdvertisingDevice
from .Advertiser import Advertiser
import subprocess
import threading
import time
class AdvertiserBTMgmt(Advertiser) :
"""
baseclass
"""
# protected static field
_BTMgmt_path = '/usr/bin/btmgmt'
# Number of repetitions per second
_RepetitionsPerSecond = 4
def __init__(self):
"""
initializes the object and defines the member fields
"""
super().__init__() # call baseclass
self._advertisement_thread_Run = False
self._advertisement_thread = None
self._advertisement_thread_Lock = threading.Lock()
# Table
# * key: AdvertisementIdentifier
# * value: advertisement-command for the call of btmgmt tool
self._advertisementTable_thread_Lock = threading.Lock()
self._advertisementTable = dict()
self._lastSetAdvertisementCommand = None
return
def TryRegisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool:
"""
try to register the given AdvertisingDevice
* returns True if the AdvertisingDevice was registered successfully
* returns False if the AdvertisingDevice wasn't registered successfully (because it still was registered)
"""
result = super().TryRegisterAdvertisingDevice(advertisingDevice)
# AdvertisingDevice was registered successfully in baseclass
if(result):
# register AdvertisindIdentifier -> only registered AdvertisindIdentifier will be sent
advertisementIdentifier = advertisingDevice.GetAdvertisementIdentifier()
self._RegisterAdvertisementIdentifier(advertisementIdentifier)
return result
def TryUnregisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool:
"""
try to unregister the given AdvertisingDevice
* returns True if the AdvertisingDevice was unregistered successfully
* returns False if the AdvertisingDevice wasn't unregistered successfully
"""
result = super().TryUnregisterAdvertisingDevice(advertisingDevice)
# AdvertisingDevice was unregistered successfully in baseclass
if(result):
# unregister AdvertisementIdentifier to remove from publishing
advertisementIdentifier = advertisingDevice.GetAdvertisementIdentifier()
self._UnregisterAdvertisementIdentifier(advertisementIdentifier)
return result
def AdvertisementStop(self) -> None:
"""
stop bluetooth advertising
"""
# stop publishing thread
self._advertisement_thread_Run = False
if(self._advertisement_thread is not None):
self._advertisement_thread.join()
self._advertisement_thread = None
#self._advertisementTable.clear()
advertisementCommand = self._BTMgmt_path + ' rm-adv 1' + ' &> /dev/null'
subprocess.run(advertisementCommand, shell=True, executable="/bin/bash")
if (self._tracer is not None):
self._tracer.TraceInfo('AdvertiserBTMgmnt.AdvertisementStop')
self._tracer.TraceInfo(advertisementCommand)
return
def _RegisterAdvertisementIdentifier(self, advertisementIdentifier: str) -> None:
"""
Register AdvertisementIdentifier
"""
try:
self._advertisementTable_thread_Lock.acquire(blocking=True)
if(not advertisementIdentifier in self._advertisementTable):
self._advertisementTable[advertisementIdentifier] = None
finally:
self._advertisementTable_thread_Lock.release()
return
def _UnregisterAdvertisementIdentifier(self, advertisementIdentifier: str) -> None:
"""
Unregister AdvertisementIdentifier
"""
try:
self._registeredDeviceTable_Lock.acquire(blocking=True)
foundAdvertisementIdentifier = False
# there are devices wich share the same AdvertisementIdentifier
# check if AdvertisementIdentifier is still present
for currentAdvertisementIdentifier in self._registeredDeviceTable.values():
if(currentAdvertisementIdentifier == advertisementIdentifier):
foundAdvertisementIdentifier = True
break
if(not foundAdvertisementIdentifier):
self._RemoveAdvertisementIdentifier(advertisementIdentifier)
finally:
self._registeredDeviceTable_Lock.release()
return
def _RemoveAdvertisementIdentifier(self, advertisementIdentifier: str) -> None:
"""
Remove AdvertisementIdentifier
"""
try:
self._advertisementTable_thread_Lock.acquire(blocking=True)
if(advertisementIdentifier in self._advertisementTable):
self._advertisementTable.pop(advertisementIdentifier)
finally:
self._advertisementTable_thread_Lock.release()
if(len(self._advertisementTable) == 0):
self.AdvertisementStop()
return
def AdvertisementDataSet(self, advertisementIdentifier: str, manufacturerId: bytes, rawdata: bytes) -> None:
"""
Set Advertisement data
"""
try:
self._advertisementTable_thread_Lock.acquire(blocking=True)
# only registered AdvertisementIdentifier are handled
if(advertisementIdentifier in self._advertisementTable):
advertisementCommand = self._BTMgmt_path + ' add-adv -d ' + self._CreateTelegramForBTMgmmt(manufacturerId, rawdata) + ' --general-discov 1' + ' &> /dev/null'
self._advertisementTable[advertisementIdentifier] = advertisementCommand
# for quick change handle immediately
timeSlot = self._CalcTimeSlot()
self._Advertise(advertisementCommand, timeSlot)
finally:
self._advertisementTable_thread_Lock.release()
# start publish thread if necessary
if(not self._advertisement_thread_Run):
self._advertisement_thread = threading.Thread(target=self._publish)
self._advertisement_thread.daemon = True
self._advertisement_thread.start()
self._advertisement_thread_Run = True
if (self._tracer is not None):
self._tracer.TraceInfo('AdvertiserBTMgmnt.AdvertisementSet')
return
def _publish(self) -> None:
"""
publishing loop
"""
if (self._tracer is not None):
self._tracer.TraceInfo('AdvertiserBTMgmnt._publish')
# loop while field is True
while(self._advertisement_thread_Run):
try:
try:
self._advertisementTable_thread_Lock.acquire(blocking=True)
# make a copy of the table to release the lock as quick as possible
copy_of_advertisementTable = self._advertisementTable.copy()
# calc time for one publishing slot
timeSlot = self._CalcTimeSlot()
finally:
self._advertisementTable_thread_Lock.release()
if(len(copy_of_advertisementTable) == 0):
pass
else:
for key, advertisementCommand in copy_of_advertisementTable.items():
# stop publishing?
if(not self._advertisement_thread_Run):
return
self._Advertise(advertisementCommand, timeSlot)
except:
pass
def _CalcTimeSlot(self) -> float:
"""
Calculates the timespan in seconds for each timeslot
"""
# timeSlot = 1 second / repetitionsPerSecond / len(self._advertisementTable)
timeSlot = 1 / self._RepetitionsPerSecond / max(1, len(self._advertisementTable))
return timeSlot
def _Advertise(self, advertisementCommand: str, timeSlot: float) -> None:
"""
calls the btmgmt tool as subprocess
"""
try:
self._advertisement_thread_Lock.acquire(blocking=True)
timeStart = time.time()
if (self._lastSetAdvertisementCommand != advertisementCommand):
self._lastSetAdvertisementCommand = advertisementCommand
subprocess.run(advertisementCommand, shell=True, executable="/bin/bash")
timeEnd = time.time()
timeDelta = timeEnd - timeStart
timeSlotRemain = max(0.001, timeSlot - timeDelta)
# stop publishing?
if(self._advertisement_thread_Run):
time.sleep(timeSlotRemain)
finally:
self._advertisement_thread_Lock.release()
return
def _CreateTelegramForBTMgmmt(self, manufacturerId: bytes, rawDataArray: bytes) -> str:
"""
Create input data for btmgmt
"""
rawDataArrayLen = len(rawDataArray)
resultArray = bytearray(4 + rawDataArrayLen)
resultArray[0] = rawDataArrayLen + 3 # len
resultArray[1] = 0xFF # type manufacturer specific
resultArray[2] = manufacturerId[1] # companyId
resultArray[3] = manufacturerId[0] # companyId
for index in range(rawDataArrayLen):
resultArray[index + 4] = rawDataArray[index]
return ''.join(f'{x:02x}' for x in resultArray)

View File

@ -0,0 +1,500 @@
__author__ = "J0EK3R"
__version__ = "0.1"
import sys
import enum
import threading
import time
from ..btsocket import btmgmt_socket
from ..btsocket import btmgmt_protocol
from ..tracer.Tracer import Tracer
from .Advertiser import Advertiser, IAdvertisingDevice
class AdvertiserBTSocket(Advertiser) :
"""
baseclass
"""
class Flags(enum.IntEnum):
CONNECTABLE = enum.auto()
GENERAL_DISCOVERABLE = enum.auto()
LIMITED_DISCOVERABLE = enum.auto()
FLAGS_IN_ADV_DATA = enum.auto()
TX_IN_ADV_DATA = enum.auto()
APPEARANCE_IN_ADV_DATA = enum.auto()
LOCAL_NAME_IN_ADV_DATA = enum.auto()
PHY_LE_1M = enum.auto()
PHY_LE_2M = enum.auto()
PHY_LE_CODED = enum.auto()
# Number of repetitions per second
_RepetitionsPerSecond = 4
def __init__(self):
"""
initializes the object and defines the member fields
"""
super().__init__() # call baseclass
self._advertisement_thread_Run = False
self._advertisement_thread = None
self._advertisement_thread_Lock = threading.Lock()
# Table
# * key: AdvertisementIdentifier
# * value: advertisement-command for the call of btmgmt tool
self._advertisementTable_thread_Lock = threading.Lock()
self._advertisementTable = dict()
self._lastSetAdvertisementCommand = None
self.sock = btmgmt_socket.open()
return
def TryRegisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool:
"""
try to register the given AdvertisingDevice
* returns True if the AdvertisingDevice was registered successfully
* returns False if the AdvertisingDevice wasn't registered successfully (because it still was registered)
"""
result = super().TryRegisterAdvertisingDevice(advertisingDevice)
# AdvertisingDevice was registered successfully in baseclass
if(result):
# register AdvertisindIdentifier -> only registered AdvertisindIdentifier will be sent
advertisementIdentifier = advertisingDevice.GetAdvertisementIdentifier()
self._RegisterAdvertisementIdentifier(advertisementIdentifier)
return result
def TryUnregisterAdvertisingDevice(self, advertisingDevice: IAdvertisingDevice) -> bool:
"""
try to unregister the given AdvertisingDevice
* returns True if the AdvertisingDevice was unregistered successfully
* returns False if the AdvertisingDevice wasn't unregistered successfully
"""
result = super().TryUnregisterAdvertisingDevice(advertisingDevice)
# AdvertisingDevice was unregistered successfully in baseclass
if(result):
# unregister AdvertisementIdentifier to remove from publishing
advertisementIdentifier = advertisingDevice.GetAdvertisementIdentifier()
self._UnregisterAdvertisementIdentifier(advertisementIdentifier)
return result
def AdvertisementStop(self) -> None:
"""
stop bluetooth advertising
"""
# stop publishing thread
self._advertisement_thread_Run = False
if(self._advertisement_thread is not None):
self._advertisement_thread.join()
self._advertisement_thread = None
advertisementCommand = AdvertiserBTSocket._create_rm_advert_command(1)
self.sock.send(advertisementCommand)
# if (self._tracer is not None):
# self._tracer.TraceInfo('AdvertiserBTMgmnt.AdvertisementStop')
# self._tracer.TraceInfo(advertisementCommand)
return
def _RegisterAdvertisementIdentifier(self, advertisementIdentifier: str) -> None:
"""
Register AdvertisementIdentifier
"""
try:
self._advertisementTable_thread_Lock.acquire(blocking=True)
if(not advertisementIdentifier in self._advertisementTable):
self._advertisementTable[advertisementIdentifier] = None
finally:
self._advertisementTable_thread_Lock.release()
return
def _UnregisterAdvertisementIdentifier(self, advertisementIdentifier: str) -> None:
"""
Unregister AdvertisementIdentifier
"""
try:
self._registeredDeviceTable_Lock.acquire(blocking=True)
foundAdvertisementIdentifier = False
# there are devices wich share the same AdvertisementIdentifier
# check if AdvertisementIdentifier is still present
for currentAdvertisementIdentifier in self._registeredDeviceTable.values():
if(currentAdvertisementIdentifier == advertisementIdentifier):
foundAdvertisementIdentifier = True
break
if(not foundAdvertisementIdentifier):
self._RemoveAdvertisementIdentifier(advertisementIdentifier)
finally:
self._registeredDeviceTable_Lock.release()
return
def _RemoveAdvertisementIdentifier(self, advertisementIdentifier: str) -> None:
"""
Remove AdvertisementIdentifier
"""
try:
self._advertisementTable_thread_Lock.acquire(blocking=True)
if(advertisementIdentifier in self._advertisementTable):
self._advertisementTable.pop(advertisementIdentifier)
finally:
self._advertisementTable_thread_Lock.release()
if(len(self._advertisementTable) == 0):
self.AdvertisementStop()
return
def AdvertisementDataSet(self, advertisementIdentifier: str, manufacturerId: bytes, rawdata: bytes) -> None:
"""
Set Advertisement data
"""
try:
self._advertisementTable_thread_Lock.acquire(blocking=True)
# only registered AdvertisementIdentifier are handled
if(advertisementIdentifier in self._advertisementTable):
# advertisementCommand = self._BTMgmt_path + ' add-adv -d ' + self._CreateTelegramForBTMgmmt(manufacturerId, rawdata) + ' --general-discov 1' + ' &> /dev/null'
advertisingData = self._CreateAdvertisingDataString(manufacturerId, rawdata)
advertisementCommand = AdvertiserBTSocket._create_add_advert_command(
instance_id=1,
flags=AdvertiserBTSocket.Flags.GENERAL_DISCOVERABLE,
duration=0x00, # zero means use default
timeout=0x00, # zero means use default
#adv_data='1bfff0ff6DB643CF7E8F471188665938D17AAA26495E131415161718',
adv_data=advertisingData,
scan_rsp='',
)
self._advertisementTable[advertisementIdentifier] = advertisementCommand
# for quick change handle immediately
timeSlot = self._CalcTimeSlot()
self._Advertise(advertisementCommand, timeSlot)
finally:
self._advertisementTable_thread_Lock.release()
# start publish thread if necessary
if(not self._advertisement_thread_Run):
self._advertisement_thread = threading.Thread(target=self._publish)
self._advertisement_thread.daemon = True
self._advertisement_thread.start()
self._advertisement_thread_Run = True
if (self._tracer is not None):
self._tracer.TraceInfo('AdvertiserBTMgmnt.AdvertisementSet')
return
def _publish(self) -> None:
"""
publishing loop
"""
if (self._tracer is not None):
self._tracer.TraceInfo('AdvertiserBTMgmnt._publish')
# loop while field is True
while(self._advertisement_thread_Run):
try:
try:
self._advertisementTable_thread_Lock.acquire(blocking=True)
# make a copy of the table to release the lock as quick as possible
copy_of_advertisementTable = self._advertisementTable.copy()
# calc time for one publishing slot
timeSlot = self._CalcTimeSlot()
finally:
self._advertisementTable_thread_Lock.release()
if(len(copy_of_advertisementTable) == 0):
pass
else:
for key, advertisementCommand in copy_of_advertisementTable.items():
# stop publishing?
if(not self._advertisement_thread_Run):
return
self._Advertise(advertisementCommand, timeSlot)
except:
pass
def _CalcTimeSlot(self) -> float:
"""
Calculates the timespan in seconds for each timeslot
"""
# timeSlot = 1 second / repetitionsPerSecond / len(self._advertisementTable)
timeSlot = 1 / self._RepetitionsPerSecond / max(1, len(self._advertisementTable))
return timeSlot
def _Advertise(self, advertisementCommand: str, timeSlot: float) -> None:
"""
calls the btmgmt tool as subprocess
"""
try:
self._advertisement_thread_Lock.acquire(blocking=True)
timeStart = time.time()
if (self._lastSetAdvertisementCommand != advertisementCommand):
self._lastSetAdvertisementCommand = advertisementCommand
# self.loop.call_soon(self.sock.send, advertisementCommand)
self.sock.send(advertisementCommand)
timeEnd = time.time()
timeDelta = timeEnd - timeStart
timeSlotRemain = max(0.001, timeSlot - timeDelta)
# stop publishing?
if(self._advertisement_thread_Run):
time.sleep(timeSlotRemain)
finally:
self._advertisement_thread_Lock.release()
return
def _CreateAdvertisingDataString(self, manufacturerId: bytes, rawDataArray: bytes) -> str:
"""
Create input data for btmgmt
"""
rawDataArrayLen = len(rawDataArray)
resultArray = bytearray(4 + rawDataArrayLen)
resultArray[0] = rawDataArrayLen + 3 # len
resultArray[1] = 0xFF # type manufacturer specific
resultArray[2] = manufacturerId[1] # companyId
resultArray[3] = manufacturerId[0] # companyId
for index in range(rawDataArrayLen):
resultArray[index + 4] = rawDataArray[index]
return ''.join(f'{x:02x}' for x in resultArray)
@staticmethod
def _little_bytes(value, size_of):
return int(value).to_bytes(size_of, byteorder='little')
@staticmethod
def _create_add_advert_command(instance_id, flags, duration, timeout, adv_data, scan_rsp):
""" Add Advertising Command
Command Code: 0x003e
Controller Index: <controller id>
Command Parameters: Instance (1 Octet)
Flags (4 Octets)
Duration (2 Octets)
Timeout (2 Octets)
Adv_Data_Len (1 Octet)
Scan_Rsp_Len (1 Octet)
Adv_Data (0-255 Octets)
Scan_Rsp (0-255 Octets)
Return Parameters: Instance (1 Octet)
This command is used to configure an advertising instance that
can be used to switch a Bluetooth Low Energy controller into
advertising mode.
Added advertising information with this command will not be visible
immediately if advertising is enabled via the Set Advertising
command. The usage of the Set Advertising command takes precedence
over this command. Instance information is stored and will be
advertised once advertising via Set Advertising has been disabled.
The Instance identifier is a value between 1 and the number of
supported instances. The value 0 is reserved.
With the Flags value the type of advertising is controlled and
the following flags are defined:
0 Switch into Connectable mode
1 Advertise as Discoverable
2 Advertise as Limited Discoverable
3 Add Flags field to Adv_Data
4 Add TX Power field to Adv_Data
5 Add Appearance field to Scan_Rsp
6 Add Local Name in Scan_Rsp
7 Secondary Channel with LE 1M
8 Secondary Channel with LE 2M
9 Secondary Channel with LE Coded
When the connectable flag is set, then the controller will use
undirected connectable advertising. The value of the connectable
setting can be overwritten this way. This is useful to switch a
controller into connectable mode only for LE operation. This is
similar to the mode 0x02 from the Set Advertising command.
When the connectable flag is not set, then the controller will
use advertising based on the connectable setting. When using
non-connectable or scannable advertising, the controller will
be programmed with a non-resolvable random address. When the
system is connectable, then the identity address or resolvable
private address will be used.
Using the connectable flag is useful for peripheral mode support
where BR/EDR (and/or LE) is controlled by Add Device. This allows
making the peripheral connectable without having to interfere
with the global connectable setting.
If Scan_Rsp_Len is zero and connectable flag is not set and
the global connectable setting is off, then non-connectable
advertising is used. If Scan_Rsp_Len is larger than zero and
connectable flag is not set and the global advertising is off,
then scannable advertising is used. This small difference is
supported to provide less air traffic for devices implementing
broadcaster role.
Secondary channel flags can be used to advertise in secondary
channel with the corresponding PHYs. These flag bits are mutually
exclusive and setting multiple will result in Invalid Parameter
error. Choosing either LE 1M or LE 2M will result in using
extended advertising on the primary channel with LE 1M and the
respectively LE 1M or LE 2M on the secondary channel. Choosing
LE Coded will result in using extended advertising on the primary
and secondary channels with LE Coded. Choosing none of these flags
will result in legacy advertising.
The Duration parameter configures the length of an Instance. The
value is in seconds.
A value of 0 indicates a default value is chosen for the
Duration. The default is 2 seconds.
If only one advertising Instance has been added, then the Duration
value will be ignored. It only applies for the case where multiple
Instances are configured. In that case every Instance will be
available for the Duration time and after that it switches to
the next one. This is a simple round-robin based approach.
The Timeout parameter configures the life-time of an Instance. In
case the value 0 is used it indicates no expiration time. If a
timeout value is provided, then the advertising Instance will be
automatically removed when the timeout passes. The value for the
timeout is in seconds. Powering down a controller will invalidate
all advertising Instances and it is not possible to add a new
Instance with a timeout when the controller is powered down.
When a Timeout is provided, then the Duration subtracts from
the actual Timeout value of that Instance. For example an Instance
with Timeout of 5 and Duration of 2 will be scheduled exactly 3
times, twice with 2 seconds and once with one second. Other
Instances have no influence on the Timeout.
Re-adding an already existing instance (i.e. issuing the Add
Advertising command with an Instance identifier of an existing
instance) will update that instance's configuration.
An instance being added or changed while another instance is
being advertised will not be visible immediately but only when
the new/changed instance is being scheduled by the round robin
advertising algorithm.
Changes to an instance that is currently being advertised will
cancel that instance and switch to the next instance. The changes
will be visible the next time the instance is scheduled for
advertising. In case a single instance is active, this means
that changes will be visible right away.
A pre-requisite is that LE is already enabled, otherwise this
command will return a "rejected" response.
This command can be used when the controller is not powered and
all settings will be programmed once powered.
This command generates a Command Complete event on success or a
Command Status event on failure.
Possible errors: Failed
Rejected
Not Supported
Invalid Parameters
Invalid Index
"""
cmd = AdvertiserBTSocket._little_bytes(0x003e, 2)
ctrl_idx = AdvertiserBTSocket._little_bytes(0x00, 2)
instance = AdvertiserBTSocket._little_bytes(instance_id, 1) # (1 Octet)
flags = AdvertiserBTSocket._little_bytes(flags, 4) # (4 Octets)
duration = AdvertiserBTSocket._little_bytes(duration, 2) # (2 Octets)
timeout = AdvertiserBTSocket._little_bytes(timeout, 2) # (2 Octets)
adv_data = bytes.fromhex(adv_data) # (0-255 Octets)
adv_data_len = AdvertiserBTSocket._little_bytes(len(adv_data), 1) # (1 Octet)
scan_rsp = bytes.fromhex(scan_rsp) # (0-255 Octets)
scan_rsp_len = AdvertiserBTSocket._little_bytes(len(scan_rsp), 1) # (1 Octet)
params = instance + flags + duration + timeout + adv_data_len + scan_rsp_len + adv_data + scan_rsp
param_len = AdvertiserBTSocket._little_bytes(len(params), 2)
return cmd + ctrl_idx + param_len + params
@staticmethod
def _create_rm_advert_command(instance_id):
""" Remove Advertising Command
Command Code: 0x003f
Controller Index: <controller id>
Command Parameters: Instance (1 Octet)
Return Parameters: Instance (1 Octet)
This command is used to remove an advertising instance that
can be used to switch a Bluetooth Low Energy controller into
advertising mode.
When the Instance parameter is zero, then all previously added
advertising Instances will be removed.
Removing advertising information with this command will not be
visible as long as advertising is enabled via the Set Advertising
command. The usage of the Set Advertising command takes precedence
over this command. Changes to Instance information are stored and
will be advertised once advertising via Set Advertising has been
disabled.
Removing an instance while it is being advertised will immediately
cancel the instance, even when it has been advertised less then its
configured Timeout or Duration.
This command can be used when the controller is not powered and
all settings will be programmed once powered.
This command generates a Command Complete event on success or
a Command Status event on failure.
Possible errors: Invalid Parameters
Invalid Index
"""
cmd = AdvertiserBTSocket._little_bytes(0x003f, 2)
ctrl_idx = AdvertiserBTSocket._little_bytes(0x00, 2)
instance = AdvertiserBTSocket._little_bytes(instance_id, 1) # (1 Octet)
params = instance
param_len = AdvertiserBTSocket._little_bytes(len(params), 2)
return cmd + ctrl_idx + param_len + params

View File

@ -0,0 +1,35 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from ..tracer.Tracer import Tracer
from .Advertiser import Advertiser
class AdvertiserDummy(Advertiser) :
"""
Dummy Advertiser
"""
def __init__(self):
"""
initializes the object and defines the fields
"""
super().__init__()
return
def AdvertisementStop(self) -> None:
"""
stop bluetooth advertising
"""
return
def AdvertisementDataSet(self, identifier: str, manufacturerId: bytes, rawdata: bytes) -> None:
"""
Set Advertisement data
"""
return

View File

@ -0,0 +1,145 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from ..tracer.Tracer import Tracer
from .Advertiser import Advertiser
import subprocess
import threading
import time
class AdvertiserHCITool(Advertiser) :
"""
baseclass
"""
HCITool_path = '/usr/bin/hcitool'
def __init__(self):
"""
initializes the object and defines the fields
"""
super().__init__()
self._isInitialized = False
self._ad_thread_Run = False
self._ad_thread = None
self._ad_thread_Lock = threading.Lock()
self._advertisementTable = dict()
return
def AdvertisementStop(self) -> None:
"""
stop bluetooth advertising
"""
self._ad_thread_Run = False
if(self._ad_thread is not None):
self._ad_thread.join()
self._ad_thread = None
self._isInitialized = False
hcitool_args_0x08_0x000a = self.HCITool_path + ' -i hci0 cmd 0x08 0x000a 00'
subprocess.run(hcitool_args_0x08_0x000a + ' &> /dev/null', shell=True, executable="/bin/bash")
if (self._tracer is not None):
self._tracer.TraceInfo('AdvertiserHCITool.AdvertisementStop')
self._tracer.TraceInfo(hcitool_args_0x08_0x000a)
return
def AdvertisementDataSet(self, identifier: str, manufacturerId: bytes, rawdata: bytes) -> None:
"""
Set Advertisement data
"""
advertisementCommand = self.HCITool_path + ' -i hci0 cmd 0x08 0x0008 ' + self._CreateTelegramForHCITool(manufacturerId, rawdata)
self._ad_thread_Lock.acquire(blocking=True)
self._advertisementTable[identifier] = advertisementCommand
self._ad_thread_Lock.release()
if(not self._ad_thread_Run):
self._ad_thread = threading.Thread(target=self._publish)
self._ad_thread.daemon = True
self._ad_thread.start()
self._ad_thread_Run = True
if (self._tracer is not None):
self._tracer.TraceInfo('AdvertiserHCITool.AdvertisementSet')
return
def _publish(self) -> None:
if (self._tracer is not None):
self._tracer.TraceInfo('AdvertiserHCITool._publish')
while(self._ad_thread_Run):
try:
self._ad_thread_Lock.acquire(blocking=True)
copy_of_advertisementTable = self._advertisementTable.copy()
self._ad_thread_Lock.release()
# We want to repeat each command
repetitionsPerSecond = 4
# timeSlot = 1 second / repetitionsPerSecond / len(copy_of_advertisementTable)
timeSlot = 1 / repetitionsPerSecond / max(1, len(copy_of_advertisementTable))
for key, advertisementCommand in copy_of_advertisementTable.items():
# stopp publishing?
if(not self._ad_thread_Run):
return
timeStart = time.time()
subprocess.run(advertisementCommand + ' &> /dev/null', shell=True, executable="/bin/bash")
if(not self._isInitialized):
hcitool_args_0x08_0x0006 = self.HCITool_path + ' -i hci0 cmd 0x08 0x0006 A0 00 A0 00 03 00 00 00 00 00 00 00 00 07 00'
hcitool_args_0x08_0x000a = self.HCITool_path + ' -i hci0 cmd 0x08 0x000a 01'
subprocess.run(hcitool_args_0x08_0x0006 + ' &> /dev/null', shell=True, executable="/bin/bash")
subprocess.run(hcitool_args_0x08_0x000a + ' &> /dev/null', shell=True, executable="/bin/bash")
if (self._tracer is not None):
self._tracer.TraceInfo(str(hcitool_args_0x08_0x0006))
self._tracer.TraceInfo(str(hcitool_args_0x08_0x000a))
self._tracer.TraceInfo()
self._isInitialized = True
timeEnd = time.time()
timeDelta = timeEnd - timeStart
timeSlotRemain = max(0.001, timeSlot - timeDelta)
# if (self._tracer is not None):
# self._tracer.TraceInfo(str(timeSlotRemain) + " " + str(key) + ": " + str(advertisement))
# if (self._tracer is not None):
# self._tracer.TraceInfo(str(timeSlotRemain))
time.sleep(timeSlotRemain)
except:
pass
def _CreateTelegramForHCITool(self, manufacturerId: bytes, rawDataArray: bytes) -> str:
"""
Create input data for hcitool
"""
rawDataArrayLen = len(rawDataArray)
resultArray = bytearray(8 + rawDataArrayLen)
resultArray[0] = rawDataArrayLen + 7 # len
resultArray[1] = 0x02 # flags
resultArray[2] = 0x01
resultArray[3] = 0x02
resultArray[4] = rawDataArrayLen + 3 # len
resultArray[5] = 0xFF # type manufacturer specific
resultArray[6] = manufacturerId[1] # companyId
resultArray[7] = manufacturerId[0] # companyId
for index in range(rawDataArrayLen):
resultArray[index + 8] = rawDataArray[index]
return ' '.join(f'{x:02x}' for x in resultArray)

View File

@ -0,0 +1,90 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from ..tracer.Tracer import Tracer
from .Advertiser import Advertiser
try:
import bluetooth
except ImportError as err:
print("AdvertiserMicroPython: " + str(err))
class AdvertiserMicroPython(Advertiser) :
"""
Advertiser using bluetooth library from MicroPython
"""
def __init__(self):
"""
initializes the object and defines the fields
"""
super().__init__()
# Activate bluetooth
try:
self.ble = bluetooth.BLE()
self.ble.active(True)
except Exception as exception:
self.ble = None
print("AdvertiserMicroPython.init: " + str(exception))
return
def AdvertisementStop(self) -> None:
"""
stop bluetooth advertising
"""
if(self.ble is not None):
self.ble.gap_advertise(None)
if (self._tracer is not None):
self._tracer.TraceInfo("AdvertisementSet")
else:
if (self._tracer is not None):
self._tracer.TraceInfo("self.ble is None")
if (self._tracer is not None):
pass
return
def AdvertisementDataSet(self, identifier: str, manufacturerId: bytes, rawdata: bytes) -> None:
"""
Set Advertisement data
"""
data = self._CreateTelegramForPicoW(manufacturerId, rawdata)
if(self.ble is not None):
self.ble.gap_advertise(100, data)
if (self._tracer is not None):
self._tracer.TraceInfo("AdvertisementSet")
else:
if (self._tracer is not None):
self._tracer.TraceInfo("self.ble is None")
return
def _CreateTelegramForPicoW(self, manufacturerId: bytes, rawDataArray: bytes) -> bytes:
"""
Create input data for bluetooth lib for Pico W
"""
rawDataArrayLen = len(rawDataArray)
btdata = bytearray(2 + rawDataArrayLen)
btdata[0] = 0x00
btdata[1] = 0xFF
for index in range(rawDataArrayLen):
btdata[index + 2] = rawDataArray[index]
btcrypteddata = bytearray(b'\x02\x01\x02') + bytearray((len(btdata) + 1, 0xFF)) + btdata
return bytes(btcrypteddata)

View File

@ -0,0 +1,99 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from ..tracer.Tracer import Tracer
from .IAdvertisingDevice import IAdvertisingDevice
from .Advertiser import Advertiser
class AdvertisingDevice(IAdvertisingDevice) :
"""
baseclass
"""
def __init__(self, identifier: str):
"""
initializes the object and defines the fields
"""
self._connected = False
self._advertiser = None
self._advertiser_registered = False
self._tracer = None
self._identifier = identifier
def SetAdvertiser(self, advertiser: Advertiser) -> Advertiser:
"""
set advertiser object
"""
if(self._advertiser == advertiser):
return advertiser
reconnect = self._connected
# unregister
if(self._advertiser is not None and self._advertiser_registered):
self._advertiser_registered = not self._advertiser.TryUnregisterAdvertisingDevice(self)
self._connected = False
self._advertiser = advertiser
# register
if(self._advertiser is not None and reconnect):
self._advertiser_registered = self._advertiser.TryRegisterAdvertisingDevice(self)
return advertiser
def SetTracer(self, tracer: Tracer) -> Tracer:
"""
set tracer object
"""
self._tracer = tracer
return tracer
def Connect(self):
"""
connects the device to the advertiser
"""
if(self._advertiser is not None and not self._advertiser_registered):
self._advertiser_registered = self._advertiser.TryRegisterAdvertisingDevice(self)
self._connected = True
return
def Disconnect(self) -> None:
"""
disconnects the device from the advertiser
"""
if(self._advertiser is not None and self._advertiser_registered):
self._advertiser_registered = not self._advertiser.TryUnregisterAdvertisingDevice(self)
self._connected = False
return
def Stop(self) -> bytes:
"""
stops the device
"""
raise NotImplementedError # override this methode
def AdvertisementSet(self, manufacturerId: bytes, rawdata: bytes) -> None:
"""
Set Advertisement data
"""
pass
def GetAdvertisementIdentifier(self) -> str:
return self._identifier

View File

@ -0,0 +1,9 @@
__author__ = "J0EK3R"
__version__ = "0.1"
class IAdvertiser :
"""
(kind of) interface for Advertiser
This Type mustn't import any AdvertisingDevice stuff!
To prevent cyclic imports caused by imports of Advertiser <--> AdvertisingDevice.
"""

View File

@ -0,0 +1,48 @@
__author__ = "J0EK3R"
__version__ = "0.1"
class IAdvertisingDevice :
"""
(kind of interface) for AdvertisingDevice
This Type mustn't import any Advertiser stuff!
To prevent cyclic imports caused by imports of Advertiser <--> AdvertisingDevice.
"""
def GetAdvertisementIdentifier(self) -> str:
"""
Returns the AdvertisementIdentifier to differentiate the Advertising-Data.
The AdvertisementIdentifier is used to register the Advertising-Data object.
This Methode has to be overridden by the implementation of the AdvertisingDevice!
Some AdvertisingDevices like MouldKing 4.0 Hub use only one Advertising telegram for all
(three possible) devices. So each AdvertisingDevices returns the same AdvertisementIdentifier
"""
raise NotImplementedError # override this methode
def Connect(self) -> None:
"""
connects the device to the advertiser
"""
raise NotImplementedError # override this methode
def Disconnect(self) -> None:
"""
disconnects the device from the advertiser
"""
raise NotImplementedError # override this methode
def Stop(self) -> bytes:
"""
stops the device
"""
raise NotImplementedError # override this methode
def SetChannel(self, channelId: int, value: float) -> bytes:
"""
set internal stored value of channel with channelId to value and return the telegram
"""
raise NotImplementedError # override this methode

View File

View File

View File

@ -0,0 +1,123 @@
"""
Use callback-based programming style to read and write to BlueZ Management API
"""
import asyncio
from collections import deque
from . import btmgmt_socket
from . import btmgmt_protocol
from . import tools
logger = tools.create_module_logger(__name__)
class Mgmt:
def __init__(self):
# Setup read and write sockets
self.sock = btmgmt_socket.open()
self.loop = asyncio.get_event_loop()
# Store for event callbacks
self._event_callbacks = dict()
# Queue for commands to be written to BlueZ socket
self.cmd_queue = deque()
self.running = False
def add_event_callback(self, event, callback):
"""
Assign a callback to be called when a specific event happens.
The callback should take two arguments.
1) The response packet
2) the AsyncMgmt() class instance object
:param event: An entry from the enum btmgmt.Events
:param callback: A callback function
"""
self._event_callbacks[event] = callback
def reader(self):
"""
Read callback is called when data available on Bluetooth socket.
Processes packet and hands-off to event callbacks that have subscribed
to events.
"""
logger.debug('Reader callback')
data = self.sock.recv(100)
pkt = btmgmt_protocol.reader(data)
logger.info('pkt: [%s]', pkt)
if pkt.header.event_code in self._event_callbacks:
self._event_callbacks[pkt.header.event_code](pkt, self)
if not self.running:
self.stop()
def writer(self):
"""
Write callback when Bluetooth socket is available for writing.
Takes commands that are on the cmd_queue and sends.
"""
logger.debug('Writer callback')
if len(self.cmd_queue) > 0:
this_cmd = self.cmd_queue.popleft()
logger.info('sending pkt [%s]', tools.format_pkt(this_cmd))
self.sock.send(this_cmd)
if not self.running and len(self.cmd_queue) == 0:
self.loop.stop()
# Do one more read to get the response from the last command
self.reader()
@staticmethod
def _as_packet(pkt_objs):
"""Pack bytes together for sending"""
full_pkt = b''
for frame in pkt_objs:
if frame:
full_pkt += frame.octets
return full_pkt
def send(self, cmd, ctrl_idx, *params):
"""
Add commands onto the queue ready to be sent.
Basic structure of the command
send(<command_name>, <adapter index>, <positional paramters>)
:param cmd: A value from btmgmt.Commands
:param ctrl_idx: The index of the controller [0xFFFF is non-controller]
:param params: 0 or more input parameters for command
"""
pkt_objs = btmgmt_protocol.command(cmd, ctrl_idx, *params)
cmd_pkt = self._as_packet(pkt_objs)
logger.debug('Queue command: %s', tools.format_pkt(cmd_pkt))
self.cmd_queue.append(cmd_pkt)
def stop(self):
"""
Once all commands have been sent, exit the event loop
"""
self.running = False
self.loop.remove_writer(self.sock)
self.loop.remove_reader(self.sock)
self.loop.stop()
def close(self):
"""
Stop the event loop and close sockets etc.
"""
btmgmt_socket.close(self.sock)
# Stop the event loop
self.loop.close()
def start(self):
self.running = True
# Setup reader and writer for socket streams
self.loop.add_reader(self.sock, self.reader)
self.loop.add_writer(self.sock, self.writer)
logger.debug('Starting event loop...')
try:
# Run the event loop
self.loop.run_forever()
except KeyboardInterrupt:
self.loop.stop()
finally:
# We are done. Close sockets and the event loop.
self.close()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,120 @@
import asyncio
import ctypes
import socket
import sys
AF_BLUETOOTH = 31
PF_BLUETOOTH = AF_BLUETOOTH
SOCK_RAW = 3
BTPROTO_HCI = 1
SOCK_CLOEXEC = 524288
SOCK_NONBLOCK = 2048
HCI_CHANNEL_CONTROL = 3
HCI_DEV_NONE = 0xffff
class BluetoothSocketError(BaseException):
pass
class BluetoothCommandError(BaseException):
pass
class SocketAddr(ctypes.Structure):
_fields_ = [
("hci_family", ctypes.c_ushort),
("hci_dev", ctypes.c_ushort),
("hci_channel", ctypes.c_ushort),
]
def open():
"""
Because of the following issue with Python the Bluetooth User socket
on linux needs to be done with lower level calls.
https://bugs.python.org/issue36132
Based on mgmt socket at:
https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/mgmt-api.txt
"""
sockaddr_hcip = ctypes.POINTER(SocketAddr)
ctypes.cdll.LoadLibrary("libc.so.6")
libc = ctypes.CDLL("libc.so.6")
libc_socket = libc.socket
libc_socket.argtypes = (ctypes.c_int, ctypes.c_int, ctypes.c_int)
libc_socket.restype = ctypes.c_int
bind = libc.bind
bind.argtypes = (ctypes.c_int, ctypes.POINTER(SocketAddr), ctypes.c_int)
bind.restype = ctypes.c_int
# fd = libc_socket(PF_BLUETOOTH, SOCK_RAW | SOCK_CLOEXEC | SOCK_NONBLOCK,
# BTPROTO_HCI)
fd = libc_socket(PF_BLUETOOTH, SOCK_RAW, BTPROTO_HCI)
if fd < 0:
raise BluetoothSocketError("Unable to open PF_BLUETOOTH socket")
addr = SocketAddr()
addr.hci_family = AF_BLUETOOTH # AF_BLUETOOTH
addr.hci_dev = HCI_DEV_NONE # adapter index
addr.hci_channel = HCI_CHANNEL_CONTROL # HCI_USER_CHANNEL
r = bind(fd, sockaddr_hcip(addr), ctypes.sizeof(addr))
if r < 0:
raise BluetoothSocketError("Unable to bind %s", r)
sock_fd = socket.socket(AF_BLUETOOTH, SOCK_RAW, BTPROTO_HCI, fileno=fd)
return sock_fd
def close(bt_socket):
"""Close the open socket"""
fd = bt_socket.detach()
socket.close(fd)
def test_asyncio_usage():
sock = open()
if sys.version_info < (3, 10):
loop = asyncio.get_event_loop()
else:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
def reader():
data = sock.recv(100)
print("Received:", data)
# We are done: unregister the file descriptor
loop.remove_reader(sock)
# Stop the event loop
loop.stop()
# Register the file descriptor for read event
loop.add_reader(sock, reader)
# Write a command to the socket
# Read Management Version Information Command
# b'\x01\x00\xff\xff\x00\x00'
loop.call_soon(sock.send, b'\x01\x00\xff\xff\x00\x00')
try:
# Run the event loop
loop.run_forever()
finally:
# We are done. Close sockets and the event loop.
close(sock)
loop.close()
if __name__ == '__main__':
test_asyncio_usage()

View File

@ -0,0 +1,37 @@
from . import btmgmt_protocol
from . import btmgmt_socket
from . import tools
logger = tools.create_module_logger(__name__)
def _as_packet(pkt_objs):
full_pkt = b''
for frame in pkt_objs:
if frame:
full_pkt += frame.octets
return full_pkt
def send(*args):
response_recvd = False
pkt_objs = btmgmt_protocol.command(*args)
logger.debug('Sending btmgmt frames %s', pkt_objs)
cmd_pkt = _as_packet(pkt_objs)
# print('cmd pkt', [f'{octets:x}' for octets in cmd_pkt])
sock = btmgmt_socket.open()
logger.debug('Sending bytes: %s', cmd_pkt)
sock.send(cmd_pkt)
while not response_recvd:
raw_data = sock.recv(100)
logger.debug('Received: %s', raw_data)
data = btmgmt_protocol.reader(raw_data)
logger.debug('Received btmgmt frames: %s', data)
if data.cmd_response_frame:
response_recvd = True
btmgmt_socket.close(sock)
if data.event_frame.status != btmgmt_protocol.ErrorCodes.Success:
raise NameError(f'btmgmt Error: '
f'{data.event_frame.command_opcode} '
f'{data.event_frame.status.name}')
return data

View File

@ -0,0 +1 @@
The files are copied from https://github.com/ukBaz/python-btsocket

View File

@ -0,0 +1,18 @@
"""Location for tools/functions to be used across various files"""
import logging
def create_module_logger(module_name):
"""helper function to create logger in modules"""
logger = logging.getLogger(module_name)
strm_hndlr = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
strm_hndlr.setFormatter(formatter)
logger.addHandler(strm_hndlr)
return logger
def format_pkt(data):
"""Put data packets in a human readable format"""
return ', '.join([f'{bite:#04x}' for bite in data])

24
src/mkconnect/cli.py Normal file
View File

@ -0,0 +1,24 @@
import sys
from .tracer.TracerConsole import TracerConsole
from .mouldking.MouldKing import MouldKing
def main() -> int:
tracer = TracerConsole()
if sys.platform == "linux":
from .advertiser.AdvertiserBTSocket import AdvertiserBTSocket as Advertiser
elif sys.platform == "rp2":
from .advertiser.AdvertiserMicroPython import AdvertiserMicroPython as Advertiser
elif sys.platform == "win32":
from .advertiser.AdvertiserDummy import AdvertiserDummy as Advertiser
else:
raise SystemExit("unsupported platform")
advertiser = Advertiser()
advertiser.SetTracer(tracer)
MouldKing.SetTracer(tracer)
MouldKing.SetAdvertiser(advertiser)
tracer.TraceInfo("mkconnect CLI initialized")
return 0

View File

@ -0,0 +1,179 @@
#!/usr/bin/python
# to run: sudo python -i consoletest.py
import sys
import time
print('Script: consoletest.py')
print('Platform: ' + sys.platform)
from mkconnect.tracer.Tracer import Tracer
from mkconnect.tracer.TracerConsole import TracerConsole
# uncomment to choose advertiser
if (sys.platform == 'linux'):
#from Advertiser.AdvertiserHCITool import AdvertiserHCITool as Advertiser
#from Advertiser.AdvertiserBTMgmt import AdvertiserBTMgmt as Advertiser
from mkconnect.advertiser.AdvertiserBTSocket import AdvertiserBTSocket as Advertiser
pass
elif (sys.platform == 'rp2'):
from mkconnect.advertiser.AdvertiserMicroPython import AdvertiserMicroPython as Advertiser
pass
elif (sys.platform == 'win32'):
from mkconnect.advertiser.AdvertiserDummy import AdvertiserDummy as Advertiser
else:
raise Exception('unsupported platform')
from mkconnect.mouldking.MouldKing import MouldKing
# instantiate Tracer
tracer = TracerConsole()
# instantiate Advertiser
advertiser = Advertiser()
advertiser.SetTracer(tracer)
# Set Tracer for all MouldKing Hubs
MouldKing.SetTracer(tracer)
MouldKing.SetAdvertiser(advertiser)
# save pre-instantiated objects in local variables
hub0 = MouldKing.Module6_0.Device0
hub1 = MouldKing.Module6_0.Device1
hub2 = MouldKing.Module6_0.Device2
hub3 = MouldKing.Module4_0.Device0
hub4 = MouldKing.Module4_0.Device1
hub5 = MouldKing.Module4_0.Device2
def _getChannelId(channel):
switch={
'A': 0,
'B': 1,
'C': 2,
'D': 3,
'E': 4,
'F': 5,
}
return switch.get(channel,"")
def _getHubId(deviceId):
# MK6
if deviceId == 0:
return hub0
elif deviceId == 1:
return hub1
elif deviceId == 2:
return hub2
# MK4
elif deviceId == 3:
return hub3
elif deviceId == 4:
return hub4
elif deviceId == 5:
return hub5
else:
raise Exception("deviceId 0..5")
def _automate(deviceId: int, channel: int):
userinput = input("\nDo you want to test channel "+ str(channel) +" ? enter y/n\n")
if (userinput != str("y")):
return
tracer.TraceInfo("HUB: "+ str(deviceId) +", FORWARD : Power ramp up from 0 to 100% on channel :" + str(channel))
for percent in range(0, 110, 10):
tracer.TraceInfo("Power : " + str(percent) + "%")
mkcontrol(deviceId,channel, percent/100)
time.sleep(1)
mkstop(deviceId)
tracer.TraceInfo("HUB: "+ str(deviceId) +", REVERSE: Power ramp up from 0 to 100% on channel :" + str(channel))
for percent in range(-0, -110, -10):
tracer.TraceInfo("Power : " + str(percent) + "%")
mkcontrol(deviceId,channel,percent/100)
time.sleep(1)
mkstop(deviceId)
def mkbtstop():
"""
stop bluetooth advertising
"""
advertiser.AdvertisementStop()
# hcitool_args1 = hcitool_path + ' -i hci0 cmd 0x08 0x000a 00' + ' &> /dev/null'
# if platform.system() == 'Linux':
# subprocess.run(hcitool_args1, shell=True, executable="/bin/bash")
# elif platform.system() == 'Windows':
# print('Connect command :')
# print(hcitool_args1)
# else:
# print('Unsupported OS')
return
def mkconnect(deviceId: int=0):
"""
send the bluetooth connect telegram to switch the MouldKing hubs in bluetooth mode
press the button on the hub(s) and the flashing of status led should switch from blue-green to blue
"""
hub = _getHubId(deviceId)
rawdata = hub.Connect()
return
def mkstop(deviceId: int=0):
hub = _getHubId(deviceId)
rawdata = hub.Stop()
return
def mkcontrol(deviceId: int=0, channel: int=0, powerAndDirection: float=0):
hub = _getHubId(deviceId)
rawdata = hub.SetChannel(channel, powerAndDirection)
return
def test_hub(hubId: int=0):
tracer.TraceInfo("HUB "+ str(hubId) +" connecting")
mkconnect()
time.sleep(1)
for index in range(6):
_automate(hubId, index) # start channel 0 = A
tracer.TraceInfo("Channel change requested")
time.sleep(1)
def help():
tracer.TraceInfo("Available commands:")
tracer.TraceInfo(" help() : print available commands")
tracer.TraceInfo(" hints() : print hints and examples")
tracer.TraceInfo(" mkconnect() : Initiate hub control by sending bluetooth connect telegram")
tracer.TraceInfo(" mkstop(hubId) : Stop ALL motors")
tracer.TraceInfo(" mkcontrol(hubId, channel, powerAndDirection) : Control a specific hub, channel, power and motor direction")
tracer.TraceInfo(" test_hub(hubId) : run automated tests on each channels")
tracer.TraceInfo(" mkbtstop() : stop bluetooth advertising")
def hints():
tracer.TraceInfo("HINTS:")
tracer.TraceInfo("If run on windows, commands are shown but not executed (hcitool dependency)")
tracer.TraceInfo()
tracer.TraceInfo("For connecting:")
tracer.TraceInfo(" Switch MK6.0 Hubs on - led is flashing green/blue")
tracer.TraceInfo(" mkconnect() to send the bluetooth connect telegram. All hubs switch to bluetooth mode")
tracer.TraceInfo(" by short-pressing the button on MK6.0 Hubs you can choose the hubId")
tracer.TraceInfo(" hubId=0 - one Led flash")
tracer.TraceInfo(" hubId=1 - two Led flashs")
tracer.TraceInfo(" hubId=2 - three Led flashs")
tracer.TraceInfo()
tracer.TraceInfo("ex: test_hub(0), mkcontrol(0, 0, 0.5); mkcontrol(0, 1, -1, True)")
tracer.TraceInfo(" the minus sign - indicate reverse motor direction")
##################################################################
# Entry point when script is started by python -i consoletest.py
help()
tracer.TraceInfo()
hints()
tracer.TraceInfo()
tracer.TraceInfo("Ready to execute commands\n")

View File

@ -0,0 +1,157 @@
#!/usr/bin/python
import sys
import time
print('Script: main.py')
print('Platform: ' + sys.platform)
from mkconnect.tracer.Tracer import Tracer
from mkconnect.tracer.TracerConsole import TracerConsole
# uncomment to choose advertiser
if (sys.platform == 'linux'):
#from Advertiser.AdvertiserHCITool import AdvertiserHCITool as Advertiser
#from Advertiser.AdvertiserBTMgmt import AdvertiserBTMgmt as Advertiser
from mkconnect.advertiser.AdvertiserBTSocket import AdvertiserBTSocket as Advertiser
pass
elif (sys.platform == 'rp2'):
from mkconnect.advertiser.AdvertiserMicroPython import AdvertiserMicroPython as Advertiser
pass
elif (sys.platform == 'win32'):
from mkconnect.advertiser.AdvertiserDummy import AdvertiserDummy as Advertiser
else:
raise Exception('unsupported platform')
from mkconnect.mouldking.MouldKing import MouldKing
# instantiate Tracer
tracer = TracerConsole()
# instantiate Advertiser
advertiser = Advertiser()
advertiser.SetTracer(tracer)
# Set Tracer for all MouldKing Hubs
MouldKing.SetTracer(tracer)
MouldKing.SetAdvertiser(advertiser)
# save pre-instantiated objects in local variables
hub0 = MouldKing.Module6_0.Device0
hub1 = MouldKing.Module6_0.Device1
#hub2 = MouldKing.Module6_0.Device2
#hub0 = MouldKing.Module4_0.Device0
#hub1 = MouldKing.Module4_0.Device1
hub2 = MouldKing.Module4_0.Device2
############################################################################
# get uncrypted connect-telegram as bytearray
title = "connect-telegram"
tracer.TraceInfo("\n" + title)
hub0.Connect()
hub1.Connect()
hub2.Connect()
time.sleep(5)
#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata))
#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata
#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted))
############################################################################
# get uncrypted stop-telegram as bytearray
title = "stop-telegram"
tracer.TraceInfo("\n" + title)
rawdata = hub0.Stop()
rawdata = hub1.Stop()
time.sleep(1)
#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata))
#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata
#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted))
############################################################################
# get uncrypted telegram with channel 1 (indexer 0) fullspeed forwards
title = "C1: fullspeed forwards"
tracer.TraceInfo("\n" + title)
rawdata = hub0.SetChannel(0, 1)
time.sleep(1)
#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata))
#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata
#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted))
############################################################################
# get uncrypted telegram with channel 1 (indexer 0) halfspeed forwards
title = "C1: halfspeed forwards"
tracer.TraceInfo("\n" + title)
rawdata = hub0.SetChannel(0, 0.5)
time.sleep(1)
#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata))
#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata
#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted))
############################################################################
# get uncrypted telegram with channel 1 (indexer 0) fullspeed forwards
title = "C1: fullspeed forwards"
tracer.TraceInfo("\n" + title)
rawdata = hub1.SetChannel(0, 1)
time.sleep(2)
#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata))
#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata
#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted))
############################################################################
# get uncrypted telegram with channel 1 (indexer 0) halfspeed backwards
title = "C1: halfspeed backwards"
tracer.TraceInfo("\n" + title)
rawdata = hub0.SetChannel(0, -0.5)
time.sleep(1)
#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata))
#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata
#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted))
############################################################################
# get uncrypted telegram with channel 1 (indexer 0) halfspeed backwards
title = "C2: halfspeed backwards"
tracer.TraceInfo("\n" + title)
rawdata = hub0.SetChannel(1, -0.5)
time.sleep(1)
#tracer.TraceInfo("rawdata: " + ' '.join(f'{x:02x}' for x in rawdata))
#crypted = MouldKingCrypt.Crypt(rawdata) # get crypted data from rawdata
#tracer.TraceInfo("crypted: " + ' '.join(f'{x:02x}' for x in crypted))
############################################################################
# disconnect from advertiser
title = "Disconnect from advertiser"
tracer.TraceInfo("\n" + title)
hub0.Disconnect()
hub1.Disconnect()
hub2.Disconnect()
time.sleep(1)
############################################################################
# stop Advertisement
title = "Advertisement stop"
tracer.TraceInfo("\n" + title)
advertiser.AdvertisementStop()
time.sleep(1)

View File

@ -0,0 +1,119 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from ..tracer.Tracer import Tracer
from ..advertiser.Advertiser import Advertiser
from .MouldKing_Hub_4 import MouldKing_Hub_4
from .MouldKing_Hub_6 import MouldKing_Hub_6
class MouldKing :
"""
abstract class to store the static objects
"""
class Module4_0 :
"""
abstract class to store the static objects
"""
Device0 = MouldKing_Hub_4(0)
"""
MouldKing Hub 4.0 with address 1
"""
Device1 = MouldKing_Hub_4(1)
"""
MouldKing Hub 4.0 with address 2
"""
Device2 = MouldKing_Hub_4(2)
"""
MouldKing Hub 4.0 with address 3
"""
@staticmethod
def SetAdvertiser(advertiser: Advertiser) -> Advertiser:
"""
Set Advertiser for all MouldKing 4.0 Hubs
"""
# MouldKing_4_Hubs is the same instance for all MouldKing_4_Hub-Instances
MouldKing.Module4_0.Device0._MouldKing_4_Hubs.SetAdvertiser(advertiser)
# MouldKing.Module4_0.Device1.MouldKing_4_Hubs.SetAdvertiser(advertiser)
# MouldKing.Module4_0.Device2.MouldKing_4_Hubs.SetAdvertiser(advertiser)
return advertiser
@staticmethod
def SetTracer(tracer: Tracer) -> Tracer:
"""
Set Tracer for all MouldKing 4.0 Hubs
"""
MouldKing.Module4_0.Device0.SetTracer(tracer)
MouldKing.Module4_0.Device1.SetTracer(tracer)
MouldKing.Module4_0.Device2.SetTracer(tracer)
return tracer
class Module6_0 :
"""
abstract class to store the static objects
"""
Device0 = MouldKing_Hub_6(0)
"""
MouldKing Hub 6.0 with address 1
"""
Device1 = MouldKing_Hub_6(1)
"""
MouldKing Hub 6.0 with address 2
"""
Device2 = MouldKing_Hub_6(2)
"""
MouldKing Hub 6.0 with address 3
"""
@staticmethod
def SetAdvertiser(advertiser: Advertiser) -> Advertiser:
"""
Set Advertiser for all MouldKing 6.0 Hubs
"""
MouldKing.Module6_0.Device0.SetAdvertiser(advertiser)
MouldKing.Module6_0.Device1.SetAdvertiser(advertiser)
MouldKing.Module6_0.Device2.SetAdvertiser(advertiser)
return advertiser
@staticmethod
def SetTracer(tracer: Tracer) -> Tracer:
"""
Set Tracer for all MouldKing 6.0 Hubs
"""
MouldKing.Module6_0.Device0.SetTracer(tracer)
MouldKing.Module6_0.Device1.SetTracer(tracer)
MouldKing.Module6_0.Device2.SetTracer(tracer)
return tracer
@staticmethod
def SetAdvertiser(advertiser: Advertiser) -> Advertiser:
"""
Set Advertiser for all MouldKing 4.0 Hubs
"""
# MouldKing_4_Hubs is the same instance for all MouldKing_4_Hub-Instances
MouldKing.Module4_0.SetAdvertiser(advertiser)
MouldKing.Module6_0.SetAdvertiser(advertiser)
return advertiser
@staticmethod
def SetTracer(tracer: Tracer) -> Tracer:
"""
Set Tracer for all MouldKing 4.0 Hubs
"""
MouldKing.Module4_0.SetTracer(tracer)
MouldKing.Module6_0.SetTracer(tracer)
return tracer

View File

@ -0,0 +1,159 @@
__author__ = "J0EK3R"
__version__ = "0.1"
class MouldKingCrypt :
"""
class with static methods to do MouldKing encryption
"""
# static class variables
__Array_C1C2C3C4C5 = bytes([0xC1, 0xC2, 0xC3, 0xC4, 0xC5])
@staticmethod
def CreateTelegramForHCITool(manufacturerId: bytes, rawDataArray: bytes) -> bytes:
"""
Create input data for hcitool
"""
cryptedArray = MouldKingCrypt.Crypt(rawDataArray)
cryptedArrayLen = len(cryptedArray)
resultArray = bytearray(8 + cryptedArrayLen)
resultArray[0] = cryptedArrayLen + 7 # len
resultArray[1] = 0x02 # flags
resultArray[2] = 0x01
resultArray[3] = 0x02
resultArray[4] = cryptedArrayLen + 3 # len
resultArray[5] = 0xFF # type manufacturer specific
resultArray[6] = manufacturerId[1] # companyId
resultArray[7] = manufacturerId[0] # companyId
for index in range(cryptedArrayLen):
resultArray[index + 8] = cryptedArray[index]
return ' '.join(f'{x:02x}' for x in resultArray)
@staticmethod
def Crypt(rawDataArray: bytes) -> bytes:
"""
do the MouldKing encryption for the given byte-array and return the resulting byte-array
"""
targetArrayLength = len(MouldKingCrypt.__Array_C1C2C3C4C5) + len(rawDataArray) + 20
targetArray = bytearray(targetArrayLength)
targetArray[15] = 113 # 0x71
targetArray[16] = 15 # 0x0f
targetArray[17] = 85 # 0x55
# copy firstDataArray reverse into targetArray with offset 18
for index in range(len(MouldKingCrypt.__Array_C1C2C3C4C5)):
targetArray[index + 18] = MouldKingCrypt.__Array_C1C2C3C4C5[(len(MouldKingCrypt.__Array_C1C2C3C4C5) - index) - 1]
# copy rawDataArray into targetArray with offset 18 + len(MouldKingCrypt.__Array_C1C2C3C4C5)
for index in range(len(rawDataArray)):
targetArray[18 + len(MouldKingCrypt.__Array_C1C2C3C4C5) + index] = rawDataArray[index]
# crypt bytes from position 15 to 22
for index in range(15, len(MouldKingCrypt.__Array_C1C2C3C4C5) + 18):
targetArray[index] = MouldKingCrypt.__revert_bits_byte(targetArray[index])
# calc checksum und copy to array
checksum = MouldKingCrypt.__calc_checksum_from_arrays(MouldKingCrypt.__Array_C1C2C3C4C5, rawDataArray)
targetArray[len(MouldKingCrypt.__Array_C1C2C3C4C5) + 18 + len(rawDataArray) + 0] = (checksum & 255)
targetArray[len(MouldKingCrypt.__Array_C1C2C3C4C5) + 18 + len(rawDataArray) + 1] = ((checksum >> 8) & 255)
# crypt bytes from offset 18 to the end with magicNumberArray_63
magicNumberArray_63 = MouldKingCrypt.__create_magic_array(63, 7)
tempArray = bytearray(targetArrayLength - 18)
for index in range(len(tempArray)):
tempArray[index] = targetArray[index + 18]
MouldKingCrypt.__crypt_array(tempArray, magicNumberArray_63)
targetArray[18:] = tempArray
# crypt complete array with magicNumberArray_37
magicNumberArray_37 = MouldKingCrypt.__create_magic_array(37, 7)
MouldKingCrypt.__crypt_array(targetArray, magicNumberArray_37)
# resulting advertisement array has a length of constant 24 bytes
telegramArray = bytearray(24)
lengthResultArray = len(MouldKingCrypt.__Array_C1C2C3C4C5) + len(rawDataArray) + 5
telegramArray[:lengthResultArray] = targetArray[15:15 + lengthResultArray]
# fill rest of array
for index in range(lengthResultArray, len(telegramArray)):
telegramArray[index] = index + 1
return telegramArray
@staticmethod
def __create_magic_array(magic_number: int, size: int) -> bytes:
magic_array = [0] * size
magic_array[0] = 1
for index in range(1, 7):
magic_array[index] = (magic_number >> (6 - index)) & 1
return magic_array
@staticmethod
def __revert_bits_byte(value: int) -> int:
result = 0
for index_bit in range(8):
if ((1 << index_bit) & value) != 0:
result = result | (1 << (7 - index_bit))
return result
@staticmethod
def __revert_bits_int(value: int) -> int:
result = 0
for index_bit in range(16):
if ((1 << index_bit) & value) != 0:
result |= 1 << (15 - index_bit)
return 65535 & result
@staticmethod
def __crypt_array(byte_array: bytes, magic_number_array: bytes) -> bytes:
# foreach byte of array
for index_byte in range(len(byte_array)):
current_byte = byte_array[index_byte]
current_result = 0
# foreach bit in byte
for index_bit in range(8):
current_result += (((current_byte >> index_bit) & 1) ^ MouldKingCrypt.__shift_magic_array(magic_number_array)) << index_bit
byte_array[index_byte] = current_result & 255
return byte_array
@staticmethod
def __calc_checksum_from_arrays(first_array: bytes, second_array: bytes) -> int:
result = 65535
for first_array_index in range(len(first_array)):
result = (result ^ (first_array[(len(first_array) - 1) - first_array_index] << 8)) & 65535
for index_bit in range(8):
current_result = result & 32768
result <<= 1
if current_result != 0:
result ^= 4129
for current_byte in second_array:
result = ((MouldKingCrypt.__revert_bits_byte(current_byte) << 8) ^ result) & 65535
for index_bit in range(8):
current_result = result & 32768
result <<= 1
if current_result != 0:
result ^= 4129
return MouldKingCrypt.__revert_bits_int(result) ^ 65535
@staticmethod
def __shift_magic_array(i_arr: bytes) -> bytes:
r1 = i_arr[3] ^ i_arr[6]
i_arr[3] = i_arr[2]
i_arr[2] = i_arr[1]
i_arr[1] = i_arr[0]
i_arr[0] = i_arr[6]
i_arr[6] = i_arr[5]
i_arr[5] = i_arr[4]
i_arr[4] = r1
return i_arr[0]

View File

@ -0,0 +1,117 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from ..tracer.Tracer import Tracer
from ..advertiser.AdvertisingDevice import AdvertisingDevice
from .MouldKingCrypt import MouldKingCrypt
class MouldKingHub(AdvertisingDevice) :
"""
baseclass
"""
ManufacturerID = bytes([0xFF, 0xF0])
def __init__(self, identifier: str, numberOfChannels: int, channelStartOffset: int, channelEndOffset: int, telegram_connect: bytes, basetelegram: bytes):
"""
initializes the object and defines the fields
"""
super().__init__(identifier)
if telegram_connect is not None:
maxArrayOffset = len(telegram_connect) - 1
if channelStartOffset > maxArrayOffset:
raise Exception("max channelStartOffset:" + maxArrayOffset)
if channelEndOffset > (maxArrayOffset - 1):
raise Exception("max channelEndOffset:" + maxArrayOffset)
self._NumberOfChannels = numberOfChannels
self._ChannelStartOffset = channelStartOffset
self._ChannelEndOffset = channelEndOffset
self._Telegram_connect = telegram_connect
self._Basetelegram = basetelegram
# create array
self._ChannelValueList = [float(0)] * self._NumberOfChannels
return
def Connect(self) -> None:
"""
returns the telegram to switch the MouldKing Hubs in bluetooth mode
"""
# call baseClass to register at Advertiser
super().Connect()
self._Advertise(self._Telegram_connect)
return
def Disconnect(self) -> None:
"""
disconnects the device from the advertiser
"""
self.Stop()
super().Disconnect()
return
def Stop(self) -> bytes:
"""
set internal stored value of all channels to zero and return the telegram
"""
# init channels
for channelId in range(0, self._NumberOfChannels):
self._ChannelValueList[channelId] = float(0)
return self.CreateTelegram()
def SetChannel(self, channelId: int, value: float) -> bytes:
"""
set internal stored value of channel with channelId to value and return the telegram
"""
if channelId > self._NumberOfChannels - 1:
raise Exception("only channelId 0.." + int(self._NumberOfChannels - 1) + "are allowed")
self._ChannelValueList[channelId] = value
return self.CreateTelegram()
def CreateTelegram(self) -> bytes:
"""
returns a telegram including the internal stored value from all channels
"""
raise NotImplementedError # override this methode
def _Advertise(self, rawdata: bytes) -> bytes:
"""
sends the data to the advertiser
"""
if(self._tracer is not None):
self._tracer.TraceInfo("Advertise")
if(self._advertiser is not None):
cryptedData = MouldKingCrypt.Crypt(rawdata)
self._advertiser.AdvertisementDataSet(self._identifier, self.ManufacturerID, cryptedData)
return self._Telegram_connect

View File

@ -0,0 +1,50 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from .MouldKingHub import MouldKingHub
class MouldKingHub_Byte(MouldKingHub) :
"""
baseclass handling with byte channels
"""
def __init__(self, identifier: str, numberOfChannels, channelStartOffset, channelEndOffset, telegram_connect, basetelegram):
"""
initializes the object and defines the fields
"""
# call baseclass init and set number of channels
super().__init__(identifier, numberOfChannels, channelStartOffset, channelEndOffset, telegram_connect, basetelegram)
def CreateTelegram(self) -> bytes:
"""
returns the telegram including the internal stored value from all channels
"""
# make a copy of the basetelegram
currentTelegramData = bytearray(self._Basetelegram)
# calc the length to be used for channels
channelDataLength = len(currentTelegramData) - self._ChannelEndOffset
# iterate channels
for channelId in range(0, self._NumberOfChannels):
currentChannelStartOffset = self._ChannelStartOffset + channelId
if self._NumberOfChannels >= (channelId + 1) and channelDataLength >= currentChannelStartOffset:
channelValue = self._ChannelValueList[channelId]
if channelValue < 0:
# Range [-1..0] -> 0x80 - [0x7F .. 0x00] = [0x01 .. 0x80]
currentTelegramData[currentChannelStartOffset] = int(0x80 - min(-channelValue * 0x80, 0x80))
elif channelValue > 0:
# Range [0..1] -> 0x80 + [0x00 .. 0x7F] = [0x80 .. 0xFF]
currentTelegramData[currentChannelStartOffset] = int(0x80 + min(channelValue * 0x7F, 0x7F))
else:
currentTelegramData[currentChannelStartOffset] = 0x80
self._Advertise(currentTelegramData)
return currentTelegramData

View File

@ -0,0 +1,71 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from .MouldKingHub import MouldKingHub
class MouldKingHub_Nibble(MouldKingHub) :
"""
baseclass handling with nibble channels
"""
def __init__(self, identifier: str, numberOfChannels, channelStartOffset, channelEndOffset, telegram_connect, basetelegram):
"""
initializes the object and defines the fields
"""
# call baseclass init and set number of channels
super().__init__(identifier, numberOfChannels, channelStartOffset, channelEndOffset, telegram_connect, basetelegram)
def CreateTelegram(self) -> bytes:
"""
returns the telegram including the internal stored value from all channels
"""
# make a copy of the basetelegram
currentTelegramData = bytearray(self._Basetelegram)
# calc the length to be used for channels
channelDataLength = len(currentTelegramData) - self._ChannelEndOffset
# iterate channels
byteOffset = 0
for channelId in range(0, self._NumberOfChannels, 2):
currentChannelStartOffset = self._ChannelStartOffset + byteOffset
highNibble = 0
lowNibble = 0
if self._NumberOfChannels >= (channelId + 1) and channelDataLength >= currentChannelStartOffset:
evenChannelValue = self._ChannelValueList[channelId]
oddChannelValue2 = self._ChannelValueList[channelId + 1]
# even Channel -> highNibble
if evenChannelValue < 0:
# Range [-1..0] -> [0x07 .. 0x00] = [0x07 .. 0x00]
highNibble = int(min(-evenChannelValue * 0x07, 0x07))
elif evenChannelValue > 0:
# Range [0..1] -> 0x80 + [0x00 .. 0x07] = [0x80 .. 0x0F]
highNibble = int(0x08 + min(evenChannelValue * 0x07, 0x07))
else:
highNibble = 0x08
# odd Channel -> lowNibble
if oddChannelValue2 < 0:
# Range [-1..0] -> [0x07 .. 0x00] = [0x07 .. 0x00]
lowNibble = int(min(-oddChannelValue2 * 0x07, 0x07))
elif oddChannelValue2 > 0:
# Range [0..1] -> 0x80 + [0x00 .. 0x07] = [0x80 .. 0x0F]
lowNibble = int(0x08 + min(oddChannelValue2 * 0x07, 0x07))
else:
lowNibble = 0x08
currentTelegramData[currentChannelStartOffset] = (int)((highNibble << 4) + lowNibble)
# next byte
byteOffset = byteOffset + 1
self._Advertise(currentTelegramData)
return currentTelegramData

View File

@ -0,0 +1,77 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from ..tracer.Tracer import Tracer
from ..advertiser.IAdvertisingDevice import IAdvertisingDevice
#from .MouldKingDevice import MouldKingDevice
from .MouldKing_Hubs_4_12Ch import MouldKing_Hubs_4_12Ch
class MouldKing_Hub_4(IAdvertisingDevice) :
"""
class handling a MouldKing 4.0 Hub
"""
# static fields/constants
_MouldKing_4_Hubs = MouldKing_Hubs_4_12Ch()
def __init__(self, deviceId: int):
"""
initializes the object and defines the fields
"""
if deviceId > 2:
raise Exception('only deviceId 0..2 are allowed')
self._deviceId = deviceId
self._NumberOfChannels = 4
self._tracer = None
def SetTracer(self, tracer: Tracer) -> Tracer:
"""
set tracer object
"""
self._tracer = tracer
return tracer
def Connect(self) -> None:
"""
returns the telegram to switch the MouldKing Hubs in bluetooth mode
"""
MouldKing_Hub_4._MouldKing_4_Hubs.SubDevice_Register(self)
return
def Disconnect(self) -> None:
"""
disconnects the device from the advertiser
"""
MouldKing_Hub_4._MouldKing_4_Hubs.SubDevice_Unregister(self)
return
def Stop(self) -> bytes:
"""
set internal stored value of all channels to zero and return the telegram
"""
return MouldKing_Hub_4._MouldKing_4_Hubs.SubDevice_Stop(self._deviceId, self._NumberOfChannels)
def SetChannel(self, channelId: int, value: float) -> bytes:
"""
set internal stored value of channel with channelId to value and return the telegram
"""
if channelId > self._NumberOfChannels - 1:
raise Exception("only channelId 0.." + int(self._NumberOfChannels - 1) + "are allowed")
return MouldKing_Hub_4._MouldKing_4_Hubs.SubDevice_SetChannel(self._deviceId, self._NumberOfChannels, channelId, value)

View File

@ -0,0 +1,38 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from .MouldKingHub_Byte import MouldKingHub_Byte
class MouldKing_Hub_6(MouldKingHub_Byte) :
"""
class handling the MouldKing 6.0 Hub
"""
# static fields/constants
__telegram_connect = bytes([0x6D, 0x7B, 0xA7, 0x80, 0x80, 0x80, 0x80, 0x92]) # Define a byte array for Telegram Connect
__telegram_base_device_a = bytes([0x61, 0x7B, 0xA7, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x9E]) # byte array for base Telegram
__telegram_base_device_b = bytes([0x62, 0x7B, 0xA7, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x9D]) # byte array for base Telegram
__telegram_base_device_c = bytes([0x63, 0x7B, 0xA7, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x9C]) # byte array for base Telegram
def __init__(self, deviceId: int):
"""
initializes the object and defines the fields
"""
if deviceId == 0:
basetelegram = MouldKing_Hub_6.__telegram_base_device_a
elif deviceId == 1:
basetelegram = MouldKing_Hub_6.__telegram_base_device_b
elif deviceId == 2:
basetelegram = MouldKing_Hub_6.__telegram_base_device_c
else:
raise Exception('only deviceId 0..2 are allowed')
# call baseclass init and set number of channels
super().__init__("MK6_" + str(deviceId), 6, 3, 1, MouldKing_Hub_6.__telegram_connect, basetelegram)

View File

@ -0,0 +1,99 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from ..tracer.Tracer import Tracer
from ..advertiser.IAdvertisingDevice import IAdvertisingDevice
from .MouldKingHub_Nibble import MouldKingHub_Nibble
class MouldKing_Hubs_4_12Ch(MouldKingHub_Nibble) :
"""
class handling 3 x MouldKing 4.0 Hubs
Only one telegram addresses all possible 3 x MK4 hubs the same time
"""
# static fields/constants
__telegram_connect = bytes([0xAD, 0x7B, 0xA7, 0x80, 0x80, 0x80, 0x4F, 0x52]) # Define a byte array for Telegram Connect
__telegram_base = bytes([0x7D, 0x7B, 0xA7, 0x88, 0x88, 0x88, 0x88, 0x88, 0x88, 0x82]) # byte array for base Telegram
def __init__(self):
"""
initializes the object and defines the fields
"""
# call baseclass init and set number of channels
super().__init__("MK4", 12, 3, 1, MouldKing_Hubs_4_12Ch.__telegram_connect, MouldKing_Hubs_4_12Ch.__telegram_base)
self._connectedSubDevices = list()
def SubDevice_Register(self, subDevice: IAdvertisingDevice) -> None:
"""
returns the telegram to switch the MouldKing Hubs in bluetooth mode
"""
connectedSubDevicesLen = len(self._connectedSubDevices)
if(not subDevice is None and not subDevice in self._connectedSubDevices):
self._connectedSubDevices.append(subDevice)
# first subDevice was added
if(connectedSubDevicesLen == 0):
self.Connect()
return
def SubDevice_Unregister(self, subDevice: IAdvertisingDevice) -> None:
"""
disconnects the device from the advertiser
"""
if(not subDevice is None and subDevice in self._connectedSubDevices):
self._connectedSubDevices.remove(subDevice)
# last subDevice was removed
if(len(self._connectedSubDevices) == 0):
self.Disconnect()
return
def SubDevice_Stop(self, hubDeviceId: int, hubNumberOfChannels: int) -> bytes:
"""
set internal stored value of all channels to zero and return the telegram
"""
# deviceId = 0
# -> channelId 0..4
# deviceId = 2
# -> channelId 5..8
# deviceId = 3
# -> channelId 9..12
channelIdHubs = hubDeviceId * hubNumberOfChannels
# init channels
for channelId in range(channelIdHubs, hubNumberOfChannels):
if channelId < self._NumberOfChannels:
self._ChannelValueList[channelId] = float(0)
return self.CreateTelegram()
def SubDevice_SetChannel(self, hubDeviceId: int, hubNumberOfChannels: int, hubChannelId: int, value: float) -> bytes:
"""
set internal stored value of channel with channelId to value and return the telegram
"""
# deviceId = 0
# -> channelId 0..4
# deviceId = 2
# -> channelId 5..8
# deviceId = 3
# -> channelId 9..12
channelIdHubs = hubDeviceId * hubNumberOfChannels + hubChannelId
if channelIdHubs > self._NumberOfChannels - 1:
raise Exception("only channelId 0.." + int(self._NumberOfChannels - 1) + "are allowed")
self._ChannelValueList[channelIdHubs] = value
return self.CreateTelegram()

View File

View File

@ -0,0 +1,18 @@
__author__ = "J0EK3R"
__version__ = "0.1"
class Tracer :
"""
baseclass
"""
def __init__(self):
"""
initializes the object and defines the fields
"""
pass
def TraceInfo(self, value: str):
"""
prints out
"""
pass

View File

@ -0,0 +1,24 @@
__author__ = "J0EK3R"
__version__ = "0.1"
from .Tracer import Tracer
class TracerConsole(Tracer) :
"""
baseclass
"""
def __init__(self):
"""
initializes the object and defines the fields
"""
super().__init__()
def TraceInfo(self, value: str=""):
"""
prints out
"""
print(value)

View File