Initial commit
This commit is contained in:
362
device.py
Normal file
362
device.py
Normal file
@@ -0,0 +1,362 @@
|
||||
from bluepy import btle, thingy52
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from time import sleep, time
|
||||
from utils import Utils
|
||||
import binascii
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class Device:
|
||||
def __init__(self,
|
||||
mac: str | None,
|
||||
updateFrequency: int,
|
||||
minReconnectDelay: int,
|
||||
maxReconnectDelay: int,
|
||||
reconnectAttempts: int,
|
||||
reconnectDelayMultiplier: int,
|
||||
disconnectCallback: Callable[[], None],
|
||||
sensors: list[str],
|
||||
temperatureOffset: int
|
||||
):
|
||||
|
||||
self.__enabledSensors = sensors
|
||||
self.__data = {
|
||||
"battery": [None, []],
|
||||
"temperature": [None, []],
|
||||
"pressure": [None, []],
|
||||
"humidity": [None, []],
|
||||
"eco2": [None, []],
|
||||
"tvoc": [None, []],
|
||||
"button": [None, []],
|
||||
"taps": [None, []],
|
||||
"tapDirection": [None, []],
|
||||
"orient": [None, []],
|
||||
"stepCount": [None, []]
|
||||
}
|
||||
|
||||
self.__reconnect = Utils.getReconnector(
|
||||
self.connect,
|
||||
"Thingy",
|
||||
minReconnectDelay,
|
||||
maxReconnectDelay,
|
||||
reconnectAttempts,
|
||||
reconnectDelayMultiplier,
|
||||
disconnectCallback
|
||||
)
|
||||
self.__run = True
|
||||
self.__temperatureOffset = temperatureOffset
|
||||
|
||||
logger.info("Initialising device")
|
||||
self.__updateFrequency = updateFrequency
|
||||
self.__disconnectCallback = disconnectCallback
|
||||
self.__delegate = NotificationDelegate(self.__setData, self.__notifySubscribers)
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.__mac = mac or Device.findMac()
|
||||
self.connect()
|
||||
break
|
||||
except btle.BTLEDisconnectError or BTLEInternalError:
|
||||
logging.warn("Unable to connect to Thingy, trying again")
|
||||
|
||||
logger.info("Finished device setup")
|
||||
|
||||
def start(self):
|
||||
logging.debug("Starting reporting loop")
|
||||
nextReport = time() + 2
|
||||
batteryReportInterval = max(60, self.__updateFrequency)
|
||||
nextBatteryReport = nextReport
|
||||
connected = True
|
||||
while self.__run:
|
||||
try:
|
||||
if (not connected):
|
||||
self.__reconnect()
|
||||
nextReport = time() + 2
|
||||
connected = True
|
||||
logging.debug("New reporting loop iteration")
|
||||
|
||||
if ("battery" in self.__enabledSensors and time() >= nextBatteryReport):
|
||||
self.__setData("battery", self.__dev.battery.read())
|
||||
nextBatteryReport = time() + max(60, self.__updateFrequency)
|
||||
self.__dev.waitForNotifications(timeout=self.__updateFrequency)
|
||||
|
||||
if (time() >= nextReport):
|
||||
logging.debug("Reporting sensor data")
|
||||
for sensor in self.__data.keys():
|
||||
if (sensor in self.__enabledSensors): self.__notifySubscribers(sensor)
|
||||
nextReport = time() + self.__updateFrequency
|
||||
except btle.BTLEDisconnectError or BTLEInternalError:
|
||||
logging.warning("Thingy disconnected, reconnecting")
|
||||
connected = False
|
||||
|
||||
def connect(self):
|
||||
self.__dev = thingy52.Thingy52(self.__mac)
|
||||
logging.info("Connected to device")
|
||||
self.__dev.setDelegate(self.__delegate)
|
||||
logging.debug("Configured notification handler")
|
||||
|
||||
self.__dev.ui.enable()
|
||||
self.__dev.ui.set_led_mode_constant(1,0,1)
|
||||
|
||||
self.__enableSensors()
|
||||
self.__enableNotifications()
|
||||
|
||||
|
||||
def disconnect(self):
|
||||
self.__run = False
|
||||
self.__dev.disconnect()
|
||||
|
||||
def subscribe(self, sensor: str, callback: Callable[[int], None]):
|
||||
if (sensor in self.__data.keys() and sensor in self.__enabledSensors):
|
||||
self.__data[sensor][1].append(callback)
|
||||
logging.debug(f"Current subscriptions for {sensor}: {repr(self.__data[sensor][1])}")
|
||||
|
||||
def unsubscribe(self, sensor: str, callback: Callable[[int], None]):
|
||||
if (sensor in self.__data.keys() and sensor in self.__enabledSensors):
|
||||
self.__data[sensor][1].remove(callback)
|
||||
|
||||
def __notifySubscribers(self, sensor: str):
|
||||
if (sensor not in self.__data.keys()): return
|
||||
data = self.__data[sensor][0]
|
||||
logging.debug(f"Current {sensor}: {data}")
|
||||
if (data != None):
|
||||
for callback in self.__data[sensor][1]:
|
||||
callback(data)
|
||||
self.__data[sensor][0] = None
|
||||
|
||||
def __setData(self, sensor: str, val: str):
|
||||
if (sensor in self.__data.keys()):
|
||||
if (sensor == "temperature"):
|
||||
self.__data[sensor][0] = float(val) + self.__temperatureOffset
|
||||
else:
|
||||
self.__data[sensor][0] = val
|
||||
|
||||
def __enableSensors(self):
|
||||
logging.info("Enabling sensors")
|
||||
# environment
|
||||
if ("temperature" in self.__enabledSensors):
|
||||
self.__dev.environment.enable()
|
||||
self.__dev.environment.configure(temp_int=self.__updateFrequency * 1000)
|
||||
logging.debug("Enabled temperature sensor")
|
||||
if ("pressure" in self.__enabledSensors):
|
||||
self.__dev.environment.enable()
|
||||
self.__dev.environment.configure(press_int=self.__updateFrequency * 1000)
|
||||
logging.debug("Enabled pressure sensor")
|
||||
if ("humidity" in self.__enabledSensors):
|
||||
self.__dev.environment.enable()
|
||||
self.__dev.environment.configure(humid_int=self.__updateFrequency * 1000)
|
||||
logging.debug("Enabled humidity sensor")
|
||||
if ("eco2" in self.__enabledSensors or "tvoc" in self.__enabledSensors):
|
||||
self.__dev.environment.enable()
|
||||
self.__dev.environment.configure(gas_mode_int=1)
|
||||
logging.debug("Enabled gas sensors")
|
||||
if ("colour" in self.__enabledSensors):
|
||||
self.__dev.environment.enable()
|
||||
self.__dev.environment.configure(color_int=self.__updateFrequency * 1000)
|
||||
self.__dev.environment.configure(color_sens_calib=[0,0,0])
|
||||
logging.debug("Enabled colour sensor")
|
||||
# UI
|
||||
if ("button" in self.__enabledSensors):
|
||||
self.__dev.ui.enable()
|
||||
logging.debug("Enabled button sensor")
|
||||
if ("battery" in self.__enabledSensors):
|
||||
self.__dev.battery.enable()
|
||||
logging.debug("Enabled battery sensor")
|
||||
# motion
|
||||
if ("taps" in self.__enabledSensors):
|
||||
self.__dev.motion.enable()
|
||||
self.__dev.motion.configure(motion_freq=200)
|
||||
logging.debug("Enabled taps sensor")
|
||||
if ("orient" in self.__enabledSensors):
|
||||
self.__dev.motion.enable()
|
||||
logging.debug("Enabled orient sensor")
|
||||
if ("stepCount" in self.__enabledSensors):
|
||||
self.__dev.motion.enable()
|
||||
self.__dev.motion.configure(step_int=self.__updateFrequency * 1000)
|
||||
logging.debug("Enabled stepCount sensor")
|
||||
logging.info("Enabled sensors")
|
||||
|
||||
def __enableNotifications(self):
|
||||
logging.info("Enabling notifications")
|
||||
# environment
|
||||
if ("temperature" in self.__enabledSensors):
|
||||
self.__dev.environment.set_temperature_notification(True)
|
||||
logging.debug("Enabled temperature sensor")
|
||||
if ("pressure" in self.__enabledSensors):
|
||||
self.__dev.environment.set_pressure_notification(True)
|
||||
logging.debug("Enabled pressure sensor")
|
||||
if ("humidity" in self.__enabledSensors):
|
||||
self.__dev.environment.set_humidity_notification(True)
|
||||
logging.debug("Enabled humidity sensor")
|
||||
if ("eco2" in self.__enabledSensors or "tvoc" in self.__enabledSensors):
|
||||
self.__dev.environment.set_gas_notification(True)
|
||||
logging.debug("Enabled gas sensors")
|
||||
if ("colour" in self.__enabledSensors):
|
||||
self.__dev.environment.set_color_notification(True)
|
||||
logging.debug("Enabled colour sensor")
|
||||
# UI
|
||||
if ("button" in self.__enabledSensors):
|
||||
self.__dev.ui.set_btn_notification(True)
|
||||
logging.debug("Enabled button sensor")
|
||||
# motion
|
||||
if ("taps" in self.__enabledSensors):
|
||||
self.__dev.motion.set_tap_notification(True)
|
||||
logging.debug("Enabled taps sensor")
|
||||
if ("orient" in self.__enabledSensors):
|
||||
self.__dev.motion.set_orient_notification(True)
|
||||
logging.debug("Enabled orient sensor")
|
||||
if ("stepCount" in self.__enabledSensors):
|
||||
self.__dev.motion.set_stepcnt_notification(True)
|
||||
logging.debug("Enabled stepCount sensor")
|
||||
logging.info("Enabled notifications")
|
||||
|
||||
@staticmethod
|
||||
def findMac():
|
||||
while True:
|
||||
logger.info("Scanning for BLE devices...")
|
||||
for device in btle.Scanner().scan(timeout=5):
|
||||
logger.info(f"Found device with MAC {device.addr} with RSSI ${device.rssi}. Checking if it's a Thingy...")
|
||||
for (_, _, value) in device.getScanData():
|
||||
if (value == "Thingy"):
|
||||
logger.info("... this device is a Thingy")
|
||||
return device.addr
|
||||
|
||||
class NotificationDelegate(btle.DefaultDelegate):
|
||||
|
||||
def __init__(self, setData: Callable[[str, str], None], notifySubscribers: Callable[[str], None]):
|
||||
self.__setData = setData
|
||||
self.__notifySubscribers = notifySubscribers
|
||||
|
||||
def handleNotification(self, hnd, data):
|
||||
|
||||
logging.debug(f"Handling notification: {repr(hnd)}")
|
||||
|
||||
if (hnd == thingy52.e_temperature_handle):
|
||||
teptep = binascii.b2a_hex(data)
|
||||
self.__setData("temperature", Utils.hexStrToInt(teptep[:-2]) + int(teptep[-2:], 16) / 100.0)
|
||||
|
||||
elif (hnd == thingy52.e_pressure_handle):
|
||||
pressure_int, pressure_dec = self.__extractPressureData(data)
|
||||
self.__setData("pressure", pressure_int + pressure_dec / 100.0)
|
||||
|
||||
elif (hnd == thingy52.e_humidity_handle):
|
||||
teptep = binascii.b2a_hex(data)
|
||||
self.__setData("humidity", Utils.hexStrToInt(teptep))
|
||||
|
||||
elif (hnd == thingy52.e_gas_handle):
|
||||
eco2, tvoc = self.__extractGasData(data)
|
||||
logging.debug(f"Handling gas sensor update: {eco2}, {tvoc}")
|
||||
self.__setData("eco2", eco2)
|
||||
self.__setData("tvoc", tvoc)
|
||||
|
||||
elif (hnd == thingy52.e_color_handle):
|
||||
teptep = binascii.b2a_hex(data)
|
||||
red, green, blue, clear = self.__extractColourData(data)
|
||||
self.__setData("colour", "0x%0.2X%0.2X%0.2X" %(red, green, blue))
|
||||
|
||||
elif (hnd == thingy52.ui_button_handle):
|
||||
teptep = binascii.b2a_hex(data)
|
||||
self.__setData("button", "on" if Utils.hexStrToInt(teptep) == 1 else "off") # 1 = pressed, 0 = released
|
||||
self.__notifySubscribers("button")
|
||||
|
||||
elif (hnd == thingy52.m_tap_handle):
|
||||
direction, count = self.__extractTapData(data)
|
||||
match direction:
|
||||
case 1:
|
||||
tapDirection = "Bottom"
|
||||
case 2:
|
||||
tapDirection = "Left"
|
||||
case 3:
|
||||
tapDirection = "Top"
|
||||
case 4:
|
||||
tapDirection = "Right"
|
||||
case 5:
|
||||
tapDirection = "Front"
|
||||
case 6:
|
||||
tapDirection = "Back"
|
||||
case _:
|
||||
tapDirection = f"Unknown, {value}"
|
||||
self.__setData("tapDirection", tapDirection)
|
||||
self.__setData("taps", count)
|
||||
|
||||
|
||||
elif (hnd == thingy52.m_orient_handle):
|
||||
teptep = binascii.b2a_hex(data)
|
||||
value = Utils.hexStrToInt(teptep)
|
||||
# 1 = led top left
|
||||
# 2 = led top right / left side up
|
||||
# 3 = led bottom right/bottom up
|
||||
# 0 = led bottom left/ right side up
|
||||
match value:
|
||||
case 0:
|
||||
orientation = "LED Bottom Left"
|
||||
case 1:
|
||||
orientation = "LED Top Left"
|
||||
case 2:
|
||||
orientation = "LED Top Right"
|
||||
case 3:
|
||||
orientation = "LED Bottom Right"
|
||||
case _:
|
||||
orientation = f"Unknown, {value}"
|
||||
self.__setData("orient", orientation)
|
||||
|
||||
elif (hnd == thingy52.m_stepcnt_handle):
|
||||
logging.info("Handling step count update")
|
||||
teptep = binascii.b2a_hex(data)
|
||||
self.__setData("stepCount", Utils.hexStrToInt(teptep))
|
||||
|
||||
elif (hnd == thingy52.m_rotation_handle):
|
||||
teptep = binascii.b2a_hex(data)
|
||||
self.__setData("rotation", Utils.hexStrToInt(teptep))
|
||||
|
||||
elif (hnd == thingy52.m_heading_handle):
|
||||
teptep = binascii.b2a_hex(data)
|
||||
self.__setData("heading", Utils.hexStrToInt(teptep))
|
||||
|
||||
elif (hnd == thingy52.m_gravity_handle):
|
||||
teptep = binascii.b2a_hex(data)
|
||||
self.__setData("gravity", Utils.hexStrToInt(teptep))
|
||||
|
||||
elif (hnd == thingy52.s_speaker_status_handle):
|
||||
teptep = binascii.b2a_hex(data)
|
||||
self.__setData("speaker", Utils.hexStrToInt(teptep))
|
||||
|
||||
elif (hnd == thingy52.s_microphone_handle):
|
||||
teptep = binascii.b2a_hex(data)
|
||||
|
||||
else:
|
||||
logging.warn(f"Unknown notification: {hnd}, {data}")
|
||||
|
||||
|
||||
def __extractPressureData(self, data):
|
||||
""" Extract pressure data from data string. """
|
||||
teptep = binascii.b2a_hex(data)
|
||||
pressure_int = 0
|
||||
for i in range(0, 4):
|
||||
pressure_int += (int(teptep[i*2:(i*2)+2], 16) << 8*i)
|
||||
pressure_dec = int(teptep[-2:], 16)
|
||||
return (pressure_int, pressure_dec)
|
||||
|
||||
def __extractGasData(self, data):
|
||||
""" Extract gas data from data string. """
|
||||
teptep = binascii.b2a_hex(data)
|
||||
eco2 = int(teptep[:2], 16) + (int(teptep[2:4], 16) << 8)
|
||||
tvoc = int(teptep[4:6], 16) + (int(teptep[6:8], 16) << 8)
|
||||
return eco2, tvoc
|
||||
|
||||
def __extractColourData(self, data):
|
||||
""" Extract color data from data string. """
|
||||
teptep = binascii.b2a_hex(data)
|
||||
red = int(teptep[:2], 16)
|
||||
green = int(teptep[2:4], 16)
|
||||
blue = int(teptep[4:6], 16)
|
||||
clear = int(teptep[6:8], 16)
|
||||
return red, green, blue, clear
|
||||
|
||||
def __extractTapData(self, data):
|
||||
""" Extract tap data from data string. """
|
||||
teptep = binascii.b2a_hex(data)
|
||||
direction = int(teptep[0:2])
|
||||
count = int(teptep[2:4])
|
||||
return (direction, count)
|
||||
Reference in New Issue
Block a user