510 lines
17 KiB
Python
510 lines
17 KiB
Python
import subprocess
|
||
import re
|
||
import os
|
||
import psutil
|
||
import signal
|
||
import time
|
||
import datetime
|
||
import json
|
||
from flask import Flask, render_template, request, redirect, url_for, make_response
|
||
import socket
|
||
|
||
import logging
|
||
|
||
logging.basicConfig(
|
||
filename='app.log', # 日志文件的名称
|
||
filemode='a', # 'a'代表追加模式,如果文件存在则追加内容
|
||
level=logging.DEBUG, # 日志级别,DEBUG是最低的级别,会记录所有级别的日志
|
||
format='%(asctime)s - %(levelname)s - %(message)s' # 日志的格式
|
||
)
|
||
|
||
wifi_credential_file = '/home/orangepi/.config/wifi_credentials.json'
|
||
|
||
try:
|
||
from takway.board import OrangePi
|
||
led_enabled = True
|
||
orangepi = OrangePi()
|
||
except ImportError:
|
||
led_enabled = False
|
||
logging.info("Error importing OrangePi")
|
||
|
||
# blue: APP
|
||
# red: hotspot
|
||
|
||
def network_error_light_1():
|
||
error_time = datetime.datetime.now()
|
||
while led_enabled:
|
||
orangepi.set_led_off('blue')
|
||
orangepi.set_led_on('red')
|
||
time.sleep(0.25)
|
||
orangepi.set_led_on('blue')
|
||
orangepi.set_led_off('red')
|
||
time.sleep(0.25)
|
||
if error_time + datetime.timedelta(seconds=2) < datetime.datetime.now():
|
||
break
|
||
# close_app()
|
||
|
||
def network_error_light_2():
|
||
error_time = datetime.datetime.now()
|
||
while led_enabled:
|
||
orangepi.set_led_on('green')
|
||
time.sleep(0.25)
|
||
orangepi.set_led_off('green')
|
||
time.sleep(0.25)
|
||
if error_time + datetime.timedelta(seconds=2) < datetime.datetime.now():
|
||
break
|
||
# close_app()
|
||
|
||
def edit_line_in_file(file_path, search_string, replacement_string):
|
||
edit = False
|
||
try:
|
||
# 读取文件内容
|
||
with open(file_path, 'r') as file:
|
||
lines = file.readlines()
|
||
|
||
# 替换包含特定内容的行
|
||
with open(file_path, 'r+') as file:
|
||
file.seek(0) # 移动到文件开头
|
||
file.truncate() # 清空文件内容
|
||
for line in lines:
|
||
if search_string in line:
|
||
file.write(replacement_string + '\n')
|
||
edit = True
|
||
logging.info(f"The line has been edited. \noriginal line: {line}, \nnew line: {replacement_string}")
|
||
else:
|
||
file.write(line)
|
||
return edit
|
||
except FileNotFoundError:
|
||
logging.info(f"The file {file_path} does not exist.")
|
||
return edit
|
||
except IOError as e:
|
||
logging.info(f"An error occurred: {e}")
|
||
return edit
|
||
|
||
# 获取私有IP地址
|
||
def save_local_ip():
|
||
try:
|
||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
s.connect(("8.8.8.8", 80))
|
||
local_ip = s.getsockname()[0]
|
||
s.close()
|
||
# save local_ip to file
|
||
with open('local_ip.txt', 'w') as f:
|
||
f.write(local_ip)
|
||
logging.info(f"Local IP address: {local_ip}")
|
||
return local_ip
|
||
except Exception as e:
|
||
logging.info(f"Error getting private IP: {e}")
|
||
return None
|
||
|
||
def edit_dns():
|
||
# 编辑 /etc/resolv.conf 文件,添加 DNS 服务器
|
||
# nameserver 8.8.8.8
|
||
# nameserver 8.8.4.4
|
||
# 读取文件内容
|
||
dns_exist = False
|
||
with open('/etc/resolv.conf', 'r+') as file:
|
||
lines = file.readlines()
|
||
for line in lines:
|
||
if 'nameserver' in line:
|
||
dns_exist = True
|
||
if not dns_exist:
|
||
file.write('nameserver 8.8.8.8' + '\n')
|
||
file.write('nameserver 8.8.4.4' + '\n')
|
||
logging.info(f"DNS server added.")
|
||
else:
|
||
logging.info(f"DNS server already exist.")
|
||
|
||
app = Flask(__name__)
|
||
|
||
def close_app():
|
||
if led_enabled:
|
||
orangepi.set_led_off('blue')
|
||
|
||
# 保存Wi-Fi IP地址
|
||
save_local_ip()
|
||
|
||
logging.info(f"Exit Wi-Fi hotspot...")
|
||
|
||
# 获取当前Flask应用程序的进程ID
|
||
flask_pid = os.getpid()
|
||
|
||
# 获取所有名为'python3'的进程ID
|
||
python_pids = [p.info['pid'] for p in psutil.process_iter(attrs=['pid', 'name']) if p.info['name'] == 'python3']
|
||
|
||
# 关闭Flask应用程序进程
|
||
os.kill(flask_pid, signal.SIGTERM)
|
||
|
||
# 关闭Python进程
|
||
for pid in python_pids:
|
||
os.kill(pid, signal.SIGTERM)
|
||
|
||
def start_hotspot():
|
||
try:
|
||
subprocess.Popen('sudo systemctl start hotspot.service', shell=True)
|
||
if led_enabled:
|
||
orangepi.set_led_on('red')
|
||
logging.info("Start hotspot.service.")
|
||
except subprocess.CalledProcessError as e:
|
||
logging.info(f"Error starting create_ap service: {e}")
|
||
|
||
def close_hotspot():
|
||
# 关闭热点
|
||
try:
|
||
subprocess.Popen('sudo systemctl stop hotspot.service', shell=True)
|
||
logging.info(f"Stopping hotspot.service")
|
||
if led_enabled:
|
||
orangepi.set_led_off('red')
|
||
except subprocess.CalledProcessError as e:
|
||
logging.info(f"Error stopping hotspot.service: {e}")
|
||
|
||
|
||
def init_networkmanager_file():
|
||
file_path = '/etc/NetworkManager/NetworkManager.conf'
|
||
search_string = 'unmanaged-devices=interface-name:'
|
||
replacement_string = '' # 如果不需要替换为其他内容,可以设置为空字符串
|
||
if edit_line_in_file(file_path, search_string, replacement_string):
|
||
'''
|
||
# 从 /etc/NetworkManager/NetworkManager.conf 文件中移除或注释掉 [keyfile] 部分的 unmanaged-devices 条目,因为它可能阻止了 NetworkManager 管理 wlan0 接口。编辑文件如下:
|
||
```
|
||
[keyfile]
|
||
#unmanaged-devices=interface-name:ap0;interface-name:wlan0
|
||
```
|
||
sudo systemctl restart NetworkManager
|
||
'''
|
||
# network_error_light_1()
|
||
logging.info(f"Wi-Fi NetworkManager Error: restart NetworkManager.")
|
||
subprocess.Popen('sudo systemctl restart NetworkManager', shell=True)
|
||
time.sleep(2) # 不得修改该时间,保留足够长时间给NetworkManager重启完成,并连接以往WiFi。
|
||
|
||
# 检测 Wi-Fi 连接状态
|
||
def check_wifi_connection():
|
||
cmd = "nmcli dev status"
|
||
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
||
output = result.stdout.strip()
|
||
lines = output.split("\n")[1:] # Skip the header line
|
||
for line in lines:
|
||
columns = line.split()
|
||
logging.info(columns)
|
||
'''
|
||
['wlan0', 'wifi', 'disconnected', '--']
|
||
['p2p-dev-wlan0', 'wifi-p2p', 'disconnected', '--']
|
||
['eth0', 'ethernet', 'unavailable', '--']
|
||
['lo', 'loopback', 'unmanaged', '--']
|
||
'''
|
||
if len(columns) >= 4 and columns[2] == "connected":
|
||
wifi_ssid = columns[3]
|
||
if led_enabled:
|
||
orangepi.set_led_off('blue')
|
||
logging.info(f"Wi-Fi connected: {wifi_ssid}")
|
||
return True, wifi_ssid
|
||
elif len(columns) >= 4 and columns[2] == "connecting":
|
||
wifi_ssid = columns[4]
|
||
check_connecting_loop()
|
||
if led_enabled:
|
||
orangepi.set_led_on('blue')
|
||
logging.info(f"Wi-Fi connecting: {wifi_ssid}")
|
||
return True, wifi_ssid
|
||
return False, None
|
||
|
||
def check_connecting_loop():
|
||
connecting = True
|
||
while connecting:
|
||
cmd = "nmcli dev status"
|
||
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
||
output = result.stdout.strip()
|
||
lines = output.split("\n")[1:] # Skip the header line
|
||
for line in lines:
|
||
columns = line.split()
|
||
if len(columns) >= 4 and columns[2] == "connected":
|
||
wifi_ssid = columns[3]
|
||
connecting = False
|
||
break
|
||
|
||
def get_known_wifi():
|
||
# 定义一个空列表来存储解析后的连接信息
|
||
connections = []
|
||
|
||
cmd = "nmcli connection show"
|
||
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
||
output = result.stdout.strip()
|
||
lines = output.split("\n")[1:] # Skip the header line
|
||
# output = result.stdout.strip()
|
||
# 按行分割输出
|
||
# lines = output.strip().split('\n')[1:] # 跳过标题行
|
||
|
||
# 遍历每一行以解析连接信息
|
||
for line in lines:
|
||
if line.strip(): # 确保跳过空行
|
||
# 分割每一行以获取字段
|
||
fields = line.split()
|
||
if len(fields) >= 4:
|
||
# 创建一个字典来存储当前连接的信息
|
||
connection_info = {
|
||
'ssid': fields[0],
|
||
# 'uuid': fields[1],
|
||
'type': fields[2],
|
||
# 'device': fields[3] if fields[3] != '--' else None # 如果设备列为'--',则设为None
|
||
}
|
||
# 将连接信息添加到列表中
|
||
if connection_info['type'] == 'wifi': # 只处理wifi连接信息
|
||
connections.append(connection_info)
|
||
logging.info(f"Known Wi-Fi connections: {connections}")
|
||
return connections
|
||
|
||
def check_known_wifi(scan_wifi_list):
|
||
known_wifi_exist = False
|
||
known_wifi_list = get_known_wifi()
|
||
logging.info(f"Known Wi-Fi list: {known_wifi_list}")
|
||
logging.info(f"Scan Wi-Fi list: {scan_wifi_list}")
|
||
for known_wifi in known_wifi_list:
|
||
for wifi in scan_wifi_list:
|
||
if known_wifi['ssid'] == wifi['ssid']:
|
||
known_wifi_exist = True
|
||
logging.info(f"Known Wi-Fi exist: {known_wifi['ssid']}")
|
||
if not known_wifi_exist:
|
||
return False
|
||
while known_wifi_exist:
|
||
logging.info(f"Known Wi-Fi exist, wait for connecting...")
|
||
connected, _ = check_wifi_connection()
|
||
if led_enabled:
|
||
for i in range(2):
|
||
orangepi.set_led_off('blue')
|
||
time.sleep(0.25)
|
||
orangepi.set_led_on('blue')
|
||
time.sleep(0.25)
|
||
if connected:
|
||
orangepi.set_led_off('blue')
|
||
break
|
||
return True
|
||
|
||
def force_scan_wifi():
|
||
wifi_list = []
|
||
while True:
|
||
logging.info(f"wifi_list=={wifi_list}, 正在重新扫描Wi-Fi...")
|
||
init_networkmanager_file() # 初始化 NetworkManager 配置文件
|
||
# 扫描Wi-Fi
|
||
wifi_list = scan_wifi()
|
||
if wifi_list != []:
|
||
logging.info(f"Wi-Fi scan finished: {wifi_list}")
|
||
break
|
||
if led_enabled:
|
||
for i in range(4):
|
||
orangepi.set_led_on('red')
|
||
time.sleep(0.25)
|
||
orangepi.set_led_off('red')
|
||
time.sleep(0.25)
|
||
else:
|
||
time.sleep(2) # 等待2秒,确保wifi scan完成
|
||
return wifi_list
|
||
|
||
|
||
def scan_wifi():
|
||
try:
|
||
# subprocess.run(['nmcli', 'dev', 'wifi', 'rescan'], check=True)
|
||
cmd = "nmcli dev wifi"
|
||
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
||
except subprocess.CalledProcessError as e:
|
||
logging.info(f"Error scanning Wi-Fi: {e}")
|
||
# network_error_light_1()
|
||
return []
|
||
logging.info(f"Wi-Fi scan complete...")
|
||
ssid_list = []
|
||
wifi_list = []
|
||
output = result.stdout.strip()
|
||
lines = output.split("\n")[1:] # Skip the header line
|
||
for line in lines:
|
||
columns = line.split()
|
||
'''
|
||
logging.info(columns)
|
||
for i, column in enumerate(columns):
|
||
logging.info(f"{i}: {column}")
|
||
'''
|
||
# ['94:14:57:15:13:50', 'Meeting', 'MG', 'Infra', '1', '130', 'Mbit/s', '100', '****', 'WPA1', 'WPA2']
|
||
|
||
# 提取MAC地址
|
||
mac_address = columns[0]
|
||
if columns[0] == '*': # 已连接的WiFi 第一位为*
|
||
mac_address = columns[1]
|
||
|
||
# 提取Wi-Fi名称
|
||
wifi_name = []
|
||
for i in range(1, len(columns)):
|
||
if columns[i] == 'Infra':
|
||
_wifi_name = ' '.join(columns[1:i])
|
||
wifi_name.append(_wifi_name)
|
||
columns = columns[i+1:]
|
||
break
|
||
ssid = wifi_name[0]
|
||
if ':' in ssid:
|
||
# 94:14:57:15:13:50 Meeting
|
||
# 去除mac地址
|
||
ssid = ssid.split(' ')[1:]
|
||
ssid = ''.join(ssid)
|
||
if ssid == '--':
|
||
continue
|
||
if ssid in ssid_list:
|
||
continue
|
||
ssid_list.append(ssid)
|
||
|
||
# 提取强度
|
||
strength = None
|
||
for i in range(len(columns)):
|
||
if columns[i] == 'Mbit/s':
|
||
strength = columns[i+1]
|
||
break
|
||
# logging.info("MAC地址:", mac_address)
|
||
# logging.info("Wi-Fi名称:", ssid)
|
||
# logging.info("强度:", strength)
|
||
|
||
wifi_list.append({'ssid': ssid, 'signal': strength, 'mac': mac_address})
|
||
if len(wifi_list) == 15:
|
||
break
|
||
with open('scaned_wifi_list.json', 'w') as f:
|
||
json.dump(wifi_list, f)
|
||
logging.info(f"Wi-Fi scaned list: {wifi_list}")
|
||
|
||
return wifi_list
|
||
|
||
# 连接 Wi-Fi
|
||
def connect_wifi(ssid, password):
|
||
# 连接到用户选择的 Wi-Fi 网络
|
||
try:
|
||
output = subprocess.check_output(['nmcli', 'dev', 'wifi', 'connect', ssid, 'password', password])
|
||
output_str = output.decode('utf-8') # 将输出转换为字符串
|
||
if "successfully" in output_str:
|
||
logging.info(f"Successfully connected to Wi-Fi: {ssid}")
|
||
save_wifi(ssid, password) # 保存连接成功的Wi-Fi信息
|
||
return True
|
||
else:
|
||
logging.info(f"Error connecting to Wi-Fi: {output_str}")
|
||
return False
|
||
except subprocess.CalledProcessError as e:
|
||
logging.info(f"Error connecting to Wi-Fi: {e}")
|
||
return False
|
||
|
||
# 关闭 Wi-Fi
|
||
def disconnect_wifi():
|
||
try:
|
||
output = subprocess.check_output(['nmcli', 'dev', 'disconnect', 'iface', 'wlan0'])
|
||
output_str = output.decode('utf-8') # 将输出转换为字符串
|
||
if "successfully disconnected" in output_str:
|
||
logging.info(f"Wi-Fi disconnected successfully")
|
||
else:
|
||
logging.info(f"Error disconnecting from Wi-Fi: {output_str}")
|
||
except Exception as e:
|
||
logging.info(f"Error disconnecting from Wi-Fi: {e}")
|
||
|
||
def connect_saved_wifi(scaned_wifi_list):
|
||
wifi_list = load_saved_wifi()
|
||
if wifi_list == []:
|
||
return False, None
|
||
for wifi in wifi_list:
|
||
if wifi['ssid'] in [item['ssid'] for item in scaned_wifi_list]:
|
||
if connect_wifi(wifi['ssid'], wifi['password']):
|
||
return True, wifi['ssid']
|
||
return check_wifi_connection()
|
||
|
||
|
||
def load_saved_wifi():
|
||
try:
|
||
with open(wifi_credential_file, 'r') as f:
|
||
wifi_list = json.load(f)
|
||
logging.info(f"Loaded saved Wi-Fi file `{wifi_credential_file}`: {wifi_list}")
|
||
return wifi_list
|
||
except:
|
||
logging.info(f"No saved Wi-Fi file `{wifi_credential_file}`found.")
|
||
return []
|
||
|
||
def save_wifi(ssid, password):
|
||
wifi_list = load_saved_wifi()
|
||
if ssid not in [item['ssid'] for item in wifi_list]:
|
||
wifi_list.append({'ssid': ssid, 'password': password})
|
||
with open(wifi_credential_file, 'w') as f:
|
||
json.dump(wifi_list, f)
|
||
logging.info(f"Saved Wi-Fi file `{wifi_credential_file}`: {wifi_list}")
|
||
|
||
|
||
# 主页
|
||
@app.route('/')
|
||
def index():
|
||
global wifi_list
|
||
|
||
response = make_response(render_template('index.html', wifi_list=wifi_list))
|
||
response.headers.set('Content-Type', 'text/html')
|
||
response.headers.set('Apple-Web-App-Capable', 'yes')
|
||
response.headers.set('Apple-Mobile-Web-App-Status-Bar-Style', 'black-translucent')
|
||
return response
|
||
|
||
# 提交 Wi-Fi 信息
|
||
@app.route('/submit', methods=['POST'])
|
||
def submit():
|
||
if led_enabled:
|
||
orangepi.set_led_on('red')
|
||
ssid = request.form['ssid']
|
||
password = request.form['password']
|
||
logging.info(f"Connecting to Wi-Fi: {ssid} with password {password}")
|
||
|
||
# 关闭热点
|
||
close_hotspot()
|
||
|
||
wifi_list = force_scan_wifi()
|
||
|
||
# 连接到用户选择的 Wi-Fi 网络
|
||
if connect_wifi(ssid, password):
|
||
close_app()
|
||
|
||
connected, wifi_ssid = check_wifi_connection()
|
||
if not connected:
|
||
logging.info(f"Wi-Fi连接失败。")
|
||
start_hotspot()
|
||
else:
|
||
save_wifi(ssid, password) # 保存连接成功的Wi-Fi信息
|
||
close_app()
|
||
|
||
return redirect(url_for('index'))
|
||
|
||
|
||
if __name__ == '__main__':
|
||
debug_mode = False # 设置为 True 以跳过 Wi-Fi 连接状态检测
|
||
if debug_mode:
|
||
disconnect_wifi()
|
||
|
||
if led_enabled:
|
||
orangepi.set_led_on('blue')
|
||
|
||
logging.info(f"Starting Wi-Fi Manager...")
|
||
|
||
edit_dns()
|
||
|
||
init_networkmanager_file() # 初始化 NetworkManager 配置文件
|
||
|
||
# 扫描Wi-Fi
|
||
wifi_list = force_scan_wifi()
|
||
# wifi_list = scan_wifi()
|
||
# time.sleep(2) # 等待2秒,确保wifi scan完成
|
||
|
||
connected = check_known_wifi(wifi_list)
|
||
if connected:
|
||
logging.info(f"系统已自动连接到 Wi-Fi 网络,退出程序")
|
||
close_app()
|
||
|
||
connected, wifi_ssid = check_wifi_connection()
|
||
if connected:
|
||
logging.info(f"系统已自动连接到 Wi-Fi 网络,退出程序")
|
||
close_app()
|
||
|
||
|
||
'''
|
||
# 连接保存的Wi-Fi
|
||
connected, wifi_ssid = connect_saved_wifi(wifi_list)
|
||
if connected:
|
||
logging.info(f"系统已自动连接到 Wi-Fi 网络 {wifi_ssid},退出程序")
|
||
close_app()
|
||
'''
|
||
|
||
logging.info(f"未连接到 Wi-Fi 网络, 开始热点模式")
|
||
# force_scan_wifi()
|
||
|
||
start_hotspot()
|
||
app.run(host='0.0.0.0', port=80) |