diff --git a/takway/board/orangepi.py b/takway/board/orangepi.py index 038210a..ab0d46f 100644 --- a/takway/board/orangepi.py +++ b/takway/board/orangepi.py @@ -76,16 +76,14 @@ class OrangePi(BaseHardware): elif button_status_1 and not last_status_1: if press_time_1: press_duration = (datetime.now() - press_time_1).total_seconds() - print(f"{datetime.now()}: press_duration: {press_duration}") - if press_duration > 1: + if press_duration > 0.01: self.long_power_status = not self.long_power_status - # print(f"{datetime.now()}: long_power_status: {self.long_power_status} {self.short_power_status}") elif press_duration > 5: time.sleep(1) # 防止短按误触发 subprocess.Popen('sudo shutdown now', shell=True) else: self.short_power_status = True - # print(f"{datetime.now()}: short_power_status: {self.short_power_status} {self.long_power_status}") + print(f"{datetime.now()}: press_duration: {press_duration}") print(f"{datetime.now()}: button_status: {button_status_1}, last_button_status: {last_status_1}, power_status: {self.long_power_status}, interrupt_status: {self.short_power_status}") press_time_1 = None # TODO: 如果button_status_1和上一帧都是True or False,则判断为无效按键,忽略 diff --git a/takway/clients/web_socket_client_utils.py b/takway/clients/web_socket_client_utils.py index c80a1d8..bfa0ce7 100644 --- a/takway/clients/web_socket_client_utils.py +++ b/takway/clients/web_socket_client_utils.py @@ -62,7 +62,7 @@ class WebSocketClinet: self.sleep_event = manager.Event() # 打断事件 - self.interrupt_event = manager.Event() + # self.interrupt_event = manager.Event() # 监听事件 self.listening_event = manager.Event() @@ -147,9 +147,9 @@ class WebSocketClinet: self.wakeup_event.set() # 短时按键被按下,打断 if recorder.hardware.short_power_status and not last_short_power_status: - self.interrupt_event.set() + # self.interrupt_event.set() recorder.hardware.short_power_status = False - print("Interrupt conversation.") + # print("Interrupt conversation.") last_short_power_status = recorder.hardware.short_power_status else: self.wakeup_event.clear() @@ -273,9 +273,9 @@ class WebSocketClinet: # 睡眠 or 监听状态 if self.sleep_event.is_set() or self.listening_event.is_set(): break - if self.interrupt_event.is_set(): - print(f"{datetime.now()}: button interrupt (input).") - break + # if self.interrupt_event.is_set(): + # print(f"{datetime.now()}: button interrupt (input).") + # break if board == 'orangepi': recorder.hardware.set_led_off("red") # 重新计时 @@ -388,10 +388,10 @@ class WebSocketClinet: client.close_client() break - if self.interrupt_event.is_set(): - self.listening_event.set() - self.speaking_event.clear() - break + # if self.interrupt_event.is_set(): + # self.listening_event.set() + # self.speaking_event.clear() + # break response, data_type = client.receive_per_data() if data_type == dict: @@ -482,18 +482,18 @@ class WebSocketClinet: self.speaking_event.clear() self.sleep_event.set() # 沉默状态下,关闭唤醒状态 break - # 按键打断播放 - if self.interrupt_event.is_set(): - print(f"{datetime.now()}: button interrupt (output).") + # # 按键打断播放 + # if self.interrupt_event.is_set(): + # print(f"{datetime.now()}: button interrupt (output).") - time.sleep(0.3) # 虽然程序到这里,但是扬声器不一定播放完毕,延迟用来避免问题 + # time.sleep(0.3) # 虽然程序到这里,但是扬声器不一定播放完毕,延迟用来避免问题 - while not self.audio_play_queue.empty(): # 清空队列 - self.audio_play_queue.get() - self.listening_event.set() - self.speaking_event.clear() - self.interrupt_event.clear() - break + # while not self.audio_play_queue.empty(): # 清空队列 + # self.audio_play_queue.get() + # self.listening_event.set() + # self.speaking_event.clear() + # self.interrupt_event.clear() + # break if self.speaking_event.is_set(): # 播放最后一段音频 diff --git a/takway/clients/web_socket_client_utils_v2.py b/takway/clients/web_socket_client_utils_v2.py new file mode 100644 index 0000000..c233954 --- /dev/null +++ b/takway/clients/web_socket_client_utils_v2.py @@ -0,0 +1,557 @@ +# ############################################### # +# 具备短按打断,长按切换唤醒问题 +# ############################################### # +# basic +import io +import os +import sys +import time +import json +import random +from collections import deque +from datetime import datetime +# log +import logging +import warnings +# multiprocessing +import queue +import threading +import multiprocessing +# web request +import requests +import pyaudio +# hot words detection +import pvporcupine + +from takway.apps.data_struct import QueueIterator +from takway.common_utils import * +from takway.audio_utils import PicovoiceRecorder, HDRecorder +from takway.clients.client_utils import BaseWebSocketClient +from takway.audio_utils import AudioPlayer + + +class WebSocketClinet: + def __init__(self, + board, + server_args, + recorder_args, + player_args, + log_args, + excute_args=None, + ): + self.board = board + # server_args + self.server_args = server_args + # self.recorder_args + self.recorder_args = recorder_args + # player_args + self.player_args = player_args + # excute_args + self.excute_args = excute_args + # log_args + self.log_args = log_args + + + def process_init(self): + # multiprocessing + manager = multiprocessing.Manager() + self.trigger_queue = manager.Queue() + self.client_queue = manager.Queue() + self.audio_play_queue = manager.Queue() + self.excute_queue = manager.Queue() + + # 唤醒事件 + self.wakeup_event = manager.Event() + self.sleep_event = manager.Event() + + # 打断事件 + self.interrupt_event = manager.Event() + + # 监听事件 + self.listening_event = manager.Event() + # 播放事件 + self.speaking_event = manager.Event() + + processes = [ + multiprocessing.Process(target=self.audio_process), + multiprocessing.Process(target=self.web_socket_client_process), + multiprocessing.Process(target=self.audio_play_process), + ] + if self.excute_args.get('enable', False): + processes.append( + multiprocessing.Process(target=self.excute_process), + ) + + for process in processes: + time.sleep(0.5) + # time.sleep(2) + process.start() + for process in processes: + process.join() + + def audio_process(self): + """audio_process + + Args: + trigger_queue: multiprocessing.Queue, trigger queue + client_queue: multiprocessing.Queue, client queue + """ + min_stream_record_time = self.recorder_args.pop('min_stream_record_time') + voice_trigger = self.recorder_args.pop('voice_trigger') + max_slience_time = self.recorder_args.pop('max_slience_time') + if voice_trigger: + recorder = PicovoiceRecorder(**self.recorder_args) + else: + voice_keys = ['access_key', 'keywords', 'keyword_paths', 'model_path','sensitivities', 'library_path'] + for key in voice_keys: + self.recorder_args.pop(key) + recorder = HDRecorder(**self.recorder_args) + recorder.min_stream_record_time = min_stream_record_time + recorder.max_slience_time = max_slience_time + + # self.hardware = recorder.hardware + # self.long_power_status = recorder.hardware.long_power_status + + print(f"recorder.min_act_time: {recorder.min_act_time}") + print(f"recorder.max_slience_time: {recorder.max_slience_time}") + + print("Audio Process started.") + + # create threads + threads = [threading.Thread(target=self.hardware_trigger_thread, args=(recorder,))] + if voice_trigger: + vioce_threads = [ + threading.Thread(target=self.voice_trigger_thread, args=(recorder,)), + ] + threads.extend(vioce_threads) + for thread in threads: + thread.start() + print("Audio Process started.") + + while True: + for thread in threads: + thread.join() + print(f"audio process exit") ; exit() + + + def hardware_trigger_thread(self, recorder): + """hardware_trigger_thread + + Args: + recorder: takway.audio_utils.Recorder, recorder object + """ + print("Hardware trigger thread started.") + last_short_power_status = False + + board = self.board + while True: + # 开关按键被按下 + if recorder.hardware.long_power_status: + self.wakeup_event.set() + # 短时按键被按下,打断 + if recorder.hardware.short_power_status and not last_short_power_status: + self.interrupt_event.set() + recorder.hardware.short_power_status = False + print("Interrupt conversation.") + last_short_power_status = recorder.hardware.short_power_status + else: + self.wakeup_event.clear() + time.sleep(0.01) + + def voice_trigger_thread(self, recorder): + """voice_trigger_thread + + Args: + recorder: takway.audio_utils.Recorder, recorder object + """ + board = self.board + print("Waiting for wake up...") + + while True: + data = recorder.record_chunk_voice( + CHUNK=recorder.porcupine.frame_length, + return_type=None, + exception_on_overflow=False, + queue=None) + + record_chunk_size = recorder.vad_chunk_size + + # 开关按键被按下或被关键词唤醒 + if self.wakeup_event.is_set(): + print(f"{datetime.now()}: Wake up by button.") + else: + if self.sleep_event.is_set(): + print(f"{datetime.now()}: stay sleep mode.") + recorder.hardware.long_power_status = False + self.sleep_event.clear() + continue + if recorder.is_wakeup(data): + recorder.hardware.long_power_status = True + self.wakeup_event.set() + print(f"{datetime.now()}: wake up by voice.") + else: + recorder.hardware.long_power_status = False + continue + + # 设置为倾听模式 + self.listening_event.set() + self.speaking_event.clear() + + # wake up + is_bgn = True + is_end = False + frames = [] + # status buffer + single_chat_finish = False + slience_bgn_t = time.time() + slience_time = 0 + print("Start recording...") + # 准备对话状态 + while True: + if self.sleep_event.is_set(): + print(f"{datetime.now()}: sleep mode.") + recorder.hardware.long_power_status = False + self.sleep_event.clear() + break + + if not self.wakeup_event.is_set(): + break + + if self.listening_event.is_set(): + # 语音活动检测 + data = recorder.record_chunk_voice( + CHUNK=record_chunk_size, + return_type=None, + exception_on_overflow=False) + is_speech = recorder.is_speech(data) + + # 判断状态 + if is_speech: + # print(f"{datetime.now()}: valid voice") + slience_bgn_t = time.time() + frames.append(data) + + slience_time = time.time() - slience_bgn_t + + # 长时沉默关闭唤醒状态:如果唤醒后超过一定时间没有说话/关闭按键被按下,则认为是结束 + if slience_time > recorder.max_slience_time: + recorder.hardware.long_power_status = False + self.wakeup_event.clear() + break + + # 短时沉默结束单次对话:沉默时间超过一定时间段(0.5s左右),则发送数据 + if slience_time > recorder.min_act_time: + single_chat_finish = True + + if single_chat_finish: + is_end = True + is_bgn = False + + # print(f"{datetime.now()}: slience_time: {slience_time}") + + # 流式发送数据 + stream_reset_status = self.stream_record_process( + bytes_frames=recorder.write_wave_bytes(frames), + frames_size=len(frames), + record_chunk_size=record_chunk_size, + sample_rate=recorder.RATE, + min_stream_record_time=recorder.min_stream_record_time, + is_bgn=is_bgn, + is_end=is_end) + # print(f"stream_reset_status: {stream_reset_status}, is_bgn: {is_bgn}, is_end: {is_end}, frame: {len(frames)}") + if stream_reset_status: + frames.clear() + is_bgn = False + if single_chat_finish: + is_bgn = True + is_end = False + single_chat_finish = False + print(f"{datetime.now()}: single conversation finish, reset frames.") + + elif self.speaking_event.is_set(): + print(f"{datetime.now()}: wait for speaking close and listening start or sleep mode.") + if board == 'orangepi': + recorder.hardware.set_led_on("red") + while True: + # 睡眠 or 监听状态 + if self.sleep_event.is_set() or self.listening_event.is_set(): + break + if self.interrupt_event.is_set(): + print(f"{datetime.now()}: button interrupt (input).") + break + if board == 'orangepi': + recorder.hardware.set_led_off("red") + # 重新计时 + slience_bgn_t = time.time() + print(f"{datetime.now()}: restart listening.") + + def stream_record_process(self, + bytes_frames: bytes, + frames_size: int, + record_chunk_size: int, + sample_rate: int, + min_stream_record_time: int, + is_bgn: bool, + is_end: bool): + ''' + Args: + bytes_frames: bytes, audio data + frames_size: int, audio data size + record_chunk_size: int, audio data chunk size + is_bgn: bool, is begin of stream + is_end: bool, is end of stream + + Returns: + bool, if stream reset status + ''' + if len(bytes_frames) == 0: + return False + if frames_size*record_chunk_size >= min_stream_record_time*sample_rate or is_end: + if is_bgn and is_end: + return False + stream_data = dict( + frames=bytes_frames, + frames_size=frames_size, + chunk_size=record_chunk_size, + is_bgn=is_bgn, + is_end=is_end) + self.client_queue.put(('audio', stream_data)) + if is_end: + # print("put None to client queue.") + self.client_queue.put(None) + return True + else: + return False + + def web_socket_client_process(self): + + client = BaseWebSocketClient(self.server_args['server_url'], self.server_args['session_id']) + print("Web socket client process started.") + # print("Web socket client process started.") + + while True: + if self.client_queue.empty(): + continue + print(f"{datetime.now()}: start setup web socket connection.") + + # 第一级:唤醒状态下,连接服务器 + if self.wakeup_event.is_set(): + client.wakeup_client() + else: + print(f"not wake up, skip setup web socket connection.") + self.client_queue.get(block=False) + continue + + # 播放状态下,不连接服务器 + if self.speaking_event.is_set(): + print(f"speaking, skip setup web socket connection.") + self.client_queue.get(block=False) + continue + + # 发送数据 + for queue_data in QueueIterator(self.client_queue): + # 发送音频数据 + if queue_data[0] == 'audio': + + # 当唤醒状态被关闭时,退出循环 + if not self.wakeup_event.is_set(): + self.listening_event.clear() + self.speaking_event.clear() + client.close_client() + break + + # 播放时不得发送数据,默认废消息 + if self.speaking_event.is_set() and not self.listening_event.is_set(): + client.close_client() + break + + audio_dict = queue_data[1] + + client.send_per_data( + audio=audio_dict['frames'], + stream=True, + voice_synthesize=True, + is_end=audio_dict['is_end'], + encoding='base64', + ) + # is_end后切换播放模式 + if audio_dict['is_end']: + self.listening_event.clear() + self.speaking_event.set() + + if not self.wakeup_event.is_set(): + continue + + # 接收数据 + while True: + # 当唤醒状态被关闭时,退出循环 + if not self.wakeup_event.is_set(): + self.listening_event.clear() + self.speaking_event.clear() + client.close_client() + break + + if self.interrupt_event.is_set(): + self.listening_event.set() + self.speaking_event.clear() + break + + response, data_type = client.receive_per_data() + if data_type == dict: + print(f"{datetime.now()}: receive json data: {response}") # 打印接收到的消息 + # 5xx: 结束错误码处理 + if response['code'] in [501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545]: + # 恢复录音状态 + self.listening_event.set() + self.speaking_event.clear() + break + # 200: 正常结束 + elif response['code'] == 200: + self.audio_play_queue.put(('audio_json', response)) + if response['type'] == 'close': + break + # 201: 切换沉默模式 + elif response['code'] == 201: + self.listening_event.clear() + self.speaking_event.clear() + self.sleep_event.set() # 沉默状态下,关闭唤醒状态 + self.wakeup_event.clear() + elif data_type == list: + print(f"{datetime.now()}: receive text_audio_data") + # 切换播放模式 + self.listening_event.clear() + self.speaking_event.set() + self.audio_play_queue.put(('text_audio_data', response)) + + elif data_type == bytes: + # 开始播放 + print(f"{datetime.now()}: receive audio bytes") + self.audio_play_queue.put(('audio_bytes', response)) + elif data_type == None: + print(f"{datetime.now()}: receive None data, break loop.") + # 切换录音模式 + self.listening_event.set() + self.speaking_event.clear() + # print(f"listening_event: {self.listening_event.is_set()}, speaking_event: {self.speaking_event.is_set()}") + break # 如果没有接收到消息,则退出循环 + client.close_client() + print(f"{datetime.now()}: web socket client closed.") + + + def audio_play_process(self): + ''' + Args: + audio_play_queue: multiprocessing.Queue, audio play queue + share_time_dict: multiprocessing.Manager.dict, shared time dict + ''' + audio_player = AudioPlayer(**self.player_args) + print("Audio play process started.") + audio_list = [] + while True: + print("wait for audio data.") + item = self.audio_play_queue.get() + + # 唤醒状态 + if not self.wakeup_event.is_set(): + continue + + if item[0] == 'text_audio_data': + # TODO: 判断bytes是否是最后一个,如果是最后一个,则播放完毕,切换监听模式 + audio_text, audio_data = item[1] + + print(f"{datetime.now()}: start playing audio.") + print(f"{datetime.now()}: audio text: {audio_text}") + + if self.listening_event.is_set(): + continue + + if not self.speaking_event.is_set(): + continue + + audio_list.append(audio_data) + + # 播放音频 + try: + tts_audio = audio_list[0] # 取出第一个音频 + tts_audio = audio_player.check_audio_type(tts_audio, return_type=None) + for i in range(0, len(tts_audio), audio_player.CHUNK): + audio_player.stream.write(tts_audio[i:i+audio_player.CHUNK]) + # print("Playing {} data...{}/{}".format(item[0], i, len(tts_audio))) + + # 关闭状态 + if not self.wakeup_event.is_set(): + print(f"{datetime.now()}: microphone and speaker close.") + self.listening_event.clear() + self.speaking_event.clear() + self.sleep_event.set() # 沉默状态下,关闭唤醒状态 + break + # 按键打断播放 + if self.interrupt_event.is_set(): + print(f"{datetime.now()}: button interrupt (output).") + + time.sleep(0.3) # 虽然程序到这里,但是扬声器不一定播放完毕,延迟用来避免问题 + + while not self.audio_play_queue.empty(): # 清空队列 + self.audio_play_queue.get() + self.listening_event.set() + self.speaking_event.clear() + self.interrupt_event.clear() + break + + if self.speaking_event.is_set(): + # 播放最后一段音频 + audio_player.stream.write(tts_audio[i+audio_player.CHUNK:]) + audio_list = [] # 清空音频列表 + # audio_list.pop(0) # 弹出已播放音频 + print(f"{datetime.now()}: audio data played.") + else: + audio_list = [] # 清空音频列表 + print(f"{datetime.now()}: audio data clear.") + continue + except TypeError as e: + print(f"audio play error: {e}") + continue + + elif item[0] == 'audio_json': + data_type = item[1]['type'] + print(f"data_type: {data_type}") + if self.wakeup_event.is_set(): + if data_type == 'text': + if self.wakeup_event.is_set(): + # 切换播放模式 + self.listening_event.clear() + self.speaking_event.set() + if data_type in ['close', 'end']: + # 虽然程序到这里,但是扬声器不一定播放完毕,延迟用来避免问题 + time.sleep(0.8) + if self.wakeup_event.is_set(): + # 启动监听状态 + self.speaking_event.clear() + self.listening_event.set() + + + + + + def excute_process(self): + ''' + Args: + excute_queue: multiprocessing.Queue, excute display queue + ''' + print("Excute process started.") + + while True: + if self.excute_queue.empty(): + continue + + if self.speaker_active_set.is_set(): + instruct, content = self.excute_queue.get() + + print(f"Got speaker info: {instruct, content}") + + print(f"Playing {instruct} {content}...") + print(f"play {instruct} time: {datetime.now()}") + self.audio_play_queue.put((instruct, content)) + + self.speaker_active_set.clear() + \ No newline at end of file