chutney/chutney.py

134 lines
3.2 KiB
Python

import json
import requests
import sys
from character_display import CharacterDisplay
from configparser import ConfigParser
from garage_display import GarageDisplay
from garage_updater import GarageUpdater
from multiprocessing import Event
from multiprocessing import Process
from multiprocessing import Queue
from queue import Empty
from signal import signal
from signal import SIGTERM
from time import sleep
from time_display import TimeDisplay
from weather_display import WeatherDisplay
from weather_updater import WeatherUpdater
try:
import unicornhathd as unicorn
unicorn.rotation(90)
except ImportError:
from unicorn_hat_sim import unicornhathd as unicorn
unicorn.rotation(180)
WEATHER_UPDATE_INTERVAL_IN_SECONDS = 10
GARAGE_UPDATE_INTERVAL_IN_SECONDS = 30
DISPLAY_UPDATE_INTERVAL_IN_SECONDS = 0.5
CONFIG_FILE = 'chutney.cfg'
class HaltException(Exception):
pass
def main():
queue = Queue()
haltEvent = Event()
Process(target=weatherLoop, args=[haltEvent]).start()
Process(target=garageLoop, args=[queue]).start()
signal(SIGTERM, cleanExit(unicorn, haltEvent, queue))
unicorn.brightness(0.3)
characterDisplay = CharacterDisplay(unicorn)
timeDisplay = TimeDisplay(characterDisplay, topRow=15)
weatherDisplay = WeatherDisplay(characterDisplay, topRow=9)
garageDisplay = GarageDisplay(characterDisplay, topRow=3)
while True:
timeDisplay.showTime()
weatherDisplay.showWeather()
garageDisplay.showGarageState()
sleep(DISPLAY_UPDATE_INTERVAL_IN_SECONDS)
def weatherLoop(haltEvent):
weatherUpdater = WeatherUpdater(configFile=CONFIG_FILE)
while not haltEvent.is_set():
weatherUpdater.updateWeather()
haltEvent.wait(timeout=WEATHER_UPDATE_INTERVAL_IN_SECONDS)
weatherUpdater.clearWeather()
def garageLoop(queue):
garageNotify = Process(target=garageNotifyLoop, args=[queue])
garageNotify.start()
garageUpdater = GarageUpdater(configFile=CONFIG_FILE)
isRunning = True
while isRunning:
garageUpdater.updateGarageState()
try:
item = queue.get(timeout=GARAGE_UPDATE_INTERVAL_IN_SECONDS)
isRunning = item != 'halt'
except Empty:
pass
garageUpdater.clearGarageState()
garageNotify.terminate()
def garageNotifyLoop(queue):
signal(SIGTERM, raiseHaltException())
config = ConfigParser()
config.read(CONFIG_FILE)
ntfy = config['garage'].get('ntfy')
topic = config['garage'].get('topic')
try:
resp = requests.get(f'https://{ntfy}/{topic}/json', stream=True)
for line in resp.iter_lines():
if line:
data = json.loads(line.decode('utf-8'))
if (data['event'] == 'message'):
queue.put('update')
except HaltException:
pass
except Exception as e:
print(e, flush=True)
finally:
resp.close()
def cleanExit(unicorn, haltEvent, queue):
def _exit_(signum, frame):
unicorn.off()
haltEvent.set()
queue.put('halt')
queue.close()
sys.exit(0)
return _exit_
def raiseHaltException():
def _exit_(signum, frame):
raise HaltException()
return _exit_
if __name__ == '__main__':
main()