# basic import io import os import sys import time import json import random from collections import deque from datetime import datetime # log import logging import warnings # multiprocessing import queue import threading import multiprocessing # web request import requests import pyaudio # hot words detection import pvporcupine from takway.apps.data_struct import QueueIterator from takway.common_utils import * from takway.audio_utils import PicovoiceRecorder, HDRecorder from takway.clients.client_utils import BaseWebSocketClient from takway.audio_utils import AudioPlayer class WebSocketClinet: def __init__(self, board, server_args, recorder_args, player_args, log_args, excute_args=None, ): self.board = board # server_args self.server_args = server_args # self.recorder_args self.recorder_args = recorder_args # player_args self.player_args = player_args # excute_args self.excute_args = excute_args # log_args self.log_args = log_args def process_init(self): # multiprocessing manager = multiprocessing.Manager() self.trigger_queue = manager.Queue() self.client_queue = manager.Queue() self.audio_play_queue = manager.Queue() self.excute_queue = manager.Queue() # 唤醒事件 self.wakeup_event = manager.Event() self.sleep_event = manager.Event() # 打断事件 # self.interrupt_event = manager.Event() # 监听事件 self.listening_event = manager.Event() # 播放事件 self.speaking_event = manager.Event() processes = [ multiprocessing.Process(target=self.audio_process), multiprocessing.Process(target=self.web_socket_client_process), multiprocessing.Process(target=self.audio_play_process), ] if self.excute_args.get('enable', False): processes.append( multiprocessing.Process(target=self.excute_process), ) for process in processes: time.sleep(0.5) # time.sleep(2) process.start() for process in processes: process.join() def audio_process(self): """audio_process Args: trigger_queue: multiprocessing.Queue, trigger queue client_queue: multiprocessing.Queue, client queue """ min_stream_record_time = self.recorder_args.pop('min_stream_record_time') voice_trigger = self.recorder_args.pop('voice_trigger') max_slience_time = self.recorder_args.pop('max_slience_time') if voice_trigger: recorder = PicovoiceRecorder(**self.recorder_args) else: voice_keys = ['access_key', 'keywords', 'keyword_paths', 'model_path','sensitivities', 'library_path'] for key in voice_keys: self.recorder_args.pop(key) recorder = HDRecorder(**self.recorder_args) recorder.min_stream_record_time = min_stream_record_time recorder.max_slience_time = max_slience_time # self.hardware = recorder.hardware # self.long_power_status = recorder.hardware.long_power_status print(f"recorder.min_act_time: {recorder.min_act_time}") print(f"recorder.max_slience_time: {recorder.max_slience_time}") print("Audio Process started.") # create threads threads = [threading.Thread(target=self.hardware_trigger_thread, args=(recorder,))] if voice_trigger: vioce_threads = [ threading.Thread(target=self.voice_trigger_thread, args=(recorder,)), ] threads.extend(vioce_threads) for thread in threads: thread.start() print("Audio Process started.") while True: for thread in threads: thread.join() print(f"audio process exit") ; exit() def hardware_trigger_thread(self, recorder): """hardware_trigger_thread Args: recorder: takway.audio_utils.Recorder, recorder object """ print("Hardware trigger thread started.") # last_short_power_status = False board = self.board while True: # 开关按键被按下 if recorder.hardware.long_power_status: self.wakeup_event.set() ''' # 短时按键被按下,打断 if recorder.hardware.short_power_status and not last_short_power_status: # self.interrupt_event.set() recorder.hardware.short_power_status = False # print("Interrupt conversation.") last_short_power_status = recorder.hardware.short_power_status ''' else: self.wakeup_event.clear() time.sleep(0.01) def voice_trigger_thread(self, recorder): """voice_trigger_thread Args: recorder: takway.audio_utils.Recorder, recorder object """ board = self.board print("Waiting for wake up...") while True: data = recorder.record_chunk_voice( CHUNK=recorder.porcupine.frame_length, return_type=None, exception_on_overflow=False, queue=None) record_chunk_size = recorder.vad_chunk_size # 开关按键被按下或被关键词唤醒 if self.wakeup_event.is_set(): print(f"{datetime.now()}: Wake up by button.") else: if self.sleep_event.is_set(): print(f"{datetime.now()}: stay sleep mode.") recorder.hardware.long_power_status = False self.sleep_event.clear() continue if recorder.is_wakeup(data): recorder.hardware.long_power_status = True self.wakeup_event.set() print(f"{datetime.now()}: wake up by voice.") else: recorder.hardware.long_power_status = False continue # 设置为倾听模式 self.listening_event.set() self.speaking_event.clear() # wake up is_bgn = True is_end = False frames = [] # status buffer single_chat_finish = False slience_bgn_t = time.time() slience_time = 0 print("Start recording...") # 准备对话状态 while True: if self.sleep_event.is_set(): print(f"{datetime.now()}: sleep mode.") recorder.hardware.long_power_status = False self.sleep_event.clear() break if not self.wakeup_event.is_set(): break if self.listening_event.is_set(): # 语音活动检测 data = recorder.record_chunk_voice( CHUNK=record_chunk_size, return_type=None, exception_on_overflow=False) is_speech = recorder.is_speech(data) # 判断状态 if is_speech: # print(f"{datetime.now()}: valid voice") slience_bgn_t = time.time() frames.append(data) slience_time = time.time() - slience_bgn_t # 长时沉默关闭唤醒状态:如果唤醒后超过一定时间没有说话/关闭按键被按下,则认为是结束 if slience_time > recorder.max_slience_time: recorder.hardware.long_power_status = False self.wakeup_event.clear() break # 短时沉默结束单次对话:沉默时间超过一定时间段(0.5s左右),则发送数据 if slience_time > recorder.min_act_time: single_chat_finish = True if single_chat_finish: is_end = True is_bgn = False # print(f"{datetime.now()}: slience_time: {slience_time}") # 流式发送数据 stream_reset_status = self.stream_record_process( bytes_frames=recorder.write_wave_bytes(frames), frames_size=len(frames), record_chunk_size=record_chunk_size, sample_rate=recorder.RATE, min_stream_record_time=recorder.min_stream_record_time, is_bgn=is_bgn, is_end=is_end) # print(f"stream_reset_status: {stream_reset_status}, is_bgn: {is_bgn}, is_end: {is_end}, frame: {len(frames)}") if stream_reset_status: frames.clear() is_bgn = False if single_chat_finish: is_bgn = True is_end = False single_chat_finish = False print(f"{datetime.now()}: single conversation finish, reset frames.") elif self.speaking_event.is_set(): print(f"{datetime.now()}: wait for speaking finished and listening started or sleep mode.") if board == 'orangepi': recorder.hardware.set_led_on("blue") while self.wakeup_event.is_set(): # 睡眠 or 监听状态 if self.sleep_event.is_set() or self.listening_event.is_set(): break # if self.interrupt_event.is_set(): # print(f"{datetime.now()}: button interrupt (input).") # break if board == 'orangepi': recorder.hardware.set_led_off("blue") # 重新计时 slience_bgn_t = time.time() print(f"{datetime.now()}: restart listening.") def stream_record_process(self, bytes_frames: bytes, frames_size: int, record_chunk_size: int, sample_rate: int, min_stream_record_time: int, is_bgn: bool, is_end: bool): ''' Args: bytes_frames: bytes, audio data frames_size: int, audio data size record_chunk_size: int, audio data chunk size is_bgn: bool, is begin of stream is_end: bool, is end of stream Returns: bool, if stream reset status ''' if len(bytes_frames) == 0: return False if frames_size*record_chunk_size >= min_stream_record_time*sample_rate or is_end: if is_bgn and is_end: return False stream_data = dict( frames=bytes_frames, frames_size=frames_size, chunk_size=record_chunk_size, is_bgn=is_bgn, is_end=is_end) self.client_queue.put(('audio', stream_data)) if is_end: # print("put None to client queue.") self.client_queue.put(None) return True else: return False def web_socket_client_process(self): client = BaseWebSocketClient(self.server_args['server_url'], self.server_args['session_id']) print("Web socket client process started.") # print("Web socket client process started.") while True: if self.client_queue.empty(): continue print(f"{datetime.now()}: start setup web socket connection.") # 第一级:唤醒状态下,连接服务器 if self.wakeup_event.is_set(): client.wakeup_client() else: print(f"not wake up, skip setup web socket connection.") self.client_queue.get(block=False) continue # 播放状态下,不连接服务器 if self.speaking_event.is_set(): print(f"speaking, skip setup web socket connection.") self.client_queue.get(block=False) continue # 发送数据 for queue_data in QueueIterator(self.client_queue): # 发送音频数据 if queue_data[0] == 'audio': # 当唤醒状态被关闭时,退出循环 if not self.wakeup_event.is_set(): self.listening_event.clear() self.speaking_event.clear() client.close_client() break # 播放时不得发送数据,默认废消息 if self.speaking_event.is_set() and not self.listening_event.is_set(): client.close_client() break audio_dict = queue_data[1] try: client.send_per_data( audio=audio_dict['frames'], stream=True, voice_synthesize=True, is_end=audio_dict['is_end'], encoding='base64', ) except BrokenPipeError: print(f"{datetime.now()}: web socket connection broken, skip send data.") self.listening_event.set() self.speaking_event.clear() client.close_client() break # is_end后切换播放模式 if audio_dict['is_end']: self.listening_event.clear() self.speaking_event.set() print(f"{datetime.now()}: Audio data send finished, switch to speaking mode.") if not self.wakeup_event.is_set(): continue if self.listening_event.is_set(): print(f"{datetime.now()}: listening or speaking, skip setup web socket connection.") continue # 接收数据 while True: # 当唤醒状态被关闭时,退出循环 if not self.wakeup_event.is_set(): self.listening_event.clear() self.speaking_event.clear() client.close_client() break response, data_type = client.receive_per_data() if data_type == dict: print(f"{datetime.now()}: receive json data: {response}") # 打印接收到的消息 # 5xx: 结束错误码处理 if response['code'] in [501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545]: # 恢复录音状态 self.listening_event.set() self.speaking_event.clear() break # 200: 正常结束 elif response['code'] == 200: self.audio_play_queue.put(('audio_json', response)) if response['type'] == 'close': break # 201: 切换沉默模式 elif response['code'] == 201: self.listening_event.clear() self.speaking_event.clear() self.sleep_event.set() # 沉默状态下,关闭唤醒状态 self.wakeup_event.clear() elif data_type == list: print(f"{datetime.now()}: receive text_audio_data") # 切换播放模式 self.listening_event.clear() self.speaking_event.set() self.audio_play_queue.put(('text_audio_data', response)) elif data_type == None: print(f"{datetime.now()}: receive None data, break loop.") # 切换录音模式 self.listening_event.set() self.speaking_event.clear() # print(f"listening_event: {self.listening_event.is_set()}, speaking_event: {self.speaking_event.is_set()}") break # 如果没有接收到消息,则退出循环 client.close_client() print(f"{datetime.now()}: web socket client closed.") def audio_play_process(self): ''' Args: audio_play_queue: multiprocessing.Queue, audio play queue share_time_dict: multiprocessing.Manager.dict, shared time dict ''' audio_player = AudioPlayer(**self.player_args) print("Audio play process started.") audio_list = [] while True: print("wait for audio data.") item = self.audio_play_queue.get() # 唤醒状态 if not self.wakeup_event.is_set(): continue if item[0] == 'text_audio_data': # TODO: 判断bytes是否是最后一个,如果是最后一个,则播放完毕,切换监听模式 audio_text, audio_data = item[1] print(f"{datetime.now()}: start playing audio.") print(f"{datetime.now()}: audio text: {audio_text}") if self.listening_event.is_set(): continue if not self.speaking_event.is_set(): continue audio_list.append(audio_data) # 播放音频 try: tts_audio = audio_list[0] # 取出第一个音频 tts_audio = audio_player.check_audio_type(tts_audio, return_type=None) for i in range(0, len(tts_audio), audio_player.CHUNK): audio_player.stream.write(tts_audio[i:i+audio_player.CHUNK]) # print("Playing {} data...{}/{}".format(item[0], i, len(tts_audio))) # 关闭状态 if not self.wakeup_event.is_set(): print(f"{datetime.now()}: microphone and speaker close.") self.listening_event.clear() self.speaking_event.clear() self.sleep_event.set() # 沉默状态下,关闭唤醒状态 break # # 按键打断播放 # if self.interrupt_event.is_set(): # print(f"{datetime.now()}: button interrupt (output).") # time.sleep(0.3) # 虽然程序到这里,但是扬声器不一定播放完毕,延迟用来避免问题 # while not self.audio_play_queue.empty(): # 清空队列 # self.audio_play_queue.get() # self.listening_event.set() # self.speaking_event.clear() # self.interrupt_event.clear() # break if self.speaking_event.is_set(): # 播放最后一段音频 audio_player.stream.write(tts_audio[i+audio_player.CHUNK:]) audio_list = [] # 清空音频列表 # audio_list.pop(0) # 弹出已播放音频 print(f"{datetime.now()}: audio data played.") else: audio_list = [] # 清空音频列表 print(f"{datetime.now()}: audio data clear.") continue except TypeError as e: print(f"audio play error: {e}") continue elif item[0] == 'audio_json': data_type = item[1]['type'] print(f"data_type: {data_type}") if self.wakeup_event.is_set(): if data_type == 'text': if self.wakeup_event.is_set(): # 切换播放模式 self.listening_event.clear() self.speaking_event.set() if data_type in ['close', 'end']: # 虽然程序到这里,但是扬声器不一定播放完毕,延迟用来避免问题 time.sleep(0.8) if self.wakeup_event.is_set(): # 启动监听状态 self.speaking_event.clear() self.listening_event.set() """ def excute_process(self): ''' Args: excute_queue: multiprocessing.Queue, excute display queue ''' print("Excute process started.") while True: if self.excute_queue.empty(): continue if self.speaker_active_set.is_set(): instruct, content = self.excute_queue.get() print(f"Got speaker info: {instruct, content}") print(f"Playing {instruct} {content}...") print(f"play {instruct} time: {datetime.now()}") self.audio_play_queue.put((instruct, content)) self.speaker_active_set.clear() """