import json import time from machine import I2C from net import AdafruitIO from net import logging from net import Ntfy from net import ntp from net import Server from net import templates from net import util from net.config import config from sensors import MCP9808 from sensors import WaterSensor class SensorServer(Server): def __init__(self): super().__init__() self.waterSensor = WaterSensor(0) self.mcp = MCP9808(I2C(0)) self.aio = AdafruitIO() self.ntfy = Ntfy() self.ntp_interval_in_seconds = 60 * 60 self.aio_interval_in_seconds = 60 * 5 self.ntp_ticks = time.ticks_ms() self.aio_ticks = time.ticks_ms() self.isWaterPresent = False ntp.sync() def work(self): super().work() ticks = time.ticks_ms() if util.secondsElapsed(ticks, self.ntp_ticks) > self.ntp_interval_in_seconds: self.ntp_ticks = ticks ntp.sync() if util.secondsElapsed(ticks, self.aio_ticks) > self.aio_interval_in_seconds: self.aio_ticks = ticks self.aio.upload(self.getReading()) self.updateWater() def updateWater(self): isWaterPresent = self.waterSensor.isWaterPresent() if isWaterPresent != self.isWaterPresent: self.isWaterPresent = isWaterPresent self.ntfy.message(f'Water is {self.waterTextValue()}') def handlePath(self, path): if path == 'index.json': return self.getJsonData() return self.getPathData(path) def getJsonData(self): return json.dumps(self.getState()) def getPathData(self, path): if path.endswith('.txt') or path.endswith('.ico'): with open(f'www/{path}', 'rb') as f: return f.read() return templates.render( f'www/{path or self.default_path}', hostname=config['hostname'], datetime=util.datetime(), temperature=self.temperatureTextValue(), waterStatus=self.waterTextValue(), waterClass=self.waterCssClass() ) def getReading(self): return { 'timestamp': util.datetimeISO8601(), 'readings': self.getState() } def getState(self): return { 'is-water-present': self.isWaterPresent, 'temperature': self.mcp.get_temp() } def temperatureTextValue(self): return round(self.mcp.get_temp(), 2) def waterTextValue(self): return 'Present' if self.isWaterPresent else 'Absent' def waterCssClass(self): return 'water-present' if self.isWaterPresent else 'water-absent' def main(): logging.log_file = 'www/log.txt' SensorServer().run() if __name__ == '__main__': main()