wifi_hotpot/wifi_manager.py

261 lines
8.1 KiB
Python

import subprocess
import re
import os
import psutil
import signal
import time
import datetime
from flask import Flask, render_template, request, redirect, url_for, make_response
try:
from takway.board import OrangePi
led_enabled = True
orangepi = OrangePi()
except ImportError:
led_enabled = False
print("Error importing OrangePi")
app = Flask(__name__)
def close_app():
if led_enabled:
orangepi.set_led_off('red')
# 获取当前Flask应用程序的进程ID
flask_pid = os.getpid()
# 获取所有名为'python3'的进程ID
python_pids = [p.info['pid'] for p in psutil.process_iter(attrs=['pid', 'name']) if p.info['name'] == 'python3']
# 关闭Flask应用程序进程
os.kill(flask_pid, signal.SIGTERM)
# 关闭Python进程
for pid in python_pids:
os.kill(pid, signal.SIGTERM)
def start_hotspot():
try:
subprocess.Popen('sudo systemctl start hotspot.service', shell=True)
if led_enabled:
orangepi.set_led_on('blue')
except subprocess.CalledProcessError as e:
print(f"{datetime.datetime.now()}: Error starting create_ap service: {e}")
def close_hotspot():
# 关闭热点
try:
subprocess.Popen('sudo systemctl stop hotspot.service', shell=True)
print(f"{datetime.datetime.now()}: Stopping create_ap service")
except subprocess.CalledProcessError as e:
print(f"{datetime.datetime.now()}: Error stopping hotspot: {e}")
# 检测 Wi-Fi 连接状态
def check_wifi_connection():
cmd = "nmcli dev status"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
time.sleep(1)
output = result.stdout.strip()
lines = output.split("\n")[1:] # Skip the header line
for line in lines:
columns = line.split()
print(columns)
if len(columns) >= 4 and columns[2] == "connected":
wifi_ssid = columns[3]
if led_enabled:
orangepi.set_led_off('blue')
return True, wifi_ssid
return False
def scan_wifi():
subprocess.run(['nmcli', 'dev', 'wifi', 'rescan'], check=True)
cmd = "nmcli dev wifi"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
wifi_list = []
output = result.stdout.strip()
lines = output.split("\n")[1:] # Skip the header line
for line in lines:
columns = line.split()
'''
print(columns)
for i, column in enumerate(columns):
print(f"{i}: {column}")
'''
# ['94:14:57:15:13:50', 'Meeting', 'MG', 'Infra', '1', '130', 'Mbit/s', '100', '****', 'WPA1', 'WPA2']
# 提取MAC地址
mac_address = columns[0]
# 提取Wi-Fi名称
wifi_names = []
for i in range(1, len(columns)):
if columns[i] == 'Infra':
wifi_name = ' '.join(columns[1:i])
wifi_names.append(wifi_name)
columns = columns[i+1:]
break
# 提取强度
strength = None
for i in range(len(columns)):
if columns[i] == 'Mbit/s':
strength = columns[i+1]
break
print("MAC地址:", mac_address)
print("Wi-Fi名称:", ' '.join(wifi_names))
print("强度:", strength)
wifi_list.append({'ssid': wifi_names, 'signal': strength, 'mac': mac_address})
if len(wifi_list) == 15:
break
# save wifi_list to file
with open('wifi_list.txt', 'w') as f:
for wifi in wifi_list:
f.write(f"{wifi['ssid']},{wifi['signal']}\n")
return wifi_list
'''
def scan_wifi():
subprocess.run(['nmcli', 'dev', 'wifi', 'rescan'], check=True)
output = subprocess.check_output(['nmcli', 'dev', 'wifi'])
wifi_list = []
lines = output.decode().splitlines()[1:]
for idx, line in enumerate(lines):
print(f"{line}")
for i, column in enumerate(line):
print(f"{i}: {column}")
close_app()
# 分割字符串以空格为分隔符
columns = line.split()
# 提取第一列和第三列的值
mac_address = columns[0]
ssid = " ".join(columns[1:3])
if "Infra" in ssid:
ssid = ssid.replace(" Infra", "")
if ":" in ssid:
# "72:A6:CC:8C:89:6C Takway-AI"
# 去除字符串中的MAC地址
ssid = re.sub(r'\b\w{2}(:\w{2}){5}\b', '', ssid)
# 提取第六列的强度值
last_info = columns[4:8]
# 去除所有的空格
columns = line.replace(" ", "")
# 提取"Mbit/s"后的三位以内数字
strength = int(columns[columns.index("Mbit/s")+6:columns.index("Mbit/s")+8])
# print("MAC地址:", mac_address)
# print("SSID:", ssid)
# print("强度:", strength)
wifi_list.append({'ssid': ssid, 'signal': strength})
if len(wifi_list) == 15:
break
# save wifi_list to file
with open('wifi_list.txt', 'w') as f:
for wifi in wifi_list:
f.write(f"{wifi['ssid']},{wifi['signal']}\n")
return wifi_list
'''
# 连接 Wi-Fi
def connect_wifi(ssid, password):
# 连接到用户选择的 Wi-Fi 网络
try:
output = subprocess.check_output(['nmcli', 'dev', 'wifi', 'connect', ssid, 'password', password])
output_str = output.decode('utf-8') # 将输出转换为字符串
if "successfully" in output_str:
print(f"{datetime.datetime.now()}: Successfully connected to Wi-Fi: {ssid}")
return True
else:
print(f"{datetime.datetime.now()}: Error connecting to Wi-Fi: {output_str}")
return False
except subprocess.CalledProcessError as e:
print(f"{datetime.datetime.now()}: Error connecting to Wi-Fi: {e}")
return False
# 关闭 Wi-Fi
def disconnect_wifi():
try:
output = subprocess.check_output(['nmcli', 'dev', 'disconnect', 'iface', 'wlan0'])
time.sleep(1)
output_str = output.decode('utf-8') # 将输出转换为字符串
if "successfully disconnected" in output_str:
print(f"{datetime.datetime.now()}: Wi-Fi disconnected successfully")
else:
print(f"{datetime.datetime.now()}: Error disconnecting from Wi-Fi: {output_str}")
except Exception as e:
print(f"{datetime.datetime.now()}: Error disconnecting from Wi-Fi: {e}")
# 主页
@app.route('/')
def index():
global wifi_list
response = make_response(render_template('index.html', wifi_list=wifi_list))
response.headers.set('Content-Type', 'text/html')
response.headers.set('Apple-Web-App-Capable', 'yes')
response.headers.set('Apple-Mobile-Web-App-Status-Bar-Style', 'black-translucent')
return response
# 提交 Wi-Fi 信息
@app.route('/submit', methods=['POST'])
def submit():
if led_enabled:
orangepi.set_led_on('red')
ssid = request.form['ssid']
password = request.form['password']
print(f"{datetime.datetime.now()}: Connecting to Wi-Fi: {ssid} with password {password}")
connected = connect_wifi(ssid, password)
time.sleep(5)
# 检查是否成功连接Wi-Fi
if check_wifi_connection() and connected:
# connected successfully, close flask
close_app()
else:
print(f"{datetime.datetime.now()}: Wi-Fi连接失败。")
restart_hotspot()
return redirect(url_for('index'))
if __name__ == '__main__':
debug_mode = True # 设置为 True 以跳过 Wi-Fi 连接状态检测
if debug_mode:
disconnect_wifi()
print(scan_wifi())
'''
if led_enabled:
orangepi.set_led_on('blue')
if check_wifi_connection():
print(f"{datetime.datetime.now()}: 系统已自动连接到 Wi-Fi 网络,退出程序")
close_app()
else:
print(f"{datetime.datetime.now()}: 未连接到 Wi-Fi 网络")
print(f"{datetime.datetime.now()}: Starting Flask server")
print("----------------------------")
wifi_list = scan_wifi()
print("----------------------------")
restart_hotspot()
# 启动 Flask 服务器
app.run(host='0.0.0.0', port=80)
'''