TakwayBoard/takway/clients/web_socket_client_utils.py

549 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# basic
import io
import os
import sys
import time
import json
import random
from collections import deque
from datetime import datetime
# log
import logging
import warnings
# multiprocessing
import queue
import threading
import multiprocessing
# web request
import requests
import pyaudio
# hot words detection
import pvporcupine
from takway.apps.data_struct import QueueIterator
from takway.common_utils import *
from takway.audio_utils import PicovoiceRecorder, HDRecorder
from takway.clients.client_utils import BaseWebSocketClient
from takway.audio_utils import AudioPlayer
class WebSocketClinet:
def __init__(self,
board,
server_args,
recorder_args,
player_args,
log_args,
excute_args=None,
):
self.board = board
# server_args
self.server_args = server_args
# self.recorder_args
self.recorder_args = recorder_args
# player_args
self.player_args = player_args
# excute_args
self.excute_args = excute_args
# log_args
self.log_args = log_args
def process_init(self):
# multiprocessing
manager = multiprocessing.Manager()
self.trigger_queue = manager.Queue()
self.client_queue = manager.Queue()
self.audio_play_queue = manager.Queue()
self.excute_queue = manager.Queue()
# 唤醒事件
self.wakeup_event = manager.Event()
self.sleep_event = manager.Event()
# 打断事件
# self.interrupt_event = manager.Event()
# 监听事件
self.listening_event = manager.Event()
# 播放事件
self.speaking_event = manager.Event()
processes = [
multiprocessing.Process(target=self.audio_process),
multiprocessing.Process(target=self.web_socket_client_process),
multiprocessing.Process(target=self.audio_play_process),
]
if self.excute_args.get('enable', False):
processes.append(
multiprocessing.Process(target=self.excute_process),
)
for process in processes:
time.sleep(0.5)
# time.sleep(2)
process.start()
for process in processes:
process.join()
def audio_process(self):
"""audio_process
Args:
trigger_queue: multiprocessing.Queue, trigger queue
client_queue: multiprocessing.Queue, client queue
"""
min_stream_record_time = self.recorder_args.pop('min_stream_record_time')
voice_trigger = self.recorder_args.pop('voice_trigger')
max_slience_time = self.recorder_args.pop('max_slience_time')
if voice_trigger:
recorder = PicovoiceRecorder(**self.recorder_args)
else:
voice_keys = ['access_key', 'keywords', 'keyword_paths', 'model_path','sensitivities', 'library_path']
for key in voice_keys:
self.recorder_args.pop(key)
recorder = HDRecorder(**self.recorder_args)
recorder.min_stream_record_time = min_stream_record_time
recorder.max_slience_time = max_slience_time
# self.hardware = recorder.hardware
# self.long_power_status = recorder.hardware.long_power_status
print(f"recorder.min_act_time: {recorder.min_act_time}")
print(f"recorder.max_slience_time: {recorder.max_slience_time}")
print("Audio Process started.")
# create threads
threads = [threading.Thread(target=self.hardware_trigger_thread, args=(recorder,))]
if voice_trigger:
vioce_threads = [
threading.Thread(target=self.voice_trigger_thread, args=(recorder,)),
]
threads.extend(vioce_threads)
for thread in threads:
thread.start()
print("Audio Process started.")
while True:
for thread in threads:
thread.join()
print(f"audio process exit") ; exit()
def hardware_trigger_thread(self, recorder):
"""hardware_trigger_thread
Args:
recorder: takway.audio_utils.Recorder, recorder object
"""
print("Hardware trigger thread started.")
# last_short_power_status = False
board = self.board
while True:
# 开关按键被按下
if recorder.hardware.long_power_status:
self.wakeup_event.set()
'''
# 短时按键被按下,打断
if recorder.hardware.short_power_status and not last_short_power_status:
# self.interrupt_event.set()
recorder.hardware.short_power_status = False
# print("Interrupt conversation.")
last_short_power_status = recorder.hardware.short_power_status
'''
else:
self.wakeup_event.clear()
time.sleep(0.01)
def voice_trigger_thread(self, recorder):
"""voice_trigger_thread
Args:
recorder: takway.audio_utils.Recorder, recorder object
"""
board = self.board
print("Waiting for wake up...")
while True:
data = recorder.record_chunk_voice(
CHUNK=recorder.porcupine.frame_length,
return_type=None,
exception_on_overflow=False,
queue=None)
record_chunk_size = recorder.vad_chunk_size
# 开关按键被按下或被关键词唤醒
if self.wakeup_event.is_set():
print(f"{datetime.now()}: Wake up by button.")
else:
if self.sleep_event.is_set():
print(f"{datetime.now()}: stay sleep mode.")
recorder.hardware.long_power_status = False
self.sleep_event.clear()
continue
if recorder.is_wakeup(data):
recorder.hardware.long_power_status = True
self.wakeup_event.set()
print(f"{datetime.now()}: wake up by voice.")
else:
recorder.hardware.long_power_status = False
continue
# 设置为倾听模式
self.listening_event.set()
self.speaking_event.clear()
# wake up
is_bgn = True
is_end = False
frames = []
# status buffer
single_chat_finish = False
slience_bgn_t = time.time()
slience_time = 0
print("Start recording...")
# 准备对话状态
while True:
if self.sleep_event.is_set():
print(f"{datetime.now()}: sleep mode.")
recorder.hardware.long_power_status = False
self.sleep_event.clear()
break
if not self.wakeup_event.is_set():
break
if self.listening_event.is_set():
# 语音活动检测
data = recorder.record_chunk_voice(
CHUNK=record_chunk_size,
return_type=None,
exception_on_overflow=False)
is_speech = recorder.is_speech(data)
# 判断状态
if is_speech:
# print(f"{datetime.now()}: valid voice")
slience_bgn_t = time.time()
frames.append(data)
slience_time = time.time() - slience_bgn_t
# 长时沉默关闭唤醒状态:如果唤醒后超过一定时间没有说话/关闭按键被按下,则认为是结束
if slience_time > recorder.max_slience_time:
recorder.hardware.long_power_status = False
self.wakeup_event.clear()
break
# 短时沉默结束单次对话沉默时间超过一定时间段0.5s左右),则发送数据
if slience_time > recorder.min_act_time:
single_chat_finish = True
if single_chat_finish:
is_end = True
is_bgn = False
# print(f"{datetime.now()}: slience_time: {slience_time}")
# 流式发送数据
stream_reset_status = self.stream_record_process(
bytes_frames=recorder.write_wave_bytes(frames),
frames_size=len(frames),
record_chunk_size=record_chunk_size,
sample_rate=recorder.RATE,
min_stream_record_time=recorder.min_stream_record_time,
is_bgn=is_bgn,
is_end=is_end)
# print(f"stream_reset_status: {stream_reset_status}, is_bgn: {is_bgn}, is_end: {is_end}, frame: {len(frames)}")
if stream_reset_status:
frames.clear()
is_bgn = False
if single_chat_finish:
is_bgn = True
is_end = False
single_chat_finish = False
print(f"{datetime.now()}: single conversation finish, reset frames.")
elif self.speaking_event.is_set():
print(f"{datetime.now()}: wait for speaking finished and listening started or sleep mode.")
if board == 'orangepi':
recorder.hardware.set_led_on("blue")
while self.wakeup_event.is_set():
# 睡眠 or 监听状态
if self.sleep_event.is_set() or self.listening_event.is_set():
break
# if self.interrupt_event.is_set():
# print(f"{datetime.now()}: button interrupt (input).")
# break
if board == 'orangepi':
recorder.hardware.set_led_off("blue")
# 重新计时
slience_bgn_t = time.time()
print(f"{datetime.now()}: restart listening.")
def stream_record_process(self,
bytes_frames: bytes,
frames_size: int,
record_chunk_size: int,
sample_rate: int,
min_stream_record_time: int,
is_bgn: bool,
is_end: bool):
'''
Args:
bytes_frames: bytes, audio data
frames_size: int, audio data size
record_chunk_size: int, audio data chunk size
is_bgn: bool, is begin of stream
is_end: bool, is end of stream
Returns:
bool, if stream reset status
'''
if len(bytes_frames) == 0:
return False
if frames_size*record_chunk_size >= min_stream_record_time*sample_rate or is_end:
if is_bgn and is_end:
return False
stream_data = dict(
frames=bytes_frames,
frames_size=frames_size,
chunk_size=record_chunk_size,
is_bgn=is_bgn,
is_end=is_end)
self.client_queue.put(('audio', stream_data))
if is_end:
# print("put None to client queue.")
self.client_queue.put(None)
return True
else:
return False
def web_socket_client_process(self):
client = BaseWebSocketClient(self.server_args['server_url'], self.server_args['session_id'])
print("Web socket client process started.")
# print("Web socket client process started.")
while True:
if self.client_queue.empty():
continue
print(f"{datetime.now()}: start setup web socket connection.")
# 第一级:唤醒状态下,连接服务器
if self.wakeup_event.is_set():
client.wakeup_client()
else:
print(f"not wake up, skip setup web socket connection.")
self.client_queue.get(block=False)
continue
# 播放状态下,不连接服务器
if self.speaking_event.is_set():
print(f"speaking, skip setup web socket connection.")
self.client_queue.get(block=False)
continue
# 发送数据
for queue_data in QueueIterator(self.client_queue):
# 发送音频数据
if queue_data[0] == 'audio':
# 当唤醒状态被关闭时,退出循环
if not self.wakeup_event.is_set():
self.listening_event.clear()
self.speaking_event.clear()
client.close_client()
break
# 播放时不得发送数据,默认废消息
if self.speaking_event.is_set() and not self.listening_event.is_set():
client.close_client()
break
audio_dict = queue_data[1]
try:
client.send_per_data(
audio=audio_dict['frames'],
stream=True,
voice_synthesize=True,
is_end=audio_dict['is_end'],
encoding='base64',
)
except BrokenPipeError:
print(f"{datetime.now()}: web socket connection broken, skip send data.")
self.listening_event.set()
self.speaking_event.clear()
client.close_client()
break
# is_end后切换播放模式
if audio_dict['is_end']:
self.listening_event.clear()
self.speaking_event.clear()
print(f"{datetime.now()}: Finished send data.")
if not self.wakeup_event.is_set():
continue
if self.listening_event.is_set():
print(f"{datetime.now()}: listening or speaking, skip setup web socket connection.")
continue
# 接收数据
while True:
# 当唤醒状态被关闭时,退出循环
if not self.wakeup_event.is_set():
self.listening_event.clear()
self.speaking_event.clear()
client.close_client()
break
print(f"{datetime.now()}: wait for receive data.")
response, data_type = client.receive_per_data()
if data_type == dict:
print(f"{datetime.now()}: receive json data: {response}") # 打印接收到的消息
# 5xx: 结束错误码处理
if response['code'] in [501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545]:
# 恢复录音状态
self.listening_event.set()
self.speaking_event.clear()
break
# 200: 正常结束
elif response['code'] == 200:
self.audio_play_queue.put(('audio_json', response))
if response['type'] == 'close':
break
# 201: 切换沉默模式
elif response['code'] == 201:
self.listening_event.clear()
self.speaking_event.clear()
self.sleep_event.set() # 沉默状态下,关闭唤醒状态
self.wakeup_event.clear()
elif data_type == list:
print(f"{datetime.now()}: receive text_audio_data")
# 切换播放模式
self.listening_event.clear()
self.speaking_event.set()
self.audio_play_queue.put(('text_audio_data', response))
elif data_type == None:
print(f"{datetime.now()}: receive None data, break loop.")
# 切换录音模式
self.listening_event.set()
self.speaking_event.clear()
# print(f"listening_event: {self.listening_event.is_set()}, speaking_event: {self.speaking_event.is_set()}")
break # 如果没有接收到消息,则退出循环
client.close_client()
print(f"{datetime.now()}: web socket client closed.")
def audio_play_process(self):
'''
Args:
audio_play_queue: multiprocessing.Queue, audio play queue
share_time_dict: multiprocessing.Manager.dict, shared time dict
'''
audio_player = AudioPlayer(**self.player_args)
print("Audio play process started.")
audio_list = []
while True:
print("wait for audio data.")
item = self.audio_play_queue.get()
# 唤醒状态
if not self.wakeup_event.is_set():
continue
if item[0] == 'text_audio_data':
# TODO: 判断bytes是否是最后一个如果是最后一个则播放完毕切换监听模式
audio_text, audio_data = item[1]
print(f"{datetime.now()}: start playing audio.")
print(f"{datetime.now()}: audio text: {audio_text}")
if self.listening_event.is_set():
continue
if not self.speaking_event.is_set():
continue
audio_list.append(audio_data)
# 播放音频
try:
tts_audio = audio_list[0] # 取出第一个音频
# tts_audio = audio_player.check_audio_type(tts_audio, return_type=None)
for i in range(0, len(tts_audio), audio_player.CHUNK):
audio_player.stream.write(tts_audio[i:i+audio_player.CHUNK])
# print("Playing {} data...{}/{}".format(item[0], i, len(tts_audio)))
# 关闭状态
if not self.wakeup_event.is_set():
print(f"{datetime.now()}: microphone and speaker close.")
self.listening_event.clear()
self.speaking_event.clear()
self.sleep_event.set() # 沉默状态下,关闭唤醒状态
break
if self.speaking_event.is_set():
# 播放最后一段音频
audio_player.stream.write(tts_audio[i+audio_player.CHUNK:])
audio_list = [] # 清空音频列表
# audio_list.pop(0) # 弹出已播放音频
print(f"{datetime.now()}: audio data played.")
else:
audio_list = [] # 清空音频列表
print(f"{datetime.now()}: audio data clear.")
continue
except TypeError as e:
print(f"audio play error: {e}")
continue
elif item[0] == 'audio_json':
data_type = item[1]['type']
print(f"data_type: {data_type}")
if self.wakeup_event.is_set():
if data_type == 'text':
if self.wakeup_event.is_set():
# 切换播放模式
self.listening_event.clear()
self.speaking_event.set()
if data_type in ['close', 'end']:
# 虽然程序到这里,但是扬声器不一定播放完毕,延迟用来避免问题
time.sleep(0.8)
if self.wakeup_event.is_set():
# 启动监听状态
self.speaking_event.clear()
self.listening_event.set()
"""
def excute_process(self):
'''
Args:
excute_queue: multiprocessing.Queue, excute display queue
'''
print("Excute process started.")
while True:
if self.excute_queue.empty():
continue
if self.speaker_active_set.is_set():
instruct, content = self.excute_queue.get()
print(f"Got speaker info: {instruct, content}")
print(f"Playing {instruct} {content}...")
print(f"play {instruct} time: {datetime.now()}")
self.audio_play_queue.put((instruct, content))
self.speaker_active_set.clear()
"""