chutney/chutney/runner.py

117 lines
3.0 KiB
Python
Raw Normal View History

2023-01-05 19:11:30 -05:00
import sys
from .exceptions import HaltException
from .garage import GarageDisplayer
from .garage import GarageListener
2023-01-06 11:14:00 -05:00
from .garage import GarageUpdater
from .symboldisplayer import SymbolDisplayer
from .time import TimeDisplayer
from .weather import WeatherDisplayer
2023-01-06 11:14:00 -05:00
from .weather import WeatherUpdater
2023-01-05 19:11:30 -05:00
from multiprocessing import Event
from multiprocessing import Process
from multiprocessing import Queue
from queue import Empty
from signal import signal
from signal import SIGTERM
from time import sleep
try:
import unicornhathd as unicorn
unicorn.rotation(90)
except ImportError:
from unicorn_hat_sim import unicornhathd as unicorn
unicorn.rotation(180)
2023-01-07 09:38:31 -05:00
WEATHER_UPDATE_INTERVAL_IN_SECONDS = 60
2023-01-05 19:11:30 -05:00
GARAGE_UPDATE_INTERVAL_IN_SECONDS = 30
GARAGE_LISTENER_RETRY_INTERVAL_IN_SECONDS = 120
2023-01-05 19:11:30 -05:00
DISPLAY_UPDATE_INTERVAL_IN_SECONDS = 0.5
CONFIG_FILE = 'chutney.cfg'
def main():
queue = Queue()
haltEvent = Event()
Process(target=weatherLoop, args=[haltEvent]).start()
Process(target=garageLoop, args=[queue]).start()
signal(SIGTERM, cleanExit(unicorn, haltEvent, queue))
unicorn.brightness(0.3)
symbolDisplayer = SymbolDisplayer(unicorn)
timeDisplayer = TimeDisplayer(symbolDisplayer, topRow=15)
weatherDisplayer = WeatherDisplayer(symbolDisplayer, topRow=9)
garageDisplayer = GarageDisplayer(symbolDisplayer, topRow=3)
2023-01-05 19:11:30 -05:00
while True:
timeDisplayer.showTime()
weatherDisplayer.showWeather()
garageDisplayer.showGarageState()
2023-01-05 19:11:30 -05:00
sleep(DISPLAY_UPDATE_INTERVAL_IN_SECONDS)
def weatherLoop(haltEvent):
weatherUpdater = WeatherUpdater(configFile=CONFIG_FILE)
while not haltEvent.is_set():
weatherUpdater.updateWeather()
haltEvent.wait(timeout=WEATHER_UPDATE_INTERVAL_IN_SECONDS)
weatherUpdater.clearWeather()
def garageLoop(queue):
garageNotify = Process(target=garageNotifyLoop, args=[queue])
garageNotify.start()
garageUpdater = GarageUpdater(configFile=CONFIG_FILE)
isRunning = True
while isRunning:
garageUpdater.updateGarageState()
try:
item = queue.get(timeout=GARAGE_UPDATE_INTERVAL_IN_SECONDS)
isRunning = item != 'halt'
except Empty:
pass
garageUpdater.clearGarageState()
garageNotify.terminate()
def garageNotifyLoop(queue):
signal(SIGTERM, raiseHaltException())
garageListener = GarageListener(configFile=CONFIG_FILE)
isRunning = True
while isRunning:
try:
garageListener.listenForNotifications(queue)
except HaltException:
isRunning = False
except Exception as e:
print(e, flush=True)
sleep(GARAGE_LISTENER_RETRY_INTERVAL_IN_SECONDS)
2023-01-05 19:11:30 -05:00
def cleanExit(unicorn, haltEvent, queue):
def _exit_(signum, frame):
unicorn.off()
haltEvent.set()
queue.put('halt')
queue.close()
sys.exit(0)
return _exit_
def raiseHaltException():
def _exit_(signum, frame):
raise HaltException()
return _exit_