TakwayBoard/takway/clients/web_socket_client_utils.py

330 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# basic
import io
import os
import sys
import time
import json
import random
from collections import deque
from datetime import datetime
# log
import logging
import warnings
# multiprocessing
import queue
import threading
import multiprocessing
# web request
import requests
import pyaudio
# hot words detection
import pvporcupine
from takway.apps.data_struct import QueueIterator
from takway.common_utils import *
from takway.audio_utils import PicovoiceRecorder, HDRecorder
from takway.clients.client_utils import BaseWebSocketClient
from takway.audio_utils import AudioPlayer
class WebSocketClinet:
def __init__(self,
board,
server_args,
recorder_args,
player_args,
log_args,
excute_args=None,
):
self.board = board
# server_args
self.server_args = server_args
# recorder_args
self.recorder_args = recorder_args
# player_args
self.player_args = player_args
# excute_args
self.excute_args = excute_args
# log_args
self.log_args = log_args
def process_init(self):
# multiprocessing
manager = multiprocessing.Manager()
self.trigger_queue = manager.Queue()
self.client_queue = manager.Queue()
self.audio_play_queue = manager.Queue()
self.excute_queue = manager.Queue()
# 多进程标志为
self.mircophone_active_set = manager.Event()
self.speaker_active_set = manager.Event()
processes = [
multiprocessing.Process(target=self.audio_process),
multiprocessing.Process(target=self.web_socket_client_process),
multiprocessing.Process(target=self.audio_play_process),
]
if self.excute_args.get('enable', False):
processes.append(
multiprocessing.Process(target=self.excute_process),
)
for process in processes:
time.sleep(0.5)
process.start()
for process in processes:
process.join()
def audio_process(self):
"""audio_process
Args:
trigger_queue: multiprocessing.Queue, trigger queue
client_queue: multiprocessing.Queue, client queue
"""
min_stream_record_time = self.recorder_args.pop('min_stream_record_time')
voice_trigger = self.recorder_args.pop('voice_trigger')
# TODO:
press_type = self.recorder_args.pop('press_type')
max_slience_time = self.recorder_args.pop('max_slience_time')
if voice_trigger:
recorder = PicovoiceRecorder(**self.recorder_args)
else:
voice_keys = ['access_key', 'keywords', 'keyword_paths', 'model_path','sensitivities', 'library_path']
for key in voice_keys:
self.recorder_args.pop(key)
recorder = HDRecorder(**self.recorder_args)
recorder.min_stream_record_time = min_stream_record_time
# TODO:
recorder.press_type = press_type
recorder.max_slience_time = max_slience_time
print("Audio Process started.")
print("Waiting for wake up...")
# recorder.hardware.set_led_on("green")
while True:
if self.shared_waiting:
continue
data = recorder.record_chunk_voice(
CHUNK=recorder.porcupine.frame_length,
return_type=None,
exception_on_overflow=False,
queue=None)
record_chunk_size = recorder.vad_chunk_size
# 开关按键被按下或被关键词唤醒
if recorder.hardware.power_status or recorder.is_wakeup(data):
# recorder.hardware.set_led_on("blue")
pass
else:
continue
# wake up
is_bgn = True
is_end = False
frames = []
# status buffer
slience_bgn_t = time.time()
slience_time = 0
print("Start recording...")
# 准备对话状态
while True:
# 语音活动检测
data = recorder.record_chunk_voice(
CHUNK=record_chunk_size,
return_type=None,
exception_on_overflow=False)
is_speech = recorder.is_speech(data)
# 判断状态
if is_speech:
print("valid voice")
slience_bgn_t = time.time()
frames.append(data)
else:
slience_time = time.time() - slience_bgn_t
# 长时沉默关闭唤醒状态:如果唤醒后超过一定时间没有说话/关闭按键被按下,则认为是结束
if slience_time > recorder.max_slience_time or not recorder.hardware.power_status:
break
# 短时沉默结束单次对话沉默时间超过一定时间段0.5s左右),则发送数据
if slience_time > recorder.min_act_time:
is_end = True
is_bgn = False
if not is_speech:
continue
# 流式发送数据
stream_reset_status = self.stream_record_process(
bytes_frames=recorder.write_wave_bytes(frames),
frames_size=len(frames),
record_chunk_size=record_chunk_size,
sample_rate=recorder.RATE,
min_stream_record_time=recorder.min_stream_record_time,
is_bgn=is_bgn,
is_end=is_end)
if stream_reset_status:
frames.clear()
is_bgn = False
# print(f"Tatal frames: {_total_frames*record_chunk_size}, valid frame: {_frames*record_chunk_size}, valid RATE: {_frames/_total_frames*100:.2f}%, {_frames*record_chunk_size/recorder.RATE} sec.")
# print("End recording.")
# recorder.hardware.set_led_off("blue")
def stream_record_process(self,
bytes_frames: bytes,
frames_size: int,
record_chunk_size: int,
sample_rate: int,
min_stream_record_time: int,
is_bgn: bool,
is_end: bool):
'''
Args:
bytes_frames: bytes, audio data
frames_size: int, audio data size
record_chunk_size: int, audio data chunk size
is_bgn: bool, is begin of stream
is_end: bool, is end of stream
Returns:
bool, if stream reset status
'''
if len(bytes_frames) == 0:
return False
if frames_size*record_chunk_size >= min_stream_record_time*sample_rate or is_end:
if is_bgn and is_end:
return False
stream_data = dict(
frames=bytes_frames,
frames_size=frames_size,
chunk_size=record_chunk_size,
is_bgn=is_bgn,
is_end=is_end)
self.client_queue.put(('audio', stream_data))
if is_end:
# print("put None to client queue.")
self.client_queue.put(None)
return True
else:
return False
def web_socket_client_process(self):
client = BaseWebSocketClient(self.server_args['server_url'], self.server_args['session_id'])
print("Web socket client process started.")
# print("Web socket client process started.")
while True:
if self.client_queue.empty():
continue
# print(f"init skt time: {datetime.now()}")
# 唤醒
client.wakeup_client()
# 发送数据
for queue_data in QueueIterator(self.client_queue):
if queue_data[0] == 'audio':
audio_dict = queue_data[1]
client.send_per_data(
audio=audio_dict['frames'],
stream=True,
voice_synthesize=True,
is_end=audio_dict['is_end'],
encoding='base64',
)
# print(f"send skt time: {datetime.now()}")
# print(f"fnsh skt time: {datetime.now()}")
# 接收数据
while True:
response, data_type = client.receive_per_data()
if data_type == dict:
print(response) # 打印接收到的消息
'''
try:
response = json.loads(response['msg'])
if 'content' in response.keys():
self.excute_queue.put((response['instruct'], response['content']))
except json.JSONDecodeError as e:
print(f"json decode error: {e}")
continue
# print(f"recv json time: {datetime.now()}")
'''
elif data_type == bytes:
# print(f"recv bytes time: {datetime.now()}")
self.audio_play_queue.put(('audio_bytes', response))
elif data_type == None:
break # 如果没有接收到消息,则退出循环
# print("接收完毕:", datetime.now())
def audio_play_process(self):
'''
Args:
audio_play_queue: multiprocessing.Queue, audio play queue
share_time_dict: multiprocessing.Manager.dict, shared time dict
'''
audio_player = AudioPlayer(**self.player_args)
print("Audio play process started.")
while True:
item = self.audio_play_queue.get()
# 播放音频
print("Playing audio...")
tts_audio = item[1]
print(f"tts_audio len: {len(tts_audio)}")
print(f"play audio time: {datetime.now()}")
try:
# 播放
self.speaker_active_set.set()
tts_audio = audio_player.check_audio_type(tts_audio, return_type=None)
for i in range(0, len(tts_audio), audio_player.CHUNK):
audio_player.stream.write(tts_audio[i:i+audio_player.CHUNK])
# print("Playing {} data...{}/{}".format(item[0], i, len(tts_audio)))
if self.mircophone_active_set.is_set():
print("mirophone is active.")
self.mircophone_active_set.wait()
break
audio_player.stream.write(tts_audio[i+audio_player.CHUNK:])
print(f"audio data played.")
except TypeError as e:
print(f"audio play error: {e}")
continue
# audio_player.stream.write(audio_data[i+audio_player.CHUNK:])
# print(f"{item[0]} data played.")
def excute_process(self):
'''
Args:
excute_queue: multiprocessing.Queue, excute display queue
'''
print("Excute process started.")
while True:
if self.excute_queue.empty():
continue
if self.speaker_active_set.is_set():
instruct, content = self.excute_queue.get()
print(f"Got speaker info: {instruct, content}")
print(f"Playing {instruct} {content}...")
print(f"play {instruct} time: {datetime.now()}")
self.audio_play_queue.put((instruct, content))
self.speaker_active_set.clear()