import subprocess import re import os import psutil import signal import time import datetime import json from flask import Flask, render_template, request, redirect, url_for, make_response try: from takway.board import OrangePi led_enabled = True orangepi = OrangePi() except ImportError: led_enabled = False print("Error importing OrangePi") # blue: APP # red: hotspot def network_error_light(): error_time = datetime.datetime.now() while led_enabled: orangepi.set_led_off('blue') orangepi.set_led_on('red') time.sleep(0.5) orangepi.set_led_on('blue') orangepi.set_led_off('red') time.sleep(0.5) if error_time + datetime.timedelta(seconds=5) < datetime.datetime.now(): break close_app() app = Flask(__name__) def close_app(): if led_enabled: orangepi.set_led_off('blue') # 获取当前Flask应用程序的进程ID flask_pid = os.getpid() # 获取所有名为'python3'的进程ID python_pids = [p.info['pid'] for p in psutil.process_iter(attrs=['pid', 'name']) if p.info['name'] == 'python3'] # 关闭Flask应用程序进程 os.kill(flask_pid, signal.SIGTERM) # 关闭Python进程 for pid in python_pids: os.kill(pid, signal.SIGTERM) def start_hotspot(): try: subprocess.Popen('sudo systemctl start hotspot.service', shell=True) if led_enabled: orangepi.set_led_on('red') except subprocess.CalledProcessError as e: print(f"{datetime.datetime.now()}: Error starting create_ap service: {e}") def close_hotspot(): # 关闭热点 try: subprocess.Popen('sudo systemctl stop hotspot.service', shell=True) print(f"{datetime.datetime.now()}: Stopping create_ap service") if led_enabled: orangepi.set_led_off('red') except subprocess.CalledProcessError as e: print(f"{datetime.datetime.now()}: Error stopping hotspot: {e}") # 检测 Wi-Fi 连接状态 def check_wifi_connection(): cmd = "nmcli dev status" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) time.sleep(1) output = result.stdout.strip() lines = output.split("\n")[1:] # Skip the header line for line in lines: columns = line.split() print(columns) ''' ['wlan0', 'wifi', 'disconnected', '--'] ['p2p-dev-wlan0', 'wifi-p2p', 'disconnected', '--'] ['eth0', 'ethernet', 'unavailable', '--'] ['lo', 'loopback', 'unmanaged', '--'] ''' if len(columns) >= 4 and columns[2] == "connected": wifi_ssid = columns[3] if led_enabled: orangepi.set_led_off('blue') return True, wifi_ssid elif len(columns) >= 4 and columns[0] == "wlan0" and columns[2] == "unmanaged": network_error_light() return False, None def scan_wifi(): try: subprocess.run(['nmcli', 'dev', 'wifi', 'rescan'], check=True) cmd = "nmcli dev wifi" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) except subprocess.CalledProcessError as e: print(f"{datetime.datetime.now()}: Error scanning Wi-Fi: {e}") network_error_light() ssid_list = [] wifi_list = [] output = result.stdout.strip() lines = output.split("\n")[1:] # Skip the header line for line in lines: columns = line.split() ''' print(columns) for i, column in enumerate(columns): print(f"{i}: {column}") ''' # ['94:14:57:15:13:50', 'Meeting', 'MG', 'Infra', '1', '130', 'Mbit/s', '100', '****', 'WPA1', 'WPA2'] # 提取MAC地址 mac_address = columns[0] # 提取Wi-Fi名称 wifi_name = [] for i in range(1, len(columns)): if columns[i] == 'Infra': _wifi_name = ' '.join(columns[1:i]) wifi_name.append(_wifi_name) columns = columns[i+1:] break ssid = wifi_name[0] if ':' in ssid: # 94:14:57:15:13:50 Meeting # 去除mac地址 ssid = ssid.split(' ')[1:] ssid = ''.join(ssid) if ssid in ssid_list: continue ssid_list.append(ssid) # 提取强度 strength = None for i in range(len(columns)): if columns[i] == 'Mbit/s': strength = columns[i+1] break # print("MAC地址:", mac_address) # print("Wi-Fi名称:", ssid) # print("强度:", strength) wifi_list.append({'ssid': ssid, 'signal': strength, 'mac': mac_address}) if len(wifi_list) == 15: break return wifi_list # 连接 Wi-Fi def connect_wifi(ssid, password): # 连接到用户选择的 Wi-Fi 网络 try: output = subprocess.check_output(['nmcli', 'dev', 'wifi', 'connect', ssid, 'password', password]) output_str = output.decode('utf-8') # 将输出转换为字符串 if "successfully" in output_str: print(f"{datetime.datetime.now()}: Successfully connected to Wi-Fi: {ssid}") save_wifi(ssid, password) # 保存连接成功的Wi-Fi信息 return True else: print(f"{datetime.datetime.now()}: Error connecting to Wi-Fi: {output_str}") return False except subprocess.CalledProcessError as e: print(f"{datetime.datetime.now()}: Error connecting to Wi-Fi: {e}") return False # 关闭 Wi-Fi def disconnect_wifi(): try: output = subprocess.check_output(['nmcli', 'dev', 'disconnect', 'iface', 'wlan0']) time.sleep(1) output_str = output.decode('utf-8') # 将输出转换为字符串 if "successfully disconnected" in output_str: print(f"{datetime.datetime.now()}: Wi-Fi disconnected successfully") else: print(f"{datetime.datetime.now()}: Error disconnecting from Wi-Fi: {output_str}") except Exception as e: print(f"{datetime.datetime.now()}: Error disconnecting from Wi-Fi: {e}") def connect_saved_wifi(scaned_wifi_list): wifi_list = load_saved_wifi() if not wifi_list: return False, None for wifi in wifi_list: if wifi['ssid'] in [item['ssid'] for item in scaned_wifi_list]: connect_wifi(wifi['ssid'], wifi['password']) return True, wifi['ssid'] return False, None def load_saved_wifi(): try: with open('wifi_list.json', 'r') as f: wifi_list = json.load(f) return wifi_list except FileNotFoundError: return [] def save_wifi(ssid, password): wifi_list = load_saved_wifi() if ssid not in [item['ssid'] for item in wifi_list]: wifi_list.append({'ssid': ssid, 'password': password}) with open('wifi_list.json', 'w') as f: json.dump(wifi_list, f) # 主页 @app.route('/') def index(): global wifi_list response = make_response(render_template('index.html', wifi_list=wifi_list)) response.headers.set('Content-Type', 'text/html') response.headers.set('Apple-Web-App-Capable', 'yes') response.headers.set('Apple-Mobile-Web-App-Status-Bar-Style', 'black-translucent') return response # 提交 Wi-Fi 信息 @app.route('/submit', methods=['POST']) def submit(): if led_enabled: orangepi.set_led_on('red') ssid = request.form['ssid'] password = request.form['password'] print(f"{datetime.datetime.now()}: Connecting to Wi-Fi: {ssid} with password {password}") # 关闭热点 close_hotspot() time.sleep(3) # 连接到用户选择的 Wi-Fi 网络 if connect_wifi(ssid, password): close_app() connected, wifi_ssid = check_wifi_connection() if not connected: print(f"{datetime.datetime.now()}: Wi-Fi连接失败。") wifi_list = scan_wifi() start_hotspot() else: save_wifi(wifi_ssid, password) # 保存连接成功的Wi-Fi信息 return redirect(url_for('index')) if __name__ == '__main__': debug_mode = False # 设置为 True 以跳过 Wi-Fi 连接状态检测 if debug_mode: disconnect_wifi() wifi_list = scan_wifi() print(wifi_list) start_hotspot() # app.run(host='0.0.0.0', port=80) if connect_wifi("Innoxsz-Public", "innox2023"): close_app() if led_enabled: orangepi.set_led_on('blue') connected, wifi_ssid = check_wifi_connection() if connected: print(f"{datetime.datetime.now()}: 系统已自动连接到 Wi-Fi 网络,退出程序") close_app() wifi_list = scan_wifi() connected, wifi_ssid = connect_saved_wifi(wifi_list) if connected: print(f"{datetime.datetime.now()}: 系统已自动连接到 Wi-Fi 网络 {wifi_ssid},退出程序") close_app() print(f"{datetime.datetime.now()}: 未连接到 Wi-Fi 网络, 开始热点模式") start_hotspot() app.run(host='0.0.0.0', port=80)