# basic import time import json import random from collections import deque # log import logging import warnings # multiprocessing import queue import threading import multiprocessing # web request import requests import pyaudio class WebRequestMPManager: def __init__(self, server_args, audio_args, recorder_args, asr_args, video_args, emo_args, log_args): # server_args self.server_args = server_args # audio_args self.record_CHUNK_SIZE = audio_args['record_CHUNK_SIZE'] self.voice_trigger = audio_args['voice_trigger'] self.keywords = audio_args['keywords'] # recorder_args self.recorder_args = recorder_args # asr_args self.asr_args = asr_args # video_args self.video_args = video_args # emo_args self.emo_args = emo_args # log_args self.log_args = log_args # TODO: 设计多进程log queue self.logger_init() def logger_init(self): # log_args log_level = self.log_args['log_level'] log_file = self.log_args['log_file'] if log_level == 'debug': log_level = logging.DEBUG elif log_level == 'info': log_level = logging.INFO # logger self.logger = logging.getLogger('mylogger') self.logger.setLevel(log_level) # handler 创建一个handler,用于写入日志文件 handler = logging.FileHandler(log_file) handler.setLevel(log_level) # stream handler 创建一个handler,用于输出到控制台 console = logging.StreamHandler() console.setLevel(logging.INFO) # 定义handler的输出格式(formatter) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) console.setFormatter(formatter) # 添加handler self.logger.addHandler(handler) self.logger.addHandler(console) self.logger.info("Logger started.") def process_init(self): # multiprocessing manager = multiprocessing.Manager() self.trigger_queue = manager.Queue() self.client_queue = manager.Queue() self.audio_queue = manager.Queue() self.audio_play_queue = manager.Queue() self.emo_display_queue = manager.Queue() processes = [ multiprocessing.Process(target=self.audio_process, args=(self.logger,self.voice_trigger,self.trigger_queue,self.client_queue)), # multiprocessing.Process(target=self.camera_process, args=(self.trigger_queue,self.client_queue)), # multiprocessing.Process(target=self.local_client_process, args=(self.logger,self.client_queue,self.audio_play_queue,self.emo_display_queue)), # multiprocessing.Process(target=self.audio_play_process, args=(self.logger,self.audio_play_queue,)), # multiprocessing.Process(target=self.emo_display_process, args=(self.logger,self.emo_display_queue,)), ] for process in processes: process.start() for process in processes: process.join() def audio_process(self, logger, voice_trigger, trigger_queue, client_queue): """audio_process Args: voice_trigger: bool, whether to use voice trigger trigger_queue: multiprocessing.Queue, trigger queue client_queue: multiprocessing.Queue, client queue """ # from takway.audio_utils import Recorder from takway.audio_utils import VADRecorder recorder = VADRecorder( **self.recorder_args, ) # two threads for hardware and voice trigger # shared data struct: self.shared_waiting = False self.shared_hd_trigger = False self.shared_kw_trigger = False self.shared_lock = threading.Lock() self.shared_data_lock = threading.Lock() self.shared_audio_data = None # vad self.shared_vad_data = None self.shared_vad_lock = threading.Lock() # stt # event self.record_event = threading.Event() self.vad_event = threading.Event() self.stt_event = threading.Event() self._debug_count = 0 ''' shared_waiting: 控制所有线程的待机状态,True表示待机,False表示工作 shared_hd_trigger: 控制硬件触发器的状态,True表示触发,False表示未触发 shared_kw_trigger: 控制语音触发器的状态,True表示触发,False表示未触发 share_audio_data: 共享音频数据,用于存储从麦克风采集的音频数据 ''' # create threads threads = [threading.Thread(target=self.hardware_trigger_thread, args=(recorder,))] if self.voice_trigger: vioce_threads = [ threading.Thread(target=self.voice_record_thread, args=(recorder,)), # threading.Thread(target=self.vad_thread, args=(recorder,)), threading.Thread(target=self.stt_thread, args=(recorder,)), ] threads.extend(vioce_threads) for thread in threads: thread.start() # self.logger.info("Audio Process started.") while True: ''' # Warning: 一定要加延时!!!否则会有bug!!! time.sleep(0.001) if (self.shared_hd_trigger or self.shared_kw_trigger): # print(f"self.shared_hd_trigger: {self.shared_hd_trigger}, self.shared_kw_trigger: {self.shared_kw_trigger}") audio_data = self.shared_audio_data trigger_queue.put(('trgrigger_status', True)) client_queue.put(('audio', audio_data)) self.shared_lock.acquire() # 加锁 self.shared_hd_trigger = False self.shared_kw_trigger = False self.shared_audio_data = None self.shared_waiting = False self.shared_lock.release() # 释放锁 ''' self.record_event.wait() # 等待record线程被唤醒 trigger_queue.put(('trgrigger_status', True)) client_queue.put(('audio', self.shared_audio_data)) # print(f"send audio data to client"); exit() def hardware_trigger_thread(self, recorder): """hardware_trigger_thread Args: recorder: takway.audio_utils.Recorder, recorder object """ self.logger.info("Hardware trigger thread started.") trgrigger_status = False while True: time.sleep(0.2) if self.shared_waiting: continue trgrigger_status = recorder.get_hardware_trigger_status() if trgrigger_status: self.shared_lock.acquire() self.shared_waiting = True # shared_waiting 控制所有线程的待机状态,True表示待机,False表示工作 self.shared_hd_trigger = True # share_hd_trigger 控制硬件触发器的状态,True表示触发,False表示未触发 self.shared_lock.release() # record microphone data audio_data = recorder.record_hardware() self.shared_data_lock.acquire() self.shared_audio_data = audio_data # shared_audio_data 共享音频数据,用于存储从麦克风采集的音频数据 self.shared_data_lock.release() self.record_event.set() # 唤醒record线程 else: self.shared_lock.acquire() self.shared_waiting = False # 释放 self.shared_lock.release() def voice_record_thread(self, recorder, keywords=['你好']): """voice_record_thread Args: recorder: takway.audio_utils.Recorder, recorder object """ self.logger.info("voice record thread started.") while True: if self.shared_waiting: time.sleep(0.01) continue frames = [] # status buffer is_currently_speaking = False buffer_size = recorder.vad_buffer_size # buffer_size = 6 active_buffer = deque([False for i in range(buffer_size-1)]+[True], maxlen=buffer_size) audio_buffer = deque(maxlen=buffer_size) silence_buffer = deque([True for i in range(buffer_size)]+[False], maxlen=buffer_size) while True: data = recorder.record_chunk_voice( CHUNK=recorder.vad_chunk_size, return_type=None, exception_on_overflow=False) if data is None: continue t1 = time.time() # print(f"VAD is_speech: {recorder.is_speech(data)}") # print(f"VAD cost: {(time.time() - t1)/1000} ms") if recorder.is_speech(data): # 标志位buffer active_buffer.append(True); active_buffer.popleft() silence_buffer.append(False); silence_buffer.popleft() # 暂时增加到buffer中 audio_buffer.append(data) # 如果满足检测要求 if all(active_buffer): if not is_currently_speaking: print("Speech start detected") is_currently_speaking = True frames.extend(audio_buffer) # 把说话的buffer也加上 if is_currently_speaking: frames.append(data) else: # 标志位buffer # active_buffer.append(False); active_buffer.popleft() silence_buffer.append(True); silence_buffer.popleft() if all(silence_buffer): # 检测到人声并持续录音 if is_currently_speaking: # 结束标志位 print("Speech end detected") # print("frames length: ", len(frames)) self.shared_vad_lock.acquire() self.shared_vad_data = frames self.shared_vad_lock.release() self.stt_event.set() # 唤醒stt线程 print("Wake stt thread") break else: frames = [] ''' # print(f"audio_data: {len(audio_data)}") self.shared_lock.acquire() self.shared_audio_data = audio_data self.shared_lock.release() self.vad_event.set() # 唤醒vad线程 ''' ''' def vad_thread(self, recorder): self.logger.info("VAD thread started.") while True: frames = [] # status buffer is_currently_speaking = False buffer_size = recorder.vad_buffer_size active_buffer = deque([False for i in range(buffer_size)], maxlen=buffer_size) audio_buffer = deque(maxlen=buffer_size) silence_buffer = deque([True for i in range(buffer_size)], maxlen=buffer_size) while True: self.vad_event.wait() # 等待vad线程被唤醒 data = self.shared_audio_data if data is None: continue t1 = time.time() print(f"VAD is_speech: {recorder.is_speech(data)}") print(f"VAD cost: {(time.time() - t1)/1000} ms") if recorder.is_speech(data): # 标志位buffer active_buffer.append(True); active_buffer.popleft() silence_buffer.append(False); silence_buffer.popleft() # 暂时增加到buffer中 audio_buffer.append(data) # 如果满足检测要求 if all(active_buffer): if not is_currently_speaking: print("Speech start detected") is_currently_speaking = True frames.extend(audio_buffer) # 把说话的buffer也加上 if is_currently_speaking: frames.append(data) else: # 标志位buffer active_buffer.append(False); active_buffer.popleft() silence_buffer.append(True); silence_buffer.popleft() # 检测到人声并持续录音 if is_currently_speaking: # 结束标志位 if all(silence_buffer): print("Speech end detected") # print("frames length: ", len(frames)) self.shared_vad_lock.acquire() self.shared_vad_data = frames self.shared_vad_lock.release() self.stt_event.set() # 唤醒stt线程 break ''' def stt_thread(self, recorder): """stt_thread Args: recorder: takway.audio_utils.Recorder, recorder object """ self.logger.info("STT thread started.") from takway.vosk_utils import AutoSpeechRecognizer asr = AutoSpeechRecognizer(**self.asr_args) asr.add_keyword(self.keywords) kw_trgrigger_status = False while True: self.stt_event.wait() # 等待stt线程被唤醒 print("STT thread start") data = self.shared_vad_data if data is None: continue print("Start to Recongnize key words") kw_trgrigger_status = asr.recognize_keywords(data, partial_size=512) print("Finish to Recongnize key words") if kw_trgrigger_status: self.shared_lock.acquire() self.shared_kw_trigger = True # share_kw_trigger 语音关键词触发器的状态,True表示触发,False表示未触发 self.shared_lock.release() self.record_event.set() # 唤醒record线程 kw_trgrigger_status = False # print(f"Got keyword trigger"); exit() def camera_process(self, logger, trigger_queue, client_queue): from takway.cam_utils import Camera cam = Camera(self.video_args) while True: if trigger_queue.empty(): time.sleep(0.5) else: item = trigger_queue.get() if item[0] == 'trgrigger_status' and item[1]: _, frame = cap.read() client_queue.put(('image', frame)) def local_client_process(self, logger, client_queue,audio_play_queue,emo_display_queue): from takway.client_utils import Client client = Client(**self.server_args) # print("Local client process started.") self.logger.info("Local client process started.") image = None; audio = None chat_status = 'init' while True: if client_queue.empty(): time.sleep(0.2) else: item = client_queue.get() # print(f"Get item: {item[0]}") if item[0] == 'image': # TODO: analyise image and send text to server image = None if item[0] == 'audio': audio = item[1] print("get audio data.") emo_display_queue.put(('emo_data', 'happy')) ''' # 发送数据到服务器 response = client.send_data_to_server( text=None, audio_data=audio, image_data=None, chat_status=chat_status) print("get response from server.") chat_status = 'chating' print(f"response: {response}") audio_play_queue.put(('audio', response)) ''' image = None; audio = None def audio_play_process(self, logger, audio_play_queue): from takway.audio_utils import AudioPlayer audio_player = AudioPlayer() self.logger.info("Audio play process started.") while True: if audio_play_queue.empty(): time.sleep(0.2) else: item = audio_play_queue.get() if item[0] == 'server_data': # 播放音频 print("Playing audio...") server_data = item[1] audio_player.play(server_data['audio_base64'], audio_type='base64') def emo_display_process(self, logger, emo_display_queue): from takway.emo_utils import EmoVideoPlayer emo_player = EmoVideoPlayer(**self.emo_args) self.logger.info("Emo display process started.") # logger.info("Emo display process started.") # print("Emo display process started.") while True: if emo_display_queue.empty(): time.sleep(0.2) seed = random.randrange(0, 1000) print(f"seed: {seed}") if seed < 100: # emo_player.display_emo_opencv(emo_name='静态', stage='seldom_wink') emo_player.display_emo_maixsense(emo_name='静态', stage='seldom_wink') else: item = emo_display_queue.get() print(f"Emo display process Get item: {item[0]}") if item[0] == 'emo_data': server_data = item[1] print("Displaying emo...") # emo_player.display_emo_opencv(emo_name='静态', stage='seldom_wink') # emo_player.display_emo_opencv(emo_name='静态', stage='quick_wink') emo_player.display_emo_maixsense(emo_name='静态', stage='seldom_wink') emo_player.display_emo_maixsense(emo_name='静态', stage='quick_wink') ''' def display_process(q): print("Display process started.") while True: item = q.get() if item[0] == 'server_data': server_data = item[1] # 显示图像和文本 # print("Displaying image and text:", item[1]['image'], item[1]['text']) print("Displaying image and text:") # 这里可以加上实际的显示图像和文本的代码 if item[0] == 'image': # 显示图像和文本 cv2.imshow('image', item[1]) cv2.waitKey(1) ''' if __name__ == '__main__': try: import gpiod as gpio model_path="vosk-model-small-cn-0.22" emo_dir="ResizedEmoji" except: model_path=r"G:\WorkSpace\CodeWorkspace\GPT_projects\vits_project\vits-uma-genshin-honkai\vosk-model-small-cn-0.22" emo_dir=r"G:\WorkSpace\CodeWorkspace\GPT_projects\vits_project\vits-uma-genshin-honkai\ResizedEmoji" import argparse parser = argparse.ArgumentParser() # server params parser.add_argument('--server_url', type=str, default='http://127.0.0.1:5000/process_all', help='Server url') # audio paramters parser.add_argument('--voice_trigger', type=bool, default=True, help='Voice trigger') parser.add_argument('--record_CHUNK_SIZE', type=int, default=8000, help='Record chunk size') parser.add_argument('--keywords', type=list, default=['你好'], help='Voice trigger keywords') # recorder paramters parser.add_argument('--hd_trigger', type=str, default='keyboard', help='Hardware trigger') parser.add_argument('--keyboard_key', type=str, default='space', help='Keyboard key') parser.add_argument('--CHUNK', type=int, default=2048, help='Record chunk size') parser.add_argument('--RATE', type=int, default=8000, help='Audio rate') parser.add_argument('--FORMAT', type=int, default=16, help='Audio format') parser.add_argument('--CHANNELS', type=int, default=1, help='Audio channels') parser.add_argument('--filename', type=str, default=None, help='Audio file name') # ASR paramters # model_path="vosk-model-small-cn-0.22" # model_path=r"G:\WorkSpace\CodeWorkspace\GPT_projects\vits_project\vits-uma-genshin-honkai\vosk-model-small-cn-0.22" parser.add_argument('--model_path', type=str, default=model_path, help='Vosk model path') # video paramters parser.add_argument('--device', type=str, default='pc', help='Video device') parser.add_argument('--width', type=int, default=1280, help='Video width') parser.add_argument('--height', type=int, default=720, help='Video height') # emo paramters # emo_dir="ResizedEmoji" # emo_dir=r"G:\WorkSpace\CodeWorkspace\GPT_projects\vits_project\vits-uma-genshin-honkai\ResizedEmoji" parser.add_argument('--emo_dir', type=str, default=emo_dir, help='Emo dir') # log paramters parser.add_argument('--log_file', type=str, default='my.log', help='Log file') parser.add_argument('--log_level', type=str, default='INFO', help='Log level') parser.add_argument('--debug', type=bool, default=True, help='Debug mode') args = parser.parse_args() # sort out args and params server_args = { 'server_url': args.server_url, } audio_args = { 'voice_trigger': args.voice_trigger, 'keywords': args.keywords, 'record_CHUNK_SIZE': args.record_CHUNK_SIZE, } recorder_args = { 'hd_trigger': args.hd_trigger, 'keyboard_key': args.keyboard_key, 'model_path': args.model_path, 'CHUNK': args.CHUNK, 'FORMAT': pyaudio.paInt16 if args.FORMAT == 16 else pyaudio.paInt32, 'CHANNELS': args.CHANNELS, 'RATE': args.RATE, 'filename': args.filename, } asr_args = { 'model_path': args.model_path, 'RATE': args.RATE, 'debug': args.debug, } video_args = { 'device': args.device, 'width': args.width, 'height': args.height, } emo_args = { 'emo_dir': args.emo_dir, } log_args = { 'log_file': args.log_file, 'log_level': args.log_level, } web_request_mp_manager = WebRequestMPManager( server_args=server_args, audio_args=audio_args, recorder_args=recorder_args, asr_args=asr_args, video_args=video_args, emo_args=emo_args, log_args=log_args) web_request_mp_manager.process_init()