ground-control/ground-control.py

98 lines
2.3 KiB
Python
Raw Permalink Normal View History

2023-10-30 09:20:48 -04:00
import json
import requests
from configparser import ConfigParser
from signal import signal
from signal import SIGTERM
from time import sleep
config = ConfigParser()
config.read('ground-control.cfg')
garage_host = config['systems'].get('garage')
marshaller_host = config['systems'].get('marshaller')
ntfy = config['ntfy'].get('host')
topic = config['ntfy'].get('topic')
auth = config['ntfy'].get('auth')
ntfy_uri = f'https://{ntfy}/{topic}/json?auth={auth}'
garage_uri = f'https://{garage_host}'
marshaller_uri = f'https://{marshaller_host}/on'
is_east_door_previously_open = False
class HaltException(Exception):
pass
def main():
signal(SIGTERM, raise_halt_exception)
isRunning = True
isFirstRun = True
while isRunning:
try:
if isFirstRun:
isFirstRun = False
else:
print('waiting to restart main loop', flush=True)
sleep(15)
print('listening for events', flush=True)
listen_for_notifications()
except HaltException:
isRunning = False
except Exception as e:
print(e, flush=True)
def listen_for_notifications():
with requests.get(ntfy_uri, stream=True, timeout=60) as response:
for line in response.iter_lines():
if line:
data = json.loads(line.decode('utf-8'))
if data['event'] == 'message' and is_east_door_newly_open():
activate_marshaller()
def is_east_door_newly_open():
global is_east_door_previously_open
is_door_open_now = is_east_door_currently_open()
if is_door_open_now != is_east_door_previously_open:
is_east_door_previously_open = is_door_open_now
return is_door_open_now
return False
def is_east_door_currently_open():
return requests.get(garage_uri, timeout=6).json()['east-door'] == 'opened'
def activate_marshaller():
print('activating marshaller', flush=True)
2024-10-12 17:00:47 -04:00
for _ in range(2):
try:
requests.get(marshaller_uri, timeout=1)
except requests.exceptions.ReadTimeout:
print('read timeout - retrying marshaller', flush=True)
sleep(2)
2023-10-30 09:20:48 -04:00
requests.get(marshaller_uri, timeout=1)
def raise_halt_exception(signum, frame):
raise HaltException()
if __name__ == '__main__':
main()