wifi_hotpot/wifi_manager.py

510 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import re
import os
import psutil
import signal
import time
import datetime
import json
from flask import Flask, render_template, request, redirect, url_for, make_response
import socket
import logging
logging.basicConfig(
filename='app.log', # 日志文件的名称
filemode='a', # 'a'代表追加模式,如果文件存在则追加内容
level=logging.DEBUG, # 日志级别DEBUG是最低的级别会记录所有级别的日志
format='%(asctime)s - %(levelname)s - %(message)s' # 日志的格式
)
wifi_credential_file = '/home/orangepi/.config/wifi_credentials.json'
try:
from takway.board import OrangePi
led_enabled = True
orangepi = OrangePi()
except ImportError:
led_enabled = False
logging.info("Error importing OrangePi")
# blue: APP
# red: hotspot
def network_error_light_1():
error_time = datetime.datetime.now()
while led_enabled:
orangepi.set_led_off('blue')
orangepi.set_led_on('red')
time.sleep(0.25)
orangepi.set_led_on('blue')
orangepi.set_led_off('red')
time.sleep(0.25)
if error_time + datetime.timedelta(seconds=2) < datetime.datetime.now():
break
# close_app()
def network_error_light_2():
error_time = datetime.datetime.now()
while led_enabled:
orangepi.set_led_on('green')
time.sleep(0.25)
orangepi.set_led_off('green')
time.sleep(0.25)
if error_time + datetime.timedelta(seconds=2) < datetime.datetime.now():
break
# close_app()
def edit_line_in_file(file_path, search_string, replacement_string):
edit = False
try:
# 读取文件内容
with open(file_path, 'r') as file:
lines = file.readlines()
# 替换包含特定内容的行
with open(file_path, 'r+') as file:
file.seek(0) # 移动到文件开头
file.truncate() # 清空文件内容
for line in lines:
if search_string in line:
file.write(replacement_string + '\n')
edit = True
logging.info(f"The line has been edited. \noriginal line: {line}, \nnew line: {replacement_string}")
else:
file.write(line)
return edit
except FileNotFoundError:
logging.info(f"The file {file_path} does not exist.")
return edit
except IOError as e:
logging.info(f"An error occurred: {e}")
return edit
# 获取私有IP地址
def save_local_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
# save local_ip to file
with open('local_ip.txt', 'w') as f:
f.write(local_ip)
logging.info(f"Local IP address: {local_ip}")
return local_ip
except Exception as e:
logging.info(f"Error getting private IP: {e}")
return None
def edit_dns():
# 编辑 /etc/resolv.conf 文件,添加 DNS 服务器
# nameserver 8.8.8.8
# nameserver 8.8.4.4
# 读取文件内容
dns_exist = False
with open('/etc/resolv.conf', 'r+') as file:
lines = file.readlines()
for line in lines:
if 'nameserver' in line:
dns_exist = True
if not dns_exist:
file.write('nameserver 8.8.8.8' + '\n')
file.write('nameserver 8.8.4.4' + '\n')
logging.info(f"DNS server added.")
else:
logging.info(f"DNS server already exist.")
app = Flask(__name__)
def close_app():
if led_enabled:
orangepi.set_led_off('blue')
# 保存Wi-Fi IP地址
save_local_ip()
logging.info(f"Exit Wi-Fi hotspot...")
# 获取当前Flask应用程序的进程ID
flask_pid = os.getpid()
# 获取所有名为'python3'的进程ID
python_pids = [p.info['pid'] for p in psutil.process_iter(attrs=['pid', 'name']) if p.info['name'] == 'python3']
# 关闭Flask应用程序进程
os.kill(flask_pid, signal.SIGTERM)
# 关闭Python进程
for pid in python_pids:
os.kill(pid, signal.SIGTERM)
def start_hotspot():
try:
subprocess.Popen('sudo systemctl start hotspot.service', shell=True)
if led_enabled:
orangepi.set_led_on('red')
logging.info("Start hotspot.service.")
except subprocess.CalledProcessError as e:
logging.info(f"Error starting create_ap service: {e}")
def close_hotspot():
# 关闭热点
try:
subprocess.Popen('sudo systemctl stop hotspot.service', shell=True)
logging.info(f"Stopping hotspot.service")
if led_enabled:
orangepi.set_led_off('red')
except subprocess.CalledProcessError as e:
logging.info(f"Error stopping hotspot.service: {e}")
def init_networkmanager_file():
file_path = '/etc/NetworkManager/NetworkManager.conf'
search_string = 'unmanaged-devices=interface-name:'
replacement_string = '' # 如果不需要替换为其他内容,可以设置为空字符串
if edit_line_in_file(file_path, search_string, replacement_string):
'''
# 从 /etc/NetworkManager/NetworkManager.conf 文件中移除或注释掉 [keyfile] 部分的 unmanaged-devices 条目,因为它可能阻止了 NetworkManager 管理 wlan0 接口。编辑文件如下:
```
[keyfile]
#unmanaged-devices=interface-name:ap0;interface-name:wlan0
```
sudo systemctl restart NetworkManager
'''
# network_error_light_1()
logging.info(f"Wi-Fi NetworkManager Error: restart NetworkManager.")
subprocess.Popen('sudo systemctl restart NetworkManager', shell=True)
time.sleep(2) # 不得修改该时间保留足够长时间给NetworkManager重启完成并连接以往WiFi。
# 检测 Wi-Fi 连接状态
def check_wifi_connection():
cmd = "nmcli dev status"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
output = result.stdout.strip()
lines = output.split("\n")[1:] # Skip the header line
for line in lines:
columns = line.split()
logging.info(columns)
'''
['wlan0', 'wifi', 'disconnected', '--']
['p2p-dev-wlan0', 'wifi-p2p', 'disconnected', '--']
['eth0', 'ethernet', 'unavailable', '--']
['lo', 'loopback', 'unmanaged', '--']
'''
if len(columns) >= 4 and columns[2] == "connected":
wifi_ssid = columns[3]
if led_enabled:
orangepi.set_led_off('blue')
logging.info(f"Wi-Fi connected: {wifi_ssid}")
return True, wifi_ssid
elif len(columns) >= 4 and columns[2] == "connecting":
wifi_ssid = columns[4]
check_connecting_loop()
if led_enabled:
orangepi.set_led_on('blue')
logging.info(f"Wi-Fi connecting: {wifi_ssid}")
return True, wifi_ssid
return False, None
def check_connecting_loop():
connecting = True
while connecting:
cmd = "nmcli dev status"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
output = result.stdout.strip()
lines = output.split("\n")[1:] # Skip the header line
for line in lines:
columns = line.split()
if len(columns) >= 4 and columns[2] == "connected":
wifi_ssid = columns[3]
connecting = False
break
def get_known_wifi():
# 定义一个空列表来存储解析后的连接信息
connections = []
cmd = "nmcli connection show"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
output = result.stdout.strip()
lines = output.split("\n")[1:] # Skip the header line
# output = result.stdout.strip()
# 按行分割输出
# lines = output.strip().split('\n')[1:] # 跳过标题行
# 遍历每一行以解析连接信息
for line in lines:
if line.strip(): # 确保跳过空行
# 分割每一行以获取字段
fields = line.split()
if len(fields) >= 4:
# 创建一个字典来存储当前连接的信息
connection_info = {
'ssid': fields[0],
# 'uuid': fields[1],
'type': fields[2],
# 'device': fields[3] if fields[3] != '--' else None # 如果设备列为'--'则设为None
}
# 将连接信息添加到列表中
if connection_info['type'] == 'wifi': # 只处理wifi连接信息
connections.append(connection_info)
logging.info(f"Known Wi-Fi connections: {connections}")
return connections
def check_known_wifi(scan_wifi_list):
known_wifi_exist = False
known_wifi_list = get_known_wifi()
logging.info(f"Known Wi-Fi list: {known_wifi_list}")
logging.info(f"Scan Wi-Fi list: {scan_wifi_list}")
for known_wifi in known_wifi_list:
for wifi in scan_wifi_list:
if known_wifi['ssid'] == wifi['ssid']:
known_wifi_exist = True
logging.info(f"Known Wi-Fi exist: {known_wifi['ssid']}")
if not known_wifi_exist:
return False
while known_wifi_exist:
logging.info(f"Known Wi-Fi exist, wait for connecting...")
connected, _ = check_wifi_connection()
if led_enabled:
for i in range(2):
orangepi.set_led_off('blue')
time.sleep(0.25)
orangepi.set_led_on('blue')
time.sleep(0.25)
if connected:
orangepi.set_led_off('blue')
break
return True
def force_scan_wifi():
wifi_list = []
while True:
logging.info(f"wifi_list=={wifi_list}, 正在重新扫描Wi-Fi...")
init_networkmanager_file() # 初始化 NetworkManager 配置文件
# 扫描Wi-Fi
wifi_list = scan_wifi()
if wifi_list != []:
logging.info(f"Wi-Fi scan finished: {wifi_list}")
break
if led_enabled:
for i in range(4):
orangepi.set_led_on('red')
time.sleep(0.25)
orangepi.set_led_off('red')
time.sleep(0.25)
else:
time.sleep(2) # 等待2秒确保wifi scan完成
return wifi_list
def scan_wifi():
try:
# subprocess.run(['nmcli', 'dev', 'wifi', 'rescan'], check=True)
cmd = "nmcli dev wifi"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
logging.info(f"Error scanning Wi-Fi: {e}")
# network_error_light_1()
return []
logging.info(f"Wi-Fi scan complete...")
ssid_list = []
wifi_list = []
output = result.stdout.strip()
lines = output.split("\n")[1:] # Skip the header line
for line in lines:
columns = line.split()
'''
logging.info(columns)
for i, column in enumerate(columns):
logging.info(f"{i}: {column}")
'''
# ['94:14:57:15:13:50', 'Meeting', 'MG', 'Infra', '1', '130', 'Mbit/s', '100', '****', 'WPA1', 'WPA2']
# 提取MAC地址
mac_address = columns[0]
if columns[0] == '*': # 已连接的WiFi 第一位为*
mac_address = columns[1]
# 提取Wi-Fi名称
wifi_name = []
for i in range(1, len(columns)):
if columns[i] == 'Infra':
_wifi_name = ' '.join(columns[1:i])
wifi_name.append(_wifi_name)
columns = columns[i+1:]
break
ssid = wifi_name[0]
if ':' in ssid:
# 94:14:57:15:13:50 Meeting
# 去除mac地址
ssid = ssid.split(' ')[1:]
ssid = ''.join(ssid)
if ssid == '--':
continue
if ssid in ssid_list:
continue
ssid_list.append(ssid)
# 提取强度
strength = None
for i in range(len(columns)):
if columns[i] == 'Mbit/s':
strength = columns[i+1]
break
# logging.info("MAC地址:", mac_address)
# logging.info("Wi-Fi名称:", ssid)
# logging.info("强度:", strength)
wifi_list.append({'ssid': ssid, 'signal': strength, 'mac': mac_address})
if len(wifi_list) == 15:
break
with open('scaned_wifi_list.json', 'w') as f:
json.dump(wifi_list, f)
logging.info(f"Wi-Fi scaned list: {wifi_list}")
return wifi_list
# 连接 Wi-Fi
def connect_wifi(ssid, password):
# 连接到用户选择的 Wi-Fi 网络
try:
output = subprocess.check_output(['nmcli', 'dev', 'wifi', 'connect', ssid, 'password', password])
output_str = output.decode('utf-8') # 将输出转换为字符串
if "successfully" in output_str:
logging.info(f"Successfully connected to Wi-Fi: {ssid}")
save_wifi(ssid, password) # 保存连接成功的Wi-Fi信息
return True
else:
logging.info(f"Error connecting to Wi-Fi: {output_str}")
return False
except subprocess.CalledProcessError as e:
logging.info(f"Error connecting to Wi-Fi: {e}")
return False
# 关闭 Wi-Fi
def disconnect_wifi():
try:
output = subprocess.check_output(['nmcli', 'dev', 'disconnect', 'iface', 'wlan0'])
output_str = output.decode('utf-8') # 将输出转换为字符串
if "successfully disconnected" in output_str:
logging.info(f"Wi-Fi disconnected successfully")
else:
logging.info(f"Error disconnecting from Wi-Fi: {output_str}")
except Exception as e:
logging.info(f"Error disconnecting from Wi-Fi: {e}")
def connect_saved_wifi(scaned_wifi_list):
wifi_list = load_saved_wifi()
if wifi_list == []:
return False, None
for wifi in wifi_list:
if wifi['ssid'] in [item['ssid'] for item in scaned_wifi_list]:
if connect_wifi(wifi['ssid'], wifi['password']):
return True, wifi['ssid']
return check_wifi_connection()
def load_saved_wifi():
try:
with open(wifi_credential_file, 'r') as f:
wifi_list = json.load(f)
logging.info(f"Loaded saved Wi-Fi file `{wifi_credential_file}`: {wifi_list}")
return wifi_list
except:
logging.info(f"No saved Wi-Fi file `{wifi_credential_file}`found.")
return []
def save_wifi(ssid, password):
wifi_list = load_saved_wifi()
if ssid not in [item['ssid'] for item in wifi_list]:
wifi_list.append({'ssid': ssid, 'password': password})
with open(wifi_credential_file, 'w') as f:
json.dump(wifi_list, f)
logging.info(f"Saved Wi-Fi file `{wifi_credential_file}`: {wifi_list}")
# 主页
@app.route('/')
def index():
global wifi_list
response = make_response(render_template('index.html', wifi_list=wifi_list))
response.headers.set('Content-Type', 'text/html')
response.headers.set('Apple-Web-App-Capable', 'yes')
response.headers.set('Apple-Mobile-Web-App-Status-Bar-Style', 'black-translucent')
return response
# 提交 Wi-Fi 信息
@app.route('/submit', methods=['POST'])
def submit():
if led_enabled:
orangepi.set_led_on('red')
ssid = request.form['ssid']
password = request.form['password']
logging.info(f"Connecting to Wi-Fi: {ssid} with password {password}")
# 关闭热点
close_hotspot()
wifi_list = force_scan_wifi()
# 连接到用户选择的 Wi-Fi 网络
if connect_wifi(ssid, password):
close_app()
connected, wifi_ssid = check_wifi_connection()
if not connected:
logging.info(f"Wi-Fi连接失败。")
start_hotspot()
else:
save_wifi(ssid, password) # 保存连接成功的Wi-Fi信息
close_app()
return redirect(url_for('index'))
if __name__ == '__main__':
debug_mode = False # 设置为 True 以跳过 Wi-Fi 连接状态检测
if debug_mode:
disconnect_wifi()
if led_enabled:
orangepi.set_led_on('blue')
logging.info(f"Starting Wi-Fi Manager...")
edit_dns()
init_networkmanager_file() # 初始化 NetworkManager 配置文件
# 扫描Wi-Fi
wifi_list = force_scan_wifi()
# wifi_list = scan_wifi()
# time.sleep(2) # 等待2秒确保wifi scan完成
connected = check_known_wifi(wifi_list)
if connected:
logging.info(f"系统已自动连接到 Wi-Fi 网络,退出程序")
close_app()
connected, wifi_ssid = check_wifi_connection()
if connected:
logging.info(f"系统已自动连接到 Wi-Fi 网络,退出程序")
close_app()
'''
# 连接保存的Wi-Fi
connected, wifi_ssid = connect_saved_wifi(wifi_list)
if connected:
logging.info(f"系统已自动连接到 Wi-Fi 网络 {wifi_ssid},退出程序")
close_app()
'''
logging.info(f"未连接到 Wi-Fi 网络, 开始热点模式")
# force_scan_wifi()
start_hotspot()
app.run(host='0.0.0.0', port=80)