TakwayBoard/tools/web_request_mp_manager_vad.py

558 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# basic
import time
import json
import random
from collections import deque
# log
import logging
import warnings
# multiprocessing
import queue
import threading
import multiprocessing
# web request
import requests
import pyaudio
class WebRequestMPManager:
def __init__(self,
server_args,
audio_args,
recorder_args,
asr_args,
video_args,
emo_args,
log_args):
# server_args
self.server_args = server_args
# audio_args
self.record_CHUNK_SIZE = audio_args['record_CHUNK_SIZE']
self.voice_trigger = audio_args['voice_trigger']
self.keywords = audio_args['keywords']
# recorder_args
self.recorder_args = recorder_args
# asr_args
self.asr_args = asr_args
# video_args
self.video_args = video_args
# emo_args
self.emo_args = emo_args
# log_args
self.log_args = log_args
# TODO: 设计多进程log queue
self.logger_init()
def logger_init(self):
# log_args
log_level = self.log_args['log_level']
log_file = self.log_args['log_file']
if log_level == 'debug':
log_level = logging.DEBUG
elif log_level == 'info':
log_level = logging.INFO
# logger
self.logger = logging.getLogger('mylogger')
self.logger.setLevel(log_level)
# handler 创建一个handler用于写入日志文件
handler = logging.FileHandler(log_file)
handler.setLevel(log_level)
# stream handler 创建一个handler用于输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# 定义handler的输出格式formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
console.setFormatter(formatter)
# 添加handler
self.logger.addHandler(handler)
self.logger.addHandler(console)
self.logger.info("Logger started.")
def process_init(self):
# multiprocessing
manager = multiprocessing.Manager()
self.trigger_queue = manager.Queue()
self.client_queue = manager.Queue()
self.audio_queue = manager.Queue()
self.audio_play_queue = manager.Queue()
self.emo_display_queue = manager.Queue()
processes = [
multiprocessing.Process(target=self.audio_process, args=(self.logger,self.voice_trigger,self.trigger_queue,self.client_queue)),
# multiprocessing.Process(target=self.camera_process, args=(self.trigger_queue,self.client_queue)),
# multiprocessing.Process(target=self.local_client_process, args=(self.logger,self.client_queue,self.audio_play_queue,self.emo_display_queue)),
# multiprocessing.Process(target=self.audio_play_process, args=(self.logger,self.audio_play_queue,)),
# multiprocessing.Process(target=self.emo_display_process, args=(self.logger,self.emo_display_queue,)),
]
for process in processes:
process.start()
for process in processes:
process.join()
def audio_process(self, logger, voice_trigger, trigger_queue, client_queue):
"""audio_process
Args:
voice_trigger: bool, whether to use voice trigger
trigger_queue: multiprocessing.Queue, trigger queue
client_queue: multiprocessing.Queue, client queue
"""
# from takway.audio_utils import Recorder
from takway.audio_utils import VADRecorder
recorder = VADRecorder(
**self.recorder_args,
)
# two threads for hardware and voice trigger
# shared data struct:
self.shared_waiting = False
self.shared_hd_trigger = False
self.shared_kw_trigger = False
self.shared_lock = threading.Lock()
self.shared_data_lock = threading.Lock()
self.shared_audio_data = None
# vad
self.shared_vad_data = None
self.shared_vad_lock = threading.Lock()
# stt
# event
self.record_event = threading.Event()
self.vad_event = threading.Event()
self.stt_event = threading.Event()
self._debug_count = 0
'''
shared_waiting: 控制所有线程的待机状态True表示待机False表示工作
shared_hd_trigger: 控制硬件触发器的状态True表示触发False表示未触发
shared_kw_trigger: 控制语音触发器的状态True表示触发False表示未触发
share_audio_data: 共享音频数据,用于存储从麦克风采集的音频数据
'''
# create threads
threads = [threading.Thread(target=self.hardware_trigger_thread, args=(recorder,))]
if self.voice_trigger:
vioce_threads = [
threading.Thread(target=self.voice_record_thread, args=(recorder,)),
# threading.Thread(target=self.vad_thread, args=(recorder,)),
threading.Thread(target=self.stt_thread, args=(recorder,)),
]
threads.extend(vioce_threads)
for thread in threads:
thread.start()
# self.logger.info("Audio Process started.")
while True:
'''
# Warning: 一定要加延时否则会有bug
time.sleep(0.001)
if (self.shared_hd_trigger or self.shared_kw_trigger):
# print(f"self.shared_hd_trigger: {self.shared_hd_trigger}, self.shared_kw_trigger: {self.shared_kw_trigger}")
audio_data = self.shared_audio_data
trigger_queue.put(('trgrigger_status', True))
client_queue.put(('audio', audio_data))
self.shared_lock.acquire() # 加锁
self.shared_hd_trigger = False
self.shared_kw_trigger = False
self.shared_audio_data = None
self.shared_waiting = False
self.shared_lock.release() # 释放锁
'''
self.record_event.wait() # 等待record线程被唤醒
trigger_queue.put(('trgrigger_status', True))
client_queue.put(('audio', self.shared_audio_data))
# print(f"send audio data to client"); exit()
def hardware_trigger_thread(self, recorder):
"""hardware_trigger_thread
Args:
recorder: takway.audio_utils.Recorder, recorder object
"""
self.logger.info("Hardware trigger thread started.")
trgrigger_status = False
while True:
time.sleep(0.2)
if self.shared_waiting:
continue
trgrigger_status = recorder.get_hardware_trigger_status()
if trgrigger_status:
self.shared_lock.acquire()
self.shared_waiting = True # shared_waiting 控制所有线程的待机状态True表示待机False表示工作
self.shared_hd_trigger = True # share_hd_trigger 控制硬件触发器的状态True表示触发False表示未触发
self.shared_lock.release()
# record microphone data
audio_data = recorder.record_hardware()
self.shared_data_lock.acquire()
self.shared_audio_data = audio_data # shared_audio_data 共享音频数据,用于存储从麦克风采集的音频数据
self.shared_data_lock.release()
self.record_event.set() # 唤醒record线程
else:
self.shared_lock.acquire()
self.shared_waiting = False # 释放
self.shared_lock.release()
def voice_record_thread(self, recorder, keywords=['你好']):
"""voice_record_thread
Args:
recorder: takway.audio_utils.Recorder, recorder object
"""
self.logger.info("voice record thread started.")
while True:
if self.shared_waiting:
time.sleep(0.01)
continue
frames = []
# status buffer
is_currently_speaking = False
buffer_size = recorder.vad_buffer_size
# buffer_size = 6
active_buffer = deque([False for i in range(buffer_size-1)]+[True], maxlen=buffer_size)
audio_buffer = deque(maxlen=buffer_size)
silence_buffer = deque([True for i in range(buffer_size)]+[False], maxlen=buffer_size)
while True:
data = recorder.record_chunk_voice(
CHUNK=recorder.vad_chunk_size,
return_type=None,
exception_on_overflow=False)
if data is None:
continue
t1 = time.time()
# print(f"VAD is_speech: {recorder.is_speech(data)}")
# print(f"VAD cost: {(time.time() - t1)/1000} ms")
if recorder.is_speech(data):
# 标志位buffer
active_buffer.append(True); active_buffer.popleft()
silence_buffer.append(False); silence_buffer.popleft()
# 暂时增加到buffer中
audio_buffer.append(data)
# 如果满足检测要求
if all(active_buffer):
if not is_currently_speaking:
print("Speech start detected")
is_currently_speaking = True
frames.extend(audio_buffer) # 把说话的buffer也加上
if is_currently_speaking:
frames.append(data)
else:
# 标志位buffer
# active_buffer.append(False); active_buffer.popleft()
silence_buffer.append(True); silence_buffer.popleft()
if all(silence_buffer):
# 检测到人声并持续录音
if is_currently_speaking:
# 结束标志位
print("Speech end detected")
# print("frames length: ", len(frames))
self.shared_vad_lock.acquire()
self.shared_vad_data = frames
self.shared_vad_lock.release()
self.stt_event.set() # 唤醒stt线程
print("Wake stt thread")
break
else:
frames = []
'''
# print(f"audio_data: {len(audio_data)}")
self.shared_lock.acquire()
self.shared_audio_data = audio_data
self.shared_lock.release()
self.vad_event.set() # 唤醒vad线程
'''
'''
def vad_thread(self, recorder):
self.logger.info("VAD thread started.")
while True:
frames = []
# status buffer
is_currently_speaking = False
buffer_size = recorder.vad_buffer_size
active_buffer = deque([False for i in range(buffer_size)], maxlen=buffer_size)
audio_buffer = deque(maxlen=buffer_size)
silence_buffer = deque([True for i in range(buffer_size)], maxlen=buffer_size)
while True:
self.vad_event.wait() # 等待vad线程被唤醒
data = self.shared_audio_data
if data is None:
continue
t1 = time.time()
print(f"VAD is_speech: {recorder.is_speech(data)}")
print(f"VAD cost: {(time.time() - t1)/1000} ms")
if recorder.is_speech(data):
# 标志位buffer
active_buffer.append(True); active_buffer.popleft()
silence_buffer.append(False); silence_buffer.popleft()
# 暂时增加到buffer中
audio_buffer.append(data)
# 如果满足检测要求
if all(active_buffer):
if not is_currently_speaking:
print("Speech start detected")
is_currently_speaking = True
frames.extend(audio_buffer) # 把说话的buffer也加上
if is_currently_speaking:
frames.append(data)
else:
# 标志位buffer
active_buffer.append(False); active_buffer.popleft()
silence_buffer.append(True); silence_buffer.popleft()
# 检测到人声并持续录音
if is_currently_speaking:
# 结束标志位
if all(silence_buffer):
print("Speech end detected")
# print("frames length: ", len(frames))
self.shared_vad_lock.acquire()
self.shared_vad_data = frames
self.shared_vad_lock.release()
self.stt_event.set() # 唤醒stt线程
break
'''
def stt_thread(self, recorder):
"""stt_thread
Args:
recorder: takway.audio_utils.Recorder, recorder object
"""
self.logger.info("STT thread started.")
from takway.vosk_utils import AutoSpeechRecognizer
asr = AutoSpeechRecognizer(**self.asr_args)
asr.add_keyword(self.keywords)
kw_trgrigger_status = False
while True:
self.stt_event.wait() # 等待stt线程被唤醒
print("STT thread start")
data = self.shared_vad_data
if data is None:
continue
print("Start to Recongnize key words")
kw_trgrigger_status = asr.recognize_keywords(data, partial_size=512)
print("Finish to Recongnize key words")
if kw_trgrigger_status:
self.shared_lock.acquire()
self.shared_kw_trigger = True # share_kw_trigger 语音关键词触发器的状态True表示触发False表示未触发
self.shared_lock.release()
self.record_event.set() # 唤醒record线程
kw_trgrigger_status = False
# print(f"Got keyword trigger"); exit()
def camera_process(self, logger, trigger_queue, client_queue):
from takway.cam_utils import Camera
cam = Camera(self.video_args)
while True:
if trigger_queue.empty():
time.sleep(0.5)
else:
item = trigger_queue.get()
if item[0] == 'trgrigger_status' and item[1]:
_, frame = cap.read()
client_queue.put(('image', frame))
def local_client_process(self, logger, client_queue,audio_play_queue,emo_display_queue):
from takway.client_utils import Client
client = Client(**self.server_args)
# print("Local client process started.")
self.logger.info("Local client process started.")
image = None; audio = None
chat_status = 'init'
while True:
if client_queue.empty():
time.sleep(0.2)
else:
item = client_queue.get()
# print(f"Get item: {item[0]}")
if item[0] == 'image':
# TODO: analyise image and send text to server
image = None
if item[0] == 'audio':
audio = item[1]
print("get audio data.")
emo_display_queue.put(('emo_data', 'happy'))
'''
# 发送数据到服务器
response = client.send_data_to_server(
text=None, audio_data=audio, image_data=None, chat_status=chat_status)
print("get response from server.")
chat_status = 'chating'
print(f"response: {response}")
audio_play_queue.put(('audio', response))
'''
image = None; audio = None
def audio_play_process(self, logger, audio_play_queue):
from takway.audio_utils import AudioPlayer
audio_player = AudioPlayer()
self.logger.info("Audio play process started.")
while True:
if audio_play_queue.empty():
time.sleep(0.2)
else:
item = audio_play_queue.get()
if item[0] == 'server_data':
# 播放音频
print("Playing audio...")
server_data = item[1]
audio_player.play(server_data['audio_base64'], audio_type='base64')
def emo_display_process(self, logger, emo_display_queue):
from takway.emo_utils import EmoVideoPlayer
emo_player = EmoVideoPlayer(**self.emo_args)
self.logger.info("Emo display process started.")
# logger.info("Emo display process started.")
# print("Emo display process started.")
while True:
if emo_display_queue.empty():
time.sleep(0.2)
seed = random.randrange(0, 1000)
print(f"seed: {seed}")
if seed < 100:
# emo_player.display_emo_opencv(emo_name='静态', stage='seldom_wink')
emo_player.display_emo_maixsense(emo_name='静态', stage='seldom_wink')
else:
item = emo_display_queue.get()
print(f"Emo display process Get item: {item[0]}")
if item[0] == 'emo_data':
server_data = item[1]
print("Displaying emo...")
# emo_player.display_emo_opencv(emo_name='静态', stage='seldom_wink')
# emo_player.display_emo_opencv(emo_name='静态', stage='quick_wink')
emo_player.display_emo_maixsense(emo_name='静态', stage='seldom_wink')
emo_player.display_emo_maixsense(emo_name='静态', stage='quick_wink')
'''
def display_process(q):
print("Display process started.")
while True:
item = q.get()
if item[0] == 'server_data':
server_data = item[1]
# 显示图像和文本
# print("Displaying image and text:", item[1]['image'], item[1]['text'])
print("Displaying image and text:")
# 这里可以加上实际的显示图像和文本的代码
if item[0] == 'image':
# 显示图像和文本
cv2.imshow('image', item[1])
cv2.waitKey(1)
'''
if __name__ == '__main__':
try:
import gpiod as gpio
model_path="vosk-model-small-cn-0.22"
emo_dir="ResizedEmoji"
except:
model_path=r"G:\WorkSpace\CodeWorkspace\GPT_projects\vits_project\vits-uma-genshin-honkai\vosk-model-small-cn-0.22"
emo_dir=r"G:\WorkSpace\CodeWorkspace\GPT_projects\vits_project\vits-uma-genshin-honkai\ResizedEmoji"
import argparse
parser = argparse.ArgumentParser()
# server params
parser.add_argument('--server_url', type=str, default='http://127.0.0.1:5000/process_all', help='Server url')
# audio paramters
parser.add_argument('--voice_trigger', type=bool, default=True, help='Voice trigger')
parser.add_argument('--record_CHUNK_SIZE', type=int, default=8000, help='Record chunk size')
parser.add_argument('--keywords', type=list, default=['你好'], help='Voice trigger keywords')
# recorder paramters
parser.add_argument('--hd_trigger', type=str, default='keyboard', help='Hardware trigger')
parser.add_argument('--keyboard_key', type=str, default='space', help='Keyboard key')
parser.add_argument('--CHUNK', type=int, default=2048, help='Record chunk size')
parser.add_argument('--RATE', type=int, default=8000, help='Audio rate')
parser.add_argument('--FORMAT', type=int, default=16, help='Audio format')
parser.add_argument('--CHANNELS', type=int, default=1, help='Audio channels')
parser.add_argument('--filename', type=str, default=None, help='Audio file name')
# ASR paramters
# model_path="vosk-model-small-cn-0.22"
# model_path=r"G:\WorkSpace\CodeWorkspace\GPT_projects\vits_project\vits-uma-genshin-honkai\vosk-model-small-cn-0.22"
parser.add_argument('--model_path', type=str, default=model_path, help='Vosk model path')
# video paramters
parser.add_argument('--device', type=str, default='pc', help='Video device')
parser.add_argument('--width', type=int, default=1280, help='Video width')
parser.add_argument('--height', type=int, default=720, help='Video height')
# emo paramters
# emo_dir="ResizedEmoji"
# emo_dir=r"G:\WorkSpace\CodeWorkspace\GPT_projects\vits_project\vits-uma-genshin-honkai\ResizedEmoji"
parser.add_argument('--emo_dir', type=str, default=emo_dir, help='Emo dir')
# log paramters
parser.add_argument('--log_file', type=str, default='my.log', help='Log file')
parser.add_argument('--log_level', type=str, default='INFO', help='Log level')
parser.add_argument('--debug', type=bool, default=True, help='Debug mode')
args = parser.parse_args()
# sort out args and params
server_args = {
'server_url': args.server_url,
}
audio_args = {
'voice_trigger': args.voice_trigger,
'keywords': args.keywords,
'record_CHUNK_SIZE': args.record_CHUNK_SIZE,
}
recorder_args = {
'hd_trigger': args.hd_trigger,
'keyboard_key': args.keyboard_key,
'model_path': args.model_path,
'CHUNK': args.CHUNK,
'FORMAT': pyaudio.paInt16 if args.FORMAT == 16 else pyaudio.paInt32,
'CHANNELS': args.CHANNELS,
'RATE': args.RATE,
'filename': args.filename,
}
asr_args = {
'model_path': args.model_path,
'RATE': args.RATE,
'debug': args.debug,
}
video_args = {
'device': args.device,
'width': args.width,
'height': args.height,
}
emo_args = {
'emo_dir': args.emo_dir,
}
log_args = {
'log_file': args.log_file,
'log_level': args.log_level,
}
web_request_mp_manager = WebRequestMPManager(
server_args=server_args,
audio_args=audio_args,
recorder_args=recorder_args,
asr_args=asr_args,
video_args=video_args,
emo_args=emo_args,
log_args=log_args)
web_request_mp_manager.process_init()