aquifer/main.py

115 lines
3.0 KiB
Python

import json
import time
from machine import I2C
from net import AdafruitIO
from net import logging
from net import Ntfy
from net import ntp
from net import Server
from net import templates
from net import util
from net.config import config
from sensors import MCP9808
from sensors import SimulatedMCP9808
from sensors import WaterSensor
class SensorServer(Server):
def __init__(self):
super().__init__()
try:
self.mcp = MCP9808(I2C(0))
except:
self.mcp = SimulatedMCP9808(temperature=37)
self.waterSensor = WaterSensor(0)
self.aio = AdafruitIO()
self.ntfy = Ntfy()
self.ntp_interval_in_seconds = 60 * 60
self.aio_interval_in_seconds = 60 * 5
self.ntp_ticks = time.ticks_ms()
self.aio_ticks = time.ticks_ms()
self.isWaterPresent = False
ntp.sync()
def work(self):
super().work()
ticks = time.ticks_ms()
if util.secondsElapsed(ticks, self.ntp_ticks) > self.ntp_interval_in_seconds:
self.ntp_ticks = ticks
ntp.sync()
if util.secondsElapsed(ticks, self.aio_ticks) > self.aio_interval_in_seconds:
self.aio_ticks = ticks
self.aio.upload(self.getReading())
self.updateWater()
def updateWater(self):
isWaterPresent = self.waterSensor.isWaterPresent()
if isWaterPresent != self.isWaterPresent:
self.isWaterPresent = isWaterPresent
#self.ntfy.message(f'Water is {self.waterTextValue()}')
def handlePath(self, path):
if path == 'index.json':
return self.getJsonData()
return self.getPathData(path)
def getJsonData(self):
return json.dumps(self.getState())
def getPathData(self, path):
if path.endswith('.txt') or path.endswith('.ico'):
with open(f'www/{path}', 'rb') as f:
return f.read()
return templates.render(
f'www/{path or self.default_path}',
hostname=config['hostname'],
datetime=util.datetime(),
temperature=self.temperatureTextValue(),
waterStatus=self.waterTextValue(),
waterClass=self.waterCssClass()
)
def getReading(self):
readings = self.getState()
readings['is-water-present'] = 1 if readings['is-water-present'] else 0
return {
'timestamp': util.datetimeISO8601(),
'readings': readings
}
def getState(self):
return {
'is-water-present': self.isWaterPresent,
'temperature': self.mcp.get_temp()
}
def temperatureTextValue(self):
return round(self.mcp.get_temp(), 2)
def waterTextValue(self):
return 'Present' if self.isWaterPresent else 'Absent'
def waterCssClass(self):
return 'water-present' if self.isWaterPresent else 'water-absent'
def main():
logging.log_file = 'www/log.txt'
SensorServer().run()
if __name__ == '__main__':
main()