aquifer/net/server.py

100 lines
2.5 KiB
Python
Raw Permalink Normal View History

2023-02-07 08:49:20 -05:00
import io
import select
import socket
import sys
from . import logging
from . import wifi
from . import http
class Server:
def __init__(self):
self.wlan = wifi.connect()
self.socket = socket.socket()
self.poller = select.poll()
self.default_path = 'index.html'
def cleanup(self):
self.socket.close()
self.wlan.disconnect()
def run(self):
try:
addr = self.listen()
self.poller.register(self.socket, select.POLLIN)
logging.info(f'listening on {addr}')
while True:
try:
self.serve()
self.work()
except Exception as e:
self.logException(e)
finally:
self.cleanup()
def logException(self, e):
buf = io.StringIO()
sys.print_exception(e, buf)
logging.debug(f'exception:', buf.getvalue())
def listen(self):
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(addr)
self.socket.listen(1)
return addr
def serve(self):
evts = self.poller.poll(5000)
for sock, _evt in evts:
try:
conn, addr = sock.accept()
logging.info(f'client connected from {addr}')
request = conn.recv(1024).decode('utf-8').strip()
self.handleRequest(conn, request)
except:
conn.write(http.serverErrorResponse)
raise
finally:
conn.close()
def handleRequest(self, conn, request):
[method, path, _protocol] = request.partition('\n')[0].split()
logging.info(f'{method} {path}')
try:
if method == 'GET':
response = self.handlePath(path.strip('/'))
conn.write(self.getPathContentType(path))
conn.write(response)
else:
conn.write(http.notFoundResponse)
except OSError:
conn.write(http.notFoundResponse)
def getPathContentType(self, path):
if path.endswith('.txt'):
return http.okTextResponse
elif path.endswith('.json'):
return http.okJsonResponse
elif path.endswith('.ico'):
return http.okIconResponse
return http.okResponse
def handlePath(self, _path):
return ''
def work(self):
if not self.wlan.isconnected():
self.wlan = wifi.connect()