104 lines
2.7 KiB
Python
104 lines
2.7 KiB
Python
import json
|
|
import time
|
|
|
|
from machine import I2C
|
|
from net import AdafruitIO
|
|
from net import logging
|
|
from net import Ntfy
|
|
from net import ntp
|
|
from net import Server
|
|
from net import templates
|
|
from net import util
|
|
from sensors import MCP9808
|
|
from sensors import WaterSensor
|
|
|
|
|
|
class SensorServer(Server):
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.waterSensor = WaterSensor(0)
|
|
self.mcp = MCP9808(I2C(0))
|
|
self.aio = AdafruitIO()
|
|
self.ntfy = Ntfy()
|
|
self.ntp_interval_in_seconds = 60 * 60
|
|
self.aio_interval_in_seconds = 60 * 5
|
|
self.ntp_ticks = time.ticks_ms()
|
|
self.aio_ticks = time.ticks_ms()
|
|
self.isWaterPresent = False
|
|
|
|
ntp.sync()
|
|
|
|
def work(self):
|
|
super().work()
|
|
ticks = time.ticks_ms()
|
|
|
|
if util.secondsElapsed(ticks, self.ntp_ticks) > self.ntp_interval_in_seconds:
|
|
self.ntp_ticks = ticks
|
|
ntp.sync()
|
|
|
|
if util.secondsElapsed(ticks, self.aio_ticks) > self.aio_interval_in_seconds:
|
|
self.aio_ticks = ticks
|
|
self.aio.upload(self.getReading())
|
|
|
|
self.updateWater()
|
|
|
|
def updateWater(self):
|
|
isWaterPresent = self.waterSensor.isWaterPresent()
|
|
|
|
if isWaterPresent != self.isWaterPresent:
|
|
self.isWaterPresent = isWaterPresent
|
|
self.ntfy.message(f'Water is {self.waterTextValue()}')
|
|
|
|
def handlePath(self, path):
|
|
if path == 'index.json':
|
|
return self.getJsonData()
|
|
|
|
return self.getPathData(path)
|
|
|
|
def getJsonData(self):
|
|
return json.dumps(self.getState())
|
|
|
|
def getPathData(self, path):
|
|
if path.endswith('.txt') or path.endswith('.ico'):
|
|
with open(f'www/{path}', 'rb') as f:
|
|
return f.read()
|
|
|
|
return templates.render(
|
|
f'www/{path or self.default_path}',
|
|
datetime=util.datetime(),
|
|
temperature=self.temperatureTextValue(),
|
|
waterStatus=self.waterTextValue(),
|
|
waterClass=self.waterCssClass()
|
|
)
|
|
|
|
def getReading(self):
|
|
return {
|
|
'timestamp': util.datetimeISO8601(),
|
|
'readings': self.getState()
|
|
}
|
|
|
|
def getState(self):
|
|
return {
|
|
'is-water-present': self.isWaterPresent,
|
|
'temperature': self.mcp.get_temp()
|
|
}
|
|
|
|
def temperatureTextValue(self):
|
|
return round(self.mcp.get_temp(), 2)
|
|
|
|
def waterTextValue(self):
|
|
return 'Present' if self.isWaterPresent else 'Absent'
|
|
|
|
def waterCssClass(self):
|
|
return 'water-present' if self.isWaterPresent else 'water-absent'
|
|
|
|
|
|
def main():
|
|
logging.log_file = 'www/log.txt'
|
|
SensorServer().run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|