from signal import SIGTERM, signal from subprocess import CalledProcessError, check_output from sys import exit from time import sleep INITIAL_DELAY_IN_SECONDS = 40 PING_INTERVAL_IN_SECONDS = 80 class WifiKeepalive: def __init__(self): self.defaultRoute = self.getDefaultWifiRoute() self.initialDelay = INITIAL_DELAY_IN_SECONDS self.pingInterval = PING_INTERVAL_IN_SECONDS self.isConnected = True self.pingCommand = 'ping -c 1 -W 2 -n' def run(self): sleep(self.initialDelay) while self.isRunning(): self.keepalive() sleep(self.pingInterval) def isRunning(self): return True def keepalive(self): if self.defaultRoute: self.pingDefaultRoute() else: self.attemptWifiConnection() def pingDefaultRoute(self): if not self.isConnected: print('Connected to Wifi', flush=True) self.isConnected = True isPingSuccess = self.ping() if not isPingSuccess: self.defaultRoute = self.getDefaultWifiRoute() def attemptWifiConnection(self): if self.isConnected: print('No Wifi Connection', flush=True) self.isConnected = False self.defaultRoute = self.getDefaultWifiRoute() def ping(self): try: check_output([*self.pingCommand.split(), self.defaultRoute]) except CalledProcessError as e: return False return True def getDefaultWifiRoute(self): defaults = [ r['gateway'] for r in self.getRoutes() if self.isDefaultWifiRoute(r) ] return defaults[0] if defaults else None def getRoutes(self): outputLines = check_output( ['route', '-n'] ).decode('utf-8').splitlines()[2:] routes = map(lambda r: self.parseRoute(r), outputLines) return routes def parseRoute(self, routeLine): destination, gateway, _, _, _, _, _, interface = routeLine.split() return {'destination': destination, 'gateway': gateway, 'interface': interface} def isDefaultWifiRoute(self, route): return route['destination'] == '0.0.0.0' and route['interface'].startswith('w') def main(): signal(SIGTERM, lambda s, f: exit(0)) WifiKeepalive().run() if __name__ == '__main__': main()