chutney/chutney/runner.py

124 lines
3.2 KiB
Python

import sys
from .exceptions import HaltException
from .garage import GarageDisplayer
from .garage import GarageListener
from .garage import GarageUpdater
from .symboldisplayer import SymbolDisplayer
from .time import TimeDisplayer
from .weather import WeatherDisplayer
from .weather import WeatherUpdater
from multiprocessing import Event
from multiprocessing import Process
from multiprocessing import Queue
from queue import Empty
from signal import signal
from signal import SIGTERM
from time import sleep
try:
import unicornhathd as unicorn
unicorn.rotation(90)
except ImportError:
from unicorn_hat_sim import unicornhathd as unicorn
unicorn.rotation(180)
WEATHER_UPDATE_INTERVAL_IN_SECONDS = 60
GARAGE_UPDATE_INTERVAL_IN_SECONDS = 30
GARAGE_LISTENER_RETRY_INTERVAL_IN_SECONDS = 120
DISPLAY_UPDATE_INTERVAL_IN_SECONDS = 0.5
CONFIG_FILE = 'chutney.cfg'
def main():
queue = Queue()
haltEvent = Event()
Process(target=weatherLoop, args=[haltEvent]).start()
Process(target=garageLoop, args=[queue]).start()
signal(SIGTERM, cleanExit(unicorn, haltEvent, queue))
unicorn.brightness(0.3)
symbolDisplayer = SymbolDisplayer(unicorn)
timeDisplayer = TimeDisplayer(symbolDisplayer, topRow=15)
weatherDisplayer = WeatherDisplayer(symbolDisplayer, topRow=9)
garageDisplayer = GarageDisplayer(symbolDisplayer, topRow=3)
while True:
timeDisplayer.showTime()
weatherDisplayer.showWeather()
garageDisplayer.showGarageState()
sleep(DISPLAY_UPDATE_INTERVAL_IN_SECONDS)
def weatherLoop(haltEvent):
weatherUpdater = WeatherUpdater(configFile=CONFIG_FILE)
while not haltEvent.is_set():
weatherUpdater.updateWeather()
haltEvent.wait(timeout=WEATHER_UPDATE_INTERVAL_IN_SECONDS)
weatherUpdater.clearWeather()
def garageLoop(queue):
garageNotify = Process(target=garageNotifyLoop, args=[queue])
garageNotify.start()
garageUpdater = GarageUpdater(configFile=CONFIG_FILE)
isRunning = True
while isRunning:
garageUpdater.updateGarageState()
try:
item = queue.get(timeout=GARAGE_UPDATE_INTERVAL_IN_SECONDS)
isRunning = item != 'halt'
except Empty:
pass
garageUpdater.clearGarageState()
garageNotify.terminate()
def garageNotifyLoop(queue):
signal(SIGTERM, raiseHaltException())
garageListener = GarageListener(configFile=CONFIG_FILE)
isRunning = True
isFirstRun = True
while isRunning:
try:
if isFirstRun:
isFirstRun = False
else:
print('waiting to restart garage notify loop', flush=True)
sleep(GARAGE_LISTENER_RETRY_INTERVAL_IN_SECONDS)
print('listening for garage updates', flush=True)
garageListener.listenForNotifications(queue)
except HaltException:
isRunning = False
except Exception as e:
print(e, flush=True)
def cleanExit(unicorn, haltEvent, queue):
def _exit_(signum, frame):
unicorn.off()
haltEvent.set()
queue.put('halt')
queue.close()
sys.exit(0)
return _exit_
def raiseHaltException():
def _exit_(signum, frame):
raise HaltException()
return _exit_