import subprocess import re import os import psutil import signal import time import datetime import json from flask import Flask, render_template, request, redirect, url_for, make_response import socket import logging logging.basicConfig( filename='app.log', # 日志文件的名称 filemode='a', # 'a'代表追加模式,如果文件存在则追加内容 level=logging.DEBUG, # 日志级别,DEBUG是最低的级别,会记录所有级别的日志 format='%(asctime)s - %(levelname)s - %(message)s' # 日志的格式 ) try: from takway.board import OrangePi led_enabled = True orangepi = OrangePi() except ImportError: led_enabled = False logging.info("Error importing OrangePi") # blue: APP # red: hotspot def network_error_light_1(): error_time = datetime.datetime.now() while led_enabled: orangepi.set_led_off('blue') orangepi.set_led_on('red') time.sleep(0.25) orangepi.set_led_on('blue') orangepi.set_led_off('red') time.sleep(0.25) if error_time + datetime.timedelta(seconds=2) < datetime.datetime.now(): break # close_app() def network_error_light_2(): error_time = datetime.datetime.now() while led_enabled: orangepi.set_led_on('green') time.sleep(0.25) orangepi.set_led_off('green') time.sleep(0.25) if error_time + datetime.timedelta(seconds=2) < datetime.datetime.now(): break # close_app() def edit_line_in_file(file_path, search_string, replacement_string): edit = False try: # 读取文件内容 with open(file_path, 'r') as file: lines = file.readlines() # 替换包含特定内容的行 with open(file_path, 'r+') as file: file.seek(0) # 移动到文件开头 file.truncate() # 清空文件内容 for line in lines: if search_string in line: file.write(replacement_string + '\n') edit = True else: file.write(line) logging.info(f"{datetime.datetime.now()}: The line has been edited.") return edit except FileNotFoundError: logging.info(f"The file {file_path} does not exist.") return edit except IOError as e: logging.info(f"An error occurred: {e}") return edit # 获取私有IP地址 def save_local_ip(): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] s.close() # save local_ip to file with open('local_ip.txt', 'w') as f: f.write(local_ip) logging.info(f"{datetime.datetime.now()}: Local IP address: {local_ip}") return local_ip except Exception as e: logging.info(f"Error getting private IP: {e}") return None app = Flask(__name__) def close_app(): if led_enabled: orangepi.set_led_off('blue') # 保存Wi-Fi IP地址 save_local_ip() # 获取当前Flask应用程序的进程ID flask_pid = os.getpid() # 获取所有名为'python3'的进程ID python_pids = [p.info['pid'] for p in psutil.process_iter(attrs=['pid', 'name']) if p.info['name'] == 'python3'] # 关闭Flask应用程序进程 os.kill(flask_pid, signal.SIGTERM) # 关闭Python进程 for pid in python_pids: os.kill(pid, signal.SIGTERM) def start_hotspot(): try: subprocess.Popen('sudo systemctl start hotspot.service', shell=True) if led_enabled: orangepi.set_led_on('red') except subprocess.CalledProcessError as e: logging.info(f"{datetime.datetime.now()}: Error starting create_ap service: {e}") def close_hotspot(): # 关闭热点 try: subprocess.Popen('sudo systemctl stop hotspot.service', shell=True) logging.info(f"{datetime.datetime.now()}: Stopping create_ap service") if led_enabled: orangepi.set_led_off('red') except subprocess.CalledProcessError as e: logging.info(f"{datetime.datetime.now()}: Error stopping hotspot: {e}") def init_networkmanager_file(): file_path = '/etc/NetworkManager/NetworkManager.conf' search_string = 'unmanaged-devices=interface-name:' replacement_string = '' # 如果不需要替换为其他内容,可以设置为空字符串 if edit_line_in_file(file_path, search_string, replacement_string): ''' # 从 /etc/NetworkManager/NetworkManager.conf 文件中移除或注释掉 [keyfile] 部分的 unmanaged-devices 条目,因为它可能阻止了 NetworkManager 管理 wlan0 接口。编辑文件如下: ``` [keyfile] #unmanaged-devices=interface-name:ap0;interface-name:wlan0 ``` sudo systemctl restart NetworkManager ''' # network_error_light_1() logging.info(f"{datetime.datetime.now()}: Wi-Fi NetworkManager Error: restart NetworkManager.") subprocess.Popen('sudo systemctl restart NetworkManager', shell=True) time.sleep(5) # 不得修改该时间,保留足够长时间给NetworkManager重启完成,并连接以往WiFi。 # 检测 Wi-Fi 连接状态 def check_wifi_connection(): cmd = "nmcli dev status" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) output = result.stdout.strip() lines = output.split("\n")[1:] # Skip the header line for line in lines: columns = line.split() logging.info(columns) ''' ['wlan0', 'wifi', 'disconnected', '--'] ['p2p-dev-wlan0', 'wifi-p2p', 'disconnected', '--'] ['eth0', 'ethernet', 'unavailable', '--'] ['lo', 'loopback', 'unmanaged', '--'] ''' if len(columns) >= 4 and columns[2] == "connected": wifi_ssid = columns[3] if led_enabled: orangepi.set_led_off('blue') return True, wifi_ssid return False, None def scan_wifi(): try: subprocess.run(['nmcli', 'dev', 'wifi', 'rescan'], check=True) cmd = "nmcli dev wifi" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) except subprocess.CalledProcessError as e: logging.info(f"{datetime.datetime.now()}: Error scanning Wi-Fi: {e}") network_error_light_1() return [] logging.info(f"{datetime.datetime.now()}: Wi-Fi scan complete...") ssid_list = [] wifi_list = [] output = result.stdout.strip() lines = output.split("\n")[1:] # Skip the header line for line in lines: columns = line.split() ''' logging.info(columns) for i, column in enumerate(columns): logging.info(f"{i}: {column}") ''' # ['94:14:57:15:13:50', 'Meeting', 'MG', 'Infra', '1', '130', 'Mbit/s', '100', '****', 'WPA1', 'WPA2'] # 提取MAC地址 mac_address = columns[0] # 提取Wi-Fi名称 wifi_name = [] for i in range(1, len(columns)): if columns[i] == 'Infra': _wifi_name = ' '.join(columns[1:i]) wifi_name.append(_wifi_name) columns = columns[i+1:] break ssid = wifi_name[0] if ':' in ssid: # 94:14:57:15:13:50 Meeting # 去除mac地址 ssid = ssid.split(' ')[1:] ssid = ''.join(ssid) if ssid in ssid_list: continue ssid_list.append(ssid) # 提取强度 strength = None for i in range(len(columns)): if columns[i] == 'Mbit/s': strength = columns[i+1] break # logging.info("MAC地址:", mac_address) # logging.info("Wi-Fi名称:", ssid) # logging.info("强度:", strength) wifi_list.append({'ssid': ssid, 'signal': strength, 'mac': mac_address}) if len(wifi_list) == 15: break with open('scaned_wifi_list.json', 'w') as f: json.dump(wifi_list, f) logging.info(f"{datetime.datetime.now()}: Wi-Fi scaned list: {wifi_list}") return wifi_list # 连接 Wi-Fi def connect_wifi(ssid, password): # 连接到用户选择的 Wi-Fi 网络 try: output = subprocess.check_output(['nmcli', 'dev', 'wifi', 'connect', ssid, 'password', password, 'autoconnect', 'yes']) output_str = output.decode('utf-8') # 将输出转换为字符串 if "successfully" in output_str: logging.info(f"{datetime.datetime.now()}: Successfully connected to Wi-Fi: {ssid}") save_wifi(ssid, password) # 保存连接成功的Wi-Fi信息 return True else: logging.info(f"{datetime.datetime.now()}: Error connecting to Wi-Fi: {output_str}") return False except subprocess.CalledProcessError as e: logging.info(f"{datetime.datetime.now()}: Error connecting to Wi-Fi: {e}") return False # 关闭 Wi-Fi def disconnect_wifi(): try: output = subprocess.check_output(['nmcli', 'dev', 'disconnect', 'iface', 'wlan0']) output_str = output.decode('utf-8') # 将输出转换为字符串 if "successfully disconnected" in output_str: logging.info(f"{datetime.datetime.now()}: Wi-Fi disconnected successfully") else: logging.info(f"{datetime.datetime.now()}: Error disconnecting from Wi-Fi: {output_str}") except Exception as e: logging.info(f"{datetime.datetime.now()}: Error disconnecting from Wi-Fi: {e}") def connect_saved_wifi(scaned_wifi_list): wifi_list = load_saved_wifi() if wifi_list == []: return False, None for wifi in wifi_list: if wifi['ssid'] in [item['ssid'] for item in scaned_wifi_list]: if connect_wifi(wifi['ssid'], wifi['password']): return True, wifi['ssid'] return check_wifi_connection() def load_saved_wifi(): try: with open('wifi_list.json', 'r') as f: wifi_list = json.load(f) return wifi_list except: return [] def save_wifi(ssid, password): wifi_list = load_saved_wifi() if ssid not in [item['ssid'] for item in wifi_list]: wifi_list.append({'ssid': ssid, 'password': password}) with open('wifi_list.json', 'w') as f: json.dump(wifi_list, f) # 主页 @app.route('/') def index(): global wifi_list response = make_response(render_template('index.html', wifi_list=wifi_list)) response.headers.set('Content-Type', 'text/html') response.headers.set('Apple-Web-App-Capable', 'yes') response.headers.set('Apple-Mobile-Web-App-Status-Bar-Style', 'black-translucent') return response # 提交 Wi-Fi 信息 @app.route('/submit', methods=['POST']) def submit(): if led_enabled: orangepi.set_led_on('red') ssid = request.form['ssid'] password = request.form['password'] logging.info(f"{datetime.datetime.now()}: Connecting to Wi-Fi: {ssid} with password {password}") # 关闭热点 close_hotspot() time.sleep(0.5) scan_wifi() time.sleep(0.5) # 连接到用户选择的 Wi-Fi 网络 if connect_wifi(ssid, password): close_app() connected, wifi_ssid = check_wifi_connection() if not connected: logging.info(f"{datetime.datetime.now()}: Wi-Fi连接失败。") wifi_list = scan_wifi() start_hotspot() else: save_wifi(ssid, password) # 保存连接成功的Wi-Fi信息 return redirect(url_for('index')) if __name__ == '__main__': debug_mode = False # 设置为 True 以跳过 Wi-Fi 连接状态检测 if debug_mode: disconnect_wifi() if led_enabled: orangepi.set_led_on('blue') init_networkmanager_file() # 初始化 NetworkManager 配置文件 connected, wifi_ssid = check_wifi_connection() if connected: logging.info(f"{datetime.datetime.now()}: 系统已自动连接到 Wi-Fi 网络,退出程序") close_app() # 扫描Wi-Fi wifi_list = scan_wifi() # 连接保存的Wi-Fi connected, wifi_ssid = connect_saved_wifi(wifi_list) if connected: logging.info(f"{datetime.datetime.now()}: 系统已自动连接到 Wi-Fi 网络 {wifi_ssid},退出程序") close_app() logging.info(f"{datetime.datetime.now()}: 未连接到 Wi-Fi 网络, 开始热点模式") start_hotspot() app.run(host='0.0.0.0', port=80)