import io import select import socket import sys from . import logging from . import wifi from . import http class Server: def __init__(self): self.wlan = wifi.connect() self.socket = socket.socket() self.poller = select.poll() self.default_path = 'index.html' def cleanup(self): self.socket.close() self.wlan.disconnect() def run(self): try: addr = self.listen() self.poller.register(self.socket, select.POLLIN) logging.info(f'listening on {addr}') while True: try: self.serve() self.work() except Exception as e: self.log_exception(e) finally: self.cleanup() def log_exception(self, e): buf = io.StringIO() sys.print_exception(e, buf) logging.debug(f'exception:', buf.getvalue()) def listen(self): addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1] self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(addr) self.socket.listen(1) return addr def serve(self): evts = self.poller.poll(50) for sock, _evt in evts: try: conn, addr = sock.accept() logging.info(f'client connected from {addr}') request = conn.recv(1024).decode('utf-8').strip() self.handle_request(conn, request) except: conn.write(http.server_error_response) raise finally: conn.close() def handle_request(self, conn, request): [method, path, _protocol] = request.partition('\n')[0].split() logging.info(f'{method} {path}') try: if method == 'GET': response = self.handle_path(path.strip('/')) conn.write(self.get_content_type(path, response)) conn.write(response) else: conn.write(http.not_found_response) except OSError: conn.write(http.not_found_response) def get_content_type(self, path, response): if path.endswith('.txt'): return http.ok_text_response elif path.endswith('.json'): return http.ok_json_response elif path.endswith('.ico'): return http.ok_icon_response elif response == '': return http.no_content_response return http.ok_response def handle_path(self, _path): return '' def work(self): if not self.wlan.isconnected(): self.wlan = wifi.connect()