43 lines
1.3 KiB
Python
43 lines
1.3 KiB
Python
from time import sleep
|
|
import logging
|
|
from collections.abc import Callable
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Utils:
|
|
|
|
@staticmethod
|
|
def getReconnector(
|
|
reconnect: Callable[[], None],
|
|
description: str,
|
|
minReconnectDelay: int,
|
|
maxReconnectDelay: int,
|
|
reconnectAttempts: int,
|
|
reconnectDelayMultiplier: int,
|
|
disconnectCallback: Callable[[], None]
|
|
):
|
|
def fn():
|
|
logger.warn(f"Disconnected from {description}, trying to reconnect")
|
|
reconnectCount = 0
|
|
reconnectDelay = minReconnectDelay
|
|
while (reconnectAttempts < 0 or reconnectCount < reconnectAttempts):
|
|
sleep(reconnectDelay)
|
|
|
|
try:
|
|
reconnect()
|
|
logging.info(f"Reconnected to {description}")
|
|
return
|
|
except:
|
|
reconnectDelay = min(minReconnectDelay, reconnectDelay * reconnectDelayMultiplier)
|
|
reconnectCount += 1
|
|
logger.error(f"Unable to reconnect to {description} after {reconnectCount} attempts, exiting")
|
|
disconnectCallback()
|
|
return fn
|
|
|
|
@staticmethod
|
|
def hexStrToInt(s):
|
|
# hex string to int
|
|
i = int(s, 16)
|
|
if i >= 2**7:
|
|
i -= 2**8
|
|
return i |