chutney/chutney.py

134 lines
3.2 KiB
Python
Raw Normal View History

import json
import requests
import sys
from character_display import CharacterDisplay
from configparser import ConfigParser
2022-12-26 19:37:50 -05:00
from garage_display import GarageDisplay
from garage_updater import GarageUpdater
2022-12-25 13:55:08 -05:00
from multiprocessing import Event
from multiprocessing import Process
from multiprocessing import Queue
from queue import Empty
2022-12-25 12:33:06 -05:00
from signal import signal
from signal import SIGTERM
from time import sleep
from time_display import TimeDisplay
2022-12-24 10:59:04 -05:00
from weather_display import WeatherDisplay
from weather_updater import WeatherUpdater
try:
import unicornhathd as unicorn
unicorn.rotation(90)
except ImportError:
from unicorn_hat_sim import unicornhathd as unicorn
unicorn.rotation(180)
2022-12-26 19:37:50 -05:00
WEATHER_UPDATE_INTERVAL_IN_SECONDS = 10
GARAGE_UPDATE_INTERVAL_IN_SECONDS = 30
DISPLAY_UPDATE_INTERVAL_IN_SECONDS = 0.5
CONFIG_FILE = 'chutney.cfg'
class HaltException(Exception):
pass
def main():
queue = Queue()
2022-12-25 12:33:06 -05:00
haltEvent = Event()
Process(target=weatherLoop, args=[haltEvent]).start()
Process(target=garageLoop, args=[queue]).start()
signal(SIGTERM, cleanExit(unicorn, haltEvent, queue))
unicorn.brightness(0.3)
2022-12-26 19:37:50 -05:00
2022-12-23 14:57:33 -05:00
characterDisplay = CharacterDisplay(unicorn)
timeDisplay = TimeDisplay(characterDisplay, topRow=15)
weatherDisplay = WeatherDisplay(characterDisplay, topRow=9)
2022-12-26 19:37:50 -05:00
garageDisplay = GarageDisplay(characterDisplay, topRow=3)
while True:
timeDisplay.showTime()
weatherDisplay.showWeather()
2022-12-26 19:37:50 -05:00
garageDisplay.showGarageState()
sleep(DISPLAY_UPDATE_INTERVAL_IN_SECONDS)
2022-12-25 12:33:06 -05:00
def weatherLoop(haltEvent):
weatherUpdater = WeatherUpdater(configFile=CONFIG_FILE)
while not haltEvent.is_set():
weatherUpdater.updateWeather()
haltEvent.wait(timeout=WEATHER_UPDATE_INTERVAL_IN_SECONDS)
weatherUpdater.clearWeather()
def garageLoop(queue):
garageNotify = Process(target=garageNotifyLoop, args=[queue])
garageNotify.start()
2022-12-26 19:37:50 -05:00
garageUpdater = GarageUpdater(configFile=CONFIG_FILE)
isRunning = True
2022-12-26 19:37:50 -05:00
while isRunning:
2022-12-26 19:37:50 -05:00
garageUpdater.updateGarageState()
try:
item = queue.get(timeout=GARAGE_UPDATE_INTERVAL_IN_SECONDS)
isRunning = item != 'halt'
except Empty:
pass
2022-12-26 19:37:50 -05:00
garageUpdater.clearGarageState()
garageNotify.terminate()
def garageNotifyLoop(queue):
signal(SIGTERM, raiseHaltException())
config = ConfigParser()
config.read(CONFIG_FILE)
2022-12-26 19:37:50 -05:00
ntfy = config['garage'].get('ntfy')
topic = config['garage'].get('topic')
2022-12-26 19:37:50 -05:00
try:
resp = requests.get(f'https://{ntfy}/{topic}/json', stream=True)
for line in resp.iter_lines():
if line:
data = json.loads(line.decode('utf-8'))
if (data['event'] == 'message'):
queue.put('update')
except HaltException:
pass
except Exception as e:
print(e, flush=True)
finally:
resp.close()
def cleanExit(unicorn, haltEvent, queue):
2022-12-25 12:33:06 -05:00
def _exit_(signum, frame):
unicorn.off()
haltEvent.set()
queue.put('halt')
queue.close()
2022-12-25 12:33:06 -05:00
sys.exit(0)
return _exit_
def raiseHaltException():
def _exit_(signum, frame):
raise HaltException()
return _exit_
if __name__ == '__main__':
main()