wifi_hotpot/wifi_manager.py

369 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import re
import os
import psutil
import signal
import time
import datetime
import json
from flask import Flask, render_template, request, redirect, url_for, make_response
import socket
import logging
logging.basicConfig(
filename='app.log', # 日志文件的名称
filemode='a', # 'a'代表追加模式,如果文件存在则追加内容
level=logging.DEBUG, # 日志级别DEBUG是最低的级别会记录所有级别的日志
format='%(asctime)s - %(levelname)s - %(message)s' # 日志的格式
)
try:
from takway.board import OrangePi
led_enabled = True
orangepi = OrangePi()
except ImportError:
led_enabled = False
logging.info("Error importing OrangePi")
# blue: APP
# red: hotspot
def network_error_light_1():
error_time = datetime.datetime.now()
while led_enabled:
orangepi.set_led_off('blue')
orangepi.set_led_on('red')
time.sleep(0.25)
orangepi.set_led_on('blue')
orangepi.set_led_off('red')
time.sleep(0.25)
if error_time + datetime.timedelta(seconds=2) < datetime.datetime.now():
break
# close_app()
def network_error_light_2():
error_time = datetime.datetime.now()
while led_enabled:
orangepi.set_led_on('green')
time.sleep(0.25)
orangepi.set_led_off('green')
time.sleep(0.25)
if error_time + datetime.timedelta(seconds=2) < datetime.datetime.now():
break
# close_app()
def edit_line_in_file(file_path, search_string, replacement_string):
edit = False
try:
# 读取文件内容
with open(file_path, 'r') as file:
lines = file.readlines()
# 替换包含特定内容的行
with open(file_path, 'r+') as file:
file.seek(0) # 移动到文件开头
file.truncate() # 清空文件内容
for line in lines:
if search_string in line:
file.write(replacement_string + '\n')
edit = True
logging.info(f"{datetime.datetime.now()}: The line has been edited. \noriginal line: {line}, \nnew line: {replacement_string}")
else:
file.write(line)
return edit
except FileNotFoundError:
logging.info(f"The file {file_path} does not exist.")
return edit
except IOError as e:
logging.info(f"An error occurred: {e}")
return edit
# 获取私有IP地址
def save_local_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
# save local_ip to file
with open('local_ip.txt', 'w') as f:
f.write(local_ip)
logging.info(f"{datetime.datetime.now()}: Local IP address: {local_ip}")
return local_ip
except Exception as e:
logging.info(f"Error getting private IP: {e}")
return None
app = Flask(__name__)
def close_app():
if led_enabled:
orangepi.set_led_off('blue')
# 保存Wi-Fi IP地址
save_local_ip()
logging.info(f"{datetime.datetime.now()}: Exit Wi-Fi hotspot...")
# 获取当前Flask应用程序的进程ID
flask_pid = os.getpid()
# 获取所有名为'python3'的进程ID
python_pids = [p.info['pid'] for p in psutil.process_iter(attrs=['pid', 'name']) if p.info['name'] == 'python3']
# 关闭Flask应用程序进程
os.kill(flask_pid, signal.SIGTERM)
# 关闭Python进程
for pid in python_pids:
os.kill(pid, signal.SIGTERM)
def start_hotspot():
try:
subprocess.Popen('sudo systemctl start hotspot.service', shell=True)
if led_enabled:
orangepi.set_led_on('red')
except subprocess.CalledProcessError as e:
logging.info(f"{datetime.datetime.now()}: Error starting create_ap service: {e}")
def close_hotspot():
# 关闭热点
try:
subprocess.Popen('sudo systemctl stop hotspot.service', shell=True)
logging.info(f"{datetime.datetime.now()}: Stopping create_ap service")
if led_enabled:
orangepi.set_led_off('red')
except subprocess.CalledProcessError as e:
logging.info(f"{datetime.datetime.now()}: Error stopping hotspot: {e}")
def init_networkmanager_file():
file_path = '/etc/NetworkManager/NetworkManager.conf'
search_string = 'unmanaged-devices=interface-name:'
replacement_string = '' # 如果不需要替换为其他内容,可以设置为空字符串
if edit_line_in_file(file_path, search_string, replacement_string):
'''
# 从 /etc/NetworkManager/NetworkManager.conf 文件中移除或注释掉 [keyfile] 部分的 unmanaged-devices 条目,因为它可能阻止了 NetworkManager 管理 wlan0 接口。编辑文件如下:
```
[keyfile]
#unmanaged-devices=interface-name:ap0;interface-name:wlan0
```
sudo systemctl restart NetworkManager
'''
# network_error_light_1()
logging.info(f"{datetime.datetime.now()}: Wi-Fi NetworkManager Error: restart NetworkManager.")
subprocess.Popen('sudo systemctl restart NetworkManager', shell=True)
time.sleep(5) # 不得修改该时间保留足够长时间给NetworkManager重启完成并连接以往WiFi。
# 检测 Wi-Fi 连接状态
def check_wifi_connection():
cmd = "nmcli dev status"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
output = result.stdout.strip()
lines = output.split("\n")[1:] # Skip the header line
for line in lines:
columns = line.split()
logging.info(columns)
'''
['wlan0', 'wifi', 'disconnected', '--']
['p2p-dev-wlan0', 'wifi-p2p', 'disconnected', '--']
['eth0', 'ethernet', 'unavailable', '--']
['lo', 'loopback', 'unmanaged', '--']
'''
if len(columns) >= 4 and columns[2] == "connected":
wifi_ssid = columns[3]
if led_enabled:
orangepi.set_led_off('blue')
return True, wifi_ssid
return False, None
def scan_wifi():
try:
subprocess.run(['nmcli', 'dev', 'wifi', 'rescan'], check=True)
cmd = "nmcli dev wifi"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
logging.info(f"{datetime.datetime.now()}: Error scanning Wi-Fi: {e}")
network_error_light_1()
return []
logging.info(f"{datetime.datetime.now()}: Wi-Fi scan complete...")
ssid_list = []
wifi_list = []
output = result.stdout.strip()
lines = output.split("\n")[1:] # Skip the header line
for line in lines:
columns = line.split()
'''
logging.info(columns)
for i, column in enumerate(columns):
logging.info(f"{i}: {column}")
'''
# ['94:14:57:15:13:50', 'Meeting', 'MG', 'Infra', '1', '130', 'Mbit/s', '100', '****', 'WPA1', 'WPA2']
# 提取MAC地址
mac_address = columns[0]
# 提取Wi-Fi名称
wifi_name = []
for i in range(1, len(columns)):
if columns[i] == 'Infra':
_wifi_name = ' '.join(columns[1:i])
wifi_name.append(_wifi_name)
columns = columns[i+1:]
break
ssid = wifi_name[0]
if ':' in ssid:
# 94:14:57:15:13:50 Meeting
# 去除mac地址
ssid = ssid.split(' ')[1:]
ssid = ''.join(ssid)
if ssid in ssid_list:
continue
ssid_list.append(ssid)
# 提取强度
strength = None
for i in range(len(columns)):
if columns[i] == 'Mbit/s':
strength = columns[i+1]
break
# logging.info("MAC地址:", mac_address)
# logging.info("Wi-Fi名称:", ssid)
# logging.info("强度:", strength)
wifi_list.append({'ssid': ssid, 'signal': strength, 'mac': mac_address})
if len(wifi_list) == 15:
break
with open('scaned_wifi_list.json', 'w') as f:
json.dump(wifi_list, f)
logging.info(f"{datetime.datetime.now()}: Wi-Fi scaned list: {wifi_list}")
return wifi_list
# 连接 Wi-Fi
def connect_wifi(ssid, password):
# 连接到用户选择的 Wi-Fi 网络
try:
output = subprocess.check_output(['nmcli', 'dev', 'wifi', 'connect', ssid, 'password', password, 'autoconnect', 'yes'])
output_str = output.decode('utf-8') # 将输出转换为字符串
if "successfully" in output_str:
logging.info(f"{datetime.datetime.now()}: Successfully connected to Wi-Fi: {ssid}")
save_wifi(ssid, password) # 保存连接成功的Wi-Fi信息
return True
else:
logging.info(f"{datetime.datetime.now()}: Error connecting to Wi-Fi: {output_str}")
return False
except subprocess.CalledProcessError as e:
logging.info(f"{datetime.datetime.now()}: Error connecting to Wi-Fi: {e}")
return False
# 关闭 Wi-Fi
def disconnect_wifi():
try:
output = subprocess.check_output(['nmcli', 'dev', 'disconnect', 'iface', 'wlan0'])
output_str = output.decode('utf-8') # 将输出转换为字符串
if "successfully disconnected" in output_str:
logging.info(f"{datetime.datetime.now()}: Wi-Fi disconnected successfully")
else:
logging.info(f"{datetime.datetime.now()}: Error disconnecting from Wi-Fi: {output_str}")
except Exception as e:
logging.info(f"{datetime.datetime.now()}: Error disconnecting from Wi-Fi: {e}")
def connect_saved_wifi(scaned_wifi_list):
wifi_list = load_saved_wifi()
if wifi_list == []:
return False, None
for wifi in wifi_list:
if wifi['ssid'] in [item['ssid'] for item in scaned_wifi_list]:
if connect_wifi(wifi['ssid'], wifi['password']):
return True, wifi['ssid']
return check_wifi_connection()
def load_saved_wifi():
try:
with open('wifi_list.json', 'r') as f:
wifi_list = json.load(f)
return wifi_list
except:
return []
def save_wifi(ssid, password):
wifi_list = load_saved_wifi()
if ssid not in [item['ssid'] for item in wifi_list]:
wifi_list.append({'ssid': ssid, 'password': password})
with open('wifi_list.json', 'w') as f:
json.dump(wifi_list, f)
# 主页
@app.route('/')
def index():
global wifi_list
response = make_response(render_template('index.html', wifi_list=wifi_list))
response.headers.set('Content-Type', 'text/html')
response.headers.set('Apple-Web-App-Capable', 'yes')
response.headers.set('Apple-Mobile-Web-App-Status-Bar-Style', 'black-translucent')
return response
# 提交 Wi-Fi 信息
@app.route('/submit', methods=['POST'])
def submit():
if led_enabled:
orangepi.set_led_on('red')
ssid = request.form['ssid']
password = request.form['password']
logging.info(f"{datetime.datetime.now()}: Connecting to Wi-Fi: {ssid} with password {password}")
# 关闭热点
close_hotspot()
time.sleep(0.5)
scan_wifi()
time.sleep(0.5)
# 连接到用户选择的 Wi-Fi 网络
if connect_wifi(ssid, password):
close_app()
connected, wifi_ssid = check_wifi_connection()
if not connected:
logging.info(f"{datetime.datetime.now()}: Wi-Fi连接失败。")
wifi_list = scan_wifi()
start_hotspot()
else:
save_wifi(ssid, password) # 保存连接成功的Wi-Fi信息
return redirect(url_for('index'))
if __name__ == '__main__':
debug_mode = False # 设置为 True 以跳过 Wi-Fi 连接状态检测
if debug_mode:
disconnect_wifi()
if led_enabled:
orangepi.set_led_on('blue')
init_networkmanager_file() # 初始化 NetworkManager 配置文件
connected, wifi_ssid = check_wifi_connection()
if connected:
logging.info(f"{datetime.datetime.now()}: 系统已自动连接到 Wi-Fi 网络,退出程序")
close_app()
# 扫描Wi-Fi
wifi_list = scan_wifi()
# 连接保存的Wi-Fi
connected, wifi_ssid = connect_saved_wifi(wifi_list)
if connected:
logging.info(f"{datetime.datetime.now()}: 系统已自动连接到 Wi-Fi 网络 {wifi_ssid},退出程序")
close_app()
logging.info(f"{datetime.datetime.now()}: 未连接到 Wi-Fi 网络, 开始热点模式")
start_hotspot()
app.run(host='0.0.0.0', port=80)