import io import select import socket import sys from . import logging from . import wifi from . import http class Server: def __init__(self): self.wlan = wifi.connect() self.socket = socket.socket() self.poller = select.poll() self.default_path = 'index.html' def cleanup(self): self.socket.close() self.wlan.disconnect() def run(self): try: addr = self.listen() self.poller.register(self.socket, select.POLLIN) logging.info(f'listening on {addr}') while True: try: self.serve() self.work() except Exception as e: self.logException(e) finally: self.cleanup() def logException(self, e): buf = io.StringIO() sys.print_exception(e, buf) logging.debug(f'exception:', buf.getvalue()) def listen(self): addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1] self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(addr) self.socket.listen(1) return addr def serve(self): evts = self.poller.poll(10) for sock, _evt in evts: try: conn, addr = sock.accept() logging.info(f'client connected from {addr}') request = conn.recv(1024).decode('utf-8').strip() self.handleRequest(conn, request) except: conn.write(http.serverErrorResponse) raise finally: conn.close() def handleRequest(self, conn, request): [method, path, _protocol] = request.partition('\n')[0].split() logging.info(f'{method} {path}') try: if method == 'GET': response = self.handlePath(path.strip('/')) conn.write(self.getContentType(path, response)) conn.write(response) else: conn.write(http.notFoundResponse) except OSError: conn.write(http.notFoundResponse) def getContentType(self, path, response): if path.endswith('.txt'): return http.okTextResponse elif path.endswith('.json'): return http.okJsonResponse elif path.endswith('.ico'): return http.okIconResponse elif response == '': return http.noContentResponse return http.okResponse def handlePath(self, _path): return '' def work(self): if not self.wlan.isconnected(): self.wlan = wifi.connect()