wifi_hotpot/wifi_manager.py

208 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 逻辑:
# 1. 检测 Wi-Fi 连接状态,若已连接,则退出程序
# 2. 扫描 Wi-Fi 网络并获取Wi-Fi列表
# 3. 关闭Wi-Fi并启动热点和web服务器
# 4. 等待用户选择 Wi-Fi 网络并输入密码
# 5. 关闭热点并重新扫描 Wi-Fi连接到用户选择的 Wi-Fi 网络
import subprocess
import re
import os
import psutil
import signal
import time
import datetime
from flask import Flask, render_template, request, redirect, url_for, make_response
try:
from takway.board import OrangePi
led_enabled = True
orangepi = OrangePi()
except ImportError:
led_enabled = False
print("Error importing OrangePi")
def close_app():
# 获取当前Flask应用程序的进程ID
flask_pid = os.getpid()
# 获取所有名为'python3'的进程ID
python_pids = [p.info['pid'] for p in psutil.process_iter(attrs=['pid', 'name']) if p.info['name'] == 'python3']
# 关闭Flask应用程序进程
os.kill(flask_pid, signal.SIGTERM)
# 关闭Python进程
for pid in python_pids:
os.kill(pid, signal.SIGTERM)
app = Flask(__name__)
# 检测 Wi-Fi 连接状态
def check_wifi_connection():
cmd = "nmcli dev status"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
time.sleep(1)
output = result.stdout.strip()
lines = output.split("\n")[1:] # Skip the header line
for line in lines:
columns = line.split()
print(columns)
if len(columns) >= 4 and columns[2] == "connected":
orangepi.set_led_off('blue')
return True
return False
def restart_hotspot():
try:
# sudo systemctl stop dnsmasq
# sudo systemctl restart NetworkManager
# sudo nmcli radio wifi off
# sudo nmcli radio wifi on
# nohup sudo create_ap wlan0 eth0 Takway-Toys --no-virt > create_ap.log 2>&1 &
subprocess.run(['sudo', 'systemctl', 'stop', 'dnsmasq'], check=True)
subprocess.run(['sudo', 'systemctl', 'restart', 'NetworkManager'], check=True)
subprocess.run(['sudo', 'nmcli', 'radio', 'wifi', 'off'], check=True)
subprocess.run(['sudo', 'nmcli', 'radio', 'wifi', 'on'], check=True)
subprocess.Popen('sudo nohup create_ap wlan0 eth0 Takway-Toys --no-virt > create_ap.log 2>&1 &', shell=True)
orangepi.set_led_on('blue')
except subprocess.CalledProcessError as e:
print(f"{datetime.datetime.now()} Error stopping create_ap hotspot service: {e}")
def scan_wifi():
try:
subprocess.run(['nmcli', 'dev', 'wifi', 'rescan'], check=True)
time.sleep(1)
output = subprocess.check_output(['nmcli', 'dev', 'wifi'])
wifi_list = []
lines = output.decode().splitlines()
for idx, line in enumerate(lines[1:]):
print(f"{idx}: {line}")
# 分割字符串以空格为分隔符
columns = line.split()
# 提取第一列和第三列的值
mac_address = columns[0]
ssid = " ".join(columns[1:3])
if "Infra" in ssid:
ssid = ssid.replace(" Infra", "")
if ":" in ssid:
# "72:A6:CC:8C:89:6C Takway-AI"
# 去除字符串中的MAC地址
ssid = re.sub(r'\b\w{2}(:\w{2}){5}\b', '', ssid)
# 提取第六列的强度值
last_info = columns[4:8]
# 去除所有的空格
columns = line.replace(" ", "")
# 提取"Mbit/s"后的三位以内数字
strength = int(columns[columns.index("Mbit/s")+6:columns.index("Mbit/s")+8])
# print("MAC地址:", mac_address)
# print("SSID:", ssid)
# print("强度:", strength)
wifi_list.append({'ssid': ssid, 'signal': strength})
if len(wifi_list) == 15:
break
# save wifi_list to file
with open('wifi_list.txt', 'w') as f:
for wifi in wifi_list:
f.write(f"{wifi['ssid']},{wifi['signal']}\n")
return wifi_list
except subprocess.CalledProcessError as e:
print(f"{datetime.datetime.now()}: Error scanning for Wi-Fi networks: {e}")
return []
# 连接 Wi-Fi
def connect_wifi(ssid, password):
# 关闭热点
try:
subprocess.run(['sudo', 'create_ap', '--stop', 'wlan0'], check=True)
print(f"{datetime.datetime.now()}: Stopping create_ap service")
except subprocess.CalledProcessError as e:
print(f"{datetime.datetime.now()}: Error stopping hotspot: {e}")
time.sleep(1)
# 扫描 Wi-Fi 网络
# scan_wifi()
time.sleep(1)
# 连接到用户选择的 Wi-Fi 网络
try:
subprocess.run(['nmcli', 'dev', 'wifi', 'connect', ssid, 'password', password], check=True)
print(f"{datetime.datetime.now()}: Successfully connected to Wi-Fi: {ssid}")
return True
except subprocess.CalledProcessError as e:
print(f"{datetime.datetime.now()}: Error connecting to Wi-Fi: {e}")
return False
# 主页
@app.route('/')
def index():
global wifi_list
print(wifi_list)
response = make_response(render_template('index.html', wifi_list=wifi_list))
response.headers.set('Content-Type', 'text/html')
response.headers.set('Apple-Web-App-Capable', 'yes')
response.headers.set('Apple-Mobile-Web-App-Status-Bar-Style', 'black-translucent')
return response
# 提交 Wi-Fi 信息
@app.route('/submit', methods=['POST'])
def submit():
orangepi.set_led_on('red')
ssid = request.form['ssid']
password = request.form['password']
print(f"{datetime.datetime.now()}: Connecting to Wi-Fi: {ssid} with password {password}")
connected = connect_wifi(ssid, password)
time.sleep(5)
# 检查是否成功连接Wi-Fi
if check_wifi_connection() and connected:
orangepi.set_led_off('red')
# connected successfully, close flask
close_app()
else:
print(f"{datetime.datetime.now()}: Wi-Fi连接失败。")
restart_hotspot()
return redirect(url_for('index'))
if __name__ == '__main__':
debug_mode = False # 设置为 True 以跳过 Wi-Fi 连接状态检测
orangepi.set_led_on('blue')
if not debug_mode:
time.sleep(5) # 等待 Wi-Fi 连接
if check_wifi_connection():
print(f"{datetime.datetime.now()}: 系统已自动连接到 Wi-Fi 网络,退出程序")
close_app()
else:
print(f"{datetime.datetime.now()}: 未连接到 Wi-Fi 网络")
print(f"{datetime.datetime.now()}: Starting Flask server")
print("----------------------------")
wifi_list = scan_wifi()
print("----------------------------")
restart_hotspot()
# 启动 Flask 服务器
app.run(host='0.0.0.0', port=80)