362 lines
14 KiB
Python
362 lines
14 KiB
Python
from bluepy import btle, thingy52
|
|
import logging
|
|
from collections.abc import Callable
|
|
from time import sleep, time
|
|
from utils import Utils
|
|
import binascii
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Device:
|
|
def __init__(self,
|
|
mac: str | None,
|
|
updateFrequency: int,
|
|
minReconnectDelay: int,
|
|
maxReconnectDelay: int,
|
|
reconnectAttempts: int,
|
|
reconnectDelayMultiplier: int,
|
|
disconnectCallback: Callable[[], None],
|
|
sensors: list[str],
|
|
temperatureOffset: int
|
|
):
|
|
|
|
self.__enabledSensors = sensors
|
|
self.__data = {
|
|
"battery": [None, []],
|
|
"temperature": [None, []],
|
|
"pressure": [None, []],
|
|
"humidity": [None, []],
|
|
"eco2": [None, []],
|
|
"tvoc": [None, []],
|
|
"button": [None, []],
|
|
"taps": [None, []],
|
|
"tapDirection": [None, []],
|
|
"orient": [None, []],
|
|
"stepCount": [None, []]
|
|
}
|
|
|
|
self.__reconnect = Utils.getReconnector(
|
|
self.connect,
|
|
"Thingy",
|
|
minReconnectDelay,
|
|
maxReconnectDelay,
|
|
reconnectAttempts,
|
|
reconnectDelayMultiplier,
|
|
disconnectCallback
|
|
)
|
|
self.__run = True
|
|
self.__temperatureOffset = temperatureOffset
|
|
|
|
logger.info("Initialising device")
|
|
self.__updateFrequency = updateFrequency
|
|
self.__disconnectCallback = disconnectCallback
|
|
self.__delegate = NotificationDelegate(self.__setData, self.__notifySubscribers)
|
|
|
|
while True:
|
|
try:
|
|
self.__mac = mac or Device.findMac()
|
|
self.connect()
|
|
break
|
|
except btle.BTLEDisconnectError or BTLEInternalError:
|
|
logging.warn("Unable to connect to Thingy, trying again")
|
|
|
|
logger.info("Finished device setup")
|
|
|
|
def start(self):
|
|
logging.debug("Starting reporting loop")
|
|
nextReport = time() + 2
|
|
batteryReportInterval = max(60, self.__updateFrequency)
|
|
nextBatteryReport = nextReport
|
|
connected = True
|
|
while self.__run:
|
|
try:
|
|
if (not connected):
|
|
self.__reconnect()
|
|
nextReport = time() + 2
|
|
connected = True
|
|
logging.debug("New reporting loop iteration")
|
|
|
|
if ("battery" in self.__enabledSensors and time() >= nextBatteryReport):
|
|
self.__setData("battery", self.__dev.battery.read())
|
|
nextBatteryReport = time() + max(60, self.__updateFrequency)
|
|
self.__dev.waitForNotifications(timeout=self.__updateFrequency)
|
|
|
|
if (time() >= nextReport):
|
|
logging.debug("Reporting sensor data")
|
|
for sensor in self.__data.keys():
|
|
if (sensor in self.__enabledSensors): self.__notifySubscribers(sensor)
|
|
nextReport = time() + self.__updateFrequency
|
|
except btle.BTLEDisconnectError or BTLEInternalError:
|
|
logging.warning("Thingy disconnected, reconnecting")
|
|
connected = False
|
|
|
|
def connect(self):
|
|
self.__dev = thingy52.Thingy52(self.__mac)
|
|
logging.info("Connected to device")
|
|
self.__dev.setDelegate(self.__delegate)
|
|
logging.debug("Configured notification handler")
|
|
|
|
self.__dev.ui.enable()
|
|
self.__dev.ui.set_led_mode_constant(1,0,1)
|
|
|
|
self.__enableSensors()
|
|
self.__enableNotifications()
|
|
|
|
|
|
def disconnect(self):
|
|
self.__run = False
|
|
self.__dev.disconnect()
|
|
|
|
def subscribe(self, sensor: str, callback: Callable[[int], None]):
|
|
if (sensor in self.__data.keys() and sensor in self.__enabledSensors):
|
|
self.__data[sensor][1].append(callback)
|
|
logging.debug(f"Current subscriptions for {sensor}: {repr(self.__data[sensor][1])}")
|
|
|
|
def unsubscribe(self, sensor: str, callback: Callable[[int], None]):
|
|
if (sensor in self.__data.keys() and sensor in self.__enabledSensors):
|
|
self.__data[sensor][1].remove(callback)
|
|
|
|
def __notifySubscribers(self, sensor: str):
|
|
if (sensor not in self.__data.keys()): return
|
|
data = self.__data[sensor][0]
|
|
logging.debug(f"Current {sensor}: {data}")
|
|
if (data != None):
|
|
for callback in self.__data[sensor][1]:
|
|
callback(data)
|
|
self.__data[sensor][0] = None
|
|
|
|
def __setData(self, sensor: str, val: str):
|
|
if (sensor in self.__data.keys()):
|
|
if (sensor == "temperature"):
|
|
self.__data[sensor][0] = float(val) + self.__temperatureOffset
|
|
else:
|
|
self.__data[sensor][0] = val
|
|
|
|
def __enableSensors(self):
|
|
logging.info("Enabling sensors")
|
|
# environment
|
|
if ("temperature" in self.__enabledSensors):
|
|
self.__dev.environment.enable()
|
|
self.__dev.environment.configure(temp_int=self.__updateFrequency * 1000)
|
|
logging.debug("Enabled temperature sensor")
|
|
if ("pressure" in self.__enabledSensors):
|
|
self.__dev.environment.enable()
|
|
self.__dev.environment.configure(press_int=self.__updateFrequency * 1000)
|
|
logging.debug("Enabled pressure sensor")
|
|
if ("humidity" in self.__enabledSensors):
|
|
self.__dev.environment.enable()
|
|
self.__dev.environment.configure(humid_int=self.__updateFrequency * 1000)
|
|
logging.debug("Enabled humidity sensor")
|
|
if ("eco2" in self.__enabledSensors or "tvoc" in self.__enabledSensors):
|
|
self.__dev.environment.enable()
|
|
self.__dev.environment.configure(gas_mode_int=1)
|
|
logging.debug("Enabled gas sensors")
|
|
if ("colour" in self.__enabledSensors):
|
|
self.__dev.environment.enable()
|
|
self.__dev.environment.configure(color_int=self.__updateFrequency * 1000)
|
|
self.__dev.environment.configure(color_sens_calib=[0,0,0])
|
|
logging.debug("Enabled colour sensor")
|
|
# UI
|
|
if ("button" in self.__enabledSensors):
|
|
self.__dev.ui.enable()
|
|
logging.debug("Enabled button sensor")
|
|
if ("battery" in self.__enabledSensors):
|
|
self.__dev.battery.enable()
|
|
logging.debug("Enabled battery sensor")
|
|
# motion
|
|
if ("taps" in self.__enabledSensors):
|
|
self.__dev.motion.enable()
|
|
self.__dev.motion.configure(motion_freq=200)
|
|
logging.debug("Enabled taps sensor")
|
|
if ("orient" in self.__enabledSensors):
|
|
self.__dev.motion.enable()
|
|
logging.debug("Enabled orient sensor")
|
|
if ("stepCount" in self.__enabledSensors):
|
|
self.__dev.motion.enable()
|
|
self.__dev.motion.configure(step_int=self.__updateFrequency * 1000)
|
|
logging.debug("Enabled stepCount sensor")
|
|
logging.info("Enabled sensors")
|
|
|
|
def __enableNotifications(self):
|
|
logging.info("Enabling notifications")
|
|
# environment
|
|
if ("temperature" in self.__enabledSensors):
|
|
self.__dev.environment.set_temperature_notification(True)
|
|
logging.debug("Enabled temperature sensor")
|
|
if ("pressure" in self.__enabledSensors):
|
|
self.__dev.environment.set_pressure_notification(True)
|
|
logging.debug("Enabled pressure sensor")
|
|
if ("humidity" in self.__enabledSensors):
|
|
self.__dev.environment.set_humidity_notification(True)
|
|
logging.debug("Enabled humidity sensor")
|
|
if ("eco2" in self.__enabledSensors or "tvoc" in self.__enabledSensors):
|
|
self.__dev.environment.set_gas_notification(True)
|
|
logging.debug("Enabled gas sensors")
|
|
if ("colour" in self.__enabledSensors):
|
|
self.__dev.environment.set_color_notification(True)
|
|
logging.debug("Enabled colour sensor")
|
|
# UI
|
|
if ("button" in self.__enabledSensors):
|
|
self.__dev.ui.set_btn_notification(True)
|
|
logging.debug("Enabled button sensor")
|
|
# motion
|
|
if ("taps" in self.__enabledSensors):
|
|
self.__dev.motion.set_tap_notification(True)
|
|
logging.debug("Enabled taps sensor")
|
|
if ("orient" in self.__enabledSensors):
|
|
self.__dev.motion.set_orient_notification(True)
|
|
logging.debug("Enabled orient sensor")
|
|
if ("stepCount" in self.__enabledSensors):
|
|
self.__dev.motion.set_stepcnt_notification(True)
|
|
logging.debug("Enabled stepCount sensor")
|
|
logging.info("Enabled notifications")
|
|
|
|
@staticmethod
|
|
def findMac():
|
|
while True:
|
|
logger.info("Scanning for BLE devices...")
|
|
for device in btle.Scanner().scan(timeout=5):
|
|
logger.info(f"Found device with MAC {device.addr} with RSSI ${device.rssi}. Checking if it's a Thingy...")
|
|
for (_, _, value) in device.getScanData():
|
|
if (value == "Thingy"):
|
|
logger.info("... this device is a Thingy")
|
|
return device.addr
|
|
|
|
class NotificationDelegate(btle.DefaultDelegate):
|
|
|
|
def __init__(self, setData: Callable[[str, str], None], notifySubscribers: Callable[[str], None]):
|
|
self.__setData = setData
|
|
self.__notifySubscribers = notifySubscribers
|
|
|
|
def handleNotification(self, hnd, data):
|
|
|
|
logging.debug(f"Handling notification: {repr(hnd)}")
|
|
|
|
if (hnd == thingy52.e_temperature_handle):
|
|
teptep = binascii.b2a_hex(data)
|
|
self.__setData("temperature", Utils.hexStrToInt(teptep[:-2]) + int(teptep[-2:], 16) / 100.0)
|
|
|
|
elif (hnd == thingy52.e_pressure_handle):
|
|
pressure_int, pressure_dec = self.__extractPressureData(data)
|
|
self.__setData("pressure", pressure_int + pressure_dec / 100.0)
|
|
|
|
elif (hnd == thingy52.e_humidity_handle):
|
|
teptep = binascii.b2a_hex(data)
|
|
self.__setData("humidity", Utils.hexStrToInt(teptep))
|
|
|
|
elif (hnd == thingy52.e_gas_handle):
|
|
eco2, tvoc = self.__extractGasData(data)
|
|
logging.debug(f"Handling gas sensor update: {eco2}, {tvoc}")
|
|
self.__setData("eco2", eco2)
|
|
self.__setData("tvoc", tvoc)
|
|
|
|
elif (hnd == thingy52.e_color_handle):
|
|
teptep = binascii.b2a_hex(data)
|
|
red, green, blue, clear = self.__extractColourData(data)
|
|
self.__setData("colour", "0x%0.2X%0.2X%0.2X" %(red, green, blue))
|
|
|
|
elif (hnd == thingy52.ui_button_handle):
|
|
teptep = binascii.b2a_hex(data)
|
|
self.__setData("button", "on" if Utils.hexStrToInt(teptep) == 1 else "off") # 1 = pressed, 0 = released
|
|
self.__notifySubscribers("button")
|
|
|
|
elif (hnd == thingy52.m_tap_handle):
|
|
direction, count = self.__extractTapData(data)
|
|
match direction:
|
|
case 1:
|
|
tapDirection = "Bottom"
|
|
case 2:
|
|
tapDirection = "Left"
|
|
case 3:
|
|
tapDirection = "Top"
|
|
case 4:
|
|
tapDirection = "Right"
|
|
case 5:
|
|
tapDirection = "Front"
|
|
case 6:
|
|
tapDirection = "Back"
|
|
case _:
|
|
tapDirection = f"Unknown, {value}"
|
|
self.__setData("tapDirection", tapDirection)
|
|
self.__setData("taps", count)
|
|
|
|
|
|
elif (hnd == thingy52.m_orient_handle):
|
|
teptep = binascii.b2a_hex(data)
|
|
value = Utils.hexStrToInt(teptep)
|
|
# 1 = led top left
|
|
# 2 = led top right / left side up
|
|
# 3 = led bottom right/bottom up
|
|
# 0 = led bottom left/ right side up
|
|
match value:
|
|
case 0:
|
|
orientation = "LED Bottom Left"
|
|
case 1:
|
|
orientation = "LED Top Left"
|
|
case 2:
|
|
orientation = "LED Top Right"
|
|
case 3:
|
|
orientation = "LED Bottom Right"
|
|
case _:
|
|
orientation = f"Unknown, {value}"
|
|
self.__setData("orient", orientation)
|
|
|
|
elif (hnd == thingy52.m_stepcnt_handle):
|
|
logging.info("Handling step count update")
|
|
teptep = binascii.b2a_hex(data)
|
|
self.__setData("stepCount", Utils.hexStrToInt(teptep))
|
|
|
|
elif (hnd == thingy52.m_rotation_handle):
|
|
teptep = binascii.b2a_hex(data)
|
|
self.__setData("rotation", Utils.hexStrToInt(teptep))
|
|
|
|
elif (hnd == thingy52.m_heading_handle):
|
|
teptep = binascii.b2a_hex(data)
|
|
self.__setData("heading", Utils.hexStrToInt(teptep))
|
|
|
|
elif (hnd == thingy52.m_gravity_handle):
|
|
teptep = binascii.b2a_hex(data)
|
|
self.__setData("gravity", Utils.hexStrToInt(teptep))
|
|
|
|
elif (hnd == thingy52.s_speaker_status_handle):
|
|
teptep = binascii.b2a_hex(data)
|
|
self.__setData("speaker", Utils.hexStrToInt(teptep))
|
|
|
|
elif (hnd == thingy52.s_microphone_handle):
|
|
teptep = binascii.b2a_hex(data)
|
|
|
|
else:
|
|
logging.warn(f"Unknown notification: {hnd}, {data}")
|
|
|
|
|
|
def __extractPressureData(self, data):
|
|
""" Extract pressure data from data string. """
|
|
teptep = binascii.b2a_hex(data)
|
|
pressure_int = 0
|
|
for i in range(0, 4):
|
|
pressure_int += (int(teptep[i*2:(i*2)+2], 16) << 8*i)
|
|
pressure_dec = int(teptep[-2:], 16)
|
|
return (pressure_int, pressure_dec)
|
|
|
|
def __extractGasData(self, data):
|
|
""" Extract gas data from data string. """
|
|
teptep = binascii.b2a_hex(data)
|
|
eco2 = int(teptep[:2], 16) + (int(teptep[2:4], 16) << 8)
|
|
tvoc = int(teptep[4:6], 16) + (int(teptep[6:8], 16) << 8)
|
|
return eco2, tvoc
|
|
|
|
def __extractColourData(self, data):
|
|
""" Extract color data from data string. """
|
|
teptep = binascii.b2a_hex(data)
|
|
red = int(teptep[:2], 16)
|
|
green = int(teptep[2:4], 16)
|
|
blue = int(teptep[4:6], 16)
|
|
clear = int(teptep[6:8], 16)
|
|
return red, green, blue, clear
|
|
|
|
def __extractTapData(self, data):
|
|
""" Extract tap data from data string. """
|
|
teptep = binascii.b2a_hex(data)
|
|
direction = int(teptep[0:2])
|
|
count = int(teptep[2:4])
|
|
return (direction, count) |