TakwayBoard/takway/clients/web_socket_client_utils_v1.py

464 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# basic
import io
import os
import sys
import time
import json
import random
from collections import deque
from datetime import datetime
# log
import logging
import warnings
# multiprocessing
import queue
import threading
import multiprocessing
# web request
import requests
import pyaudio
# hot words detection
import pvporcupine
from takway.apps.data_struct import QueueIterator
from takway.common_utils import *
from takway.audio_utils import PicovoiceRecorder, HDRecorder
from takway.clients.client_utils import BaseWebSocketClient
from takway.audio_utils import AudioPlayer
class WebSocketClinet:
def __init__(self,
board,
server_args,
recorder_args,
player_args,
log_args,
excute_args=None,
):
self.board = board
# server_args
self.server_args = server_args
# recorder_args
self.recorder_args = recorder_args
# player_args
self.player_args = player_args
# excute_args
self.excute_args = excute_args
# log_args
self.log_args = log_args
def process_init(self):
# multiprocessing
manager = multiprocessing.Manager()
self.trigger_queue = manager.Queue()
self.client_queue = manager.Queue()
self.audio_play_queue = manager.Queue()
self.excute_queue = manager.Queue()
# 多进程标志为
self.mircophone_active_set = manager.Event()
self.speaker_active_set = manager.Event()
processes = [
multiprocessing.Process(target=self.audio_process),
multiprocessing.Process(target=self.web_socket_client_process),
multiprocessing.Process(target=self.audio_play_process),
]
if self.excute_args.get('enable', False):
processes.append(
multiprocessing.Process(target=self.excute_process),
)
for process in processes:
time.sleep(0.5)
process.start()
for process in processes:
process.join()
def audio_process(self):
"""audio_process
Args:
trigger_queue: multiprocessing.Queue, trigger queue
client_queue: multiprocessing.Queue, client queue
"""
min_stream_record_time = self.recorder_args.pop('min_stream_record_time')
voice_trigger = self.recorder_args.pop('voice_trigger')
press_type = self.recorder_args.pop('press_type')
if voice_trigger:
recorder = PicovoiceRecorder(**self.recorder_args)
else:
voice_keys = ['access_key', 'keywords', 'keyword_paths', 'model_path','sensitivities', 'library_path']
for key in voice_keys:
self.recorder_args.pop(key)
recorder = HDRecorder(**self.recorder_args)
recorder.min_stream_record_time = min_stream_record_time
recorder.press_type = press_type
# shared data struct:
self.shared_waiting = False
self.shared_lock = threading.Lock()
self.shared_data_lock = threading.Lock()
# create threads
threads = [threading.Thread(target=self.hardware_trigger_thread, args=(recorder,))]
if voice_trigger:
vioce_threads = [
threading.Thread(target=self.voice_trigger_thread, args=(recorder,)),
]
threads.extend(vioce_threads)
for thread in threads:
thread.start()
print("Audio Process started.")
while True:
for thread in threads:
thread.join()
print(f"audio process exit") ; exit()
def hardware_trigger_thread(self, recorder):
"""hardware_trigger_thread
Args:
recorder: takway.audio_utils.Recorder, recorder object
"""
print("Hardware trigger thread started.")
trgrigger_status = False
record_chunk_size = recorder.hd_chunk_size
while True:
if self.shared_waiting:
continue
# init status buffer
is_bgn = True
frames = []
_total_frames = 0
self.mircophone_active_set.clear()
print("Waiting for button press...")
recorder.wait_for_hardware_pressed()
print("Button pressed.")
self.mircophone_active_set.set()
# stop voice trigger thread
with self.shared_data_lock:
self.shared_waiting = True # shared_waiting 控制所有线程的待机状态True表示待机False表示工作
print("Start recording...")
bg_t = time.time()
while True:
data = recorder.record_chunk_voice(
CHUNK=record_chunk_size,
return_type=None,
exception_on_overflow=False)
frames.append(data)
_total_frames += 1
if not recorder.is_hardware_pressed:
# print("Button released.")
print(f"button rlse time: {datetime.now()}")
break
stream_reset_status = self.stream_record_process(
bytes_frames=recorder.write_wave_bytes(frames),
frames_size=len(frames),
record_chunk_size=record_chunk_size,
sample_rate=recorder.RATE,
min_stream_record_time=recorder.min_stream_record_time,
is_bgn=is_bgn,
is_end=False)
if stream_reset_status:
frames.clear()
is_bgn = False
self.stream_record_process(
bytes_frames=recorder.write_wave_bytes(frames),
frames_size=len(frames),
record_chunk_size=record_chunk_size,
sample_rate=recorder.RATE,
min_stream_record_time=recorder.min_stream_record_time,
is_bgn=is_bgn,
is_end=True)
# print(f"Tatal frames: {_total_frames*record_chunk_size}, {_total_frames*record_chunk_size/recorder.RATE} sec.")
# print(f"rcrd time: {datetime.now()}")
with self.shared_data_lock:
self.shared_waiting = False # 恢复voice trigger线程工作
def voice_trigger_thread(self, recorder):
"""voice_trigger_thread
Args:
recorder: takway.audio_utils.Recorder, recorder object
"""
print("voice record thread started.")
print("Waiting for wake up...")
while True:
if self.shared_waiting:
continue
data = recorder.record_chunk_voice(
CHUNK=recorder.porcupine.frame_length,
return_type=None,
exception_on_overflow=False,
queue=None)
record_chunk_size = recorder.vad_chunk_size
self.mircophone_active_set.clear()
if not recorder.is_wakeup(data):
continue
if self.board == 'orangepi':
recorder.hardware.set_led2_on()
self.mircophone_active_set.set()
# wake up
is_bgn = True
_frames = 0
_total_frames = 0
frames = []
full_frames = []
# status buffer
buffer_size = recorder.vad_buffer_size
active_buffer = deque(maxlen=buffer_size)
bg_t = time.time()
print("Start recording...")
while True:
data = recorder.record_chunk_voice(
CHUNK=record_chunk_size,
return_type=None,
exception_on_overflow=False)
if data is None:
continue
is_speech = recorder.is_speech(data)
if is_speech:
_frames += 1
frames.append(data)
# print("add vad frame")
_total_frames += 1
full_frames.append(data)
# send chunk data to client
stream_reset_status = self.stream_record_process(
bytes_frames=recorder.write_wave_bytes(full_frames),
frames_size=len(full_frames),
record_chunk_size=record_chunk_size,
sample_rate=recorder.RATE,
min_stream_record_time=recorder.min_stream_record_time,
is_bgn=is_bgn,
is_end=False)
if stream_reset_status:
full_frames.clear()
is_bgn = False
if is_speech:
if active_buffer.__len__() == buffer_size:
active_buffer.popleft()
active_buffer.append(True)
else:
if active_buffer.__len__() == buffer_size:
active_buffer.popleft()
active_buffer.append(False)
if active_buffer.count(False) != active_buffer.maxlen:
continue
if time.time() - bg_t > recorder.min_act_time:
# end recording
self.stream_record_process(
bytes_frames=recorder.write_wave_bytes(full_frames),
frames_size=len(full_frames),
record_chunk_size=record_chunk_size,
sample_rate=recorder.RATE,
min_stream_record_time=recorder.min_stream_record_time,
is_bgn=is_bgn,
is_end=True)
# print(f"Tatal frames: {_total_frames*record_chunk_size}, valid frame: {_frames*record_chunk_size}, valid RATE: {_frames/_total_frames*100:.2f}%, {_frames*record_chunk_size/recorder.RATE} sec.")
# print("End recording.")
break
if self.board == 'orangepi':
recorder.hardware.set_led2_off()
def stream_record_process(self,
bytes_frames: bytes,
frames_size: int,
record_chunk_size: int,
sample_rate: int,
min_stream_record_time: int,
is_bgn: bool,
is_end: bool):
'''
Args:
bytes_frames: bytes, audio data
frames_size: int, audio data size
record_chunk_size: int, audio data chunk size
is_bgn: bool, is begin of stream
is_end: bool, is end of stream
Returns:
bool, if stream reset status
'''
if len(bytes_frames) == 0:
return False
if frames_size*record_chunk_size >= min_stream_record_time*sample_rate or is_end:
if is_bgn and is_end:
return False
stream_data = dict(
frames=bytes_frames,
frames_size=frames_size,
chunk_size=record_chunk_size,
is_bgn=is_bgn,
is_end=is_end)
self.client_queue.put(('audio', stream_data))
if is_end:
# print("put None to client queue.")
self.client_queue.put(None)
return True
else:
return False
def web_socket_client_process(self):
client = BaseWebSocketClient(self.server_args['server_url'], self.server_args['session_id'])
print("Web socket client process started.")
# print("Web socket client process started.")
while True:
if self.client_queue.empty():
continue
# print(f"init skt time: {datetime.now()}")
# 唤醒
client.wakeup_client()
# 发送数据
for queue_data in QueueIterator(self.client_queue):
if queue_data[0] == 'audio':
audio_dict = queue_data[1]
client.send_per_data(
audio=audio_dict['frames'],
stream=True,
voice_synthesize=True,
is_end=audio_dict['is_end'],
encoding='base64',
)
# print(f"send skt time: {datetime.now()}")
# print(f"fnsh skt time: {datetime.now()}")
# 接收数据
while True:
response, data_type = client.receive_per_data()
if data_type == dict:
print(response) # 打印接收到的消息
'''
try:
response = json.loads(response['msg'])
if 'content' in response.keys():
self.excute_queue.put((response['instruct'], response['content']))
except json.JSONDecodeError as e:
print(f"json decode error: {e}")
continue
# print(f"recv json time: {datetime.now()}")
'''
elif data_type == bytes:
# print(f"recv bytes time: {datetime.now()}")
self.audio_play_queue.put(('audio_bytes', response))
elif data_type == None:
break # 如果没有接收到消息,则退出循环
# print("接收完毕:", datetime.now())
def audio_play_process(self):
'''
Args:
audio_play_queue: multiprocessing.Queue, audio play queue
share_time_dict: multiprocessing.Manager.dict, shared time dict
'''
audio_player = AudioPlayer(**self.player_args)
print("Audio play process started.")
while True:
item = self.audio_play_queue.get()
if item[0] == 'audio_bytes':
# 播放音频
print("Playing audio...")
tts_audio = item[1]
print(f"tts_audio len: {len(tts_audio)}")
print(f"play audio time: {datetime.now()}")
try:
# 播放
self.speaker_active_set.set()
tts_audio = audio_player.check_audio_type(tts_audio, return_type=None)
for i in range(0, len(tts_audio), audio_player.CHUNK):
audio_player.stream.write(tts_audio[i:i+audio_player.CHUNK])
print("Playing {} data...{}/{}".format(item[0], i, len(tts_audio)))
if self.mircophone_active_set.is_set():
print("mirophone is active.")
self.mircophone_active_set.wait()
break
audio_player.stream.write(tts_audio[i+audio_player.CHUNK:])
# svae bytes to file, 追加写
with open("chat_audio.txt", 'ab') as f:
f.write(tts_audio)
print("Audio saved.")
print(f"audio data played.")
except TypeError as e:
print(f"audio play error: {e}")
continue
else:
if item[0] == 'story':
audio_data = audio_player.load_audio_file(f"/home/orangepi/story_22050/{item[1]}.wav")
elif item[0] == 'music':
audio_data = audio_player.load_audio_file("/home/orangepi/music_22050/1.wav")
# 播放
self.speaker_active_set.set()
audio_data = audio_player.check_audio_type(audio_data, return_type=None)
time.sleep(0.5)
for i in range(0, len(audio_data), audio_player.CHUNK):
audio_player.stream.write(audio_data[i:i+audio_player.CHUNK])
print("Playing {} data...{}/{}".format(item[0], i, len(audio_data)))
if self.mircophone_active_set.is_set():
audio_player.close()
print("Reinit audio player.")
print("mirophone is active.")
self.mircophone_active_set.wait()
time.sleep(0.5)
audio_player = AudioPlayer(**self.player_args)
break
# audio_player.stream.write(audio_data[i+audio_player.CHUNK:])
# print(f"{item[0]} data played.")
def excute_process(self):
'''
Args:
excute_queue: multiprocessing.Queue, excute display queue
'''
print("Excute process started.")
while True:
if self.excute_queue.empty():
continue
if self.speaker_active_set.is_set():
instruct, content = self.excute_queue.get()
print(f"Got speaker info: {instruct, content}")
print(f"Playing {instruct} {content}...")
print(f"play {instruct} time: {datetime.now()}")
self.audio_play_queue.put((instruct, content))
self.speaker_active_set.clear()