diff --git a/auth_manager.py b/auth_manager.py new file mode 100644 index 0000000..87c5c28 --- /dev/null +++ b/auth_manager.py @@ -0,0 +1,126 @@ +from time import sleep +from umachine import RTC +import system_manager +import struct +import ntptime +import urequests as requests +import ujson +import utime + +def get_jwt(): + data =ujson.dumps({'opt_code': totp(ntptime.time(), system_manager.read_systemdata()['TS']), 'id': system_manager.read_systemdata()['ID']}) + response = requests.post("https://tcs-auth.bogner.systems/telegraph/login", headers = {'content-type': 'application/json'}, data=data) + if response.status_code != 200: + return None + return response.text + +def totp(time, key, step_secs=30, digits=6): + hmac = hmac_sha1(base32_decode(key), struct.pack(">Q", time // step_secs)) + offset = hmac[-1] & 0xF + code = ((hmac[offset] & 0x7F) << 24 | + (hmac[offset + 1] & 0xFF) << 16 | + (hmac[offset + 2] & 0xFF) << 8 | + (hmac[offset + 3] & 0xFF)) + code = str(code % 10 ** digits) + return code + +def base32_decode(message): + padded_message = message + '=' * (8 - len(message) % 8) + chunks = [padded_message[i:i+8] for i in range(0, len(padded_message), 8)] + + decoded = [] + + for chunk in chunks: + bits = 0 + bitbuff = 0 + + for c in chunk: + if 'A' <= c <= 'Z': + n = ord(c) - ord('A') + elif '2' <= c <= '7': + n = ord(c) - ord('2') + 26 + elif c == '=': + continue + else: + raise ValueError("Not Base32") + + bits += 5 + bitbuff <<= 5 + bitbuff |= n + + if bits >= 8: + bits -= 8 + byte = bitbuff >> bits + bitbuff &= ~(0xFF << bits) + decoded.append(byte) + + return bytes(decoded) + + + + +HASH_CONSTANTS = [0x67452301, 0xEFCDAB89, 0x98BADCFE, 0x10325476, 0xC3D2E1F0] + + +def left_rotate(n, b): + return ((n << b) | (n >> (32 - b))) & 0xFFFFFFFF + + +def expand_chunk(chunk): + w = list(struct.unpack(">16L", chunk)) + [0] * 64 + for i in range(16, 80): + w[i] = left_rotate((w[i - 3] ^ w[i - 8] ^ w[i - 14] ^ w[i - 16]), 1) + return w + + +def sha1(message): + h = HASH_CONSTANTS + padded_message = message + b"\x80" + \ + (b"\x00" * (63 - (len(message) + 8) % 64)) + \ + struct.pack(">Q", 8 * len(message)) + chunks = [padded_message[i:i+64] + for i in range(0, len(padded_message), 64)] + + for chunk in chunks: + expanded_chunk = expand_chunk(chunk) + a, b, c, d, e = h + for i in range(0, 80): + if 0 <= i < 20: + f = (b & c) | ((~b) & d) + k = 0x5A827999 + elif 20 <= i < 40: + f = b ^ c ^ d + k = 0x6ED9EBA1 + elif 40 <= i < 60: + f = (b & c) | (b & d) | (c & d) + k = 0x8F1BBCDC + elif 60 <= i < 80: + f = b ^ c ^ d + k = 0xCA62C1D6 + a, b, c, d, e = ( + left_rotate(a, 5) + f + e + k + expanded_chunk[i] & 0xFFFFFFFF, + a, + left_rotate(b, 30), + c, + d, + ) + h = ( + h[0] + a & 0xFFFFFFFF, + h[1] + b & 0xFFFFFFFF, + h[2] + c & 0xFFFFFFFF, + h[3] + d & 0xFFFFFFFF, + h[4] + e & 0xFFFFFFFF, + ) + + return struct.pack(">5I", *h) + + +def hmac_sha1(key, message): + key_block = key + (b'\0' * (64 - len(key))) + key_inner = bytes((x ^ 0x36) for x in key_block) + key_outer = bytes((x ^ 0x5C) for x in key_block) + + inner_message = key_inner + message + outer_message = key_outer + sha1(inner_message) + + return sha1(outer_message) \ No newline at end of file diff --git a/system_manager.py b/system_manager.py new file mode 100644 index 0000000..ff2e97c --- /dev/null +++ b/system_manager.py @@ -0,0 +1,13 @@ + + +SYSTEMDATA = "SYSTEMDATA.DAT" + +def read_systemdata(): + with open(SYSTEMDATA) as f: + lines = f.readlines() + data = {} + for line in lines: + id, totp_secret = line.strip("\n").split(";") + data['ID'] = id + data['TS'] = totp_secret + return data \ No newline at end of file diff --git a/wifi_manager.py b/wifi_manager.py new file mode 100644 index 0000000..f73e4a5 --- /dev/null +++ b/wifi_manager.py @@ -0,0 +1,344 @@ +import network +import socket +import ure +import time +import machine + +ap_ssid = "WifiManager" +ap_password = "tayfunulu" +ap_authmode = 3 # WPA2 + +NETWORK_PROFILES = 'wifi.dat' + +wlan_ap = network.WLAN(network.AP_IF) +wlan_sta = network.WLAN(network.STA_IF) + +server_socket = None + + +def get_connection(): + """return a working WLAN(STA_IF) instance or None""" + + # First check if there already is any connection: + if wlan_sta.isconnected(): + return wlan_sta + + connected = False + try: + # ESP connecting to WiFi takes time, wait a bit and try again: + time.sleep(3) + if wlan_sta.isconnected(): + return wlan_sta + + # Read known network profiles from file + profiles = read_profiles() + + # Search WiFis in range + wlan_sta.active(True) + networks = wlan_sta.scan() + + AUTHMODE = {0: "open", 1: "WEP", 2: "WPA-PSK", 3: "WPA2-PSK", 4: "WPA/WPA2-PSK"} + for ssid, bssid, channel, rssi, authmode, hidden in sorted(networks, key=lambda x: x[3], reverse=True): + ssid = ssid.decode('utf-8') + encrypted = authmode > 0 + print("ssid: %s chan: %d rssi: %d authmode: %s" % (ssid, channel, rssi, AUTHMODE.get(authmode, '?'))) + if encrypted: + if ssid in profiles: + password = profiles[ssid] + connected = do_connect(ssid, password) + else: + print("skipping unknown encrypted network") + if connected: + break + + except OSError as e: + print("exception", str(e)) + + # start web server for connection manager: + if not connected: + connected = start() + machine.reset() + + return wlan_sta if connected else None + + +def read_profiles(): + with open(NETWORK_PROFILES) as f: + lines = f.readlines() + profiles = {} + for line in lines: + ssid, password = line.strip("\n").split(";") + profiles[ssid] = password + return profiles + + +def write_profiles(profiles): + lines = [] + for ssid, password in profiles.items(): + lines.append("%s;%s\n" % (ssid, password)) + with open(NETWORK_PROFILES, "w") as f: + f.write(''.join(lines)) + + +def do_connect(ssid, password): + wlan_sta.active(True) + if wlan_sta.isconnected(): + return None + print('Trying to connect to %s...' % ssid) + wlan_sta.connect(ssid, password) + for retry in range(200): + connected = wlan_sta.isconnected() + if connected: + break + time.sleep(0.1) + print('.', end='') + if connected: + print('\nConnected. Network config: ', wlan_sta.ifconfig()) + + else: + print('\nFailed. Not Connected to: ' + ssid) + return connected + + +def send_header(client, status_code=200, content_length=None): + client.sendall("HTTP/1.0 {} OK\r\n".format(status_code)) + client.sendall("Content-Type: text/html\r\n") + if content_length is not None: + client.sendall("Content-Length: {}\r\n".format(content_length)) + client.sendall("\r\n") + + +def send_response(client, payload, status_code=200): + content_length = len(payload) + send_header(client, status_code, content_length) + if content_length > 0: + client.sendall(payload) + client.close() + + +def handle_root(client): + wlan_sta.active(True) + ssids = sorted(ssid.decode('utf-8') for ssid, *_ in wlan_sta.scan()) + send_header(client) + client.sendall("""\ + +

+ + Wi-Fi Client Setup + +

+
+ + + """) + while len(ssids): + ssid = ssids.pop(0) + client.sendall("""\ + + + + """.format(ssid)) + client.sendall("""\ + + + + + +
+ {0} +
Password:
+

+ +

+
+

 

+
+
+ + Your ssid and password information will be saved into the + "%(filename)s" file in your ESP module for future usage. + Be careful about security! + +
+
+

+ Some useful infos: +

+ + + """ % dict(filename=NETWORK_PROFILES)) + client.close() + +def custom_url_decode(s): + # Erst %XX durch das entsprechende Zeichen ersetzen + i = 0 + result = "" + while i < len(s): + if s[i] == '%': + hex_value = s[i+1:i+3] + result += chr(int(hex_value, 16)) + i += 3 + elif s[i] == '+': + result += ' ' + i += 1 + else: + result += s[i] + i += 1 + return result + +def handle_configure(client, request): + match = ure.search("ssid=([^&]*)&password=(.*)", request) + + if match is None: + send_response(client, "Parameters not found", status_code=400) + return False + # version 1.9 compatibility + try: + ssid = match.group(1).decode("utf-8").replace("%3F", "?").replace("%21", "!") + password = match.group(2).decode("utf-8").replace("%3F", "?").replace("%21", "!") + except Exception: + ssid = match.group(1).replace("%3F", "?").replace("%21", "!") + password = match.group(2).replace("%3F", "?").replace("%21", "!") + + if len(ssid) == 0: + send_response(client, "SSID must be provided", status_code=400) + return False + + ssid = custom_url_decode(ssid) + + if do_connect(ssid, password): + response = """\ + +
+

+

+ + ESP successfully connected to WiFi network %(ssid)s. + +

+

+
+ + """ % dict(ssid=ssid) + send_response(client, response) + time.sleep(1) + wlan_ap.active(False) + try: + profiles = read_profiles() + except OSError: + profiles = {} + profiles[ssid] = password + write_profiles(profiles) + + time.sleep(5) + + return True + else: + response = """\ + +
+

+ + ESP could not connect to WiFi network %(ssid)s. + +

+

+
+ +
+
+ + """ % dict(ssid=ssid) + send_response(client, response) + return False + + +def handle_not_found(client, url): + send_response(client, "Path not found: {}".format(url), status_code=404) + + +def stop(): + global server_socket + + if server_socket: + server_socket.close() + server_socket = None + + +def start(port=80): + global server_socket + + addr = socket.getaddrinfo('0.0.0.0', port)[0][-1] + + stop() + + wlan_sta.active(True) + wlan_ap.active(True) + + wlan_ap.config(essid=ap_ssid, password=ap_password) + + server_socket = socket.socket() + server_socket.bind(addr) + server_socket.listen(1) + + print('Connect to WiFi ssid ' + ap_ssid + ', default password: ' + ap_password) + print('and access the ESP via your favorite web browser at 192.168.4.1.') + print('Listening on:', addr) + + while True: + if wlan_sta.isconnected(): + wlan_ap.active(False) + server_socket.close() + wlan_sta.disconnect() + wlan_ap.disconnect() + return True + + client, addr = server_socket.accept() + print('client connected from', addr) + try: + client.settimeout(5.0) + + request = b"" + try: + while "\r\n\r\n" not in request: + request += client.recv(512) + except OSError: + pass + + # Handle form data from Safari on macOS and iOS; it sends \r\n\r\nssid=&password= + try: + request += client.recv(1024) + print("Received form data after \\r\\n\\r\\n(i.e. from Safari on macOS or iOS)") + except OSError: + pass + + print("Request is: {}".format(request)) + if "HTTP" not in request: # skip invalid requests + continue + + # version 1.9 compatibility + try: + url = ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request).group(1).decode("utf-8").rstrip("/") + except Exception: + url = ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request).group(1).rstrip("/") + print("URL is {}".format(url)) + + if url == "": + handle_root(client) + elif url == "configure": + handle_configure(client, request) + else: + handle_not_found(client, url) + + finally: + client.close() +