124 lines
3.2 KiB
Python
124 lines
3.2 KiB
Python
import sys
|
|
|
|
from .exceptions import HaltException
|
|
from .garage import GarageDisplayer
|
|
from .garage import GarageListener
|
|
from .garage import GarageUpdater
|
|
from .symboldisplayer import SymbolDisplayer
|
|
from .time import TimeDisplayer
|
|
from .weather import WeatherDisplayer
|
|
from .weather import WeatherUpdater
|
|
from multiprocessing import Event
|
|
from multiprocessing import Process
|
|
from multiprocessing import Queue
|
|
from queue import Empty
|
|
from signal import signal
|
|
from signal import SIGTERM
|
|
from time import sleep
|
|
|
|
try:
|
|
import unicornhathd as unicorn
|
|
unicorn.rotation(90)
|
|
except ImportError:
|
|
from unicorn_hat_sim import unicornhathd as unicorn
|
|
unicorn.rotation(180)
|
|
|
|
|
|
WEATHER_UPDATE_INTERVAL_IN_SECONDS = 60
|
|
GARAGE_UPDATE_INTERVAL_IN_SECONDS = 30
|
|
GARAGE_LISTENER_RETRY_INTERVAL_IN_SECONDS = 120
|
|
DISPLAY_UPDATE_INTERVAL_IN_SECONDS = 0.5
|
|
CONFIG_FILE = 'chutney.cfg'
|
|
|
|
|
|
def main():
|
|
queue = Queue()
|
|
haltEvent = Event()
|
|
Process(target=weatherLoop, args=[haltEvent]).start()
|
|
Process(target=garageLoop, args=[queue]).start()
|
|
signal(SIGTERM, cleanExit(unicorn, haltEvent, queue))
|
|
|
|
unicorn.brightness(0.3)
|
|
|
|
symbolDisplayer = SymbolDisplayer(unicorn)
|
|
timeDisplayer = TimeDisplayer(symbolDisplayer, topRow=15)
|
|
weatherDisplayer = WeatherDisplayer(symbolDisplayer, topRow=9)
|
|
garageDisplayer = GarageDisplayer(symbolDisplayer, topRow=3)
|
|
|
|
while True:
|
|
timeDisplayer.showTime()
|
|
weatherDisplayer.showWeather()
|
|
garageDisplayer.showGarageState()
|
|
sleep(DISPLAY_UPDATE_INTERVAL_IN_SECONDS)
|
|
|
|
|
|
def weatherLoop(haltEvent):
|
|
weatherUpdater = WeatherUpdater(configFile=CONFIG_FILE)
|
|
|
|
while not haltEvent.is_set():
|
|
weatherUpdater.updateWeather()
|
|
haltEvent.wait(timeout=WEATHER_UPDATE_INTERVAL_IN_SECONDS)
|
|
|
|
weatherUpdater.clearWeather()
|
|
|
|
|
|
def garageLoop(queue):
|
|
garageNotify = Process(target=garageNotifyLoop, args=[queue])
|
|
garageNotify.start()
|
|
|
|
garageUpdater = GarageUpdater(configFile=CONFIG_FILE)
|
|
isRunning = True
|
|
|
|
while isRunning:
|
|
garageUpdater.updateGarageState()
|
|
|
|
try:
|
|
item = queue.get(timeout=GARAGE_UPDATE_INTERVAL_IN_SECONDS)
|
|
isRunning = item != 'halt'
|
|
except Empty:
|
|
pass
|
|
|
|
garageUpdater.clearGarageState()
|
|
garageNotify.terminate()
|
|
|
|
|
|
def garageNotifyLoop(queue):
|
|
signal(SIGTERM, raiseHaltException())
|
|
|
|
garageListener = GarageListener(configFile=CONFIG_FILE)
|
|
isRunning = True
|
|
isFirstRun = True
|
|
|
|
while isRunning:
|
|
try:
|
|
if isFirstRun:
|
|
isFirstRun = False
|
|
else:
|
|
print('waiting to restart garage notify loop', flush=True)
|
|
sleep(GARAGE_LISTENER_RETRY_INTERVAL_IN_SECONDS)
|
|
|
|
print('listening for garage updates', flush=True)
|
|
garageListener.listenForNotifications(queue)
|
|
except HaltException:
|
|
isRunning = False
|
|
except Exception as e:
|
|
print(e, flush=True)
|
|
|
|
|
|
def cleanExit(unicorn, haltEvent, queue):
|
|
def _exit_(signum, frame):
|
|
unicorn.off()
|
|
haltEvent.set()
|
|
queue.put('halt')
|
|
queue.close()
|
|
sys.exit(0)
|
|
|
|
return _exit_
|
|
|
|
|
|
def raiseHaltException():
|
|
def _exit_(signum, frame):
|
|
raise HaltException()
|
|
|
|
return _exit_
|